stan

package
v2.0.0-RC3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2020 License: Apache-2.0 Imports: 10 Imported by: 3

Documentation

Overview

Package stan implements the CloudEvent transport implementation using NATS Streaming.

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")

Functions

func StanOptions

func StanOptions(opts ...stan.Option) []stan.Option

StanOptions is a helper function to group a variadic stan.Option into []stan.Option

func WriteMsg

func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) error

WriteMsg fills the provided writer with the bindings.Message m. Using context you can tweak the encoding processing (more details on binding.Write documentation).

Types

type Consumer

type Consumer struct {
	Receiver

	Conn               stan.Conn
	Subject            string
	Subscriber         Subscriber
	UnsubscribeOnClose bool
	// contains filtered or unexported fields
}

Consumer is responsible for managing STAN subscriptions and makes messages available via the Receiver interface.

Consumer implements the following interfaces:

- protocol.Opener - protocol.Closer - protocol.Receiver

func NewConsumer

func NewConsumer(clusterID, clientID, subject string, stanOpts []stan.Option, opts ...ConsumerOption) (*Consumer, error)

func NewConsumerFromConn

func NewConsumerFromConn(conn stan.Conn, subject string, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close(_ context.Context) error

Close implements Closer.Close. This method only closes the connection if the Consumer opened it. Subscriptions are closed/unsubscribed dependent on the UnsubscribeOnClose field.

func (*Consumer) OpenInbound

func (c *Consumer) OpenInbound(ctx context.Context) error

OpenInbound implements Opener.OpenInbound.

type ConsumerOption

type ConsumerOption func(*Consumer) error

func WithQueueSubscriber

func WithQueueSubscriber(queue string) ConsumerOption

WithQueueSubscriber configures the transport to create a queue subscription instead of a standard subscription.

func WithSubscriptionOptions

func WithSubscriptionOptions(opts ...stan.SubscriptionOption) ConsumerOption

WithSubscriptionOptions sets options to configure the STAN subscription.

func WithUnsubscribeOnClose

func WithUnsubscribeOnClose() ConsumerOption

WithUnsubscribeOnClose configures the Consumer to unsubscribe when OpenInbound context is cancelled or when Consumer.Close() is invoked. This causes durable subscriptions to be forgotten by the STAN service and recreated durable subscriptions will act like they are newly created.

type Message

type Message struct {
	Msg *stan.Msg
	// contains filtered or unexported fields
}

Message implements binding.Message by wrapping an *stan.Msg. This message *can* be read several times safely

func NewMessage

func NewMessage(msg *stan.Msg, opts ...MessageOption) (*Message, error)

NewMessage wraps a *nats.Msg in a binding.Message. The returned message *can* be read several times safely

func (*Message) Finish

func (m *Message) Finish(err error) error

func (*Message) ReadBinary

func (m *Message) ReadBinary(context.Context, binding.BinaryWriter) error

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type MessageOption

type MessageOption func(*Message) error

func WithManualAcks

func WithManualAcks() MessageOption

type Protocol

type Protocol struct {
	Conn stan.Conn

	Consumer *Consumer

	Sender *Sender
	// contains filtered or unexported fields
}

Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a STAN client and a STAN handler.

func NewProtocol

func NewProtocol(clusterID, clientID, sendSubject, receiveSubject string, stanOpts []stan.Option, opts ...ProtocolOption) (*Protocol, error)

NewProtocol creates a new STAN protocol including managing the lifecycle of the connection

func NewProtocolFromConn

func NewProtocolFromConn(conn stan.Conn, sendSubject, receiveSubject string, opts ...ProtocolOption) (*Protocol, error)

NewProtocolFromConn creates a new STAN protocol but leaves managing the lifecycle of the connection up to the caller

func (*Protocol) Close

func (p *Protocol) Close(ctx context.Context) (err error)

Close implements Closer.Close

func (*Protocol) OpenInbound

func (p *Protocol) OpenInbound(ctx context.Context) error

OpenInbound implements Opener.OpenInbound

func (*Protocol) Receive

func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)

Receive implements Receiver.Receive

func (*Protocol) Send

func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error

Send implements Sender.Send

type ProtocolOption

type ProtocolOption func(*Protocol) error

func WithConsumerOptions

func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption

func WithSenderOptions

func WithSenderOptions(opts ...SenderOption) ProtocolOption

type QueueSubscriber

type QueueSubscriber struct {
	QueueGroup string
}

QueueSubscriber creates queue subscriptions

func (*QueueSubscriber) Subscribe

func (s *QueueSubscriber) Subscribe(conn stan.Conn, subject string, cb stan.MsgHandler,
	opts ...stan.SubscriptionOption) (stan.Subscription, error)

Subscribe implements Subscriber.Subscribe

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver implements protocol.Receiver for STAN subscriptions

func NewReceiver

func NewReceiver(opts ...ReceiverOption) (*Receiver, error)

func (*Receiver) MsgHandler

func (r *Receiver) MsgHandler(msg *stan.Msg)

MsgHandler implements stan.MsgHandler This function is passed to the call to stan.Conn.Subscribe so that we can stream messages to be delivered via Receive()

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)

Receive implements Receiver.Receive This should probably not be invoked directly by applications or library code, but instead invoked via Protocol.Receive

type ReceiverOption

type ReceiverOption func(*Receiver) error

func WithMessageOptions

func WithMessageOptions(opts ...MessageOption) ReceiverOption

type RegularSubscriber

type RegularSubscriber struct {
}

RegularSubscriber creates regular subscriptions

func (*RegularSubscriber) Subscribe

func (s *RegularSubscriber) Subscribe(conn stan.Conn, subject string, cb stan.MsgHandler,
	opts ...stan.SubscriptionOption) (stan.Subscription, error)

Subscribe implements Subscriber.Subscribe

type Sender

type Sender struct {
	Conn    stan.Conn
	Subject string
	// contains filtered or unexported fields
}

func NewSender

func NewSender(clusterID, clientID, subject string, stanOpts []stan.Option, opts ...SenderOption) (*Sender, error)

NewSender creates a new protocol.Sender responsible for opening and closing the STAN connection

func NewSenderFromConn

func NewSenderFromConn(conn stan.Conn, subject string, opts ...SenderOption) (*Sender, error)

NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller

func (*Sender) Close

func (s *Sender) Close(_ context.Context) error

Close implements Closer.Close This method only closes the connection if the Sender opened it

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error)

type SenderOption

type SenderOption func(*Sender) error

type Subscriber

type Subscriber interface {
	Subscribe(conn stan.Conn, subject string, cb stan.MsgHandler,
		opts ...stan.SubscriptionOption) (stan.Subscription, error)
}

The Subscriber interface allows us to configure how the subscription is created

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL