Documentation ¶
Overview ¶
Package mux implements the session multiplexer that allows multiple plugins to share a single connection to a Kafka broker.
Index ¶
- Constants
- func ToBytesMsgChan(ch chan *client.ConsumerMessage, opts ...interface{}) func(*client.ConsumerMessage)
- func ToBytesProducerChan(ch chan *client.ProducerMessage, opts ...interface{}) func(*client.ProducerMessage)
- func ToBytesProducerErrChan(ch chan *client.ProducerError, opts ...interface{}) func(*client.ProducerError)
- type BytesConnection
- type BytesConnectionFields
- type BytesConnectionStr
- func (conn *BytesConnectionStr) ConsumeTopic(msgClb func(message *client.ConsumerMessage), topics ...string) error
- func (conn *BytesConnectionStr) NewAsyncPublisher(topic string, successClb func(*client.ProducerMessage), ...) (BytesPublisher, error)
- func (conn *BytesConnectionStr) NewSyncPublisher(topic string) (BytesPublisher, error)
- func (conn *BytesConnectionStr) SendAsyncByte(topic string, key []byte, value []byte, meta interface{}, ...)
- func (conn *BytesConnectionStr) SendAsyncMessage(topic string, key client.Encoder, value client.Encoder, meta interface{}, ...)
- func (conn *BytesConnectionStr) SendAsyncString(topic string, key string, value string, meta interface{}, ...)
- func (conn *BytesConnectionStr) SendSyncByte(topic string, key []byte, value []byte) (offset int64, err error)
- func (conn *BytesConnectionStr) SendSyncMessage(topic string, key client.Encoder, value client.Encoder) (offset int64, err error)
- func (conn *BytesConnectionStr) SendSyncString(topic string, key string, value string) (offset int64, err error)
- func (conn *BytesConnectionStr) StopConsuming(topic string) error
- type BytesManualConnection
- type BytesManualConnectionStr
- func (conn *BytesManualConnectionStr) ConsumePartition(msgClb func(message *client.ConsumerMessage), topic string, partition int32, ...) error
- func (conn *BytesManualConnectionStr) NewAsyncPublisherToPartition(topic string, partition int32, successClb func(*client.ProducerMessage), ...) (BytesPublisher, error)
- func (conn *BytesManualConnectionStr) NewSyncPublisherToPartition(topic string, partition int32) (BytesPublisher, error)
- func (conn *BytesManualConnectionStr) SendAsyncMessageToPartition(topic string, partition int32, key client.Encoder, value client.Encoder, ...)
- func (conn *BytesManualConnectionStr) SendAsyncStringToPartition(topic string, partition int32, key string, value string, meta interface{}, ...)
- func (conn *BytesManualConnectionStr) SendSyncMessageToPartition(topic string, partition int32, key client.Encoder, value client.Encoder) (offset int64, err error)
- func (conn *BytesManualConnectionStr) SendSyncStringToPartition(topic string, partition int32, key string, value string) (offset int64, err error)
- func (conn *BytesManualConnectionStr) StartPostInitConsumer(topic string, partition int32, offset int64) (*sarama.PartitionConsumer, error)
- func (conn *BytesManualConnectionStr) StopConsumingPartition(topic string, partition int32, offset int64) error
- type BytesPublisher
- type Config
- type Connection
- type ConsumerFactory
- type KafkaMock
- type ManualConnection
- type Multiplexer
- func InitMultiplexer(configFile string, name string, log logging.Logger) (*Multiplexer, error)
- func InitMultiplexerWithConfig(clientCfg *client.Config, hsClient sarama.Client, manClient sarama.Client, ...) (*Multiplexer, error)
- func NewMultiplexer(consumerFactory ConsumerFactory, producers multiplexerProducers, ...) *Multiplexer
- func (mux *Multiplexer) Close()
- func (mux *Multiplexer) NewBytesConnection(name string) *BytesConnectionStr
- func (mux *Multiplexer) NewBytesManualConnection(name string) *BytesManualConnectionStr
- func (mux *Multiplexer) NewProtoConnection(name string, serializer keyval.Serializer) *ProtoConnection
- func (mux *Multiplexer) NewProtoManualConnection(name string, serializer keyval.Serializer) *ProtoManualConnection
- func (mux *Multiplexer) Start() error
- type ProtoConnection
- func (conn *ProtoConnection) ConsumeTopic(msgClb func(messaging.ProtoMessage), topics ...string) error
- func (conn *ProtoConnection) NewAsyncPublisher(topic string, successClb func(messaging.ProtoMessage), ...) (messaging.ProtoPublisher, error)
- func (conn *ProtoConnection) NewSyncPublisher(topic string) (messaging.ProtoPublisher, error)
- func (conn *ProtoConnection) Watch(msgClb func(messaging.ProtoMessage), topics ...string) error
- type ProtoConnectionFields
- func (conn *ProtoConnectionFields) CommitOffsets() error
- func (conn *ProtoConnectionFields) MarkOffset(msg messaging.ProtoMessage, metadata string)
- func (conn *ProtoConnectionFields) StopConsuming(topic string) error
- func (conn *ProtoConnectionFields) StopConsumingPartition(topic string, partition int32, offset int64) error
- func (conn *ProtoConnectionFields) StopWatch(topic string) error
- func (conn *ProtoConnectionFields) StopWatchPartition(topic string, partition int32, offset int64) error
- type ProtoManualConnection
- func (conn *ProtoManualConnection) ConsumePartition(msgClb func(messaging.ProtoMessage), topic string, partition int32, ...) error
- func (conn *ProtoManualConnection) NewAsyncPublisherToPartition(topic string, partition int32, successClb func(messaging.ProtoMessage), ...) (messaging.ProtoPublisher, error)
- func (conn *ProtoManualConnection) NewSyncPublisherToPartition(topic string, partition int32) (messaging.ProtoPublisher, error)
- func (conn *ProtoManualConnection) StartPostInitConsumer(topic string, partition int32, offset int64) (*sarama.PartitionConsumer, error)
- func (conn *ProtoManualConnection) WatchPartition(msgClb func(messaging.ProtoMessage), topic string, partition int32, ...) error
Constants ¶
const ( // DefAddress default kafka address/port (if not specified in config) DefAddress = "127.0.0.1:9092" // DefPartition is used if no specific partition is set DefPartition = 0 // OffsetNewest is head offset which will be assigned to the new message produced to the partition OffsetNewest = sarama.OffsetNewest // OffsetOldest is oldest offset available on the partition OffsetOldest = sarama.OffsetOldest )
Variables ¶
This section is empty.
Functions ¶
func ToBytesMsgChan ¶
func ToBytesMsgChan(ch chan *client.ConsumerMessage, opts ...interface{}) func(*client.ConsumerMessage)
ToBytesMsgChan allows to receive ConsumerMessage through channel. This function can be used as an argument for ConsumeTopic call.
func ToBytesProducerChan ¶
func ToBytesProducerChan(ch chan *client.ProducerMessage, opts ...interface{}) func(*client.ProducerMessage)
ToBytesProducerChan allows to receive ProducerMessage through channel. This function can be used as an argument for methods publishing using async API.
func ToBytesProducerErrChan ¶
func ToBytesProducerErrChan(ch chan *client.ProducerError, opts ...interface{}) func(*client.ProducerError)
ToBytesProducerErrChan allows to receive ProducerMessage through channel. This function can be used as an argument for methods publishing using async API.
Types ¶
type BytesConnection ¶
type BytesConnection interface { // Creates new synchronous publisher allowing to publish kafka messages NewSyncPublisher(topic string) (BytesPublisher, error) // Creates new asynchronous publisher allowing to publish kafka messages NewAsyncPublisher(topic string, successClb func(*client.ProducerMessage), errorClb func(err *client.ProducerError)) (BytesPublisher, error) }
BytesConnection is interface for multiplexer with dynamic partitioner.
type BytesConnectionFields ¶
type BytesConnectionFields struct {
// contains filtered or unexported fields
}
BytesConnectionFields is an entity that provides access to shared producers/consumers of multiplexer
func (*BytesConnectionFields) CommitOffsets ¶
func (conn *BytesConnectionFields) CommitOffsets() error
CommitOffsets manually commits message offsets
func (*BytesConnectionFields) MarkOffset ¶
func (conn *BytesConnectionFields) MarkOffset(msg client.ConsumerMessage, metadata string)
MarkOffset marks the specified message as read
type BytesConnectionStr ¶
type BytesConnectionStr struct {
BytesConnectionFields
}
BytesConnectionStr represents connection built on hash-mode multiplexer
func (*BytesConnectionStr) ConsumeTopic ¶
func (conn *BytesConnectionStr) ConsumeTopic(msgClb func(message *client.ConsumerMessage), topics ...string) error
ConsumeTopic is called to start consuming of a topic. Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.
func (*BytesConnectionStr) NewAsyncPublisher ¶
func (conn *BytesConnectionStr) NewAsyncPublisher(topic string, successClb func(*client.ProducerMessage), errorClb func(err *client.ProducerError)) (BytesPublisher, error)
NewAsyncPublisher creates a new instance of bytesAsyncPublisherKafka that allows to publish async kafka messages using common messaging API
func (*BytesConnectionStr) NewSyncPublisher ¶
func (conn *BytesConnectionStr) NewSyncPublisher(topic string) (BytesPublisher, error)
NewSyncPublisher creates a new instance of bytesSyncPublisherKafka that allows to publish sync kafka messages using common messaging API
func (*BytesConnectionStr) SendAsyncByte ¶
func (conn *BytesConnectionStr) SendAsyncByte(topic string, key []byte, value []byte, meta interface{}, successClb func(*client.ProducerMessage), errClb func(*client.ProducerError))
SendAsyncByte sends a message that uses byte encoder using the async API
func (*BytesConnectionStr) SendAsyncMessage ¶
func (conn *BytesConnectionStr) SendAsyncMessage(topic string, key client.Encoder, value client.Encoder, meta interface{}, successClb func(*client.ProducerMessage), errClb func(*client.ProducerError))
SendAsyncMessage sends a message using the async API and default partitioner
func (*BytesConnectionStr) SendAsyncString ¶
func (conn *BytesConnectionStr) SendAsyncString(topic string, key string, value string, meta interface{}, successClb func(*client.ProducerMessage), errClb func(*client.ProducerError))
SendAsyncString sends a message that uses string encoder using the async API
func (*BytesConnectionStr) SendSyncByte ¶
func (conn *BytesConnectionStr) SendSyncByte(topic string, key []byte, value []byte) (offset int64, err error)
SendSyncByte sends a message that uses byte encoder using the sync API
func (*BytesConnectionStr) SendSyncMessage ¶
func (conn *BytesConnectionStr) SendSyncMessage(topic string, key client.Encoder, value client.Encoder) (offset int64, err error)
SendSyncMessage sends a message using the sync API and default partitioner
func (*BytesConnectionStr) SendSyncString ¶
func (conn *BytesConnectionStr) SendSyncString(topic string, key string, value string) (offset int64, err error)
SendSyncString sends a message that uses string encoder using the sync API
func (*BytesConnectionStr) StopConsuming ¶
func (conn *BytesConnectionStr) StopConsuming(topic string) error
StopConsuming cancels the previously created subscription for consuming the topic.
type BytesManualConnection ¶
type BytesManualConnection interface { // Creates new synchronous publisher allowing to publish kafka messages to chosen partition NewSyncPublisherToPartition(topic string, partition int32) (BytesPublisher, error) // Creates new asynchronous publisher allowing to publish kafka messages to chosen partition NewAsyncPublisherToPartition(topic string, partition int32, successClb func(*client.ProducerMessage), errorClb func(err *client.ProducerError)) (BytesPublisher, error) }
BytesManualConnection is interface for multiplexer with manual partitioner.
type BytesManualConnectionStr ¶
type BytesManualConnectionStr struct {
BytesConnectionFields
}
BytesManualConnectionStr represents connection built on manual-mode multiplexer
func (*BytesManualConnectionStr) ConsumePartition ¶
func (conn *BytesManualConnectionStr) ConsumePartition(msgClb func(message *client.ConsumerMessage), topic string, partition int32, offset int64) error
ConsumePartition is called to start consuming given topic on partition with offset Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.
func (*BytesManualConnectionStr) NewAsyncPublisherToPartition ¶
func (conn *BytesManualConnectionStr) NewAsyncPublisherToPartition(topic string, partition int32, successClb func(*client.ProducerMessage), errorClb func(err *client.ProducerError)) (BytesPublisher, error)
NewAsyncPublisherToPartition creates a new instance of bytesAsyncPublisherKafka that allows to publish async kafka messages using common messaging API
func (*BytesManualConnectionStr) NewSyncPublisherToPartition ¶
func (conn *BytesManualConnectionStr) NewSyncPublisherToPartition(topic string, partition int32) (BytesPublisher, error)
NewSyncPublisherToPartition creates a new instance of bytesSyncPublisherKafka that allows to publish sync kafka messages using common messaging API
func (*BytesManualConnectionStr) SendAsyncMessageToPartition ¶
func (conn *BytesManualConnectionStr) SendAsyncMessageToPartition(topic string, partition int32, key client.Encoder, value client.Encoder, meta interface{}, successClb func(*client.ProducerMessage), errClb func(*client.ProducerError))
SendAsyncMessageToPartition sends a message using the async API and default partitioner
func (*BytesManualConnectionStr) SendAsyncStringToPartition ¶
func (conn *BytesManualConnectionStr) SendAsyncStringToPartition(topic string, partition int32, key string, value string, meta interface{}, successClb func(*client.ProducerMessage), errClb func(*client.ProducerError))
SendAsyncStringToPartition sends a message that uses string encoder using the async API to custom partition
func (*BytesManualConnectionStr) SendSyncMessageToPartition ¶
func (conn *BytesManualConnectionStr) SendSyncMessageToPartition(topic string, partition int32, key client.Encoder, value client.Encoder) (offset int64, err error)
SendSyncMessageToPartition sends a message using the sync API and default partitioner
func (*BytesManualConnectionStr) SendSyncStringToPartition ¶
func (conn *BytesManualConnectionStr) SendSyncStringToPartition(topic string, partition int32, key string, value string) (offset int64, err error)
SendSyncStringToPartition sends a message that uses string encoder using the sync API to custom partition
func (*BytesManualConnectionStr) StartPostInitConsumer ¶
func (conn *BytesManualConnectionStr) StartPostInitConsumer(topic string, partition int32, offset int64) (*sarama.PartitionConsumer, error)
StartPostInitConsumer allows to start a new partition consumer after mux is initialized
func (*BytesManualConnectionStr) StopConsumingPartition ¶
func (conn *BytesManualConnectionStr) StopConsumingPartition(topic string, partition int32, offset int64) error
StopConsumingPartition cancels the previously created subscription for consuming the topic, partition and offset
type BytesPublisher ¶
BytesPublisher allows to publish a message of type []bytes into messaging system.
type Config ¶
Config holds the settings for kafka multiplexer.
func ConfigFromFile ¶
ConfigFromFile loads the Kafka multiplexer configuration from the specified file. If the specified file is valid and contains valid configuration, the parsed configuration is returned; otherwise, an error is returned.
type Connection ¶
type Connection interface { messaging.ProtoWatcher // Creates new synchronous publisher allowing to publish kafka messages NewSyncPublisher(topic string) (messaging.ProtoPublisher, error) // Creates new asynchronous publisher allowing to publish kafka messages NewAsyncPublisher(topic string, successClb func(messaging.ProtoMessage), errorClb func(messaging.ProtoMessageErr)) (messaging.ProtoPublisher, error) }
Connection is interface for multiplexer with dynamic partitioner.
type ConsumerFactory ¶
ConsumerFactory produces a consumer for the selected topics in a specified consumer group. The reason why a function(factory) is passed to Multiplexer instead of consumer instance is that list of topics to be consumed has to be known on consumer initialization. Multiplexer calls the function once the list of topics to be consumed is selected.
type KafkaMock ¶
type KafkaMock struct { Mux *Multiplexer AsyncPub *mocks.AsyncProducer SyncPub *mocks.SyncProducer }
KafkaMock for the tests
type ManualConnection ¶
type ManualConnection interface { messaging.ProtoPartitionWatcher // Creates new synchronous publisher allowing to publish kafka messages to chosen partition NewSyncPublisherToPartition(topic string, partition int32) (messaging.ProtoPublisher, error) // Creates new asynchronous publisher allowing to publish kafka messages to chosen partition NewAsyncPublisherToPartition(topic string, partition int32, successClb func(messaging.ProtoMessage), errorClb func(messaging.ProtoMessageErr)) (messaging.ProtoPublisher, error) }
ManualConnection is interface for multiplexer with manual partitioner.
type Multiplexer ¶
type Multiplexer struct { logging.Logger // consumer used by the Multiplexer (bsm/sarama cluster) Consumer *client.Consumer // contains filtered or unexported fields }
Multiplexer encapsulates clients to kafka cluster (SyncProducer, AsyncProducer (both of them with 'hash' and 'manual' partitioner), consumer). It allows to create multiple Connections that use multiplexer's clients for communication with kafka cluster. The aim of Multiplexer is to decrease the number of connections needed. The set of topics to be consumed by Connections needs to be selected before the underlying consumer in Multiplexer is started. Once the Multiplexer's consumer has been started new topics can not be added.
func InitMultiplexer ¶
InitMultiplexer initialize and returns new kafka multiplexer based on the supplied config file. Name is used as groupId identification of consumer. Kafka allows to store last read offset for a groupId. This is leveraged to deliver unread messages after restart.
func InitMultiplexerWithConfig ¶
func InitMultiplexerWithConfig(clientCfg *client.Config, hsClient sarama.Client, manClient sarama.Client, name string, log logging.Logger) (*Multiplexer, error)
InitMultiplexerWithConfig initialize and returns new kafka multiplexer based on the supplied mux configuration. Name is used as groupId identification of consumer. Kafka allows to store last read offset for a groupId. This is leveraged to deliver unread messages after restart.
func NewMultiplexer ¶
func NewMultiplexer(consumerFactory ConsumerFactory, producers multiplexerProducers, clientCfg *client.Config, name string, log logging.Logger) *Multiplexer
NewMultiplexer creates new instance of Kafka Multiplexer
func (*Multiplexer) Close ¶
func (mux *Multiplexer) Close()
Close cleans up the resources used by the Multiplexer
func (*Multiplexer) NewBytesConnection ¶
func (mux *Multiplexer) NewBytesConnection(name string) *BytesConnectionStr
NewBytesConnection creates instance of the BytesConnectionStr that provides access to shared Multiplexer's clients with hash partitioner.
func (*Multiplexer) NewBytesManualConnection ¶
func (mux *Multiplexer) NewBytesManualConnection(name string) *BytesManualConnectionStr
NewBytesManualConnection creates instance of the BytesManualConnectionStr that provides access to shared Multiplexer's clients with manual partitioner.
func (*Multiplexer) NewProtoConnection ¶
func (mux *Multiplexer) NewProtoConnection(name string, serializer keyval.Serializer) *ProtoConnection
NewProtoConnection creates instance of the ProtoConnection that provides access to shared Multiplexer's clients with hash partitioner.
func (*Multiplexer) NewProtoManualConnection ¶
func (mux *Multiplexer) NewProtoManualConnection(name string, serializer keyval.Serializer) *ProtoManualConnection
NewProtoManualConnection creates instance of the ProtoConnectionFields that provides access to shared Multiplexer's clients with manual partitioner.
func (*Multiplexer) Start ¶
func (mux *Multiplexer) Start() error
Start should be called once all the Connections have been subscribed for topic consumption. An attempt to start consuming a topic after the multiplexer is started returns an error.
type ProtoConnection ¶
type ProtoConnection struct {
ProtoConnectionFields
}
ProtoConnection represents connection built on hash-mode multiplexer
func (*ProtoConnection) ConsumeTopic ¶
func (conn *ProtoConnection) ConsumeTopic(msgClb func(messaging.ProtoMessage), topics ...string) error
ConsumeTopic is called to start consuming given topics. Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.
func (*ProtoConnection) NewAsyncPublisher ¶
func (conn *ProtoConnection) NewAsyncPublisher(topic string, successClb func(messaging.ProtoMessage), errorClb func(messaging.ProtoMessageErr)) (messaging.ProtoPublisher, error)
NewAsyncPublisher creates a new instance of protoAsyncPublisherKafka that allows to publish sync kafka messages using common messaging API
func (*ProtoConnection) NewSyncPublisher ¶
func (conn *ProtoConnection) NewSyncPublisher(topic string) (messaging.ProtoPublisher, error)
NewSyncPublisher creates a new instance of protoSyncPublisherKafka that allows to publish sync kafka messages using common messaging API
func (*ProtoConnection) Watch ¶
func (conn *ProtoConnection) Watch(msgClb func(messaging.ProtoMessage), topics ...string) error
Watch is an alias for ConsumeTopic method. The alias was added in order to conform to messaging.Mux interface.
type ProtoConnectionFields ¶
type ProtoConnectionFields struct {
// contains filtered or unexported fields
}
ProtoConnectionFields is an entity that provides access to shared producers/consumers of multiplexer. The value of message are marshaled and unmarshaled to/from proto.message behind the scene.
func (*ProtoConnectionFields) CommitOffsets ¶
func (conn *ProtoConnectionFields) CommitOffsets() error
CommitOffsets manually commits message offsets
func (*ProtoConnectionFields) MarkOffset ¶
func (conn *ProtoConnectionFields) MarkOffset(msg messaging.ProtoMessage, metadata string)
MarkOffset marks the specified message as read
func (*ProtoConnectionFields) StopConsuming ¶
func (conn *ProtoConnectionFields) StopConsuming(topic string) error
StopConsuming cancels the previously created subscription for consuming the topic.
func (*ProtoConnectionFields) StopConsumingPartition ¶
func (conn *ProtoConnectionFields) StopConsumingPartition(topic string, partition int32, offset int64) error
StopConsumingPartition cancels the previously created subscription for consuming the topic, partition and offset
func (*ProtoConnectionFields) StopWatch ¶
func (conn *ProtoConnectionFields) StopWatch(topic string) error
StopWatch is an alias for StopConsuming method. The alias was added in order to conform to messaging.Mux interface.
func (*ProtoConnectionFields) StopWatchPartition ¶
func (conn *ProtoConnectionFields) StopWatchPartition(topic string, partition int32, offset int64) error
StopWatchPartition is an alias for StopConsumingPartition method. The alias was added in order to conform to messaging.Mux interface.
type ProtoManualConnection ¶
type ProtoManualConnection struct {
ProtoConnectionFields
}
ProtoManualConnection represents connection built on manual-mode multiplexer
func (*ProtoManualConnection) ConsumePartition ¶
func (conn *ProtoManualConnection) ConsumePartition(msgClb func(messaging.ProtoMessage), topic string, partition int32, offset int64) error
ConsumePartition is called to start consuming given topic on partition with offset Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.
func (*ProtoManualConnection) NewAsyncPublisherToPartition ¶
func (conn *ProtoManualConnection) NewAsyncPublisherToPartition(topic string, partition int32, successClb func(messaging.ProtoMessage), errorClb func(messaging.ProtoMessageErr)) (messaging.ProtoPublisher, error)
NewAsyncPublisherToPartition creates a new instance of protoManualAsyncPublisherKafka that allows to publish sync kafka messages using common messaging API.
func (*ProtoManualConnection) NewSyncPublisherToPartition ¶
func (conn *ProtoManualConnection) NewSyncPublisherToPartition(topic string, partition int32) (messaging.ProtoPublisher, error)
NewSyncPublisherToPartition creates a new instance of protoManualSyncPublisherKafka that allows to publish sync kafka messages using common messaging API
func (*ProtoManualConnection) StartPostInitConsumer ¶
func (conn *ProtoManualConnection) StartPostInitConsumer(topic string, partition int32, offset int64) (*sarama.PartitionConsumer, error)
StartPostInitConsumer allows to start a new partition consumer after mux is initialized. Created partition consumer is returned so it can be stored in subscription and closed if needed
func (*ProtoManualConnection) WatchPartition ¶
func (conn *ProtoManualConnection) WatchPartition(msgClb func(messaging.ProtoMessage), topic string, partition int32, offset int64) error
WatchPartition is an alias for ConsumePartition method. The alias was added in order to conform to messaging.Mux interface.