Documentation
¶
Overview ¶
Package nats_jetstream implements the CloudEvent transport implementation using NATS JetStream.
Index ¶
- Variables
- func NatsOptions(opts ...nats.Option) []nats.Option
- func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, ...) (nats.Header, error)
- type Consumer
- type ConsumerOption
- type Message
- func (m *Message) Finish(err error) error
- func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{})
- func (m *Message) GetExtension(name string) interface{}
- func (m *Message) GetVersion() spec.Version
- func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error
- func (m *Message) ReadEncoding() binding.Encoding
- func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error
- type Protocol
- type ProtocolOption
- type QueueSubscriber
- type Receiver
- type RegularSubscriber
- type Sender
- type SenderOption
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")
Functions ¶
func NatsOptions ¶
func NatsOptions(opts ...nats.Option) []nats.Option
NatsOptions is a helper function to group a variadic nats.ProtocolOption into []nats.Option that can be used by either Sender, Consumer or Protocol
func WriteMsg ¶
func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) (nats.Header, error)
WriteMsg fills the provided writer with the bindings.Message m. Using context you can tweak the encoding processing (more details on binding.Write documentation). The nats.Header returned is not deep-copied. The header values should be deep-copied to an event object.
Types ¶
type Consumer ¶
type Consumer struct { Receiver Conn *nats.Conn Jsm nats.JetStreamContext Subject string Subscriber Subscriber SubOpt []nats.SubOpt // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer(url, stream, subject string, natsOpts []nats.Option, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error)
func NewConsumerFromConn ¶
func NewConsumerFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error)
type ConsumerOption ¶
func WithQueueSubscriber ¶
func WithQueueSubscriber(queue string) ConsumerOption
WithQueueSubscriber configures the Consumer to join a queue group when subscribing
type Message ¶
type Message struct { Msg *nats.Msg // contains filtered or unexported fields }
Message implements binding.Message by wrapping an *nats.Msg. This message *can* be read several times safely
func NewMessage ¶
func NewMessage(msg *nats.Msg) *Message
NewMessage wraps an *nats.Msg in a binding.Message. The returned message *can* be read several times safely The default encoding returned is EncodingStructured unless the NATS message contains a specversion header.
func (*Message) Finish ¶
Finish *must* be called when message from a Receiver can be forgotten by the receiver.
func (*Message) GetAttribute ¶ added in v2.15.0
GetAttribute implements binding.MessageMetadataReader
func (*Message) GetExtension ¶ added in v2.15.0
GetExtension implements binding.MessageMetadataReader
func (*Message) GetVersion ¶ added in v2.15.0
GetVersion looks for specVersion header and returns a Version object
func (*Message) ReadBinary ¶
ReadBinary transfers a binary-mode event to an BinaryWriter.
func (*Message) ReadEncoding ¶
ReadEncoding return the type of the message Encoding.
func (*Message) ReadStructured ¶
ReadStructured transfers a structured-mode event to a StructuredWriter.
type Protocol ¶
type Protocol struct { Conn *nats.Conn Consumer *Consumer Sender *Sender // contains filtered or unexported fields }
Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.
func NewProtocol ¶
func NewProtocol(url, stream, sendSubject, receiveSubject string, natsOpts []nats.Option, jsOps []nats.JSOpt, subOpts []nats.SubOpt, opts ...ProtocolOption) (*Protocol, error)
NewProtocol creates a new NATS protocol.
func NewProtocolFromConn ¶
func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject string, jsOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ProtocolOption) (*Protocol, error)
type ProtocolOption ¶
ProtocolOption is the function signature required to be considered an nats.ProtocolOption.
func WithConsumerOptions ¶ added in v2.15.0
func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption
func WithSenderOptions ¶ added in v2.15.0
func WithSenderOptions(opts ...SenderOption) ProtocolOption
type QueueSubscriber ¶
type QueueSubscriber struct {
Queue string
}
QueueSubscriber creates queue subscriptions
func (*QueueSubscriber) Subscribe ¶
func (s *QueueSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
func NewReceiver ¶
func NewReceiver() *Receiver
NewReceiver creates a new protocol.Receiver responsible for receiving messages.
func (*Receiver) MsgHandler ¶
func (r *Receiver) MsgHandler(msg *nats.Msg)
MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered via r.Receive(ctx)
type RegularSubscriber ¶
type RegularSubscriber struct { }
RegularSubscriber creates regular subscriptions
func (*RegularSubscriber) Subscribe ¶
func (s *RegularSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Sender ¶
type Sender struct { Jsm nats.JetStreamContext Conn *nats.Conn Subject string Stream string // contains filtered or unexported fields }
func NewSender ¶
func NewSender(url, stream, subject string, natsOpts []nats.Option, jsmOpts []nats.JSOpt, opts ...SenderOption) (*Sender, error)
NewSender creates a new protocol.Sender responsible for opening and closing the NATS connection
func NewSenderFromConn ¶
func NewSenderFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.JSOpt, opts ...SenderOption) (*Sender, error)
NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the NATS connection to the caller
type SenderOption ¶
type Subscriber ¶
type Subscriber interface {
Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
}
The Subscriber interface allows us to configure how the subscription is created