Documentation ¶
Index ¶
- Constants
- Variables
- func NewBufferedSubscriber(conn *MQTTConnection, buffSize int, autoDelete bool, ...) message.Subscriber
- func NewDynamicSubscriber(cfg *Configuration, qos Qos, autoDelete bool, logger watermill.LoggerAdapter, ...) message.Subscriber
- func NewErrPublisher(err error) message.Publisher
- func NewOnlinePublisher(conn *MQTTConnection, qos Qos, ackTimeout time.Duration, ...) message.Publisher
- func NewPublisher(conn *MQTTConnection, qos Qos, logger watermill.LoggerAdapter, ...) message.Publisher
- func NewRateLimiter(pub message.Publisher, count int64, duration time.Duration) message.Publisher
- func NewSubscriber(conn *MQTTConnection, qos Qos, autoDelete bool, logger watermill.LoggerAdapter, ...) message.Subscriber
- func NewSyncPublisher(conn *MQTTConnection, qos Qos, ackTimeout time.Duration, ...) message.Publisher
- func NewTrace(logger watermill.LoggerAdapter, prefixes []string) message.HandlerMiddleware
- func NullPublisher() message.Publisher
- func RetainFromCtx(ctx context.Context) bool
- func SetQosToCtx(ctx context.Context, qos Qos) context.Context
- func SetRetainToCtx(ctx context.Context, retain bool) context.Context
- func SetTopicToCtx(ctx context.Context, topic string) context.Context
- func TopicFromCtx(ctx context.Context) (string, bool)
- type ClientIDProvider
- type Configuration
- type ConnectFuture
- type ConnectionListener
- type Credentials
- type MQTTConnection
- func (c *MQTTConnection) AddConnectionListener(listener ConnectionListener)
- func (c *MQTTConnection) AuthErrRetries() int64
- func (c *MQTTConnection) ClientID() string
- func (c *MQTTConnection) Connect() ConnectFuture
- func (c *MQTTConnection) ConnectBackoff() backoff.BackOff
- func (c *MQTTConnection) Disconnect()
- func (c *MQTTConnection) RemoveConnectionListener(listener ConnectionListener)
- func (c *MQTTConnection) URL() string
- type Marshaller
- type Qos
- type SubscriptionManager
Constants ¶
const (
// TopicEmpty defines empty topic.
TopicEmpty = ""
)
Variables ¶
var ( // ErrClosed defines a closed publisher or subscriber channel error. ErrClosed = errors.New("publisher/subscriber closed") // ErrTimeout defines a publish timeout error. ErrTimeout = errors.New("publish timeout") // ErrNotConnected defines error on not connected state. ErrNotConnected = errors.New("not connected") )
Functions ¶
func NewBufferedSubscriber ¶
func NewBufferedSubscriber(conn *MQTTConnection, buffSize int, autoDelete bool, logger watermill.LoggerAdapter, marshaller Marshaller, ) message.Subscriber
NewBufferedSubscriber creates a buffered message subscriber.
func NewDynamicSubscriber ¶
func NewDynamicSubscriber(cfg *Configuration, qos Qos, autoDelete bool, logger watermill.LoggerAdapter, marshaller Marshaller, clientIDProvider ClientIDProvider, ) message.Subscriber
NewDynamicSubscriber creates a dynamic message subscriber.
func NewErrPublisher ¶
NewErrPublisher creates publisher that fails on every publish with specified error
func NewOnlinePublisher ¶
func NewOnlinePublisher(conn *MQTTConnection, qos Qos, ackTimeout time.Duration, logger watermill.LoggerAdapter, marshaller Marshaller, ) message.Publisher
NewOnlinePublisher creates publisher for the provided connection in online mode.
func NewPublisher ¶
func NewPublisher(conn *MQTTConnection, qos Qos, logger watermill.LoggerAdapter, marshaller Marshaller, ) message.Publisher
NewPublisher creates a publisher for a MQTT connection.
func NewRateLimiter ¶
NewRateLimiter creates the RateLimiter as message publisher.
func NewSubscriber ¶
func NewSubscriber(conn *MQTTConnection, qos Qos, autoDelete bool, logger watermill.LoggerAdapter, marshaller Marshaller, ) message.Subscriber
NewSubscriber creates a message subscriber.
func NewSyncPublisher ¶
func NewSyncPublisher(conn *MQTTConnection, qos Qos, ackTimeout time.Duration, logger watermill.LoggerAdapter, marshaller Marshaller, ) message.Publisher
NewSyncPublisher creates a synchronized publisher for a MQTT connection.
func NewTrace ¶
func NewTrace(logger watermill.LoggerAdapter, prefixes []string) message.HandlerMiddleware
NewTrace creates the tracing as middleware handler.
func NullPublisher ¶
NullPublisher creates publisher that discards all published messages
func RetainFromCtx ¶
RetainFromCtx returns the retain value.
func SetQosToCtx ¶
SetQosToCtx adds the QoS to the context.
func SetRetainToCtx ¶
SetRetainToCtx adds the retain value to the context.
func SetTopicToCtx ¶
SetTopicToCtx adds the topic to the provided context.
Types ¶
type ClientIDProvider ¶
ClientIDProvider defines the subscriber function per topic.
type Configuration ¶
type Configuration struct { URL string CleanSession bool NoOpStore bool ConnectTimeout time.Duration KeepAliveTimeout time.Duration ConnectRetryInterval time.Duration ExternalReconnect bool AuthErrRetries int64 BackoffMultiplier float64 MaxReconnectInterval time.Duration MinReconnectInterval time.Duration Credentials Credentials WillTopic string WillMessage []byte WillQos Qos WillRetain bool TLSConfig *tls.Config }
Configuration defines the MQTT connection system data.
func NewMQTTClientConfig ¶
func NewMQTTClientConfig(broker string) (*Configuration, error)
NewMQTTClientConfig returns the configuration a broker.
func (*Configuration) Validate ¶
func (c *Configuration) Validate() error
Validate validates the configuration data.
type ConnectFuture ¶
type ConnectFuture interface { Done() <-chan struct{} Error() error }
ConnectFuture defines connection behavior.
type ConnectionListener ¶
ConnectionListener is used to notify on connection state changes.
type Credentials ¶
Credentials defines the connection user name and password.
type MQTTConnection ¶
type MQTTConnection struct {
// contains filtered or unexported fields
}
MQTTConnection holds a MQTT connection data and manages the communication.
func NewMQTTConnection ¶
func NewMQTTConnection( config *Configuration, clientID string, logger watermill.LoggerAdapter, ) (*MQTTConnection, error)
NewMQTTConnection creates a local MQTT connection.
func NewMQTTConnectionCredentialsProvider ¶
func NewMQTTConnectionCredentialsProvider( config *Configuration, clientID string, logger watermill.LoggerAdapter, prov func() (string, string), ) (*MQTTConnection, error)
NewMQTTConnectionCredentialsProvider creates a local MQTT connection with specific credentials provider.
func (*MQTTConnection) AddConnectionListener ¶
func (c *MQTTConnection) AddConnectionListener(listener ConnectionListener)
AddConnectionListener adds a connection listener.
func (*MQTTConnection) AuthErrRetries ¶
func (c *MQTTConnection) AuthErrRetries() int64
AuthErrRetries returns number of retry attempts on authorization failure
func (*MQTTConnection) ClientID ¶
func (c *MQTTConnection) ClientID() string
ClientID returns the connection client ID.
func (*MQTTConnection) Connect ¶
func (c *MQTTConnection) Connect() ConnectFuture
Connect opens the connection.
func (*MQTTConnection) ConnectBackoff ¶
func (c *MQTTConnection) ConnectBackoff() backoff.BackOff
ConnectBackoff creates the connection backoff.
func (*MQTTConnection) Disconnect ¶
func (c *MQTTConnection) Disconnect()
Disconnect closes the connection.
func (*MQTTConnection) RemoveConnectionListener ¶
func (c *MQTTConnection) RemoveConnectionListener(listener ConnectionListener)
RemoveConnectionListener removes a connection listener.
func (*MQTTConnection) URL ¶
func (c *MQTTConnection) URL() string
URL returns the connection address.
type Marshaller ¶
type Marshaller interface { Marshal(*message.Message) ([]byte, error) Unmarshal(mid uint16, payload []byte) (*message.Message, error) }
Marshaller interface is used to enable marshall and unmarshal functionality for messages.
func NewDefaultMarshaller ¶
func NewDefaultMarshaller() Marshaller
NewDefaultMarshaller returns the default marshaller.
type SubscriptionManager ¶
type SubscriptionManager interface { // Add adds a subscription for the provided topic. Returns true if it doesn't exist and was successfully subscribed. Add(topic string) bool // Remove removes a subscription for the provided topic. Returns true if it does exist and was successfully unsubscribed. Remove(topic string) bool // RemoveAll removes all subscriptions. RemoveAll() // ForwardTo specifies the connection to which should be the messages forwarded. ForwardTo(conn *MQTTConnection) }
SubscriptionManager is responsible for subscriptions management.
func NewSubscriptionManager ¶
func NewSubscriptionManager() SubscriptionManager
NewSubscriptionManager creates subscriptions manager.