messaging

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2022 License: MIT Imports: 4 Imported by: 0

README

messaging-wrapper-nats

Wrapper library around the messaging technology NATS

This is a very thin wrapper around https://github.com/nats-io/nats.go, mainly meant to:

  • provide sensible default options for our project
  • help with creating the connection to the messaging infrastructure
  • Add encoding/decoding support for the payload

This library does not abstract away from the NATS specifics. It also allows to access the underlying upstream contexts to allow for advanced usage of the underlying library without adding explicit support here.

This library supports the JetStream protocol, which provides reliability on top of NATS and pure NATS. In most cases durable messages (e.g. JetStream) should be preferred. However, JetStream requires some application-specific setting up of Streams and Consumers. TODO: Explain how to determine correct Streams and Consumers setup.

For Request-Reply style communication pure NATS must be used. Reliability can be archived by retrying the request on failure.

Example usage

Publishing:

type complexData struct {
    name string
    whatever int
    ...
}

func publish() error {
    messagingConfig := ...
    mc := NewMessagingConnector(messagingConfig)
    if err := mc.Connect(); err != nil {
        return err
    }
    _, err := mc.PublishDurable("subject1", "payload1") //payload1 is a string
    if err != nil {
        return err
    }

    _, err :=  mc.PublishDurable("subject2", 123) //payload2 is a number
    if err != nil {
        return err
    }

    _, err := mc.PublishDurable("subject3", &complexData{
        name: "asd",
        whatever: 5,
        ...
    }) //payload3 is a dict

    if err != nil {
        return err
    }
}

Subscribing:

type complexData struct {
    name string
    whatever int
    ...
}

func subscribe() {
    messagingConfig := ...
    mc := NewMessagingConnector(messagingConfig)
    if err := mc.Connect(); err != nil {
        return err
    }

    sub, err := mc.SubscribeDurableSync("Sample-Stream", "Sample-Consumer")
    if err != nil {
        return err
    }
    msg, err := sub.NextMsg(timeout)
    if err != nil {
        return err
    }
    if err := sub.Unsubscribe(); err != nil {
        return err
    }
    var payload *string // payload1 is a string
    if err := mc.Encoder().Decode(msg.Subject, msg.Data, payload); err != nil {
        return err
    }

    sub, err := mc.SubscribeDurableAsync("Sample-Stream", "Sample-Consumer", func(m *nats.Msg) {
        var payload *complexData // payload3 is a dict
        if err := mc.Encoder().Decode(msg.Subject, msg.Data, payload); err != nil {
            //somehow handle
        }
    })
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	ServerURL                            string                        `default:"nats://127.0.0.1:4222" split_words:"true" desc:"The url of the NATS server"`
	RequiredStreamsConfigurationAction   PersistentConfigurationAction `` /* 147-byte string literal not displayed */
	RequiredConsumersConfigurationAction PersistentConfigurationAction `` /* 149-byte string literal not displayed */
}

Config is the configuration used by a messaging connector. These are intended to be set by the ops people deploying the application. For options set by the calling application see Options. It is made to be processed by https://github.com/kelseyhightower/envconfig but could also be filled manually

type MsgCon

type MsgCon struct {
	// contains filtered or unexported fields
}

MsgCon is a messaging connector able to connect to the messaging infrastructure

func NewMessagingConnector

func NewMessagingConnector(config Config) MsgCon

NewMessagingConnector creates a new messaging connector with the given config

It does not yet actually connect to the messaging infrastructure. Use Connect for that.

func (*MsgCon) Config

func (mc *MsgCon) Config() Config

func (*MsgCon) Connect

func (mc *MsgCon) Connect() error

Connect actually connects this messaging connector instance to the messaging infrastructure.

To customize the connection please call SetOptions.

This message can only be called once per messaging connector. Returns an error when called a second time.

func (*MsgCon) Connected

func (mc *MsgCon) Connected() bool

func (MsgCon) DefaultDurablePubOptions

func (mc MsgCon) DefaultDurablePubOptions() []nats.PubOpt

DefaultDurablePubOptions returns some sensible default PubOpts for the JetStream Publish Functions

func (MsgCon) DefaultDurableSubOptions

func (mc MsgCon) DefaultDurableSubOptions() []nats.SubOpt

DefaultDurableSubOptions returns some sensible default SubOpts for the JetStream Sublish Functions

func (*MsgCon) Disconnect

func (mc *MsgCon) Disconnect() error

Disconnect this messaging connector. After this is called, there is no way to reconnect with this messaging connector.

func (*MsgCon) Disconnected

func (mc *MsgCon) Disconnected() bool

func (*MsgCon) Encoder

func (mc *MsgCon) Encoder() nats.Encoder

func (*MsgCon) JetStreamContext

func (mc *MsgCon) JetStreamContext() *nats.JetStreamContext

func (*MsgCon) NatsContext

func (mc *MsgCon) NatsContext() *nats.Conn

func (*MsgCon) Options

func (mc *MsgCon) Options() options

func (MsgCon) Publish

func (mc MsgCon) Publish(subject string, payload interface{}) error

Publish publishes a message with the given payload on the given subject.

The payload may be anything that can be encoded with the selected encoder.

This method does not use JetStream, therefore no QOS is guaranteed. For reliable messaging see PublishDurable

Returns an error when encoding fails

For more information on the behavior, parameters, and return value see nats.Conn.Publish

func (MsgCon) PublishDurable

func (mc MsgCon) PublishDurable(subject string, payload interface{}) (*nats.PubAck, error)

Publish publishes a message with the given payload on the given subject.

The payload may be anything that can be encoded with the selected encoder.

This method does use JetStream and the messages are therefore durable. However you must make sure the Streams and Consumers are setup correctly.

Returns an error when encoding fails

For more information on the behavior, parameters, and return value see nats.JetStreamContext.Publish

func (MsgCon) Request

func (mc MsgCon) Request(subject string, payload interface{}, timeout time.Duration) (*nats.Msg, error)

Request will do a request with the given payload to the given subject and wait the given amount of time for the response.

The payload may be anything that can be encoded with the selected encoder.

This method does not use JetStream, therefore no QOS is guaranteed. Request-reply is not supported by JetStream. Reliability in request-reply can be achived by retrying the request on failure.

Returns an error when encoding fails

For more information on the behavior, parameters, and return value see nats.Conn.Request

func (*MsgCon) SetOptions

func (mc *MsgCon) SetOptions(opts ...Option) error

SetOptions sets the given options on the messaging connector.

Options are ment to be set by the calling application. For configuration supplied during deployment see Config.

Options set will be overwritten, not merged.

This method must be called becore calling Connect. Returns an error if the messaging connector is already connected.

Example setting some options: mc := NewMessagingConnector(...) mc.SetOptions(WithRequiredStreams(...), WithAdditionalNatsOptions(...), ...)

func (MsgCon) SubscribeAsync

func (mc MsgCon) SubscribeAsync(subject string, handler nats.MsgHandler) (*nats.Subscription, error)

SubscribeAsync subscribes to a subject, retrieving messages asnchronously.

To decode the payload use MsgCon.Encoder.Decode

This method does not use JetStream, therefore no QOS is guaranteed. For reliable messaging see SubscribeDurableAsync

For more information on the behavior, parameters, and return value see nats.Conn.Subscribe

func (MsgCon) SubscribeDurableAsync

func (mc MsgCon) SubscribeDurableAsync(streamName string, consumerName string, handler nats.MsgHandler) (*nats.Subscription, error)

SubscribeDurableAsync durably subscribes to a subject, retrieving messages asnchronously. It uses the preconfigured consumer with the given name on the stream with the given name. The subject is defined by the configuration of the stream.

To decode the payload use MsgCon.Encoder.Decode

This method does use JetStream and the messages are therefore durable. However you must make sure the Streams and Consumers are setup correctly.

For more information on the behavior, parameters, and return value see nats.JetStreamContext.Subscribe

func (MsgCon) SubscribeDurableSync

func (mc MsgCon) SubscribeDurableSync(streamName string, consumerName string) (*nats.Subscription, error)

SubscribeDurableSync durably subscribes to a subject, retrieving messages synchronously. It uses the preconfigured consumer with the given name on the stream with the given name. The subject is defined by the configuration of the stream.

To decode the payload use MsgCon.Encoder.Decode

This method does use JetStream and the messages are therefore durable. However you must make sure the Streams and Consumers are setup correctly.

For more information on the behavior, parameters, and return value see nats.JetStreamContext.SubscribeSync

func (MsgCon) SubscribeSync

func (mc MsgCon) SubscribeSync(subject string) (*nats.Subscription, error)

SubscribeSync subscribes to a subject, retrieving messages synchronously.

To decode the payload use MsgCon.Encoder.Decode

This method does not use JetStream, therefore no QOS is guaranteed. For reliable messaging see SubscribeDurableSync

For more information on the behavior, parameters, and return value see nats.Conn.SubscribeSync

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is one option that can be set for a messaging connector

func WithAdditionalJetStreamOptions

func WithAdditionalJetStreamOptions(additionalJetStreamOptions ...nats.JSOpt) Option

WithAdditionalJetStreamOptions returns the corresponding Option for passing the given additional options the the nats.Conn.JetStream call

func WithAdditionalNatsOptions

func WithAdditionalNatsOptions(additionalNatsOptions ...nats.Option) Option

WithAdditionalNatsOptions returns the corresponding Option for passing the given additional options to the nats.Connect call

func WithEncoder

func WithEncoder(encoder nats.Encoder) Option

WithEncoder returns the corresponding Option for using the given encoder for encoding payloads

func WithRequiredConsumers

func WithRequiredConsumers(streamName string, requiredConsumers ...nats.ConsumerConfig) Option

WithRequiredConsumers returns the corresponding Option indicating that consumers with the given config should be configured for the given stream during Connect

Consumers are part of JetStream, the protocol on top of NATS with durability gurantees. Streams should be properly configured when using one of the durable subscribe functions.

This will only overwrite the list of required consumers for the given stream name

func WithRequiredStreams

func WithRequiredStreams(requiredStreams ...nats.StreamConfig) Option

WithRequiredStreams returns the corresponding Option indicating that streams with the given config should be configured during Connect

Streams are part of JetStream, the protocol on top of NATS with durability gurantees. Streams should be properly configured when using one of the durable publish or subscribe functions.

type PersistentConfigurationAction

type PersistentConfigurationAction string

PersistentConfigurationAction is a type indictaing what to do with configuration options persisted in the nats infrastructure

const (
	// Do not configure instances of this option ever
	DoNotTouch PersistentConfigurationAction = "DoNotTouch"
	// Create required instances of this option when they are not present but do not update existing instances
	CreateIfMissing PersistentConfigurationAction = "CreateIfMissing"
	// Always update the configuration of the required instances of this option
	AlwaysUpdate PersistentConfigurationAction = "AlwaysUpdate"
)

func (*PersistentConfigurationAction) Decode

func (action *PersistentConfigurationAction) Decode(value string) error

func (PersistentConfigurationAction) IsValid

func (a PersistentConfigurationAction) IsValid() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL