jetstream

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2022 License: MIT Imports: 12 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GobMarshaler

type GobMarshaler struct{}

GobMarshaler is marshaller which is using Gob to marshal Watermill messages.

func (GobMarshaler) Marshal

func (GobMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error)

Marshal transforms a watermill message into gob format.

func (GobMarshaler) Unmarshal

func (GobMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error)

Unmarshal extracts a watermill message from a nats message.

type JSONMarshaler added in v0.0.4

type JSONMarshaler struct{}

JSONMarshaler uses encoding/json to marshal Watermill messages.

func (JSONMarshaler) Marshal added in v0.0.4

func (JSONMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error)

Marshal transforms a watermill message into JSON format.

func (JSONMarshaler) Unmarshal added in v0.0.4

func (JSONMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error)

Unmarshal extracts a watermill message from a nats message.

type Marshaler

type Marshaler interface {
	// Marshal transforms a watermill message into binary format.
	Marshal(topic string, msg *message.Message) (*nats.Msg, error)
}

type MarshalerUnmarshaler

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

type NATSMarshaler added in v0.0.4

type NATSMarshaler struct{}

NATSMarshaler uses NATS header to marshal directly between watermill and NATS formats. The watermill UUID is stored at _watermill_message_uuid

func (*NATSMarshaler) Marshal added in v0.0.4

func (*NATSMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error)

Marshal transforms a watermill message into JSON format.

func (*NATSMarshaler) Unmarshal added in v0.0.4

func (*NATSMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error)

Unmarshal extracts a watermill message from a nats message.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher provides the jetstream implementation for watermill publish operations

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisher creates a new Publisher.

func NewPublisherWithNatsConn

func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisherWithNatsConn creates a new Publisher with the provided nats connection.

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher and the underlying connection

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

Publish publishes message to NATS.

Publish will not return until an ack has been received from JetStream. When one of messages delivery fails - function is interrupted.

type PublisherConfig

type PublisherConfig struct {
	// URL is the NATS URL.
	URL string

	// NatsOptions are custom options for a connection.
	NatsOptions []nats.Option

	// JetstreamOptions are custom Jetstream options for a connection.
	JetstreamOptions []nats.JSOpt

	// Marshaler is marshaler used to marshal messages between watermill and wire formats
	Marshaler Marshaler

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
	SubjectCalculator SubjectCalculator

	// AutoProvision bypasses client validation and provisioning of streams
	AutoProvision bool

	// PublishOptions are custom publish option to be used on all publication
	PublishOptions []nats.PubOpt

	// TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication
	TrackMsgId bool
}

PublisherConfig is the configuration to create a publisher

func (PublisherConfig) GetPublisherPublishConfig

func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig

GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established

func (PublisherConfig) Validate

func (c PublisherConfig) Validate() error

Validate ensures configuration is valid before use

type PublisherPublishConfig

type PublisherPublishConfig struct {
	// Marshaler is marshaler used to marshal messages between watermill and wire formats
	Marshaler Marshaler

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
	SubjectCalculator SubjectCalculator

	// AutoProvision bypasses client validation and provisioning of streams
	AutoProvision bool

	// JetstreamOptions are custom Jetstream options for a connection.
	JetstreamOptions []nats.JSOpt

	// PublishOptions are custom publish option to be used on all publication
	PublishOptions []nats.PubOpt

	// TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication
	TrackMsgId bool
}

PublisherPublishConfig is the configuration subset needed for an individual publish call

type SubjectCalculator added in v0.0.2

type SubjectCalculator func(topic string) *Subjects

SubjectCalculator is a function used to calculate nats subject(s) for the given topic.

type Subjects added in v0.0.3

type Subjects struct {
	Primary    string
	Additional []string
}

Subjects contains nats subject detail (primary + all additional) for a given watermill topic.

func (*Subjects) All added in v0.0.3

func (s *Subjects) All() []string

All combines the primary and all additional subjects for use by the nats client on creation.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber provides the jetstream implementation for watermill subscribe operations

func NewSubscriber

func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriber creates a new Subscriber.

func NewSubscriberWithNatsConn

func NewSubscriberWithNatsConn(conn *nats.Conn, config SubscriberSubscriptionConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriberWithNatsConn creates a new Subscriber with the provided nats connection.

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close closes the publisher and the underlying connection. It will attempt to wait for in-flight messages to complete.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes messages from JetStream.

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) error

SubscribeInitialize offers a way to ensure the stream for a topic exists prior to subscribe

type SubscriberConfig

type SubscriberConfig struct {
	// URL is the URL to the broker
	URL string

	// QueueGroup is the JetStream queue group.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// It is recommended to set it with DurableName.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurableName) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	//
	// When QueueGroup is empty, subscribe without QueueGroup will be used.
	QueueGroup string

	// DurableName is the JetStream durable name.
	//
	// Subscriptions may also specify a “durable name” which will survive client restarts.
	// Durable subscriptions cause the server to track the last acknowledged message
	// sequence number for a client and durable name. When the client restarts/resubscribes,
	// and uses the same client ID and durable name, the server will resume delivery beginning
	// with the earliest unacknowledged message for this durable subscription.
	//
	// Doing this causes the JetStream server to track
	// the last acknowledged message for that ClientID + DurableName.
	DurableName string

	// SubscribersCount determines how many concurrent subscribers should be started.
	SubscribersCount int

	// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
	// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
	CloseTimeout time.Duration

	// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
	// It is mapped to stan.AckWait option.
	AckWaitTimeout time.Duration

	// SubscribeTimeout determines how long subscriber will wait for a successful subscription
	SubscribeTimeout time.Duration

	// NatsOptions are custom []nats.Option passed to the connection.
	// It is also used to provide connection parameters, for example:
	// 		nats.URL("nats://localhost:4222")
	NatsOptions []nats.Option

	// JetstreamOptions are custom Jetstream options for a connection.
	JetstreamOptions []nats.JSOpt

	// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
	Unmarshaler Unmarshaler

	// SubscribeOptions defines nats options to be used when subscribing
	SubscribeOptions []nats.SubOpt

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
	SubjectCalculator SubjectCalculator

	// AutoProvision bypasses client validation and provisioning of streams
	AutoProvision bool

	// AckSync enables synchronous acknowledgement (needed for exactly once processing)
	AckSync bool
}

SubscriberConfig is the configuration to create a subscriber

func (*SubscriberConfig) GetSubscriberSubscriptionConfig added in v0.0.2

func (c *SubscriberConfig) GetSubscriberSubscriptionConfig() SubscriberSubscriptionConfig

GetSubscriberSubscriptionConfig gets the configuration subset needed for individual subscribe calls once a connection has been established

type SubscriberSubscriptionConfig

type SubscriberSubscriptionConfig struct {
	// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
	Unmarshaler Unmarshaler
	// QueueGroup is the JetStream queue group.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// It is recommended to set it with DurableName.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurableName) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	//
	// When QueueGroup is empty, subscribe without QueueGroup will be used.
	QueueGroup string

	// DurableName is the JetStream durable name.
	//
	// Subscriptions may also specify a “durable name” which will survive client restarts.
	// Durable subscriptions cause the server to track the last acknowledged message
	// sequence number for a client and durable name. When the client restarts/resubscribes,
	// and uses the same client ID and durable name, the server will resume delivery beginning
	// with the earliest unacknowledged message for this durable subscription.
	//
	// Doing this causes the JetStream server to track
	// the last acknowledged message for that ClientID + DurableName.
	DurableName string

	// SubscribersCount determines wow much concurrent subscribers should be started.
	SubscribersCount int

	// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
	// It is mapped to stan.AckWait option.
	AckWaitTimeout time.Duration

	// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
	// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
	CloseTimeout time.Duration

	// SubscribeTimeout determines how long subscriber will wait for a successful subscription
	SubscribeTimeout time.Duration

	// JetstreamOptions are custom Jetstream options for a connection.
	JetstreamOptions []nats.JSOpt

	// SubscribeOptions defines nats options to be used when subscribing
	SubscribeOptions []nats.SubOpt

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
	SubjectCalculator SubjectCalculator

	// AutoProvision bypasses client validation and provisioning of streams
	AutoProvision bool

	// AckSync enables synchronous acknowledgement (needed for exactly once processing)
	AckSync bool
}

SubscriberSubscriptionConfig is the configurationz

func (*SubscriberSubscriptionConfig) Validate

func (c *SubscriberSubscriptionConfig) Validate() error

Validate ensures configuration is valid before use

type Unmarshaler

type Unmarshaler interface {
	// Unmarshal extracts a watermill message from a nats message.
	Unmarshal(*nats.Msg) (*message.Message, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL