kafka

package
v0.0.0-...-7055b2f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2021 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultPartitionAssignmentStrategy is the default partition assignment strategy used by group members.
	DefaultPartitionAssignmentStrategy = "roundrobin,range"

	// DefaultClientRack is the default client rack identifier. Usually set to region name.
	DefaultClientRack = ""

	// DefaultPollInterval is the default maximum interval Poll call can block.
	DefaultPollInterval = 100 * time.Millisecond
)
View Source
var (
	// DefaultFlushTimeout is the default timeout duration for producer Flush call.
	DefaultFlushTimeout = 5 * time.Second
)
View Source
var (
	// DefaultPubSubEventsChanSize is the default size of events buffered channel.
	DefaultPubSubEventsChanSize = 100
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is the Kafka consumer

func NewConsumer

func NewConsumer(config ConsumerConfig) (*Consumer, error)

NewConsumer creates a new Consumer instance

func (*Consumer) Assign

func (c *Consumer) Assign(partitions []kafka.TopicPartition) error

Assign the set of partitions to consume

func (*Consumer) Commit

func (c *Consumer) Commit(topic string, partition uint32, offset kafka.Offset) error

Commit commits the offset for the provided topic partition

func (*Consumer) Pause

func (c *Consumer) Pause(partitions []kafka.TopicPartition) error

Pause consumption for the provided list of partitions

func (*Consumer) Poll

func (c *Consumer) Poll() kafka.Event

Poll the consumer for messages or events

func (*Consumer) Seek

func (c *Consumer) Seek(topic string, partition uint32, offset kafka.Offset) error

Seek seeks the given topic partition

func (*Consumer) Start

func (c *Consumer) Start() error

Start will start the consumer

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop will stop the consumer

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topics ...string) error

Subscribe subscribes to the provided list of topics

func (*Consumer) Unassign

func (c *Consumer) Unassign() error

Unassign the current set of partitions to consume

type ConsumerConfig

type ConsumerConfig struct {
	// The list of Kafka brokers.
	//
	// Field value is required.
	Brokers []string `required:"true"`

	// Client group identifier. All clients sharing the same id belong to the same group.
	//
	// Field value is required.
	ConsumerGroup string `required:"true"`

	// The partition assignment strategy used by group members.
	//
	// Default value is set via DefaultPartitionAssignmentStrategy variable.
	PartitionAssignmentStrategy string

	// The maximum interval Poll call can block
	//
	// Default value is set via DefaultPollInterval variable.
	PollInterval time.Duration `required:"false" min:"0ms"`

	// A rack identifier for the client.
	//
	// Default value is set via DefaultClientRack variable.
	ClientRack string

	// Allows further configuration of underlying Kafka consumer.
	//
	// For advanced consumer configuration options, check librdkafka docs:
	// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
	AdditionalConfig kafka.ConfigMap

	// Breaker enables tracking consumer error rate.
	//
	// Default value is set via DefaultKafkaBreaker variable.
	Breaker core.Breaker
}

ConsumerConfig is the Kafka consumer configuration

func (ConsumerConfig) Get

func (c ConsumerConfig) Get() (interface{}, error)

Get creates and returns the corresponding instance

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is the Kafka producer

func NewProducer

func NewProducer(config ProducerConfig) (*Producer, error)

NewProducer creates a new Producer instance

func (*Producer) Produce

func (p *Producer) Produce(topic string, key, value []byte) error

Produce will send a new messsage to specified topic

func (*Producer) ProduceMessages

func (p *Producer) ProduceMessages(topic string, partition uint32, messages ...core.Message) error

ProduceMessages will write the messsages to Kafka

func (*Producer) Start

func (p *Producer) Start() error

Start will start the producer

func (*Producer) Stop

func (p *Producer) Stop()

Stop will stop the producer

type ProducerConfig

type ProducerConfig struct {
	// The list of Kafka brokers.
	//
	// Field value is required.
	Brokers []string `required:"true"`

	// Maximum number of messages in one borker request.
	//
	// Default value is set by underlying Kafka producer.
	BatchSize int

	// Delay in milliseconds to wait for messages in the producer queue to accumulate before
	// constructing message batches to transmit to brokers. A higher value allows larger and
	// more effective (less overhead, improved compression) batches of messages to accumulate
	// at the expense of increased message delivery latency.
	//
	// Default value is set by underlying Kafka producer.
	BatchDelay time.Duration

	// The timeout duration for producer Flush call.
	//
	// Default value is set via DefaultFlushTimeout variable.
	FlushTimeout time.Duration `required:"false" min:"0ms"`

	// Allows further configuration of underlying Kafka producer.
	//
	// For advanced producer configuration options, check librdkafka docs:
	// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
	AdditionalConfig kafka.ConfigMap

	// Breaker enables tracking producer error rate.
	//
	// Default value is set via DefaultKafkaBreaker variable.
	Breaker core.Breaker
}

ProducerConfig is the Kafka producer configuration

func (ProducerConfig) Get

func (c ProducerConfig) Get() (interface{}, error)

Get creates the corresponding instance

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub implements the publish–subscribe messaging pattern using a Kafka producer and consumer pair.

func NewPubSub

func NewPubSub(config PubSubConfig) (*PubSub, error)

NewPubSub creates a new PubSub instance

func (*PubSub) Events

func (p *PubSub) Events() <-chan kafka.Event

Events is the channel of incoming events

func (*PubSub) Publish

func (p *PubSub) Publish(topic string, key, value []byte) error

Publish will send a new messsage to specified topic

func (*PubSub) Start

func (p *PubSub) Start() error

Start will start the consumer and producer

func (*PubSub) Stop

func (p *PubSub) Stop()

Stop will stop the consumer and producer

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(topic string) error

Subscribe subscribes to the provided topic

type PubSubConfig

type PubSubConfig struct {
	// The list of Kafka brokers.
	//
	// Field value is required.
	Brokers []string `required:"true"`

	// Prefix used to generate an unique consumer group identifier.
	//
	// Field value is required.
	ConsumerGroupPrefix string `required:"true"`

	// Size of events buffered channel.
	//
	// Default value is set via DefaultPubSubEventsChanSize variable.
	EventsChanSize int `min:"1"`

	// The maximum interval consumer Poll call can block
	//
	// Default value is set via DefaultPollInterval variable.
	PollInterval time.Duration `required:"false" min:"0ms"`

	// The timeout duration for producer Flush call.
	//
	// Default value is set via DefaultFlushTimeout variable.
	FlushTimeout time.Duration `required:"false" min:"0ms"`

	// Allows further configuration of underlying Kafka consumer.
	ConsumerAdditionalConfig kafka.ConfigMap

	// Allows further configuration of underlying Kafka producer.
	ProducerAdditionalConfig kafka.ConfigMap

	// Breaker enables tracking error rate.
	//
	// Default value is set via DefaultKafkaBreaker variable.
	Breaker core.Breaker
}

PubSubConfig is the PubSub configuration

func (PubSubConfig) Get

func (c PubSubConfig) Get() (interface{}, error)

Get creates the corresponding instance

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL