kafka

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2019 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetConsumerGroupName

func GetConsumerGroupName(deployment, jobName string, aresCluster string) string

GetConsumerGroupName will return the consumer group name to use or being used for given deployment and job name

func NewKafkaConsumer

func NewKafkaConsumer(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error)

NewKafkaConsumer creates kafka consumer by using https://github.com/confluentinc/confluent-kafka-go.

Types

type KafkaConsumer

type KafkaConsumer struct {
	*kafkaConfluent.Consumer
	kafkaConfluent.ConfigMap
	sync.Mutex

	TopicArray []string
	Logger     *zap.Logger
	Scope      tally.Scope
	ErrCh      chan error
	MsgCh      chan consumer.Message

	// WARNING: The following channels should not be closed by the lib users
	CloseAttempted bool
	CloseErr       error
	CloseCh        chan struct{}
}

KafkaConsumer implements Consumer interface

func (*KafkaConsumer) Close

func (c *KafkaConsumer) Close() error

func (*KafkaConsumer) Closed

func (c *KafkaConsumer) Closed() <-chan struct{}

Closed returns a channel that unblocks when the consumer successfully shuts down.

func (*KafkaConsumer) CommitUpTo

func (c *KafkaConsumer) CommitUpTo(msg consumer.Message) error

CommitUpTo marks this message and all previous messages in the same partition as processed. The last processed offset for each partition is periodically flushed to ZooKeeper; on startup, consumers begin processing after the last stored offset.

func (*KafkaConsumer) Errors

func (c *KafkaConsumer) Errors() <-chan error

Errors returns a channel of errors for the topic. To prevent deadlocks, users must read from the error channel.

All errors returned from this channel can be safely cast to the consumer.Error interface, which allows structured access to the topic name and partition number.

func (*KafkaConsumer) Messages

func (c *KafkaConsumer) Messages() <-chan consumer.Message

Messages returns a channel of messages for the topic.

If the consumer is not configured with nonzero buffer size, the Errors() channel must be read in conjunction with Messages() to prevent deadlocks.

func (*KafkaConsumer) Name

func (c *KafkaConsumer) Name() string

Name returns the name of this consumer group.

func (*KafkaConsumer) Topics

func (c *KafkaConsumer) Topics() []string

Topics returns the names of the topics being consumed.

type KafkaMessage

type KafkaMessage struct {
	*kafkaConfluent.Message

	Consumer    consumer.Consumer
	ClusterName string
}

KafkaMessage implements Message interface

func (*KafkaMessage) Ack

func (m *KafkaMessage) Ack()

func (*KafkaMessage) Cluster

func (m *KafkaMessage) Cluster() string

func (*KafkaMessage) Key

func (m *KafkaMessage) Key() []byte

func (*KafkaMessage) Nack

func (m *KafkaMessage) Nack()

func (*KafkaMessage) Offset

func (m *KafkaMessage) Offset() int64

func (*KafkaMessage) Partition

func (m *KafkaMessage) Partition() int32

func (*KafkaMessage) Topic

func (m *KafkaMessage) Topic() string

func (*KafkaMessage) Value

func (m *KafkaMessage) Value() []byte

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL