courier

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: MIT Imports: 24 Imported by: 7

README

Courier Golang Client Library

build lint codecov docs go-report-card docs

Introduction

Courier Golang client library provides an opinionated wrapper over paho MQTT library to add features on top of it.

Head over to Documentation to get started.

Find the end-to-end courier example here.

Features

  • Supports MQTT v3.1.1
  • Flexible Encoder/Decoder support from Go type to MQTT payload conversion and back
  • Middleware chaining
  • OpenTelemetry support

Usage

go get -u github.com/gojek/courier-go
Contributing Guide

Read our contributing guide to learn about our development process, how to propose bugfixes and improvements, and how to build and test your changes to Courier Go Client.

Release Process

This repo uses Golang submodules, to make a new release, make sure to follow the release process described in RELEASING doc exactly.

License

Courier Go Client is MIT licensed.

Documentation

Overview

Package courier contains the client that can be used to interact with the courier infrastructure to publish/subscribe to messages from other clients

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnectTimeout indicates connection timeout while connecting to broker.
	ErrConnectTimeout = errors.New("client timed out while trying to connect to the broker")
	// ErrPublishTimeout indicates publish timeout.
	ErrPublishTimeout = errors.New("publish timeout")
	// ErrSubscribeTimeout indicates subscribe timeout.
	ErrSubscribeTimeout = errors.New("subscribe timeout")
	// ErrUnsubscribeTimeout indicates unsubscribe timeout.
	ErrUnsubscribeTimeout = errors.New("unsubscribe timeout")
	// ErrSubscribeMultipleTimeout indicates multiple subscribe timeout.
	ErrSubscribeMultipleTimeout = errors.New("subscribe multiple timeout")
)
View Source
var ErrClientNotInitialized = errors.New("courier: client not initialized")

ErrClientNotInitialized is returned when the client is not initialized

View Source
var ResumeSubscriptions = resumeSubscriptions{}

ResumeSubscriptions allows resuming of stored (un)subscribe messages when connecting but not reconnecting if CleanSession is false. Otherwise, these messages are discarded.

View Source
var UseMultiConnectionMode = multiConnMode{}

UseMultiConnectionMode allows to configure the client to use multiple connections when available.

This is useful when working with shared subscriptions and multiple connections can be created to subscribe on the same application.

Functions

func ExponentialStartStrategy

func ExponentialStartStrategy(ctx context.Context, c interface{ Start() error }, opts ...StartOption)

ExponentialStartStrategy will keep attempting to call Client.Start in the background and retry on error, it will never exit unless the context used to invoke is cancelled. This will NOT stop the client, that is the responsibility of caller.

func Version

func Version() string

Version can be used to get the current courier library version

func WaitForConnection

func WaitForConnection(c ConnectionInformer, waitFor time.Duration, tick time.Duration) bool

WaitForConnection checks if the Client is connected, it calls ConnectionInformer.IsConnected after every tick and waitFor is the maximum duration it can block. Returns true only when ConnectionInformer.IsConnected returns true

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client allows to communicate with an MQTT broker

func NewClient

func NewClient(opts ...ClientOption) (*Client, error)

NewClient creates the Client struct with the clientOptions provided, it can return error when prometheus.DefaultRegisterer has already been used to register the collected metrics

Example
c, err := courier.NewClient(
	courier.WithUsername("username"),
	courier.WithPassword("password"),
	courier.WithAddress("localhost", 1883),
)

if err != nil {
	panic(err)
}

if err := c.Start(); err != nil {
	panic(err)
}

stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, []os.Signal{os.Interrupt, syscall.SIGTERM}...)

go func() {
	tick := time.NewTicker(time.Second)
	for {
		select {
		case t := <-tick.C:
			msg := map[string]interface{}{
				"time": t.UnixNano(),
			}
			if err := c.Publish(context.Background(), "topic", msg, courier.QOSOne); err != nil {
				fmt.Printf("Publish() error = %s\n", err)
			} else {
				fmt.Println("Publish() success")
			}
		case <-stopCh:
			tick.Stop()
			return
		}
	}
}()

<-stopCh
c.Stop()
Output:

func (*Client) InfoHandler added in v0.5.3

func (c *Client) InfoHandler() http.Handler

InfoHandler returns a http.Handler that exposes the connected clients information

func (*Client) IsConnected

func (c *Client) IsConnected() bool

IsConnected checks whether the client is connected to the broker

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, topic string, message interface{}, opts ...Option) error

Publish allows to publish messages to an MQTT broker

func (*Client) Run added in v0.4.0

func (c *Client) Run(ctx context.Context) error

Run will start running the Client. This makes Client compatible with github.com/gojekfarm/xrun package. https://pkg.go.dev/github.com/gojekfarm/xrun

func (*Client) Start

func (c *Client) Start() error

Start will attempt to connect to the broker.

func (*Client) Stop

func (c *Client) Stop()

Stop will disconnect from the broker and finish up any pending work on internal communication workers. This can only block until the period configured with the ClientOption WithGracefulShutdownPeriod.

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error

Subscribe allows to subscribe to messages from an MQTT broker

func (*Client) SubscribeMultiple

func (c *Client) SubscribeMultiple(
	ctx context.Context,
	topicsWithQos map[string]QOSLevel,
	callback MessageHandler,
) error

SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ctx context.Context, topics ...string) error

Unsubscribe removes any subscription to messages from an MQTT broker

func (*Client) UsePublisherMiddleware

func (c *Client) UsePublisherMiddleware(mwf ...PublisherMiddlewareFunc)

UsePublisherMiddleware appends a PublisherMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip messages. They are executed in the order that they are applied to the Client.

func (*Client) UseSubscriberMiddleware

func (c *Client) UseSubscriberMiddleware(mwf ...SubscriberMiddlewareFunc)

UseSubscriberMiddleware appends a SubscriberMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip subscriptions. They are executed in the order that they are applied to the Client.

func (*Client) UseUnsubscriberMiddleware

func (c *Client) UseUnsubscriberMiddleware(mwf ...UnsubscriberMiddlewareFunc)

UseUnsubscriberMiddleware appends a UnsubscriberMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip subscriptions. They are executed in the order that they are applied to the Client.

type ClientInfoEmitter added in v0.5.3

type ClientInfoEmitter interface {
	Emit(ctx context.Context, meta ClientMeta)
}

ClientInfoEmitter emits broker info. This can be called concurrently, implementations should be concurrency safe.

type ClientInfoEmitterConfig added in v0.5.3

type ClientInfoEmitterConfig struct {
	// Interval is the interval at which the broker info emitter emits broker info.
	Interval time.Duration
	Emitter  ClientInfoEmitter
}

ClientInfoEmitterConfig is used to configure the broker info emitter.

type ClientMeta added in v0.5.3

type ClientMeta struct {
	MultiConnMode bool
	Clients       []MQTTClientInfo
	Subscriptions map[string]QOSLevel
}

ClientMeta contains information about the internal MQTT client(s)

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods
}

ClientOption allows to configure the behaviour of a Client.

func WithAddress added in v0.1.1

func WithAddress(host string, port uint16) ClientOption

WithAddress sets the broker address to be used. To establish a TLS connection, use WithTLS Option along with this. Default values for hostname is "127.0.0.1" and for port is 1883.

func WithAutoReconnect

func WithAutoReconnect(autoReconnect bool) ClientOption

WithAutoReconnect sets whether the automatic reconnection logic should be used when the connection is lost, even if disabled the WithOnConnectionLost is still called.

func WithCleanSession

func WithCleanSession(cleanSession bool) ClientOption

WithCleanSession will set the "clean session" flag in the connect message when this client connects to an MQTT broker. By setting this flag, you are indicating that no messages saved by the broker for this client should be delivered. Any messages that were going to be sent by this client before disconnecting but didn't, will not be sent upon connecting to the broker.

func WithClientID

func WithClientID(clientID string) ClientOption

WithClientID sets the clientID to be used while connecting to an MQTT broker. According to the MQTT v3.1 specification, a client id must be no longer than 23 characters.

func WithConnectTimeout

func WithConnectTimeout(duration time.Duration) ClientOption

WithConnectTimeout limits how long the client will wait when trying to open a connection to an MQTT server before timing out. A duration of 0 never times out. Default 15 seconds.

func WithCredentialFetcher added in v0.5.0

func WithCredentialFetcher(fetcher CredentialFetcher) ClientOption

WithCredentialFetcher sets the specified CredentialFetcher.

func WithCustomDecoder

func WithCustomDecoder(decoderFunc DecoderFunc) ClientOption

WithCustomDecoder allows to decode message bytes into the desired object.

func WithCustomEncoder

func WithCustomEncoder(encoderFunc EncoderFunc) ClientOption

WithCustomEncoder allows to transform objects into the desired message bytes.

func WithExponentialStartOptions added in v0.5.0

func WithExponentialStartOptions(options ...StartOption) ClientOption

WithExponentialStartOptions configures the client to use ExponentialStartStrategy along with the passed StartOption(s) when using the Client.Run method.

func WithGracefulShutdownPeriod

func WithGracefulShutdownPeriod(duration time.Duration) ClientOption

WithGracefulShutdownPeriod sets the limit that is allowed for existing work to be completed.

func WithKeepAlive

func WithKeepAlive(duration time.Duration) ClientOption

WithKeepAlive will set the amount of time (in seconds) that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server. Deprecated: Use KeepAlive instead.

func WithLogger added in v0.5.0

func WithLogger(l Logger) ClientOption

WithLogger sets the Logger to use for the client.

func WithMaintainOrder

func WithMaintainOrder(maintainOrder bool) ClientOption

WithMaintainOrder will set the message routing to guarantee order within each QoS level. By default, this value is true. If set to false (recommended), this flag indicates that messages can be delivered asynchronously from the client to the application and possibly arrive out of order. Specifically, the message handler is called in its own go routine. Note that setting this to true does not guarantee in-order delivery (this is subject to broker settings like "max_inflight_messages=1") and if true then MessageHandler callback must not block.

func WithMaxReconnectInterval

func WithMaxReconnectInterval(duration time.Duration) ClientOption

WithMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts. when connection is lost

func WithOnConnect

func WithOnConnect(handler OnConnectHandler) ClientOption

WithOnConnect will set the OnConnectHandler callback to be called when the client is connected. Both at initial connection time and upon automatic reconnect.

func WithOnConnectionLost

func WithOnConnectionLost(handler OnConnectionLostHandler) ClientOption

WithOnConnectionLost will set the OnConnectionLostHandler callback to be executed in the case where the client unexpectedly loses connection with the MQTT broker.

func WithOnReconnect

func WithOnReconnect(handler OnReconnectHandler) ClientOption

WithOnReconnect sets the OnReconnectHandler callback to be executed prior to the client attempting a reconnect to the MQTT broker.

func WithPassword

func WithPassword(password string) ClientOption

WithPassword sets the password to be used while connecting to an MQTT broker.

func WithPersistence

func WithPersistence(store Store) ClientOption

WithPersistence allows to configure the store to be used by broker Default persistence is in-memory persistence with mqtt.MemoryStore

func WithResolver added in v0.1.1

func WithResolver(resolver Resolver) ClientOption

WithResolver sets the specified Resolver.

func WithTCPAddress deprecated

func WithTCPAddress(host string, port uint16) ClientOption

WithTCPAddress sets the broker address to be used. Default values for hostname is "127.0.0.1" and for port is 1883.

Deprecated: This Option used to work with plain TCP connections, it's now possible to use TLS with WithAddress and WithTLS combination.

func WithTLS added in v0.1.1

func WithTLS(tlsConfig *tls.Config) ClientOption

WithTLS sets the TLs configuration to be used while connecting to an MQTT broker.

func WithUseBase64Decoder

func WithUseBase64Decoder() ClientOption

WithUseBase64Decoder configures a json decoder with a base64.StdEncoding wrapped decoder which decodes base64 encoded message bytes into the passed object.

func WithUsername

func WithUsername(username string) ClientOption

WithUsername sets the username to be used while connecting to an MQTT broker.

func WithWriteTimeout

func WithWriteTimeout(duration time.Duration) ClientOption

WithWriteTimeout limits how long the client will wait when trying to publish, subscribe or unsubscribe on topic when a context deadline is not set while calling Publisher.Publish, Subscriber.Subscribe, Subscriber.SubscribeMultiple or Unsubscriber.Unsubscribe.

type ConnectRetryInterval added in v0.5.2

type ConnectRetryInterval time.Duration

ConnectRetryInterval allows to configure the interval between connection retries. Default value is 10 seconds.

type ConnectionInformer

type ConnectionInformer interface {
	// IsConnected checks whether the client is connected to the broker
	IsConnected() bool
}

ConnectionInformer can be used to get information about the connection

type Credential added in v0.5.0

type Credential struct {
	Username string
	Password string
}

Credential is a <username,password> pair.

type CredentialFetcher added in v0.5.0

type CredentialFetcher interface {
	Credentials(context.Context) (*Credential, error)
}

CredentialFetcher is an interface that allows to fetch credentials for a client.

type Decoder

type Decoder interface {
	// Decode decodes message bytes into the passed object
	Decode(v interface{}) error
}

Decoder helps to decode message bytes into the desired object

func DefaultDecoderFunc added in v0.2.0

func DefaultDecoderFunc(_ context.Context, r io.Reader) Decoder

DefaultDecoderFunc is a DecoderFunc that uses a json.Decoder as the Decoder.

type DecoderFunc

type DecoderFunc func(context.Context, io.Reader) Decoder

DecoderFunc is used to create a Decoder from io.Reader stream of message bytes before calling MessageHandler; the context.Context value may be used to select appropriate Decoder.

type Encoder

type Encoder interface {
	// Encode takes any object and encodes it into bytes
	Encode(v interface{}) error
}

Encoder helps in transforming objects to message bytes

func DefaultEncoderFunc added in v0.2.0

func DefaultEncoderFunc(_ context.Context, w io.Writer) Encoder

DefaultEncoderFunc is a EncoderFunc that uses a json.Encoder as the Encoder.

type EncoderFunc

type EncoderFunc func(context.Context, io.Writer) Encoder

EncoderFunc is used to create an Encoder from io.Writer; the context.Context value may be used to select appropriate Encoder.

type KeepAlive added in v0.5.1

type KeepAlive time.Duration

KeepAlive will set the amount of time that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server. Default value is 60 seconds. Note: Practically, when KeepAlive >= 10s, the client will check every 5s, if it needs to send a PING. In other cases, the client will check every KeepAlive/2.

type Logger added in v0.5.0

type Logger interface {
	Info(ctx context.Context, msg string, attrs map[string]any)
	Error(ctx context.Context, err error, attrs map[string]any)
}

Logger is the interface that wraps the Info and Error methods.

type MQTTClientInfo added in v0.5.3

type MQTTClientInfo struct {
	Addresses     []TCPAddress `json:"addresses"`
	ClientID      string       `json:"client_id"`
	Username      string       `json:"username"`
	ResumeSubs    bool         `json:"resume_subs"`
	CleanSession  bool         `json:"clean_session"`
	AutoReconnect bool         `json:"auto_reconnect"`
	Connected     bool         `json:"connected"`
	// Subscriptions contains the topics the client is subscribed to
	// Note: Currently, this field only holds shared subscriptions.
	Subscriptions []string `json:"subscriptions,omitempty"`
}

MQTTClientInfo contains information about the internal MQTT client

type Message

type Message struct {
	ID        int
	Topic     string
	Duplicate bool
	Retained  bool
	QoS       QOSLevel
	// contains filtered or unexported fields
}

Message represents the entity that is being relayed via the courier MQTT brokers from Publisher(s) to Subscriber(s).

func NewMessageWithDecoder

func NewMessageWithDecoder(
	payloadDecoder Decoder,
) *Message

NewMessageWithDecoder is a helper to create Message, ideally payloadDecoder should not be mutated once created.

func (*Message) DecodePayload

func (m *Message) DecodePayload(v interface{}) error

DecodePayload can decode the message payload bytes into the desired object.

type MessageHandler

type MessageHandler func(context.Context, PubSub, *Message)

MessageHandler is the type that all callbacks being passed to Subscriber must satisfy.

type OnConnectHandler

type OnConnectHandler func(PubSub)

OnConnectHandler is a callback that is called when the client state changes from disconnected to connected. Both at initial connection and on reconnection

type OnConnectionLostHandler

type OnConnectionLostHandler func(error)

OnConnectionLostHandler is a callback type which can be set to be executed upon an unintended disconnection from the MQTT broker. Disconnects caused by calling Disconnect or ForceDisconnect will not cause an WithOnConnectionLost callback to execute.

type OnReconnectHandler

type OnReconnectHandler func(PubSub)

OnReconnectHandler is invoked prior to reconnecting after the initial connection is lost

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option changes behaviour of Publisher.Publish, Subscriber.Subscribe calls.

type PubSub

PubSub exposes all the operational functionalities of Client with Publisher, Subscriber, Unsubscriber and ConnectionInformer

type Publisher

type Publisher interface {
	// Publish allows to publish messages to an MQTT broker
	Publish(ctx context.Context, topic string, message interface{}, options ...Option) error
}

Publisher defines behaviour of an MQTT publisher that can send messages.

type PublisherFunc

type PublisherFunc func(context.Context, string, interface{}, ...Option) error

PublisherFunc defines signature of a Publish function.

func (PublisherFunc) Publish

func (f PublisherFunc) Publish(
	ctx context.Context,
	topic string,
	message interface{},
	opts ...Option,
) error

Publish implements Publisher interface on PublisherFunc.

type PublisherMiddlewareFunc

type PublisherMiddlewareFunc func(Publisher) Publisher

PublisherMiddlewareFunc functions are closures that intercept Publisher.Publish calls.

func (PublisherMiddlewareFunc) Middleware

func (pmw PublisherMiddlewareFunc) Middleware(publisher Publisher) Publisher

Middleware allows PublisherMiddlewareFunc to implement the publishMiddleware interface.

type QOSLevel

type QOSLevel uint8

QOSLevel is an agreement between the sender of a message and the receiver of a message that defines the guarantee of delivery for a specific message

const (
	// QOSZero denotes at most once message delivery
	QOSZero QOSLevel = 0
	// QOSOne denotes at least once message delivery
	QOSOne QOSLevel = 1
	// QOSTwo denotes exactly once message delivery
	QOSTwo QOSLevel = 2
)

type Resolver added in v0.1.1

type Resolver interface {
	// UpdateChan returns a channel where TCPAddress updates can be received.
	UpdateChan() <-chan []TCPAddress
	// Done returns a channel which is closed when the Resolver is no longer running.
	Done() <-chan struct{}
}

Resolver sends TCPAddress updates on channel returned by UpdateChan() channel.

type Retained

type Retained bool

Retained is an option used with Publisher.Publish call

type SharedSubscriptionPredicate added in v0.5.0

type SharedSubscriptionPredicate func(topic string) bool

SharedSubscriptionPredicate allows to configure the predicate function that determines whether a topic is a shared subscription topic.

type StartOption

type StartOption func(*startOptions)

StartOption can be used to customise behaviour of ExponentialStartStrategy

func WithMaxInterval

func WithMaxInterval(interval time.Duration) StartOption

WithMaxInterval sets the maximum interval the retry logic will wait before attempting another Client.Start, Default is 30 seconds

func WithOnRetry

func WithOnRetry(retryFunc func(error)) StartOption

WithOnRetry sets the func which is called when there is an error in the previous Client.Start attempt

type Store

type Store = mqtt.Store

Store is an interface which can be used to provide implementations for message persistence.

[IMPORTANT] When implementing a store with a shared storage (ex: redis) across multiple application instances, it should be ensured that the keys are namespaced for each application instance otherwise there will be collisions. The messages are identified based on message id from the MQTT packets, and they have values in range (0, 2^16), this coincides with max number of in-flight messages

func NewMemoryStore

func NewMemoryStore() Store

NewMemoryStore returns a pointer to a new instance of mqtt.MemoryStore, the instance is not initialized and ready to use until Open() has been called on it.

type Subscriber

type Subscriber interface {
	// Subscribe allows to subscribe to messages from an MQTT broker
	Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error

	// SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker
	SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler) error
}

Subscriber defines behaviour of an MQTT subscriber that can create subscriptions.

type SubscriberFuncs

type SubscriberFuncs struct {
	// contains filtered or unexported fields
}

SubscriberFuncs defines signature of a Subscribe function.

func NewSubscriberFuncs

func NewSubscriberFuncs(
	subscribeFunc func(context.Context, string, MessageHandler, ...Option) error,
	subscribeMultipleFunc func(context.Context, map[string]QOSLevel, MessageHandler) error,
) SubscriberFuncs

NewSubscriberFuncs is a helper function to create SubscriberFuncs

func (SubscriberFuncs) Subscribe

func (s SubscriberFuncs) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error

Subscribe implements Subscriber interface on SubscriberFuncs.

func (SubscriberFuncs) SubscribeMultiple

func (s SubscriberFuncs) SubscribeMultiple(
	ctx context.Context,
	topicsWithQos map[string]QOSLevel,
	callback MessageHandler,
) error

SubscribeMultiple implements Subscriber interface on SubscriberFuncs.

type SubscriberMiddlewareFunc

type SubscriberMiddlewareFunc func(Subscriber) Subscriber

SubscriberMiddlewareFunc functions are closures that intercept Subscriber.Subscribe calls.

func (SubscriberMiddlewareFunc) Middleware

func (smw SubscriberMiddlewareFunc) Middleware(subscriber Subscriber) Subscriber

Middleware allows SubscriberMiddlewareFunc to implement the subscribeMiddleware interface.

type TCPAddress added in v0.1.1

type TCPAddress struct {
	Host string `json:"host"`
	Port uint16 `json:"port"`
}

TCPAddress specifies Host and Port for remote broker

func (TCPAddress) String added in v0.5.0

func (t TCPAddress) String() string

type Unsubscriber

type Unsubscriber interface {
	// Unsubscribe removes any subscription to messages from an MQTT broker
	Unsubscribe(ctx context.Context, topics ...string) error
}

Unsubscriber defines behaviour of an MQTT client that can remove subscriptions.

type UnsubscriberFunc

type UnsubscriberFunc func(context.Context, ...string) error

UnsubscriberFunc defines signature of a Unsubscribe function.

func (UnsubscriberFunc) Unsubscribe

func (f UnsubscriberFunc) Unsubscribe(ctx context.Context, topics ...string) error

Unsubscribe implements Unsubscriber interface on UnsubscriberFunc.

type UnsubscriberMiddlewareFunc

type UnsubscriberMiddlewareFunc func(Unsubscriber) Unsubscriber

UnsubscriberMiddlewareFunc functions are closures that intercept Unsubscriber.Unsubscribe calls.

func (UnsubscriberMiddlewareFunc) Middleware

func (usmw UnsubscriberMiddlewareFunc) Middleware(unsubscriber Unsubscriber) Unsubscriber

Middleware allows UnsubscriberMiddlewareFunc to implement the unsubscribeMiddleware interface.

Directories

Path Synopsis
otelcourier module
slog module
xds module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL