Documentation
¶
Index ¶
- Variables
- type Consumer
- func (c *Consumer) Assign(partitions []kafka.TopicPartition) error
- func (c *Consumer) Commit(topic string, partition uint32, offset kafka.Offset) error
- func (c *Consumer) Pause(partitions []kafka.TopicPartition) error
- func (c *Consumer) Poll() kafka.Event
- func (c *Consumer) Seek(topic string, partition uint32, offset kafka.Offset) error
- func (c *Consumer) Start() error
- func (c *Consumer) Stop()
- func (c *Consumer) Subscribe(topics ...string) error
- func (c *Consumer) Unassign() error
- type ConsumerConfig
- type Producer
- type ProducerConfig
- type PubSub
- type PubSubConfig
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultPartitionAssignmentStrategy is the default partition assignment strategy used by group members. DefaultPartitionAssignmentStrategy = "roundrobin,range" // DefaultClientRack is the default client rack identifier. Usually set to region name. DefaultClientRack = "" // DefaultPollInterval is the default maximum interval Poll call can block. DefaultPollInterval = 100 * time.Millisecond )
var ( // DefaultFlushTimeout is the default timeout duration for producer Flush call. DefaultFlushTimeout = 5 * time.Second )
var (
// DefaultPubSubEventsChanSize is the default size of events buffered channel.
DefaultPubSubEventsChanSize = 100
)
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is the Kafka consumer
func NewConsumer ¶
func NewConsumer(config ConsumerConfig) (*Consumer, error)
NewConsumer creates a new Consumer instance
func (*Consumer) Assign ¶
func (c *Consumer) Assign(partitions []kafka.TopicPartition) error
Assign the set of partitions to consume
func (*Consumer) Pause ¶
func (c *Consumer) Pause(partitions []kafka.TopicPartition) error
Pause consumption for the provided list of partitions
type ConsumerConfig ¶
type ConsumerConfig struct { // The list of Kafka brokers. // // Field value is required. Brokers []string `required:"true"` // Client group identifier. All clients sharing the same id belong to the same group. // // Field value is required. ConsumerGroup string `required:"true"` // The partition assignment strategy used by group members. // // Default value is set via DefaultPartitionAssignmentStrategy variable. PartitionAssignmentStrategy string // The maximum interval Poll call can block // // Default value is set via DefaultPollInterval variable. PollInterval time.Duration `required:"false" min:"0ms"` // A rack identifier for the client. // // Default value is set via DefaultClientRack variable. ClientRack string // Allows further configuration of underlying Kafka consumer. // // For advanced consumer configuration options, check librdkafka docs: // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md AdditionalConfig kafka.ConfigMap // Breaker enables tracking consumer error rate. // // Default value is set via DefaultKafkaBreaker variable. Breaker core.Breaker }
ConsumerConfig is the Kafka consumer configuration
func (ConsumerConfig) Get ¶
func (c ConsumerConfig) Get() (interface{}, error)
Get creates and returns the corresponding instance
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is the Kafka producer
func NewProducer ¶
func NewProducer(config ProducerConfig) (*Producer, error)
NewProducer creates a new Producer instance
func (*Producer) ProduceMessages ¶
ProduceMessages will write the messsages to Kafka
type ProducerConfig ¶
type ProducerConfig struct { // The list of Kafka brokers. // // Field value is required. Brokers []string `required:"true"` // Maximum number of messages in one borker request. // // Default value is set by underlying Kafka producer. BatchSize int // Delay in milliseconds to wait for messages in the producer queue to accumulate before // constructing message batches to transmit to brokers. A higher value allows larger and // more effective (less overhead, improved compression) batches of messages to accumulate // at the expense of increased message delivery latency. // // Default value is set by underlying Kafka producer. BatchDelay time.Duration // The timeout duration for producer Flush call. // // Default value is set via DefaultFlushTimeout variable. FlushTimeout time.Duration `required:"false" min:"0ms"` // Allows further configuration of underlying Kafka producer. // // For advanced producer configuration options, check librdkafka docs: // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md AdditionalConfig kafka.ConfigMap // Breaker enables tracking producer error rate. // // Default value is set via DefaultKafkaBreaker variable. Breaker core.Breaker }
ProducerConfig is the Kafka producer configuration
func (ProducerConfig) Get ¶
func (c ProducerConfig) Get() (interface{}, error)
Get creates the corresponding instance
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub implements the publish–subscribe messaging pattern using a Kafka producer and consumer pair.
func NewPubSub ¶
func NewPubSub(config PubSubConfig) (*PubSub, error)
NewPubSub creates a new PubSub instance
type PubSubConfig ¶
type PubSubConfig struct { // The list of Kafka brokers. // // Field value is required. Brokers []string `required:"true"` // Prefix used to generate an unique consumer group identifier. // // Field value is required. ConsumerGroupPrefix string `required:"true"` // Size of events buffered channel. // // Default value is set via DefaultPubSubEventsChanSize variable. EventsChanSize int `min:"1"` // The maximum interval consumer Poll call can block // // Default value is set via DefaultPollInterval variable. PollInterval time.Duration `required:"false" min:"0ms"` // The timeout duration for producer Flush call. // // Default value is set via DefaultFlushTimeout variable. FlushTimeout time.Duration `required:"false" min:"0ms"` // Allows further configuration of underlying Kafka consumer. ConsumerAdditionalConfig kafka.ConfigMap // Allows further configuration of underlying Kafka producer. ProducerAdditionalConfig kafka.ConfigMap // Breaker enables tracking error rate. // // Default value is set via DefaultKafkaBreaker variable. Breaker core.Breaker }
PubSubConfig is the PubSub configuration
func (PubSubConfig) Get ¶
func (c PubSubConfig) Get() (interface{}, error)
Get creates the corresponding instance