nats

package module
v2.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2024 License: Apache-2.0 Imports: 10 Imported by: 19

Documentation

Overview

Package nats implements the CloudEvent transport implementation using NATS.

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")

Functions

func NatsOptions

func NatsOptions(opts ...nats.Option) []nats.Option

NatsOptions is a helper function to group a variadic nats.ProtocolOption into []nats.Option that can be used by either Sender, Consumer or Protocol

func WriteMsg

func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) error

WriteMsg fills the provided writer with the bindings.Message m. Using context you can tweak the encoding processing (more details on binding.Write documentation).

Types

type Consumer

type Consumer struct {
	Receiver

	Conn       *nats.Conn
	Subject    string
	Subscriber Subscriber
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(url, subject string, natsOpts []nats.Option, opts ...ConsumerOption) (*Consumer, error)

func NewConsumerFromConn

func NewConsumerFromConn(conn *nats.Conn, subject string, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) OpenInbound

func (c *Consumer) OpenInbound(ctx context.Context) error

type ConsumerOption

type ConsumerOption func(*Consumer) error

func WithQueueSubscriber

func WithQueueSubscriber(queue string) ConsumerOption

WithQueueSubscriber configures the Consumer to join a queue group when subscribing

type Message

type Message struct {
	Msg *nats.Msg
	// contains filtered or unexported fields
}

Message implements binding.Message by wrapping an *nats.Msg. This message *can* be read several times safely

func NewMessage

func NewMessage(msg *nats.Msg) *Message

NewMessage wraps an *nats.Msg in a binding.Message. The returned message *can* be read several times safely

func (*Message) Finish

func (m *Message) Finish(err error) error

func (*Message) ReadBinary

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type Protocol

type Protocol struct {
	Conn *nats.Conn

	Consumer *Consumer

	Sender *Sender
	// contains filtered or unexported fields
}

Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.

func NewProtocol

func NewProtocol(url, sendSubject, receiveSubject string, natsOpts []nats.Option, opts ...ProtocolOption) (*Protocol, error)

NewProtocol creates a new NATS protocol.

func NewProtocolFromConn

func NewProtocolFromConn(conn *nats.Conn, sendSubject, receiveSubject string, opts ...ProtocolOption) (*Protocol, error)

func (*Protocol) Close

func (p *Protocol) Close(ctx context.Context) error

Close implements Closer.Close

func (*Protocol) OpenInbound

func (p *Protocol) OpenInbound(ctx context.Context) error

func (*Protocol) Receive

func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)

Receive implements Receiver.Receive

func (*Protocol) Send

func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error

Send implements Sender.Send

type ProtocolOption

type ProtocolOption func(*Protocol) error

ProtocolOption is the function signature required to be considered an nats.ProtocolOption.

func WithConsumerOptions

func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption

func WithSenderOptions

func WithSenderOptions(opts ...SenderOption) ProtocolOption

type QueueSubscriber

type QueueSubscriber struct {
	Queue string
}

QueueSubscriber creates queue subscriptions

func (*QueueSubscriber) Subscribe

func (s *QueueSubscriber) Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)

Subscribe implements Subscriber.Subscribe

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func NewReceiver

func NewReceiver() *Receiver

func (*Receiver) MsgHandler

func (r *Receiver) MsgHandler(msg *nats.Msg)

MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered via r.Receive(ctx)

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)

type RegularSubscriber

type RegularSubscriber struct {
}

RegularSubscriber creates regular subscriptions

func (*RegularSubscriber) Subscribe

func (s *RegularSubscriber) Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)

Subscribe implements Subscriber.Subscribe

type Sender

type Sender struct {
	Conn    *nats.Conn
	Subject string
	// contains filtered or unexported fields
}

func NewSender

func NewSender(url, subject string, natsOpts []nats.Option, opts ...SenderOption) (*Sender, error)

NewSender creates a new protocol.Sender responsible for opening and closing the NATS connection

func NewSenderFromConn

func NewSenderFromConn(conn *nats.Conn, subject string, opts ...SenderOption) (*Sender, error)

NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the NATS connection to the caller

func (*Sender) Close

func (s *Sender) Close(_ context.Context) error

Close implements Closer.Close This method only closes the connection if the Sender opened it

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error)

type SenderOption

type SenderOption func(*Sender) error

type Subscriber

type Subscriber interface {
	Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)
}

The Subscriber interface allows us to configure how the subscription is created

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL