connector

package
v0.1.0-M4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2023 License: Apache-2.0, EPL-2.0 Imports: 22 Imported by: 12

Documentation

Index

Constants

View Source
const (
	// TopicEmpty defines empty topic.
	TopicEmpty = ""
)

Variables

View Source
var (
	// ErrClosed defines a closed publisher or subscriber channel error.
	ErrClosed = errors.New("publisher/subscriber closed")
	// ErrTimeout defines a publish timeout error.
	ErrTimeout = errors.New("publish timeout")
	// ErrNotConnected defines error on not connected state.
	ErrNotConnected = errors.New("not connected")
)

Functions

func NewBufferedSubscriber

func NewBufferedSubscriber(conn *MQTTConnection,
	buffSize int,
	autoDelete bool,
	logger watermill.LoggerAdapter,
	marshaller Marshaller,
) message.Subscriber

NewBufferedSubscriber creates a buffered message subscriber.

func NewDynamicSubscriber

func NewDynamicSubscriber(cfg *Configuration,
	qos Qos,
	autoDelete bool,
	logger watermill.LoggerAdapter,
	marshaller Marshaller,
	clientIDProvider ClientIDProvider,
) message.Subscriber

NewDynamicSubscriber creates a dynamic message subscriber.

func NewErrPublisher

func NewErrPublisher(err error) message.Publisher

NewErrPublisher creates publisher that fails on every publish with specified error

func NewOnlinePublisher

func NewOnlinePublisher(conn *MQTTConnection,
	qos Qos,
	ackTimeout time.Duration,
	logger watermill.LoggerAdapter,
	marshaller Marshaller,
) message.Publisher

NewOnlinePublisher creates publisher for the provided connection in online mode.

func NewPublisher

func NewPublisher(conn *MQTTConnection,
	qos Qos,
	logger watermill.LoggerAdapter,
	marshaller Marshaller,
) message.Publisher

NewPublisher creates a publisher for a MQTT connection.

func NewRateLimiter

func NewRateLimiter(pub message.Publisher, count int64, duration time.Duration) message.Publisher

NewRateLimiter creates the RateLimiter as message publisher.

func NewSubscriber

func NewSubscriber(conn *MQTTConnection,
	qos Qos,
	autoDelete bool,
	logger watermill.LoggerAdapter,
	marshaller Marshaller,
) message.Subscriber

NewSubscriber creates a message subscriber.

func NewSyncPublisher

func NewSyncPublisher(conn *MQTTConnection,
	qos Qos,
	ackTimeout time.Duration,
	logger watermill.LoggerAdapter,
	marshaller Marshaller,
) message.Publisher

NewSyncPublisher creates a synchronized publisher for a MQTT connection.

func NewTrace

func NewTrace(logger watermill.LoggerAdapter, prefixes []string) message.HandlerMiddleware

NewTrace creates the tracing as middleware handler.

func NullPublisher

func NullPublisher() message.Publisher

NullPublisher creates publisher that discards all published messages

func RetainFromCtx

func RetainFromCtx(ctx context.Context) bool

RetainFromCtx returns the retain value.

func SetQosToCtx

func SetQosToCtx(ctx context.Context, qos Qos) context.Context

SetQosToCtx adds the QoS to the context.

func SetRetainToCtx

func SetRetainToCtx(ctx context.Context, retain bool) context.Context

SetRetainToCtx adds the retain value to the context.

func SetTopicToCtx

func SetTopicToCtx(ctx context.Context, topic string) context.Context

SetTopicToCtx adds the topic to the provided context.

func TopicFromCtx

func TopicFromCtx(ctx context.Context) (string, bool)

TopicFromCtx returns the topic.

Types

type ClientIDProvider

type ClientIDProvider func(topic string) string

ClientIDProvider defines the subscriber function per topic.

type Configuration

type Configuration struct {
	URL string

	CleanSession bool
	NoOpStore    bool

	ConnectTimeout       time.Duration
	KeepAliveTimeout     time.Duration
	ConnectRetryInterval time.Duration

	ExternalReconnect    bool
	AuthErrRetries       int64
	BackoffMultiplier    float64
	MaxReconnectInterval time.Duration
	MinReconnectInterval time.Duration

	Credentials Credentials

	WillTopic   string
	WillMessage []byte
	WillQos     Qos
	WillRetain  bool

	TLSConfig *tls.Config
}

Configuration defines the MQTT connection system data.

func NewMQTTClientConfig

func NewMQTTClientConfig(broker string) (*Configuration, error)

NewMQTTClientConfig returns the configuration a broker.

func (*Configuration) Validate

func (c *Configuration) Validate() error

Validate validates the configuration data.

type ConnectFuture

type ConnectFuture interface {
	Done() <-chan struct{}

	Error() error
}

ConnectFuture defines connection behavior.

type ConnectionListener

type ConnectionListener interface {
	Connected(connected bool, err error)
}

ConnectionListener is used to notify on connection state changes.

type Credentials

type Credentials struct {
	UserName string
	Password string
}

Credentials defines the connection user name and password.

type MQTTConnection

type MQTTConnection struct {
	// contains filtered or unexported fields
}

MQTTConnection holds a MQTT connection data and manages the communication.

func NewMQTTConnection

func NewMQTTConnection(
	config *Configuration, clientID string, logger watermill.LoggerAdapter,
) (*MQTTConnection, error)

NewMQTTConnection creates a local MQTT connection.

func NewMQTTConnectionCredentialsProvider

func NewMQTTConnectionCredentialsProvider(
	config *Configuration, clientID string, logger watermill.LoggerAdapter, prov func() (string, string),
) (*MQTTConnection, error)

NewMQTTConnectionCredentialsProvider creates a local MQTT connection with specific credentials provider.

func (*MQTTConnection) AddConnectionListener

func (c *MQTTConnection) AddConnectionListener(listener ConnectionListener)

AddConnectionListener adds a connection listener.

func (*MQTTConnection) AuthErrRetries

func (c *MQTTConnection) AuthErrRetries() int64

AuthErrRetries returns number of retry attempts on authorization failure

func (*MQTTConnection) ClientID

func (c *MQTTConnection) ClientID() string

ClientID returns the connection client ID.

func (*MQTTConnection) Connect

func (c *MQTTConnection) Connect() ConnectFuture

Connect opens the connection.

func (*MQTTConnection) ConnectBackoff

func (c *MQTTConnection) ConnectBackoff() backoff.BackOff

ConnectBackoff creates the connection backoff.

func (*MQTTConnection) Disconnect

func (c *MQTTConnection) Disconnect()

Disconnect closes the connection.

func (*MQTTConnection) RemoveConnectionListener

func (c *MQTTConnection) RemoveConnectionListener(listener ConnectionListener)

RemoveConnectionListener removes a connection listener.

func (*MQTTConnection) URL

func (c *MQTTConnection) URL() string

URL returns the connection address.

type Marshaller

type Marshaller interface {
	Marshal(*message.Message) ([]byte, error)
	Unmarshal(mid uint16, payload []byte) (*message.Message, error)
}

Marshaller interface is used to enable marshall and unmarshal functionality for messages.

func NewDefaultMarshaller

func NewDefaultMarshaller() Marshaller

NewDefaultMarshaller returns the default marshaller.

type Qos

type Qos byte

Qos defines the quality of service.

const (
	// QosAtMostOnce defines at most once value.
	QosAtMostOnce Qos = iota
	// QosAtLeastOnce defines at least once value.
	QosAtLeastOnce
)

func QosFromCtx

func QosFromCtx(ctx context.Context) (Qos, bool)

QosFromCtx returns the QoS.

type SubscriptionManager

type SubscriptionManager interface {
	// Add adds a subscription for the provided topic. Returns true if it doesn't exist and was successfully subscribed.
	Add(topic string) bool

	// Remove removes a subscription for the provided topic. Returns true if it does exist and was successfully unsubscribed.
	Remove(topic string) bool

	// RemoveAll removes all subscriptions.
	RemoveAll()

	// ForwardTo specifies the connection to which should be the messages forwarded.
	ForwardTo(conn *MQTTConnection)
}

SubscriptionManager is responsible for subscriptions management.

func NewSubscriptionManager

func NewSubscriptionManager() SubscriptionManager

NewSubscriptionManager creates subscriptions manager.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL