Documentation ¶
Overview ¶
Package nats implements the CloudEvent transport implementation using NATS.
Index ¶
- Variables
- func NatsOptions(opts ...nats.Option) []nats.Option
- func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, ...) error
- type Consumer
- type ConsumerOption
- type Message
- type Protocol
- type ProtocolOption
- type QueueSubscriber
- type Receiver
- type RegularSubscriber
- type Sender
- type SenderOption
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")
var ErrSubscriptionAlreadyOpen = errors.New("subscription already open")
Functions ¶
func NatsOptions ¶
func NatsOptions(opts ...nats.Option) []nats.Option
NatsOptions is a helper function to group a variadic stan.ProtocolOption into []stan.Option that can be used by either Sender, Consumer or Protocol
Types ¶
type Consumer ¶
type Consumer struct { Receiver Conn *nats.Conn Subject string Subscriber Subscriber // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer(url, subject string, natsOpts []nats.Option, opts ...ConsumerOption) (*Consumer, error)
func NewConsumerFromConn ¶
func NewConsumerFromConn(conn *nats.Conn, subject string, opts ...ConsumerOption) (*Consumer, error)
type ConsumerOption ¶
func WithQueueSubscriber ¶
func WithQueueSubscriber(queue string) ConsumerOption
WithQueueSubscriber configures the Consumer to join a queue group when subscribing
type Message ¶
type Message struct { Msg *nats.Msg // contains filtered or unexported fields }
Message implements binding.Message by wrapping an *nats.Msg. This message *can* be read several times safely
func NewMessage ¶
func NewMessage(msg *nats.Msg) *Message
Wrap an *nats.Msg in a binding.Message. The returned message *can* be read several times safely
func (*Message) ReadBinary ¶
func (*Message) ReadEncoding ¶
func (*Message) ReadStructured ¶
type Protocol ¶
type Protocol struct { Conn *nats.Conn Consumer *Consumer Sender *Sender // contains filtered or unexported fields }
Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.
func NewProtocol ¶
func NewProtocol(url, sendSubject, receiveSubject string, natsOpts []nats.Option, opts ...ProtocolOption) (*Protocol, error)
NewProtocol creates a new NATS protocol.
func NewProtocolFromConn ¶
func NewProtocolFromConn(conn *nats.Conn, sendSubject, receiveSubject string, opts ...ProtocolOption) (*Protocol, error)
type ProtocolOption ¶
ProtocolOption is the function signature required to be considered an nats.ProtocolOption.
func WithConsumerOptions ¶
func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption
func WithSenderOptions ¶
func WithSenderOptions(opts ...SenderOption) ProtocolOption
type QueueSubscriber ¶
type QueueSubscriber struct {
Queue string
}
QueueSubscriber creates queue subscriptions
func (*QueueSubscriber) Subscribe ¶
func (s *QueueSubscriber) Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
func NewReceiver ¶
func NewReceiver() *Receiver
func (*Receiver) MsgHandler ¶
func (r *Receiver) MsgHandler(msg *nats.Msg)
MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered via r.Receive(ctx)
type RegularSubscriber ¶
type RegularSubscriber struct { }
RegularSubscriber creates regular subscriptions
func (*RegularSubscriber) Subscribe ¶
func (s *RegularSubscriber) Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Sender ¶
type Sender struct { Conn *nats.Conn Subject string Transformers binding.TransformerFactories // contains filtered or unexported fields }
func NewSender ¶
func NewSender(url, subject string, natsOpts []nats.Option, opts ...SenderOption) (*Sender, error)
NewSender creates a new protocol.Sender responsible for opening and closing the STAN connection
func NewSenderFromConn ¶
func NewSenderFromConn(conn *nats.Conn, subject string, opts ...SenderOption) (*Sender, error)
NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller
type SenderOption ¶
func WithTransformer ¶
func WithTransformer(transformer binding.TransformerFactory) SenderOption
Add a transformer, which Sender uses while encoding a binding.Message to an nats.Message
type Subscriber ¶
type Subscriber interface {
Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)
}
The Subscriber interface allows us to configure how the subscription is created