mux

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2017 License: Apache-2.0 Imports: 15 Imported by: 20

README

Multiplexer

The multiplexer instance has an access to kafka Brokers. To share the access it allows to create connections. There are available two connection types one support message of type []byte and the other proto.Message. Both of them allows to create several SyncPublishers and AsyncPublishers that implements BytesPublisher interface or ProtoPubliser respectively. The connections also provide API for consuming messages implementing BytesMessage interface or ProtoMessage respectively.

   
    +-----------------+                                  +---------------+
    |                 |                                  |               |
    |  Kafka brokers  |        +--------------+     +----| SyncPublisher |
    |                 |        |              |     |    |               |
    +--------^--------+    +---| Connection   <-----+    +---------------+
             |             |   |              |
   +---------+----------+  |   +--------------+
   |  Multiplexer       |  |
   |                    <--+
   | SyncProducer       <--+   +--------------+
   | AsyncProducer      |  |   |              |
   | Consumer           |  |   | Connection   <-----+    +----------------+
   |                    |  +---|              |     |    |                |
   |                    |      +--------------+     +----| AsyncPublisher |
   +--------------------+                                |                | 
                                                         +----------------+

Documentation

Overview

Package mux implements the session multiplexer that allows multiple plugins to share a single connection to a Kafka broker.

Index

Constants

View Source
const (
	// DefAddress default kafka address/port (if not specified in config)
	DefAddress = "127.0.0.1:9092"
	// DefPartition is used if no specific partition is set
	DefPartition = 0
	// OffsetNewest is head offset which will be assigned to the new message produced to the partition
	OffsetNewest = sarama.OffsetNewest
	// OffsetOldest is oldest offset available on the partition
	OffsetOldest = sarama.OffsetOldest
)

Variables

This section is empty.

Functions

func ToBytesMsgChan

func ToBytesMsgChan(ch chan *client.ConsumerMessage, opts ...interface{}) func(*client.ConsumerMessage)

ToBytesMsgChan allows to receive ConsumerMessage through channel. This function can be used as an argument for ConsumeTopic call.

func ToBytesProducerChan

func ToBytesProducerChan(ch chan *client.ProducerMessage, opts ...interface{}) func(*client.ProducerMessage)

ToBytesProducerChan allows to receive ProducerMessage through channel. This function can be used as an argument for methods publishing using async API.

func ToBytesProducerErrChan

func ToBytesProducerErrChan(ch chan *client.ProducerError, opts ...interface{}) func(*client.ProducerError)

ToBytesProducerErrChan allows to receive ProducerMessage through channel. This function can be used as an argument for methods publishing using async API.

Types

type BytesPublisher

type BytesPublisher interface {
	Publish(key string, data []byte) error
}

BytesPublisher allows to publish a message of type []bytes into messaging system.

type Config

type Config struct {
	Addrs []string `json:"addrs"`
}

Config holds the settings for kafka multiplexer.

func ConfigFromFile

func ConfigFromFile(fpath string) (*Config, error)

ConfigFromFile loads the Kafka multiplexer configuration from the specified file. If the specified file is valid and contains valid configuration, the parsed configuration is returned; otherwise, an error is returned.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is an entity that provides access to shared producers/consumers of multiplexer.

func (*Connection) ConsumeTopic

func (conn *Connection) ConsumeTopic(msgClb func(message *client.ConsumerMessage), topics ...string) error

ConsumeTopic is called to start consuming of a topic. Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.

func (*Connection) ConsumeTopicOnPartition added in v1.0.3

func (conn *Connection) ConsumeTopicOnPartition(msgClb func(message *client.ConsumerMessage), topic string, partition int32, offset int64) error

ConsumeTopicOnPartition is called to start consuming given topic on partition with offset Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.

func (*Connection) NewAsyncPublisher

func (conn *Connection) NewAsyncPublisher(topic string, successClb func(*client.ProducerMessage), errorClb func(err *client.ProducerError)) (BytesPublisher, error)

NewAsyncPublisher creates a new instance of bytesAsyncPublisherKafka that allows to publish async kafka messages using common messaging API

func (*Connection) NewAsyncPublisherToPartition added in v1.0.3

func (conn *Connection) NewAsyncPublisherToPartition(topic string, partition int32, successClb func(*client.ProducerMessage), errorClb func(err *client.ProducerError)) (BytesPublisher, error)

NewAsyncPublisherToPartition creates a new instance of bytesAsyncPublisherKafka that allows to publish async kafka messages using common messaging API

func (*Connection) NewSyncPublisher

func (conn *Connection) NewSyncPublisher(topic string) (BytesPublisher, error)

NewSyncPublisher creates a new instance of bytesSyncPublisherKafka that allows to publish sync kafka messages using common messaging API

func (*Connection) NewSyncPublisherToPartition added in v1.0.3

func (conn *Connection) NewSyncPublisherToPartition(topic string, partition int32) (BytesPublisher, error)

NewSyncPublisherToPartition creates a new instance of bytesSyncPublisherKafka that allows to publish sync kafka messages using common messaging API

func (*Connection) SendAsyncByte

func (conn *Connection) SendAsyncByte(topic string, partition int32, key []byte, value []byte, meta interface{}, successClb func(*client.ProducerMessage), errClb func(*client.ProducerError))

SendAsyncByte sends a message that uses byte encoder using the async API

func (*Connection) SendAsyncMessage

func (conn *Connection) SendAsyncMessage(topic string, partition int32, key client.Encoder, value client.Encoder, meta interface{}, successClb func(*client.ProducerMessage), errClb func(*client.ProducerError))

SendAsyncMessage sends a message using the async API

func (*Connection) SendAsyncString

func (conn *Connection) SendAsyncString(topic string, partition int32, key string, value string, meta interface{}, successClb func(*client.ProducerMessage), errClb func(*client.ProducerError))

SendAsyncString sends a message that uses string encoder using the async API

func (*Connection) SendSyncByte

func (conn *Connection) SendSyncByte(topic string, partition int32, key []byte, value []byte) (offset int64, err error)

SendSyncByte sends a message that uses byte encoder using the sync API

func (*Connection) SendSyncMessage

func (conn *Connection) SendSyncMessage(topic string, partition int32, key client.Encoder, value client.Encoder) (offset int64, err error)

SendSyncMessage sends a message using the sync API

func (*Connection) SendSyncString

func (conn *Connection) SendSyncString(topic string, partition int32, key string, value string) (offset int64, err error)

SendSyncString sends a message that uses string encoder using the sync API

func (*Connection) StopConsuming

func (conn *Connection) StopConsuming(topic string) error

StopConsuming cancels the previously created subscription for consuming the topic.

func (*Connection) StopConsumingPartition added in v1.0.3

func (conn *Connection) StopConsumingPartition(topic string, partition int32, offset int64) error

StopConsumingPartition cancels the previously created subscription for consuming the topic, partition and offset

type ConsumerFactory

type ConsumerFactory func(topics []string, groupId string) (*client.Consumer, error)

ConsumerFactory produces a consumer for the selected topics in a specified consumer group. The reason why a function(factory) is passed to Multiplexer instead of consumer instance is that list of topics to be consumed has to be known on consumer initialization. Multiplexer calls the function once the list of topics to be consumed is selected.

type KafkaMock

type KafkaMock struct {
	Mux      *Multiplexer
	AsyncPub *mocks.AsyncProducer
	SyncPub  *mocks.SyncProducer
}

KafkaMock for the tests

func Mock

func Mock(t *testing.T) *KafkaMock

Mock returns mock of Multiplexer that can be used for testing purposes.

type Multiplexer

type Multiplexer struct {
	logging.Logger
	// contains filtered or unexported fields
}

Multiplexer encapsulates clients to kafka cluster (syncProducer, asyncProducer, consumer). It allows to create multiple Connections that use multiplexer's clients for communication with kafka cluster. The aim of Multiplexer is to decrease the number of connections needed. The set of topics to be consumed by Connections needs to be selected before the underlying consumer in Multiplexer is started. Once the Multiplexer's consumer has been started new topics can not be added.

func InitMultiplexer

func InitMultiplexer(configFile string, name string, partitioner string, log logging.Logger) (*Multiplexer, error)

InitMultiplexer initialize and returns new kafka multiplexer based on the supplied config file. Name is used as groupId identification of consumer. Kafka allows to store last read offset for a groupId. This is leveraged to deliver unread messages after restart.

func InitMultiplexerWithConfig

func InitMultiplexerWithConfig(muxConfig *Config, name string, partitioner string, log logging.Logger) (*Multiplexer, error)

InitMultiplexerWithConfig initialize and returns new kafka multiplexer based on the supplied configuration. Name is used as groupId identification of consumer. Kafka allows to store last read offset for a groupId. This is leveraged to deliver unread messages after restart.

func NewMultiplexer

func NewMultiplexer(consumerFactory ConsumerFactory, syncP *client.SyncProducer, asyncP *client.AsyncProducer,
	partitioner string, name string, log logging.Logger) *Multiplexer

NewMultiplexer creates new instance of Kafka Multiplexer

func (*Multiplexer) Close

func (mux *Multiplexer) Close()

Close cleans up the resources used by the Multiplexer

func (*Multiplexer) NewConnection

func (mux *Multiplexer) NewConnection(name string) *Connection

NewConnection creates instance of the Connection that will be provide access to shared Multiplexer's clients.

func (*Multiplexer) NewProtoConnection

func (mux *Multiplexer) NewProtoConnection(name string, serializer keyval.Serializer) *ProtoConnection

NewProtoConnection creates instance of the ProtoConnection that will be provide access to shared Multiplexer's clients.

func (*Multiplexer) Start

func (mux *Multiplexer) Start() error

Start should be called once all the Connections have been subscribed for topic consumption. An attempt to start consuming a topic after the multiplexer is started returns an error.

type ProtoConnection

type ProtoConnection struct {
	// contains filtered or unexported fields
}

ProtoConnection is an entity that provides access to shared producers/consumers of multiplexer. The value of message are marshaled and unmarshaled to/from proto.message behind the scene.

func (*ProtoConnection) ConsumePartition added in v1.0.3

func (conn *ProtoConnection) ConsumePartition(msgClb func(messaging.ProtoMessage), topic string, partition int32, offset int64) error

ConsumePartition is called to start consuming given topic on given partition and offset. Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.

func (*ProtoConnection) ConsumeTopic

func (conn *ProtoConnection) ConsumeTopic(msgClb func(messaging.ProtoMessage), topics ...string) error

ConsumeTopic is called to start consuming given topics. Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.

func (*ProtoConnection) ConsumeTopicOnPartition added in v1.0.3

func (conn *ProtoConnection) ConsumeTopicOnPartition(msgClb func(messaging.ProtoMessage), topic string, partition int32, offset int64) error

ConsumeTopicOnPartition is called to start consuming given topic on partition with offset Function can be called until the multiplexer is started, it returns an error otherwise. The provided channel should be buffered, otherwise messages might be lost.

func (*ProtoConnection) NewAsyncPublisher

func (conn *ProtoConnection) NewAsyncPublisher(topic string, successClb func(messaging.ProtoMessage), errorClb func(messaging.ProtoMessageErr)) (messaging.ProtoPublisher, error)

NewAsyncPublisher creates a new instance of protoAsyncPublisherKafka that allows to publish sync kafka messages using common messaging API

func (*ProtoConnection) NewAsyncPublisherToPartition added in v1.0.3

func (conn *ProtoConnection) NewAsyncPublisherToPartition(topic string, partition int32, successClb func(messaging.ProtoMessage), errorClb func(messaging.ProtoMessageErr)) (messaging.ProtoPublisher, error)

NewAsyncPublisherToPartition creates a new instance of protoAsyncPublisherKafka that allows to publish sync kafka messages using common messaging API.

func (*ProtoConnection) NewSyncPublisher

func (conn *ProtoConnection) NewSyncPublisher(topic string) (messaging.ProtoPublisher, error)

NewSyncPublisher creates a new instance of protoSyncPublisherKafka that allows to publish sync kafka messages using common messaging API

func (*ProtoConnection) NewSyncPublisherToPartition added in v1.0.3

func (conn *ProtoConnection) NewSyncPublisherToPartition(topic string, partition int32) (messaging.ProtoPublisher, error)

NewSyncPublisherToPartition creates a new instance of protoSyncPublisherKafka that allows to publish sync kafka messages using common messaging API

func (*ProtoConnection) SendAsyncMessage

func (conn *ProtoConnection) SendAsyncMessage(topic string, partition int32, key string, value proto.Message, meta interface{}, successClb func(messaging.ProtoMessage), errClb func(messaging.ProtoMessageErr)) error

SendAsyncMessage sends a message using the async API

func (*ProtoConnection) SendSyncMessage

func (conn *ProtoConnection) SendSyncMessage(topic string, partition int32, key string, value proto.Message) (offset int64, err error)

SendSyncMessage sends a message using the sync API

func (*ProtoConnection) StopConsuming

func (conn *ProtoConnection) StopConsuming(topic string) error

StopConsuming cancels the previously created subscription for consuming the topic.

func (*ProtoConnection) StopConsumingPartition added in v1.0.3

func (conn *ProtoConnection) StopConsumingPartition(topic string, partition int32, offset int64) error

StopConsumingPartition cancels the previously created subscription for consuming the topic, partition and offset

func (*ProtoConnection) StopWatch

func (conn *ProtoConnection) StopWatch(topic string) error

StopWatch is an alias for StopConsuming method. The alias was added in order to conform to messaging.Mux interface.

func (*ProtoConnection) StopWatchPartition added in v1.0.3

func (conn *ProtoConnection) StopWatchPartition(topic string, partition int32, offset int64) error

StopWatchPartition is an alias for StopConsumingPartition method. The alias was added in order to conform to messaging.Mux interface.

func (*ProtoConnection) Watch

func (conn *ProtoConnection) Watch(msgClb func(messaging.ProtoMessage), topics ...string) error

Watch is an alias for ConsumeTopic method. The alias was added in order to conform to messaging.Mux interface.

func (*ProtoConnection) WatchPartition added in v1.0.3

func (conn *ProtoConnection) WatchPartition(msgClb func(messaging.ProtoMessage), topic string, partition int32, offset int64) error

WatchPartition is an alias for ConsumePartition method. The alias was added in order to conform to messaging.Mux interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL