Documentation ¶
Index ¶
- func CopyDataChunkFile(consumer *Consumer, dst io.Writer)
- func MsgAddressPrintable(message interface{}) string
- func TasksProcessor(consumer *Consumer, processor func(*common.Task) error)
- type ConsumeMessageFunction
- type Consumer
- type ConsumerGroup
- func (c *ConsumerGroup) ConsumeLoop(consumeNewest bool, ack bool)
- func (c *ConsumerGroup) SetAddress(address *common.KafkaAddress) *ConsumerGroup
- func (c *ConsumerGroup) SetConsumeMessageFunction(consumeMessageFunction ConsumeMessageFunction) *ConsumerGroup
- func (c *ConsumerGroup) SetConsumerGroupHandler(handler sarama.ConsumerGroupHandler) *ConsumerGroup
- func (c *ConsumerGroup) SetContext(ctx context.Context) *ConsumerGroup
- func (c *ConsumerGroup) SetTopic(topic string) *ConsumerGroup
- type ConsumerGroupHandler
- type DataChunkTransport
- type Producer
- func (p *Producer) Close()
- func (p *Producer) CreateTopic() (error, error)
- func (p *Producer) GetTopic() string
- func (p *Producer) ListTopics() (map[string]sarama.TopicDetail, error)
- func (p *Producer) Send(data []byte) error
- func (p *Producer) SetAddress(address *common.KafkaAddress) *Producer
- func (p *Producer) SetTopic(topic string) *Producer
- type TaskTransport
- type Transport
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CopyDataChunkFile ¶
CopyDataChunkFile
Types ¶
type ConsumeMessageFunction ¶
type ConsumeMessageFunction func(context.Context, *sarama.ConsumerMessage) bool
ConsumeMessageFunction specifies message consumeMessageFunction function
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer
func NewConsumer ¶
func NewConsumer(endpoint *common.KafkaEndpoint, address *common.KafkaAddress) *Consumer
NewConsumer
func NewConsumerConfig ¶
func NewConsumerConfig(cfg sections.KafkaConfigurator, topic string) *Consumer
NewConsumerConfig
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup
func NewConsumerGroup ¶
func NewConsumerGroup(endpoint *common.KafkaEndpoint, address *common.KafkaAddress, groupID string) *ConsumerGroup
NewConsumerGroup creates new consumer group
func NewConsumerGroupFromEndpoint ¶
func NewConsumerGroupFromEndpoint(cfg sections.KafkaConfigurator, groupID string) *ConsumerGroup
NewConsumerGroupFromEndpoint. IMPORTANT - you have to specify topic to read from either with
- SetAddress
- SetTopic
func (*ConsumerGroup) ConsumeLoop ¶
func (c *ConsumerGroup) ConsumeLoop(consumeNewest bool, ack bool)
ConsumeLoop runs an endless loop of kafka consumer
func (*ConsumerGroup) SetAddress ¶
func (c *ConsumerGroup) SetAddress(address *common.KafkaAddress) *ConsumerGroup
SetAddress - sets the full address - Topic and Partition
func (*ConsumerGroup) SetConsumeMessageFunction ¶
func (c *ConsumerGroup) SetConsumeMessageFunction(consumeMessageFunction ConsumeMessageFunction) *ConsumerGroup
SetConsumeMessageFunction sets function which will be called for each message received from Kafka
func (*ConsumerGroup) SetConsumerGroupHandler ¶
func (c *ConsumerGroup) SetConsumerGroupHandler(handler sarama.ConsumerGroupHandler) *ConsumerGroup
SetConsumerGroupHandler sets handler which performs setup, cleanup and message processing activities
func (*ConsumerGroup) SetContext ¶
func (c *ConsumerGroup) SetContext(ctx context.Context) *ConsumerGroup
SetContext - sets context to be used by MessageProcessor
func (*ConsumerGroup) SetTopic ¶
func (c *ConsumerGroup) SetTopic(topic string) *ConsumerGroup
SetTopic - sets address in simplified form - specified Topic and Partition 0
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct {
// contains filtered or unexported fields
}
ConsumerGroupHandler instances are used to handle individual topic/partition claims. It also provides hooks for your consumer group session life-cycle and allow you to trigger logic before or after the consume loop(s).
PLEASE NOTE that handlers are likely be called from several goroutines concurrently, ensure that all state is safely protected against race conditions.
Implements sarama.ConsumerGroupHandler interface
func (*ConsumerGroupHandler) Cleanup ¶
func (*ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time. Part of sarama.ConsumerGroupHandler interface
func (*ConsumerGroupHandler) ConsumeClaim ¶
func (h *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit. Part of sarama.ConsumerGroupHandler interface
func (*ConsumerGroupHandler) Setup ¶
func (*ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim. Part of sarama.ConsumerGroupHandler interface
type DataChunkTransport ¶
type DataChunkTransport struct {
Transport
}
func NewDataChunkTransport ¶
func NewDataChunkTransport(producer *Producer, consumer *Consumer, close bool) *DataChunkTransport
NewDataChunkTransport
func (*DataChunkTransport) Recv ¶
func (t *DataChunkTransport) Recv() (*common.DataPacket, error)
Recv
func (*DataChunkTransport) Send ¶
func (t *DataChunkTransport) Send(dataChunk *common.DataPacket) error
Send
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer
func NewProducer ¶
func NewProducer(endpoint *common.KafkaEndpoint, address *common.KafkaAddress) *Producer
NewProducer
func NewProducerConfig ¶
func NewProducerConfig(cfg sections.KafkaConfigurator) *Producer
NewProducerConfig
func (*Producer) ListTopics ¶
func (p *Producer) ListTopics() (map[string]sarama.TopicDetail, error)
ListTopics
func (*Producer) SetAddress ¶
func (p *Producer) SetAddress(address *common.KafkaAddress) *Producer
SetAddress
type TaskTransport ¶
type TaskTransport struct {
Transport
}
func NewTaskTransport ¶
func NewTaskTransport(producer *Producer, consumer *Consumer, close bool) *TaskTransport
NewTaskTransport