client

package
v2.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2020 License: Apache-2.0 Imports: 15 Imported by: 6

Documentation

Overview

Package client implements the synchronous and asynchronous kafka Producers and the kafka Consumer.

Index

Examples

Constants

View Source
const (
	// Hash scheme (messages with the same key always end up on the same partition)
	Hash = "hash"
	// Random scheme (random partition is always used)
	Random = "random"
	// Manual scheme (partitions are manually set in the provided message's partition field)
	Manual = "manual"
)

Partitioner schemes

Variables

This section is empty.

Functions

func NewClient

func NewClient(config *Config, partitioner string) (sarama.Client, error)

NewClient initializes new sarama client instance from provided config and with defined partitioner

Types

type AsyncProducer

type AsyncProducer struct {
	logging.Logger
	Config    *Config
	Client    sarama.Client
	Producer  sarama.AsyncProducer
	Partition int32

	sync.Mutex
	// contains filtered or unexported fields
}

AsyncProducer allows to publish message to kafka using asynchronous API. The message using SendMsgToPartition and SendMsgByte function returns do not block. The status whether message was sent successfully or not is delivered using channels specified in config structure.

Example
log := logrus.DefaultLogger()

//init config
config := NewConfig(logrus.DefaultLogger())
config.SetBrokers("localhost:9091", "localhost:9092")
config.SetSendSuccess(true)
config.SetSuccessChan(make(chan *ProducerMessage))
config.SetSendError(true)
config.SetErrorChan(make(chan *ProducerError))

// init client
sClient, err := NewClient(config, Hash)
if err != nil {
	return
}

// init producer
producer, err := NewAsyncProducer(config, sClient, Hash, nil)
if err != nil {
	log.Errorf("NewAsyncProducer errored: %v\n", err)
	return
}

// send a message
producer.SendMsgByte("test-topic", []byte("key"), []byte("test message"), nil)

select {
case msg := <-config.SuccessChan:
	log.Info("message sent successfully - ", msg)
case err := <-config.ErrorChan:
	log.Error("message errored - ", err)
}

// close producer and release resources
err = producer.Close(true)
if err != nil {
	log.Errorf("AsyncProducer close errored: %v\n", err)
	return
}
log.Info("AsyncProducer closed")
Output:

func GetAsyncProducerMock

func GetAsyncProducerMock(t mocks.ErrorReporter) (*AsyncProducer, *mocks.AsyncProducer)

GetAsyncProducerMock returns mocked implementation of async producer that doesn't need connection to Kafka broker and can be used for testing purposes.

func NewAsyncProducer

func NewAsyncProducer(config *Config, sClient sarama.Client, partitioner string, wg *sync.WaitGroup) (*AsyncProducer, error)

NewAsyncProducer returns a new AsyncProducer instance. Producer is created from provided sarama client which can be nil; in that case a new client will be created. Also the partitioner is set here. Note: provided sarama client partitioner should match the one used in config.

func (*AsyncProducer) Close

func (ref *AsyncProducer) Close(async ...bool) error

Close closes the client and producer

func (*AsyncProducer) GetCloseChannel

func (ref *AsyncProducer) GetCloseChannel() <-chan struct{}

GetCloseChannel returns a channel that is closed on asyncProducer cleanup

func (*AsyncProducer) IsClosed

func (ref *AsyncProducer) IsClosed() bool

IsClosed returns the "closed" status

func (*AsyncProducer) SendMsgByte

func (ref *AsyncProducer) SendMsgByte(topic string, key []byte, msg []byte, metadata interface{})

SendMsgByte sends an async message to Kafka.

func (*AsyncProducer) SendMsgToPartition

func (ref *AsyncProducer) SendMsgToPartition(topic string, partition int32, key Encoder, msg Encoder, metadata interface{})

SendMsgToPartition sends an async message to Kafka

func (*AsyncProducer) WaitForClose

func (ref *AsyncProducer) WaitForClose()

WaitForClose returns when the producer is closed

type Config

type Config struct {
	logging.Logger
	// Config extends the sarama-cluster.Config with the kafkaclient namespace
	*cluster.Config
	// Context Package carries deadlines, cancelation signals, and other values.
	// see: http://golang.org/x/net/context
	Context context.Context
	// Cancel is a function that can be call, e.g. config.Cancel(), to cancel and close
	// the producer/consumer
	Cancel context.CancelFunc
	// Brokers contains "{domain:port}" array of Kafka brokers.
	// This list of brokers is used by the kafkaclient to determine the 'lead' broker for each topic
	// and the 'lead' consumer for each topic. If only one broker is supplied then it will be used to
	// communicate with the other brokers.
	// REQUIRED: PRODUCER AND CONSUMER.
	Brokers []string
	// GroupID contains the name of the consumer's group.
	// REQUIRED: CONSUMER.
	GroupID string
	// Debug determines if debug code should be 'turned-on'.
	// DEFAULT: false. OPTIONAL.
	Debug bool
	// Topics contains the topics that a consumer should retrieve messages for.
	// REQUIRED: CONSUMER.
	Topics []string
	// Partition is the partition. Used when configuring partitions manually.
	Partition int32
	// Partitioner is the method used to determine a topic's partition.
	// REQUIRED: PRODUCER. DEFAULT: HASH
	Partitioner sarama.PartitionerConstructor
	// InitialOffset indicates the initial offset that should be used when a consumer is initialized and begins reading
	// the Kafka message log for the topic. If the offset was previously committed then the committed offset is used
	// rather than the initial offset.
	// REQUIRED: CONSUMER
	InitialOffset int64
	// RequiredAcks is the level of acknowledgement reliability needed from the broker
	// REQUIRED: PRODUCER. DEFAULT(Async) WaitForLocal DEFAULT(Sync) WaitForAll
	RequiredAcks RequiredAcks
	// RecvNotification indicates that a Consumer return "Notification" messages after it has rebalanced.
	// REQUIRED: CONSUMER. DEFAULT: false.
	RecvNotification bool
	// NotificationChan function called when a "Notification" message is received by a consumer.
	// REQUIRED: CONSUMER if 'RecvNotification=true'
	RecvNotificationChan chan *cluster.Notification
	// RecvError indicates that "receive" errors should not be ignored and should be returned to the consumer.
	// REQUIRED: CONSUMER. DEFAULT: true.
	RecvError bool
	// RecvErrorChan channel is for delivery of "Error" messages received by the consumer.
	// REQUIRED: CONSUMER if 'RecvError=true'
	RecvErrorChan chan error
	// MessageChan channel is used for delivery of consumer messages.
	// REQUIRED: CONSUMER
	RecvMessageChan chan *ConsumerMessage
	// SendSuccess indicates that the Async Producer should return "Success" messages when a message
	//  has been successfully received by the Kafka.
	// REQUIRED: CONSUMER. DEFAULT: false.
	SendSuccess bool
	// SuccessChan is used for delivery of message when a "Success" is returned by Async Producer.
	// REQUIRED: PRODUCER if 'SendSuccess=true'
	SuccessChan chan *ProducerMessage
	// SendError indicates that an Async Producer should return "Error" messages when a message transmission to Kafka
	// failed.
	// REQUIRED: CONSUMER. DEFAULT: true.
	SendError bool
	// ErrorChan is used for delivery of "Error" message if an error is returned by Async Producer.
	// REQUIRED: PRODUCER if 'SendError=true'
	ErrorChan chan *ProducerError
}

Config struct provides the configuration for a Producer (Sync or Async) and Consumer.

func NewConfig

func NewConfig(log logging.Logger) *Config

NewConfig return a new Config object.

func (*Config) ConsumerConfig

func (ref *Config) ConsumerConfig() *cluster.Config

ConsumerConfig sets the Config.ConsumerConfig field

func (*Config) ProducerConfig

func (ref *Config) ProducerConfig() *sarama.Config

ProducerConfig sets the Config.ProducerConfig field

func (*Config) SetAcks

func (ref *Config) SetAcks(acks RequiredAcks)

SetAcks sets the Config.RequiredAcks field

func (*Config) SetBrokers

func (ref *Config) SetBrokers(brokers ...string)

SetBrokers sets the Config.Brokers field

func (*Config) SetDebug

func (ref *Config) SetDebug(val bool)

SetDebug sets the Config.Debug field

func (*Config) SetErrorChan

func (ref *Config) SetErrorChan(val chan *ProducerError)

SetErrorChan sets the Config.ErrorChan field

func (*Config) SetGroup

func (ref *Config) SetGroup(id string)

SetGroup sets the Config.GroupID field

func (*Config) SetInitialOffset

func (ref *Config) SetInitialOffset(offset int64)

SetInitialOffset sets the Config.InitialOffset field

func (*Config) SetPartition

func (ref *Config) SetPartition(val int32)

SetPartition sets the Config.SetPartition field

func (*Config) SetPartitioner

func (ref *Config) SetPartitioner(val string)

SetPartitioner sets the Config.SetPartitioner field

func (*Config) SetRecvError

func (ref *Config) SetRecvError(val bool)

SetRecvError sets the Config.RecvError field

func (*Config) SetRecvErrorChan

func (ref *Config) SetRecvErrorChan(val chan error)

SetRecvErrorChan sets the Config.RecvErrorChan field

func (*Config) SetRecvMessageChan

func (ref *Config) SetRecvMessageChan(val chan *ConsumerMessage)

SetRecvMessageChan sets the Config.RecvMessageChan field

func (*Config) SetRecvNotification

func (ref *Config) SetRecvNotification(val bool)

SetRecvNotification sets the Config.RecvNotification field

func (*Config) SetRecvNotificationChan

func (ref *Config) SetRecvNotificationChan(val chan *cluster.Notification)

SetRecvNotificationChan sets the Config.RecvNotificationChan field

func (*Config) SetSendError

func (ref *Config) SetSendError(val bool)

SetSendError sets the Config.SendError field

func (*Config) SetSendSuccess

func (ref *Config) SetSendSuccess(val bool)

SetSendSuccess sets the Config.SendSuccess field

func (*Config) SetSuccessChan

func (ref *Config) SetSuccessChan(val chan *ProducerMessage)

SetSuccessChan sets the Config.SuccessChan field

func (*Config) SetTLS

func (ref *Config) SetTLS(tlsConfig *tls.Config) (err error)

SetTLS sets the TLS configuration

func (*Config) SetTopics

func (ref *Config) SetTopics(topics string)

SetTopics sets the Config.Topics field

func (*Config) ValidateAsyncProducerConfig

func (ref *Config) ValidateAsyncProducerConfig() error

ValidateAsyncProducerConfig validates config for an Async Producer

func (*Config) ValidateConsumerConfig

func (ref *Config) ValidateConsumerConfig() error

ValidateConsumerConfig validates config for Consumer

func (*Config) ValidateSyncProducerConfig

func (ref *Config) ValidateSyncProducerConfig() error

ValidateSyncProducerConfig validates config for a Sync Producer

type Consumer

type Consumer struct {
	logging.Logger
	Config    *Config
	SConsumer sarama.Consumer
	Consumer  clusterConsumer

	sync.Mutex
	// contains filtered or unexported fields
}

Consumer allows to consume message belonging to specified set of kafka topics.

Example
//init config
config := NewConfig(logrus.DefaultLogger())
config.SetBrokers("localhost:9091,localhost:9092")
config.SetRecvNotification(true)
config.SetRecvNotificationChan(make(chan *cluster.Notification))
config.SetRecvError(true)
config.SetRecvErrorChan(make(chan error))
config.SetRecvMessageChan(make(chan *ConsumerMessage))
config.SetTopics("topic1,topic2,topic3")
config.SetGroup("test-group")

// init consumer with message handlers
consumer, err := NewConsumer(config, nil)
if err != nil {
	log.Errorf("NewConsumer Error: %v", err)
}

go watchChannels(consumer, config)

// wait for consumer to finish receiving messages
consumer.WaitForClose()
log.Info("consumer closed")
// do something
Output:

func GetConsumerMock

func GetConsumerMock(t mocks.ErrorReporter) *Consumer

GetConsumerMock returns mocked implementation of consumer that doesn't need connection to kafka cluster.

func NewConsumer

func NewConsumer(config *Config, wg *sync.WaitGroup) (*Consumer, error)

NewConsumer returns a Consumer instance. If startHandlers is set to true, reading of messages, errors and notifications is started using new consumer. Otherwise, only instance is returned

func (*Consumer) Close

func (ref *Consumer) Close() error

Close closes the client and consumer

func (*Consumer) CommitOffsets

func (ref *Consumer) CommitOffsets() error

CommitOffsets manually commits marked offsets

func (*Consumer) GetCloseChannel

func (ref *Consumer) GetCloseChannel() <-chan struct{}

GetCloseChannel returns a channel that is closed on asyncProducer cleanup

func (*Consumer) IsClosed

func (ref *Consumer) IsClosed() bool

IsClosed returns the "closed" status

func (*Consumer) MarkOffset

func (ref *Consumer) MarkOffset(msg *ConsumerMessage, metadata string)

MarkOffset marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.

Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice, and your processing should ideally be idempotent.

func (*Consumer) MarkPartitionOffset

func (ref *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)

MarkPartitionOffset marks an offset of the provided topic/partition as processed. See MarkOffset for additional explanation.

func (*Consumer) PrintNotification

func (ref *Consumer) PrintNotification(note map[string][]int32)

PrintNotification print the topics and partitions

func (*Consumer) StartConsumerHandlers

func (ref *Consumer) StartConsumerHandlers()

StartConsumerHandlers starts required handlers using bsm/sarama consumer. Used when partitioner set in config is non-manual

func (*Consumer) StartConsumerManualHandlers

func (ref *Consumer) StartConsumerManualHandlers(partitionConsumer sarama.PartitionConsumer)

StartConsumerManualHandlers starts required handlers using sarama partition consumer. Used when partitioner set in config is manual

func (*Consumer) Subscriptions

func (ref *Consumer) Subscriptions() map[string][]int32

Subscriptions returns the consumed topics and partitions

func (*Consumer) WaitForClose

func (ref *Consumer) WaitForClose()

WaitForClose waits for the consumer to close

type ConsumerMessage

type ConsumerMessage struct {
	Key, Value, PrevValue []byte
	Topic                 string
	Partition             int32
	Offset                int64
	Timestamp             time.Time
}

ConsumerMessage encapsulates a Kafka message returned by the consumer.

func (*ConsumerMessage) GetKey

func (cm *ConsumerMessage) GetKey() string

GetKey returns the key associated with the message.

func (*ConsumerMessage) GetOffset

func (cm *ConsumerMessage) GetOffset() int64

GetOffset returns the offset associated with the message

func (*ConsumerMessage) GetPartition

func (cm *ConsumerMessage) GetPartition() int32

GetPartition returns the partition associated with the message

func (*ConsumerMessage) GetPrevValue

func (cm *ConsumerMessage) GetPrevValue() []byte

GetPrevValue returns the previous value associated with the message.

func (*ConsumerMessage) GetTopic

func (cm *ConsumerMessage) GetTopic() string

GetTopic returns the topic associated with the message

func (*ConsumerMessage) GetValue

func (cm *ConsumerMessage) GetValue() []byte

GetValue returns the value associated with the message.

type Encoder

type Encoder interface {
	sarama.Encoder
}

Encoder defines an interface that is used as argument of producer functions. It wraps the sarama.Encoder

type ProducerError

type ProducerError struct {
	*ProducerMessage
	Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

func (*ProducerError) Error

func (ref *ProducerError) Error() error

func (*ProducerError) String

func (ref *ProducerError) String() string

type ProducerMessage

type ProducerMessage struct {
	// The Kafka topic for this message.
	Topic string
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Key Encoder
	// The actual message to store in Kafka. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Value Encoder

	// This field is used to hold arbitrary data you wish to include so it
	// will be available when receiving on the Successes and Errors channels.
	// Sarama completely ignores this field and is only to be used for
	// pass-through data.
	Metadata interface{}

	// Offset is the offset of the message stored on the broker. This is only
	// guaranteed to be defined if the message was successfully delivered and
	// RequiredAcks is not NoResponse.
	Offset int64
	// Partition is the partition that the message was sent to. This is only
	// guaranteed to be defined if the message was successfully delivered.
	Partition int32
}

ProducerMessage is the collection of elements passed to the Producer in order to send a message.

func (*ProducerMessage) GetKey

func (pm *ProducerMessage) GetKey() string

GetKey returns the key associated with the message.

func (*ProducerMessage) GetOffset

func (pm *ProducerMessage) GetOffset() int64

GetOffset returns the offset associated with the message.

func (*ProducerMessage) GetPartition

func (pm *ProducerMessage) GetPartition() int32

GetPartition returns the partition associated with the message.

func (*ProducerMessage) GetPrevValue

func (pm *ProducerMessage) GetPrevValue() []byte

GetPrevValue returns nil for the producer

func (*ProducerMessage) GetTopic

func (pm *ProducerMessage) GetTopic() string

GetTopic returns the topic associated with the message.

func (*ProducerMessage) GetValue

func (pm *ProducerMessage) GetValue() []byte

GetValue returns the content of the message.

func (*ProducerMessage) String

func (pm *ProducerMessage) String() string

type ProtoConsumerMessage

type ProtoConsumerMessage struct {
	*ConsumerMessage
	// contains filtered or unexported fields
}

ProtoConsumerMessage encapsulates a Kafka message returned by the consumer and provides means to unmarshal the value into proto.Message.

func NewProtoConsumerMessage

func NewProtoConsumerMessage(msg *ConsumerMessage, serializer keyval.Serializer) *ProtoConsumerMessage

NewProtoConsumerMessage creates new instance of ProtoConsumerMessage

func (*ProtoConsumerMessage) GetKey

func (cm *ProtoConsumerMessage) GetKey() string

GetKey returns the key associated with the message.

func (*ProtoConsumerMessage) GetOffset

func (cm *ProtoConsumerMessage) GetOffset() int64

GetOffset returns the offset associated with the message.

func (*ProtoConsumerMessage) GetPartition

func (cm *ProtoConsumerMessage) GetPartition() int32

GetPartition returns the partition associated with the message.

func (*ProtoConsumerMessage) GetPrevValue

func (cm *ProtoConsumerMessage) GetPrevValue(msg proto.Message) (prevValueExist bool, err error)

GetPrevValue returns the previous value associated with the latest message.

func (*ProtoConsumerMessage) GetTopic

func (cm *ProtoConsumerMessage) GetTopic() string

GetTopic returns the topic associated with the message.

func (*ProtoConsumerMessage) GetValue

func (cm *ProtoConsumerMessage) GetValue(msg proto.Message) error

GetValue returns the value associated with the message.

type ProtoProducerMessage

type ProtoProducerMessage struct {
	*ProducerMessage
	Serializer keyval.Serializer
}

ProtoProducerMessage is wrapper of a producer message that simplify work with proto-modelled data.

func (*ProtoProducerMessage) GetKey

func (ppm *ProtoProducerMessage) GetKey() string

GetKey returns the key associated with the message.

func (*ProtoProducerMessage) GetOffset

func (ppm *ProtoProducerMessage) GetOffset() int64

GetOffset returns the offset associated with the message.

func (*ProtoProducerMessage) GetPartition

func (ppm *ProtoProducerMessage) GetPartition() int32

GetPartition returns the partition associated with the message.

func (*ProtoProducerMessage) GetPrevValue

func (ppm *ProtoProducerMessage) GetPrevValue(msg proto.Message) (prevValueExist bool, err error)

GetPrevValue for producer returns false (value does not exist)

func (*ProtoProducerMessage) GetTopic

func (ppm *ProtoProducerMessage) GetTopic() string

GetTopic returns the topic associated with the message.

func (*ProtoProducerMessage) GetValue

func (ppm *ProtoProducerMessage) GetValue(msg proto.Message) error

GetValue unmarshalls the content of the msg into provided structure.

type ProtoProducerMessageErr

type ProtoProducerMessageErr struct {
	*ProtoProducerMessage
	Err error
}

ProtoProducerMessageErr represents a proto-modelled message that was not published successfully.

func (*ProtoProducerMessageErr) Error

func (pme *ProtoProducerMessageErr) Error() error

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid except AcksUnset.

const (
	// AcksUnset indicates that no valid value has been set
	AcksUnset RequiredAcks = -32768
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all replicas to commit before responding.
	WaitForAll RequiredAcks = -1
)

type SyncProducer

type SyncProducer struct {
	logging.Logger
	Config    *Config
	Client    sarama.Client
	Producer  sarama.SyncProducer
	Partition int32

	sync.Mutex
	// contains filtered or unexported fields
}

SyncProducer allows to publish messages to kafka using synchronous API.

Example
// init config
config := NewConfig(logrus.DefaultLogger())
config.ProducerConfig().Producer.RequiredAcks = sarama.WaitForAll
config.SetBrokers("localhost:9091", "localhost:9092")

// init client
sClient, err := NewClient(config, Hash)
if err != nil {
	return
}

// init producer
producer, err := NewSyncProducer(config, sClient, Hash, nil)
if err != nil {
	log.Errorf("NewSyncProducer errored: %v\n", err)
	return
}

// send message
_, err = producer.SendMsgByte("test-topic", nil, []byte("test message"))
if err != nil {
	log.Errorf("SendMsg errored: %v", err)
}

// close producer and release resources
err = producer.Close()
if err != nil {
	log.Errorf("SyncProducer close errored: %v", err)
	return
}
log.Info("SyncProducer closed")
Output:

func GetSyncProducerMock

func GetSyncProducerMock(t mocks.ErrorReporter) (*SyncProducer, *mocks.SyncProducer)

GetSyncProducerMock returns mocked implementation of sync producer that doesn't need connection to Kafka broker and can be used for testing purposes.

func NewSyncProducer

func NewSyncProducer(config *Config, sClient sarama.Client, partitioner string, wg *sync.WaitGroup) (*SyncProducer, error)

NewSyncProducer returns a new SyncProducer instance. Producer is created from provided sarama client which can be nil; in that case, a new client is created. Also the partitioner is set here. Note: provided sarama client partitioner should match the one used in config.

func (*SyncProducer) Close

func (ref *SyncProducer) Close() error

Close closes the client and producer

func (*SyncProducer) GetCloseChannel

func (ref *SyncProducer) GetCloseChannel() <-chan struct{}

GetCloseChannel returns a channel that is closed on asyncProducer cleanup

func (*SyncProducer) IsClosed

func (ref *SyncProducer) IsClosed() bool

IsClosed returns the "closed" status

func (*SyncProducer) SendMsgByte

func (ref *SyncProducer) SendMsgByte(topic string, key []byte, msg []byte) (*ProducerMessage, error)

SendMsgByte sends a message to Kafka

func (*SyncProducer) SendMsgToPartition

func (ref *SyncProducer) SendMsgToPartition(topic string, partition int32, key sarama.Encoder, msg sarama.Encoder) (*ProducerMessage, error)

SendMsgToPartition sends a message to Kafka

func (*SyncProducer) WaitForClose

func (ref *SyncProducer) WaitForClose()

WaitForClose returns when the producer is closed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL