Documentation ¶
Overview ¶
Package messaging is the parent package for implementations of various messaging clients, e.g. Kafka.
Index ¶
- Constants
- func ParseOpts(opts ...interface{}) (time.Duration, logging.Logger)
- func ToProtoMsgChan(ch chan ProtoMessage, opts ...interface{}) func(ProtoMessage)
- func ToProtoMsgErrChan(ch chan ProtoMessageErr, opts ...interface{}) func(ProtoMessageErr)
- type Mux
- type OffsetHandler
- type ProtoMessage
- type ProtoMessageErr
- type ProtoPartitionWatcher
- type ProtoPublisher
- type ProtoWatcher
- type WithLoggerOpt
- type WithTimeoutOpt
Constants ¶
const DefaultMsgTimeout = 2 * time.Second
DefaultMsgTimeout for delivery of notification
Variables ¶
This section is empty.
Functions ¶
func ToProtoMsgChan ¶
func ToProtoMsgChan(ch chan ProtoMessage, opts ...interface{}) func(ProtoMessage)
ToProtoMsgChan allows to receive messages through channel instead of callback.
func ToProtoMsgErrChan ¶
func ToProtoMsgErrChan(ch chan ProtoMessageErr, opts ...interface{}) func(ProtoMessageErr)
ToProtoMsgErrChan allows to receive error messages through channel instead of callback.
Types ¶
type Mux ¶
type Mux interface { // Creates new Kafka synchronous publisher sending messages to given topic. // Partitioner has to be set to 'hash' (default) or 'random' scheme, // otherwise an error is thrown. NewSyncPublisher(connName string, topic string) (ProtoPublisher, error) // Creates new Kafka synchronous publisher sending messages to given topic // and partition. Partitioner has to be set to 'manual' scheme, // otherwise an error is thrown. NewSyncPublisherToPartition(connName string, topic string, partition int32) (ProtoPublisher, error) // Creates new Kafka asynchronous publisher sending messages to given topic. // Partitioner has to be set to 'hash' (default) or 'random' scheme, // otherwise an error is thrown. NewAsyncPublisher(connName string, topic string, successClb func(ProtoMessage), errorClb func(err ProtoMessageErr)) (ProtoPublisher, error) // Creates new Kafka asynchronous publisher sending messages to given topic // and partition. Partitioner has to be set to 'manual' scheme, // otherwise an error is thrown. NewAsyncPublisherToPartition(connName string, topic string, partition int32, successClb func(ProtoMessage), errorClb func(err ProtoMessageErr)) (ProtoPublisher, error) // Initializes new watcher which can start/stop watching on topic, NewWatcher(subscriberName string) ProtoWatcher // Initializes new watcher which can start/stop watching on topic, // eventually partition and offset. NewPartitionWatcher(subscriberName string) ProtoPartitionWatcher // Disabled if the plugin config was not found. Disabled() (disabled bool) }
Mux defines API for the plugins that use access to kafka brokers.
type OffsetHandler ¶
type OffsetHandler interface { // MarkOffset marks the message received by a consumer as processed. MarkOffset(msg ProtoMessage, metadata string) // CommitOffsets manually commits marked offsets. CommitOffsets() error }
OffsetHandler allows to mark offset or commit
type ProtoMessage ¶
type ProtoMessage interface { keyval.ProtoKvPair // GetTopic returns the name of the topic from which the message // was consumed. GetTopic() string // GetTopic returns the index of the partition from which the message // was consumed. GetPartition() int32 GetOffset() int64 }
ProtoMessage exposes parameters of a single message received from messaging system.
type ProtoMessageErr ¶
type ProtoMessageErr interface { ProtoMessage // Error returns an error instance describing the cause of the failed // delivery. Error() error }
ProtoMessageErr represents a message that was not published successfully to a messaging system.
type ProtoPartitionWatcher ¶
type ProtoPartitionWatcher interface { OffsetHandler // WatchPartition starts consuming specific <partition> of a selected <topic> // from a given <offset>. Offset is the oldest message index consumed, // all previously published messages are ignored. // Callback <msgCallback> is called for each delivered message. WatchPartition(msgCallback func(ProtoMessage), topic string, partition int32, offset int64) error // StopWatchPartition cancels the previously created subscription // for consuming a given <topic>/<partition>/<offset>. // Return error if such a combination is not subscribed StopWatchPartition(topic string, partition int32, offset int64) error }
ProtoPartitionWatcher allows to subscribe for receiving of messages published to selected topics, partitions and offsets
type ProtoPublisher ¶
type ProtoPublisher interface { datasync.KeyProtoValWriter }
ProtoPublisher allows to publish a message of type proto.Message into messaging system.
type ProtoWatcher ¶
type ProtoWatcher interface { OffsetHandler // Watch starts consuming all selected <topics>. // Returns error if 'manual' partitioner scheme is chosen // Callback <msgCallback> is called for each delivered message. Watch(msgCallback func(ProtoMessage), topics ...string) error // StopWatch cancels the previously created subscription for consuming // a given <topic>. StopWatch(topic string) error }
ProtoWatcher allows to subscribe for receiving of messages published to selected topics.
type WithLoggerOpt ¶
type WithLoggerOpt struct {
// contains filtered or unexported fields
}
WithLoggerOpt defines a logger that logs if delivery of notification is unsuccessful.
func WithLogger ¶
func WithLogger(logger logging.Logger) *WithLoggerOpt
WithLogger creates an option for ToChan function that specifies a logger to be used.
type WithTimeoutOpt ¶
type WithTimeoutOpt struct {
// contains filtered or unexported fields
}
WithTimeoutOpt defines the maximum time allocated to deliver a notification.
func WithTimeout ¶
func WithTimeout(timeout time.Duration) *WithTimeoutOpt
WithTimeout creates an option for ToChan function that defines a timeout for notification delivery.
Directories ¶
Path | Synopsis |
---|---|
Package kafka implements a client for the Kafka broker.
|
Package kafka implements a client for the Kafka broker. |
client
Package client implements the synchronous and asynchronous kafka Producers and the kafka Consumer.
|
Package client implements the synchronous and asynchronous kafka Producers and the kafka Consumer. |
mux
Package mux implements the session multiplexer that allows multiple plugins to share a single connection to a Kafka broker.
|
Package mux implements the session multiplexer that allows multiple plugins to share a single connection to a Kafka broker. |