nats_jetstream

package module
v2.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2024 License: Apache-2.0 Imports: 13 Imported by: 12

Documentation

Overview

Package nats_jetstream implements the CloudEvent transport implementation using NATS JetStream.

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")

Functions

func NatsOptions

func NatsOptions(opts ...nats.Option) []nats.Option

NatsOptions is a helper function to group a variadic nats.ProtocolOption into []nats.Option that can be used by either Sender, Consumer or Protocol

func WriteMsg

func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) (nats.Header, error)

WriteMsg fills the provided writer with the bindings.Message m. Using context you can tweak the encoding processing (more details on binding.Write documentation). The nats.Header returned is not deep-copied. The header values should be deep-copied to an event object.

Types

type Consumer

type Consumer struct {
	Receiver

	Conn       *nats.Conn
	Jsm        nats.JetStreamContext
	Subject    string
	Subscriber Subscriber
	SubOpt     []nats.SubOpt
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(url, stream, subject string, natsOpts []nats.Option, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error)

func NewConsumerFromConn

func NewConsumerFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) OpenInbound

func (c *Consumer) OpenInbound(ctx context.Context) error

type ConsumerOption

type ConsumerOption func(*Consumer) error

func WithQueueSubscriber

func WithQueueSubscriber(queue string) ConsumerOption

WithQueueSubscriber configures the Consumer to join a queue group when subscribing

type Message

type Message struct {
	Msg *nats.Msg
	// contains filtered or unexported fields
}

Message implements binding.Message by wrapping an *nats.Msg. This message *can* be read several times safely

func NewMessage

func NewMessage(msg *nats.Msg) *Message

NewMessage wraps an *nats.Msg in a binding.Message. The returned message *can* be read several times safely The default encoding returned is EncodingStructured unless the NATS message contains a specversion header.

func (*Message) Finish

func (m *Message) Finish(err error) error

Finish *must* be called when message from a Receiver can be forgotten by the receiver.

func (*Message) GetAttribute added in v2.15.0

func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{})

GetAttribute implements binding.MessageMetadataReader

func (*Message) GetExtension added in v2.15.0

func (m *Message) GetExtension(name string) interface{}

GetExtension implements binding.MessageMetadataReader

func (*Message) GetVersion added in v2.15.0

func (m *Message) GetVersion() spec.Version

GetVersion looks for specVersion header and returns a Version object

func (*Message) ReadBinary

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error

ReadBinary transfers a binary-mode event to an BinaryWriter.

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

ReadEncoding return the type of the message Encoding.

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

ReadStructured transfers a structured-mode event to a StructuredWriter.

type Protocol

type Protocol struct {
	Conn *nats.Conn

	Consumer *Consumer

	Sender *Sender
	// contains filtered or unexported fields
}

Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.

func NewProtocol

func NewProtocol(url, stream, sendSubject, receiveSubject string, natsOpts []nats.Option, jsOps []nats.JSOpt, subOpts []nats.SubOpt, opts ...ProtocolOption) (*Protocol, error)

NewProtocol creates a new NATS protocol.

func NewProtocolFromConn

func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject string, jsOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ProtocolOption) (*Protocol, error)

func (*Protocol) Close

func (p *Protocol) Close(ctx context.Context) error

Close implements Closer.Close

func (*Protocol) OpenInbound

func (p *Protocol) OpenInbound(ctx context.Context) error

func (*Protocol) Receive

func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)

Receive implements Receiver.Receive

func (*Protocol) Send

func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error

Send implements Sender.Send

type ProtocolOption

type ProtocolOption func(*Protocol) error

ProtocolOption is the function signature required to be considered an nats.ProtocolOption.

func WithConsumerOptions added in v2.15.0

func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption

func WithSenderOptions added in v2.15.0

func WithSenderOptions(opts ...SenderOption) ProtocolOption

type QueueSubscriber

type QueueSubscriber struct {
	Queue string
}

QueueSubscriber creates queue subscriptions

func (*QueueSubscriber) Subscribe

func (s *QueueSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)

Subscribe implements Subscriber.Subscribe

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func NewReceiver

func NewReceiver() *Receiver

NewReceiver creates a new protocol.Receiver responsible for receiving messages.

func (*Receiver) MsgHandler

func (r *Receiver) MsgHandler(msg *nats.Msg)

MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered via r.Receive(ctx)

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)

Receive implements Receiver.Receive.

type RegularSubscriber

type RegularSubscriber struct {
}

RegularSubscriber creates regular subscriptions

func (*RegularSubscriber) Subscribe

func (s *RegularSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)

Subscribe implements Subscriber.Subscribe

type Sender

type Sender struct {
	Jsm     nats.JetStreamContext
	Conn    *nats.Conn
	Subject string
	Stream  string
	// contains filtered or unexported fields
}

func NewSender

func NewSender(url, stream, subject string, natsOpts []nats.Option, jsmOpts []nats.JSOpt, opts ...SenderOption) (*Sender, error)

NewSender creates a new protocol.Sender responsible for opening and closing the NATS connection

func NewSenderFromConn

func NewSenderFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.JSOpt, opts ...SenderOption) (*Sender, error)

NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the NATS connection to the caller

func (*Sender) Close

func (s *Sender) Close(_ context.Context) error

Close implements Closer.Close This method only closes the connection if the Sender opened it

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error)

Close implements Sender.Sender Sender sends messages.

type SenderOption

type SenderOption func(*Sender) error

type Subscriber

type Subscriber interface {
	Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
}

The Subscriber interface allows us to configure how the subscription is created

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL