Documentation ¶
Overview ¶
Package courier contains the client that can be used to interact with the courier infrastructure to publish/subscribe to messages from other clients
Index ¶
- Variables
- func ExponentialStartStrategy(ctx context.Context, c interface{ ... }, opts ...StartOption)
- func Version() string
- func WaitForConnection(c ConnectionInformer, waitFor time.Duration, tick time.Duration) bool
- type Client
- func (c *Client) InfoHandler() http.Handler
- func (c *Client) IsConnected() bool
- func (c *Client) Publish(ctx context.Context, topic string, message interface{}, opts ...Option) error
- func (c *Client) Run(ctx context.Context) error
- func (c *Client) Start() error
- func (c *Client) Stop()
- func (c *Client) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
- func (c *Client) SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, ...) error
- func (c *Client) Unsubscribe(ctx context.Context, topics ...string) error
- func (c *Client) UsePublisherMiddleware(mwf ...PublisherMiddlewareFunc)
- func (c *Client) UseSubscriberMiddleware(mwf ...SubscriberMiddlewareFunc)
- func (c *Client) UseUnsubscriberMiddleware(mwf ...UnsubscriberMiddlewareFunc)
- type ClientInfoEmitter
- type ClientInfoEmitterConfig
- type ClientMeta
- type ClientOption
- func WithAddress(host string, port uint16) ClientOption
- func WithAutoReconnect(autoReconnect bool) ClientOption
- func WithCleanSession(cleanSession bool) ClientOption
- func WithClientID(clientID string) ClientOption
- func WithConnectTimeout(duration time.Duration) ClientOption
- func WithCredentialFetcher(fetcher CredentialFetcher) ClientOption
- func WithCustomDecoder(decoderFunc DecoderFunc) ClientOption
- func WithCustomEncoder(encoderFunc EncoderFunc) ClientOption
- func WithExponentialStartOptions(options ...StartOption) ClientOption
- func WithGracefulShutdownPeriod(duration time.Duration) ClientOption
- func WithKeepAlive(duration time.Duration) ClientOption
- func WithLogger(l Logger) ClientOption
- func WithMaintainOrder(maintainOrder bool) ClientOption
- func WithMaxReconnectInterval(duration time.Duration) ClientOption
- func WithOnConnect(handler OnConnectHandler) ClientOption
- func WithOnConnectionLost(handler OnConnectionLostHandler) ClientOption
- func WithOnReconnect(handler OnReconnectHandler) ClientOption
- func WithPassword(password string) ClientOption
- func WithPersistence(store Store) ClientOption
- func WithResolver(resolver Resolver) ClientOption
- func WithTCPAddress(host string, port uint16) ClientOptiondeprecated
- func WithTLS(tlsConfig *tls.Config) ClientOption
- func WithUseBase64Decoder() ClientOption
- func WithUsername(username string) ClientOption
- func WithWriteTimeout(duration time.Duration) ClientOption
- type ConnectRetryInterval
- type ConnectionInformer
- type Credential
- type CredentialFetcher
- type Decoder
- type DecoderFunc
- type Encoder
- type EncoderFunc
- type KeepAlive
- type Logger
- type MQTTClientInfo
- type Message
- type MessageHandler
- type OnConnectHandler
- type OnConnectionLostHandler
- type OnReconnectHandler
- type Option
- type PubSub
- type Publisher
- type PublisherFunc
- type PublisherMiddlewareFunc
- type QOSLevel
- type Resolver
- type Retained
- type SharedSubscriptionPredicate
- type StartOption
- type Store
- type Subscriber
- type SubscriberFuncs
- type SubscriberMiddlewareFunc
- type TCPAddress
- type Unsubscriber
- type UnsubscriberFunc
- type UnsubscriberMiddlewareFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConnectTimeout indicates connection timeout while connecting to broker. ErrConnectTimeout = errors.New("client timed out while trying to connect to the broker") // ErrPublishTimeout indicates publish timeout. ErrPublishTimeout = errors.New("publish timeout") // ErrSubscribeTimeout indicates subscribe timeout. ErrSubscribeTimeout = errors.New("subscribe timeout") // ErrUnsubscribeTimeout indicates unsubscribe timeout. ErrUnsubscribeTimeout = errors.New("unsubscribe timeout") // ErrSubscribeMultipleTimeout indicates multiple subscribe timeout. ErrSubscribeMultipleTimeout = errors.New("subscribe multiple timeout") )
var ErrClientNotInitialized = errors.New("courier: client not initialized")
ErrClientNotInitialized is returned when the client is not initialized
var ResumeSubscriptions = resumeSubscriptions{}
ResumeSubscriptions allows resuming of stored (un)subscribe messages when connecting but not reconnecting if CleanSession is false. Otherwise, these messages are discarded.
var UseMultiConnectionMode = multiConnMode{}
UseMultiConnectionMode allows to configure the client to use multiple connections when available.
This is useful when working with shared subscriptions and multiple connections can be created to subscribe on the same application.
Functions ¶
func ExponentialStartStrategy ¶
func ExponentialStartStrategy(ctx context.Context, c interface{ Start() error }, opts ...StartOption)
ExponentialStartStrategy will keep attempting to call Client.Start in the background and retry on error, it will never exit unless the context used to invoke is cancelled. This will NOT stop the client, that is the responsibility of caller.
func WaitForConnection ¶
WaitForConnection checks if the Client is connected, it calls ConnectionInformer.IsConnected after every tick and waitFor is the maximum duration it can block. Returns true only when ConnectionInformer.IsConnected returns true
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client allows to communicate with an MQTT broker
func NewClient ¶
func NewClient(opts ...ClientOption) (*Client, error)
NewClient creates the Client struct with the clientOptions provided, it can return error when prometheus.DefaultRegisterer has already been used to register the collected metrics
Example ¶
c, err := courier.NewClient( courier.WithUsername("username"), courier.WithPassword("password"), courier.WithAddress("localhost", 1883), ) if err != nil { panic(err) } if err := c.Start(); err != nil { panic(err) } stopCh := make(chan os.Signal, 1) signal.Notify(stopCh, []os.Signal{os.Interrupt, syscall.SIGTERM}...) go func() { tick := time.NewTicker(time.Second) for { select { case t := <-tick.C: msg := map[string]interface{}{ "time": t.UnixNano(), } if err := c.Publish(context.Background(), "topic", msg, courier.QOSOne); err != nil { fmt.Printf("Publish() error = %s\n", err) } else { fmt.Println("Publish() success") } case <-stopCh: tick.Stop() return } } }() <-stopCh c.Stop()
Output:
func (*Client) InfoHandler ¶ added in v0.5.3
InfoHandler returns a http.Handler that exposes the connected clients information
func (*Client) IsConnected ¶
IsConnected checks whether the client is connected to the broker
func (*Client) Publish ¶
func (c *Client) Publish(ctx context.Context, topic string, message interface{}, opts ...Option) error
Publish allows to publish messages to an MQTT broker
func (*Client) Run ¶ added in v0.4.0
Run will start running the Client. This makes Client compatible with github.com/gojekfarm/xrun package. https://pkg.go.dev/github.com/gojekfarm/xrun
func (*Client) Stop ¶
func (c *Client) Stop()
Stop will disconnect from the broker and finish up any pending work on internal communication workers. This can only block until the period configured with the ClientOption WithGracefulShutdownPeriod.
func (*Client) Subscribe ¶
func (c *Client) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
Subscribe allows to subscribe to messages from an MQTT broker
func (*Client) SubscribeMultiple ¶
func (c *Client) SubscribeMultiple( ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler, ) error
SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker
func (*Client) Unsubscribe ¶
Unsubscribe removes any subscription to messages from an MQTT broker
func (*Client) UsePublisherMiddleware ¶
func (c *Client) UsePublisherMiddleware(mwf ...PublisherMiddlewareFunc)
UsePublisherMiddleware appends a PublisherMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip messages. They are executed in the order that they are applied to the Client.
func (*Client) UseSubscriberMiddleware ¶
func (c *Client) UseSubscriberMiddleware(mwf ...SubscriberMiddlewareFunc)
UseSubscriberMiddleware appends a SubscriberMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip subscriptions. They are executed in the order that they are applied to the Client.
func (*Client) UseUnsubscriberMiddleware ¶
func (c *Client) UseUnsubscriberMiddleware(mwf ...UnsubscriberMiddlewareFunc)
UseUnsubscriberMiddleware appends a UnsubscriberMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip subscriptions. They are executed in the order that they are applied to the Client.
type ClientInfoEmitter ¶ added in v0.5.3
type ClientInfoEmitter interface {
Emit(ctx context.Context, meta ClientMeta)
}
ClientInfoEmitter emits broker info. This can be called concurrently, implementations should be concurrency safe.
type ClientInfoEmitterConfig ¶ added in v0.5.3
type ClientInfoEmitterConfig struct { // Interval is the interval at which the broker info emitter emits broker info. Interval time.Duration Emitter ClientInfoEmitter }
ClientInfoEmitterConfig is used to configure the broker info emitter.
type ClientMeta ¶ added in v0.5.3
type ClientMeta struct { MultiConnMode bool Clients []MQTTClientInfo Subscriptions map[string]QOSLevel }
ClientMeta contains information about the internal MQTT client(s)
type ClientOption ¶
type ClientOption interface {
// contains filtered or unexported methods
}
ClientOption allows to configure the behaviour of a Client.
func WithAddress ¶ added in v0.1.1
func WithAddress(host string, port uint16) ClientOption
WithAddress sets the broker address to be used. To establish a TLS connection, use WithTLS Option along with this. Default values for hostname is "127.0.0.1" and for port is 1883.
func WithAutoReconnect ¶
func WithAutoReconnect(autoReconnect bool) ClientOption
WithAutoReconnect sets whether the automatic reconnection logic should be used when the connection is lost, even if disabled the WithOnConnectionLost is still called.
func WithCleanSession ¶
func WithCleanSession(cleanSession bool) ClientOption
WithCleanSession will set the "clean session" flag in the connect message when this client connects to an MQTT broker. By setting this flag, you are indicating that no messages saved by the broker for this client should be delivered. Any messages that were going to be sent by this client before disconnecting but didn't, will not be sent upon connecting to the broker.
func WithClientID ¶
func WithClientID(clientID string) ClientOption
WithClientID sets the clientID to be used while connecting to an MQTT broker. According to the MQTT v3.1 specification, a client id must be no longer than 23 characters.
func WithConnectTimeout ¶
func WithConnectTimeout(duration time.Duration) ClientOption
WithConnectTimeout limits how long the client will wait when trying to open a connection to an MQTT server before timing out. A duration of 0 never times out. Default 15 seconds.
func WithCredentialFetcher ¶ added in v0.5.0
func WithCredentialFetcher(fetcher CredentialFetcher) ClientOption
WithCredentialFetcher sets the specified CredentialFetcher.
func WithCustomDecoder ¶
func WithCustomDecoder(decoderFunc DecoderFunc) ClientOption
WithCustomDecoder allows to decode message bytes into the desired object.
func WithCustomEncoder ¶
func WithCustomEncoder(encoderFunc EncoderFunc) ClientOption
WithCustomEncoder allows to transform objects into the desired message bytes.
func WithExponentialStartOptions ¶ added in v0.5.0
func WithExponentialStartOptions(options ...StartOption) ClientOption
WithExponentialStartOptions configures the client to use ExponentialStartStrategy along with the passed StartOption(s) when using the Client.Run method.
func WithGracefulShutdownPeriod ¶
func WithGracefulShutdownPeriod(duration time.Duration) ClientOption
WithGracefulShutdownPeriod sets the limit that is allowed for existing work to be completed.
func WithKeepAlive ¶
func WithKeepAlive(duration time.Duration) ClientOption
WithKeepAlive will set the amount of time (in seconds) that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server. Deprecated: Use KeepAlive instead.
func WithLogger ¶ added in v0.5.0
func WithLogger(l Logger) ClientOption
WithLogger sets the Logger to use for the client.
func WithMaintainOrder ¶
func WithMaintainOrder(maintainOrder bool) ClientOption
WithMaintainOrder will set the message routing to guarantee order within each QoS level. By default, this value is true. If set to false (recommended), this flag indicates that messages can be delivered asynchronously from the client to the application and possibly arrive out of order. Specifically, the message handler is called in its own go routine. Note that setting this to true does not guarantee in-order delivery (this is subject to broker settings like "max_inflight_messages=1") and if true then MessageHandler callback must not block.
func WithMaxReconnectInterval ¶
func WithMaxReconnectInterval(duration time.Duration) ClientOption
WithMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts. when connection is lost
func WithOnConnect ¶
func WithOnConnect(handler OnConnectHandler) ClientOption
WithOnConnect will set the OnConnectHandler callback to be called when the client is connected. Both at initial connection time and upon automatic reconnect.
func WithOnConnectionLost ¶
func WithOnConnectionLost(handler OnConnectionLostHandler) ClientOption
WithOnConnectionLost will set the OnConnectionLostHandler callback to be executed in the case where the client unexpectedly loses connection with the MQTT broker.
func WithOnReconnect ¶
func WithOnReconnect(handler OnReconnectHandler) ClientOption
WithOnReconnect sets the OnReconnectHandler callback to be executed prior to the client attempting a reconnect to the MQTT broker.
func WithPassword ¶
func WithPassword(password string) ClientOption
WithPassword sets the password to be used while connecting to an MQTT broker.
func WithPersistence ¶
func WithPersistence(store Store) ClientOption
WithPersistence allows to configure the store to be used by broker Default persistence is in-memory persistence with mqtt.MemoryStore
func WithResolver ¶ added in v0.1.1
func WithResolver(resolver Resolver) ClientOption
WithResolver sets the specified Resolver.
func WithTCPAddress
deprecated
func WithTCPAddress(host string, port uint16) ClientOption
WithTCPAddress sets the broker address to be used. Default values for hostname is "127.0.0.1" and for port is 1883.
Deprecated: This Option used to work with plain TCP connections, it's now possible to use TLS with WithAddress and WithTLS combination.
func WithTLS ¶ added in v0.1.1
func WithTLS(tlsConfig *tls.Config) ClientOption
WithTLS sets the TLs configuration to be used while connecting to an MQTT broker.
func WithUseBase64Decoder ¶
func WithUseBase64Decoder() ClientOption
WithUseBase64Decoder configures a json decoder with a base64.StdEncoding wrapped decoder which decodes base64 encoded message bytes into the passed object.
func WithUsername ¶
func WithUsername(username string) ClientOption
WithUsername sets the username to be used while connecting to an MQTT broker.
func WithWriteTimeout ¶
func WithWriteTimeout(duration time.Duration) ClientOption
WithWriteTimeout limits how long the client will wait when trying to publish, subscribe or unsubscribe on topic when a context deadline is not set while calling Publisher.Publish, Subscriber.Subscribe, Subscriber.SubscribeMultiple or Unsubscriber.Unsubscribe.
type ConnectRetryInterval ¶ added in v0.5.2
ConnectRetryInterval allows to configure the interval between connection retries. Default value is 10 seconds.
type ConnectionInformer ¶
type ConnectionInformer interface { // IsConnected checks whether the client is connected to the broker IsConnected() bool }
ConnectionInformer can be used to get information about the connection
type Credential ¶ added in v0.5.0
Credential is a <username,password> pair.
type CredentialFetcher ¶ added in v0.5.0
type CredentialFetcher interface {
Credentials(context.Context) (*Credential, error)
}
CredentialFetcher is an interface that allows to fetch credentials for a client.
type Decoder ¶
type Decoder interface { // Decode decodes message bytes into the passed object Decode(v interface{}) error }
Decoder helps to decode message bytes into the desired object
type DecoderFunc ¶
DecoderFunc is used to create a Decoder from io.Reader stream of message bytes before calling MessageHandler; the context.Context value may be used to select appropriate Decoder.
type Encoder ¶
type Encoder interface { // Encode takes any object and encodes it into bytes Encode(v interface{}) error }
Encoder helps in transforming objects to message bytes
type EncoderFunc ¶
EncoderFunc is used to create an Encoder from io.Writer; the context.Context value may be used to select appropriate Encoder.
type KeepAlive ¶ added in v0.5.1
KeepAlive will set the amount of time that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server. Default value is 60 seconds. Note: Practically, when KeepAlive >= 10s, the client will check every 5s, if it needs to send a PING. In other cases, the client will check every KeepAlive/2.
type Logger ¶ added in v0.5.0
type Logger interface { Info(ctx context.Context, msg string, attrs map[string]any) Error(ctx context.Context, err error, attrs map[string]any) }
Logger is the interface that wraps the Info and Error methods.
type MQTTClientInfo ¶ added in v0.5.3
type MQTTClientInfo struct { Addresses []TCPAddress `json:"addresses"` ClientID string `json:"client_id"` Username string `json:"username"` ResumeSubs bool `json:"resume_subs"` CleanSession bool `json:"clean_session"` AutoReconnect bool `json:"auto_reconnect"` Connected bool `json:"connected"` // Subscriptions contains the topics the client is subscribed to // Note: Currently, this field only holds shared subscriptions. Subscriptions []string `json:"subscriptions,omitempty"` }
MQTTClientInfo contains information about the internal MQTT client
type Message ¶
type Message struct { ID int Topic string Duplicate bool Retained bool QoS QOSLevel // contains filtered or unexported fields }
Message represents the entity that is being relayed via the courier MQTT brokers from Publisher(s) to Subscriber(s).
func NewMessageWithDecoder ¶
NewMessageWithDecoder is a helper to create Message, ideally payloadDecoder should not be mutated once created.
func (*Message) DecodePayload ¶
DecodePayload can decode the message payload bytes into the desired object.
type MessageHandler ¶
MessageHandler is the type that all callbacks being passed to Subscriber must satisfy.
type OnConnectHandler ¶
type OnConnectHandler func(PubSub)
OnConnectHandler is a callback that is called when the client state changes from disconnected to connected. Both at initial connection and on reconnection
type OnConnectionLostHandler ¶
type OnConnectionLostHandler func(error)
OnConnectionLostHandler is a callback type which can be set to be executed upon an unintended disconnection from the MQTT broker. Disconnects caused by calling Disconnect or ForceDisconnect will not cause an WithOnConnectionLost callback to execute.
type OnReconnectHandler ¶
type OnReconnectHandler func(PubSub)
OnReconnectHandler is invoked prior to reconnecting after the initial connection is lost
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option changes behaviour of Publisher.Publish, Subscriber.Subscribe calls.
type PubSub ¶
type PubSub interface { Publisher Subscriber Unsubscriber ConnectionInformer }
PubSub exposes all the operational functionalities of Client with Publisher, Subscriber, Unsubscriber and ConnectionInformer
type Publisher ¶
type Publisher interface { // Publish allows to publish messages to an MQTT broker Publish(ctx context.Context, topic string, message interface{}, options ...Option) error }
Publisher defines behaviour of an MQTT publisher that can send messages.
type PublisherFunc ¶
PublisherFunc defines signature of a Publish function.
type PublisherMiddlewareFunc ¶
PublisherMiddlewareFunc functions are closures that intercept Publisher.Publish calls.
func (PublisherMiddlewareFunc) Middleware ¶
func (pmw PublisherMiddlewareFunc) Middleware(publisher Publisher) Publisher
Middleware allows PublisherMiddlewareFunc to implement the publishMiddleware interface.
type QOSLevel ¶
type QOSLevel uint8
QOSLevel is an agreement between the sender of a message and the receiver of a message that defines the guarantee of delivery for a specific message
type Resolver ¶ added in v0.1.1
type Resolver interface { // UpdateChan returns a channel where TCPAddress updates can be received. UpdateChan() <-chan []TCPAddress // Done returns a channel which is closed when the Resolver is no longer running. Done() <-chan struct{} }
Resolver sends TCPAddress updates on channel returned by UpdateChan() channel.
type SharedSubscriptionPredicate ¶ added in v0.5.0
SharedSubscriptionPredicate allows to configure the predicate function that determines whether a topic is a shared subscription topic.
type StartOption ¶
type StartOption func(*startOptions)
StartOption can be used to customise behaviour of ExponentialStartStrategy
func WithMaxInterval ¶
func WithMaxInterval(interval time.Duration) StartOption
WithMaxInterval sets the maximum interval the retry logic will wait before attempting another Client.Start, Default is 30 seconds
func WithOnRetry ¶
func WithOnRetry(retryFunc func(error)) StartOption
WithOnRetry sets the func which is called when there is an error in the previous Client.Start attempt
type Store ¶
Store is an interface which can be used to provide implementations for message persistence.
[IMPORTANT] When implementing a store with a shared storage (ex: redis) across multiple application instances, it should be ensured that the keys are namespaced for each application instance otherwise there will be collisions. The messages are identified based on message id from the MQTT packets, and they have values in range (0, 2^16), this coincides with max number of in-flight messages
func NewMemoryStore ¶
func NewMemoryStore() Store
NewMemoryStore returns a pointer to a new instance of mqtt.MemoryStore, the instance is not initialized and ready to use until Open() has been called on it.
type Subscriber ¶
type Subscriber interface { // Subscribe allows to subscribe to messages from an MQTT broker Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error // SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler) error }
Subscriber defines behaviour of an MQTT subscriber that can create subscriptions.
type SubscriberFuncs ¶
type SubscriberFuncs struct {
// contains filtered or unexported fields
}
SubscriberFuncs defines signature of a Subscribe function.
func NewSubscriberFuncs ¶
func NewSubscriberFuncs( subscribeFunc func(context.Context, string, MessageHandler, ...Option) error, subscribeMultipleFunc func(context.Context, map[string]QOSLevel, MessageHandler) error, ) SubscriberFuncs
NewSubscriberFuncs is a helper function to create SubscriberFuncs
func (SubscriberFuncs) Subscribe ¶
func (s SubscriberFuncs) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
Subscribe implements Subscriber interface on SubscriberFuncs.
func (SubscriberFuncs) SubscribeMultiple ¶
func (s SubscriberFuncs) SubscribeMultiple( ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler, ) error
SubscribeMultiple implements Subscriber interface on SubscriberFuncs.
type SubscriberMiddlewareFunc ¶
type SubscriberMiddlewareFunc func(Subscriber) Subscriber
SubscriberMiddlewareFunc functions are closures that intercept Subscriber.Subscribe calls.
func (SubscriberMiddlewareFunc) Middleware ¶
func (smw SubscriberMiddlewareFunc) Middleware(subscriber Subscriber) Subscriber
Middleware allows SubscriberMiddlewareFunc to implement the subscribeMiddleware interface.
type TCPAddress ¶ added in v0.1.1
TCPAddress specifies Host and Port for remote broker
func (TCPAddress) String ¶ added in v0.5.0
func (t TCPAddress) String() string
type Unsubscriber ¶
type Unsubscriber interface { // Unsubscribe removes any subscription to messages from an MQTT broker Unsubscribe(ctx context.Context, topics ...string) error }
Unsubscriber defines behaviour of an MQTT client that can remove subscriptions.
type UnsubscriberFunc ¶
UnsubscriberFunc defines signature of a Unsubscribe function.
func (UnsubscriberFunc) Unsubscribe ¶
func (f UnsubscriberFunc) Unsubscribe(ctx context.Context, topics ...string) error
Unsubscribe implements Unsubscriber interface on UnsubscriberFunc.
type UnsubscriberMiddlewareFunc ¶
type UnsubscriberMiddlewareFunc func(Unsubscriber) Unsubscriber
UnsubscriberMiddlewareFunc functions are closures that intercept Unsubscriber.Unsubscribe calls.
func (UnsubscriberMiddlewareFunc) Middleware ¶
func (usmw UnsubscriberMiddlewareFunc) Middleware(unsubscriber Unsubscriber) Unsubscriber
Middleware allows UnsubscriberMiddlewareFunc to implement the unsubscribeMiddleware interface.
Source Files ¶
- alias.go
- client.go
- client_credentials.go
- client_options.go
- client_publish.go
- client_resolver.go
- client_subscribe.go
- client_telemetry.go
- client_unsubscribe.go
- decoder.go
- doc.go
- encoder.go
- errors.go
- exec.go
- exp_starter.go
- http.go
- interface.go
- log.go
- message.go
- metrics.go
- options.go
- publisher.go
- subscriber.go
- types.go
- unsubscriber.go
- utils.go
- version.go