Documentation ¶
Overview ¶
Package stan implements the CloudEvent transport implementation using NATS Streaming.
Index ¶
- Variables
- func StanOptions(opts ...stan.Option) []stan.Option
- func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, ...) error
- type Consumer
- type ConsumerOption
- type Message
- type MessageOption
- type Protocol
- type ProtocolOption
- type QueueSubscriber
- type Receiver
- type ReceiverOption
- type RegularSubscriber
- type Sender
- type SenderOption
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")
Functions ¶
func StanOptions ¶
func StanOptions(opts ...stan.Option) []stan.Option
StanOptions is a helper function to group a variadic stan.Option into []stan.Option
Types ¶
type Consumer ¶
type Consumer struct { Receiver Conn stan.Conn Subject string Subscriber Subscriber UnsubscribeOnClose bool // contains filtered or unexported fields }
Consumer is responsible for managing STAN subscriptions and makes messages available via the Receiver interface.
Consumer implements the following interfaces:
- protocol.Opener - protocol.Closer - protocol.Receiver
func NewConsumer ¶
func NewConsumer(clusterID, clientID, subject string, stanOpts []stan.Option, opts ...ConsumerOption) (*Consumer, error)
func NewConsumerFromConn ¶
func NewConsumerFromConn(conn stan.Conn, subject string, opts ...ConsumerOption) (*Consumer, error)
type ConsumerOption ¶
func WithQueueSubscriber ¶
func WithQueueSubscriber(queue string) ConsumerOption
WithQueueSubscriber configures the transport to create a queue subscription instead of a standard subscription.
func WithSubscriptionOptions ¶
func WithSubscriptionOptions(opts ...stan.SubscriptionOption) ConsumerOption
WithSubscriptionOptions sets options to configure the STAN subscription.
func WithUnsubscribeOnClose ¶
func WithUnsubscribeOnClose() ConsumerOption
WithUnsubscribeOnClose configures the Consumer to unsubscribe when OpenInbound context is cancelled or when Consumer.Close() is invoked. This causes durable subscriptions to be forgotten by the STAN service and recreated durable subscriptions will act like they are newly created.
type Message ¶
type Message struct { Msg *stan.Msg // contains filtered or unexported fields }
Message implements binding.Message by wrapping an *stan.Msg. This message *can* be read several times safely
func NewMessage ¶
func NewMessage(msg *stan.Msg, opts ...MessageOption) (*Message, error)
NewMessage wraps a *nats.Msg in a binding.Message. The returned message *can* be read several times safely
func (*Message) ReadBinary ¶
func (*Message) ReadEncoding ¶
func (*Message) ReadStructured ¶
type MessageOption ¶
func WithManualAcks ¶
func WithManualAcks() MessageOption
type Protocol ¶
type Protocol struct { Conn stan.Conn Consumer *Consumer Sender *Sender // contains filtered or unexported fields }
Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a STAN client and a STAN handler.
func NewProtocol ¶
func NewProtocol(clusterID, clientID, sendSubject, receiveSubject string, stanOpts []stan.Option, opts ...ProtocolOption) (*Protocol, error)
NewProtocol creates a new STAN protocol including managing the lifecycle of the connection
func NewProtocolFromConn ¶
func NewProtocolFromConn(conn stan.Conn, sendSubject, receiveSubject string, opts ...ProtocolOption) (*Protocol, error)
NewProtocolFromConn creates a new STAN protocol but leaves managing the lifecycle of the connection up to the caller
func (*Protocol) OpenInbound ¶
OpenInbound implements Opener.OpenInbound
type ProtocolOption ¶
func WithConsumerOptions ¶
func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption
func WithSenderOptions ¶
func WithSenderOptions(opts ...SenderOption) ProtocolOption
type QueueSubscriber ¶
type QueueSubscriber struct {
QueueGroup string
}
QueueSubscriber creates queue subscriptions
func (*QueueSubscriber) Subscribe ¶
func (s *QueueSubscriber) Subscribe(conn stan.Conn, subject string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (stan.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver implements protocol.Receiver for STAN subscriptions
func NewReceiver ¶
func NewReceiver(opts ...ReceiverOption) (*Receiver, error)
func (*Receiver) MsgHandler ¶
func (r *Receiver) MsgHandler(msg *stan.Msg)
MsgHandler implements stan.MsgHandler This function is passed to the call to stan.Conn.Subscribe so that we can stream messages to be delivered via Receive()
type ReceiverOption ¶
func WithMessageOptions ¶
func WithMessageOptions(opts ...MessageOption) ReceiverOption
type RegularSubscriber ¶
type RegularSubscriber struct { }
RegularSubscriber creates regular subscriptions
func (*RegularSubscriber) Subscribe ¶
func (s *RegularSubscriber) Subscribe(conn stan.Conn, subject string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (stan.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Sender ¶
type Sender struct { Conn stan.Conn Subject string // contains filtered or unexported fields }
func NewSender ¶
func NewSender(clusterID, clientID, subject string, stanOpts []stan.Option, opts ...SenderOption) (*Sender, error)
NewSender creates a new protocol.Sender responsible for opening and closing the STAN connection
func NewSenderFromConn ¶
func NewSenderFromConn(conn stan.Conn, subject string, opts ...SenderOption) (*Sender, error)
NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller
type SenderOption ¶
type Subscriber ¶
type Subscriber interface { Subscribe(conn stan.Conn, subject string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (stan.Subscription, error) }
The Subscriber interface allows us to configure how the subscription is created