messaging

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2018 License: Apache-2.0 Imports: 5 Imported by: 0

README

Messaging

The package contains the implementation of a front-end for messaging systems. Currently, only kafka is supported.

Documentation

Overview

Package messaging is the parent package for implementations of various messaging clients, e.g. Kafka.

Index

Constants

View Source
const DefaultMsgTimeout = 2 * time.Second

DefaultMsgTimeout for delivery of notification

Variables

This section is empty.

Functions

func ParseOpts

func ParseOpts(opts ...interface{}) (time.Duration, logging.Logger)

ParseOpts returns timeout and logger to be used based on the given options.

func ToProtoMsgChan

func ToProtoMsgChan(ch chan ProtoMessage, opts ...interface{}) func(ProtoMessage)

ToProtoMsgChan allows to receive messages through channel instead of callback.

func ToProtoMsgErrChan

func ToProtoMsgErrChan(ch chan ProtoMessageErr, opts ...interface{}) func(ProtoMessageErr)

ToProtoMsgErrChan allows to receive error messages through channel instead of callback.

Types

type Mux

type Mux interface {
	// Creates new Kafka synchronous publisher sending messages to given topic.
	// Partitioner has to be set to 'hash' (default) or 'random' scheme,
	// otherwise an error is thrown.
	NewSyncPublisher(connName string, topic string) (ProtoPublisher, error)

	// Creates new Kafka synchronous publisher sending messages to given topic
	// and partition. Partitioner has to be set to 'manual' scheme,
	// otherwise an error is thrown.
	NewSyncPublisherToPartition(connName string, topic string, partition int32) (ProtoPublisher, error)

	// Creates new Kafka asynchronous publisher sending messages to given topic.
	// Partitioner has to be set to 'hash' (default) or 'random' scheme,
	// otherwise an error is thrown.
	NewAsyncPublisher(connName string, topic string, successClb func(ProtoMessage), errorClb func(err ProtoMessageErr)) (ProtoPublisher, error)

	// Creates new Kafka asynchronous publisher sending messages to given topic
	// and partition. Partitioner has to be set to 'manual' scheme,
	// otherwise an error is thrown.
	NewAsyncPublisherToPartition(connName string, topic string, partition int32,
		successClb func(ProtoMessage), errorClb func(err ProtoMessageErr)) (ProtoPublisher, error)

	// Initializes new watcher which can start/stop watching on topic,
	NewWatcher(subscriberName string) ProtoWatcher

	// Initializes new watcher which can start/stop watching on topic,
	// eventually partition and offset.
	NewPartitionWatcher(subscriberName string) ProtoPartitionWatcher

	// Disabled if the plugin config was not found.
	Disabled() (disabled bool)
}

Mux defines API for the plugins that use access to kafka brokers.

type OffsetHandler added in v1.0.5

type OffsetHandler interface {
	// MarkOffset marks the message received by a consumer as processed.
	MarkOffset(msg ProtoMessage, metadata string)
	// CommitOffsets manually commits marked offsets.
	CommitOffsets() error
}

OffsetHandler allows to mark offset or commit

type ProtoMessage

type ProtoMessage interface {
	keyval.ProtoKvPair

	// GetTopic returns the name of the topic from which the message
	// was consumed.
	GetTopic() string

	// GetTopic returns the index of the partition from which the message
	// was consumed.
	GetPartition() int32
	GetOffset() int64
}

ProtoMessage exposes parameters of a single message received from messaging system.

type ProtoMessageErr

type ProtoMessageErr interface {
	ProtoMessage

	// Error returns an error instance describing the cause of the failed
	// delivery.
	Error() error
}

ProtoMessageErr represents a message that was not published successfully to a messaging system.

type ProtoPartitionWatcher added in v1.0.4

type ProtoPartitionWatcher interface {
	OffsetHandler
	// WatchPartition starts consuming specific <partition> of a selected <topic>
	// from a given <offset>. Offset is the oldest message index consumed,
	// all previously published messages are ignored.
	// Callback <msgCallback> is called for each delivered message.
	WatchPartition(msgCallback func(ProtoMessage), topic string, partition int32, offset int64) error

	// StopWatchPartition cancels the previously created subscription
	// for consuming a given <topic>/<partition>/<offset>.
	// Return error if such a combination is not subscribed
	StopWatchPartition(topic string, partition int32, offset int64) error
}

ProtoPartitionWatcher allows to subscribe for receiving of messages published to selected topics, partitions and offsets

type ProtoPublisher

type ProtoPublisher interface {
	datasync.KeyProtoValWriter
}

ProtoPublisher allows to publish a message of type proto.Message into messaging system.

type ProtoWatcher

type ProtoWatcher interface {
	OffsetHandler
	// Watch starts consuming all selected <topics>.
	// Returns error if 'manual' partitioner scheme is chosen
	// Callback <msgCallback> is called for each delivered message.
	Watch(msgCallback func(ProtoMessage), topics ...string) error

	// StopWatch cancels the previously created subscription for consuming
	// a given <topic>.
	StopWatch(topic string) error
}

ProtoWatcher allows to subscribe for receiving of messages published to selected topics.

type WithLoggerOpt

type WithLoggerOpt struct {
	// contains filtered or unexported fields
}

WithLoggerOpt defines a logger that logs if delivery of notification is unsuccessful.

func WithLogger

func WithLogger(logger logging.Logger) *WithLoggerOpt

WithLogger creates an option for ToChan function that specifies a logger to be used.

type WithTimeoutOpt

type WithTimeoutOpt struct {
	// contains filtered or unexported fields
}

WithTimeoutOpt defines the maximum time allocated to deliver a notification.

func WithTimeout

func WithTimeout(timeout time.Duration) *WithTimeoutOpt

WithTimeout creates an option for ToChan function that defines a timeout for notification delivery.

Directories

Path Synopsis
Package kafka implements a client for the Kafka broker.
Package kafka implements a client for the Kafka broker.
client
Package client implements the synchronous and asynchronous kafka Producers and the kafka Consumer.
Package client implements the synchronous and asynchronous kafka Producers and the kafka Consumer.
mux
Package mux implements the session multiplexer that allows multiple plugins to share a single connection to a Kafka broker.
Package mux implements the session multiplexer that allows multiple plugins to share a single connection to a Kafka broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL