sikafka

package
v2.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AsyncProducerWithConfig

func AsyncProducerWithConfig(config *sarama.Config, brokers []string) (sarama.AsyncProducer, error)

func ConsumerGroupWithConfig

func ConsumerGroupWithConfig(config *sarama.Config, brokers []string, group string) (sarama.ConsumerGroup, error)

func DefaultAsyncProducer

func DefaultAsyncProducer(brokers []string) (sarama.AsyncProducer, error)

func DefaultConsumerGroup

func DefaultConsumerGroup(brokers []string, group string, version string, assignor string, oldest bool) (sarama.ConsumerGroup, error)

func DefaultSyncProducer

func DefaultSyncProducer(brokers []string) (sarama.SyncProducer, error)

func SyncProducerWithConfig

func SyncProducerWithConfig(config *sarama.Config, brokers []string) (sarama.SyncProducer, error)

Types

type AsyncProducer

type AsyncProducer struct {
	sarama.AsyncProducer
	// contains filtered or unexported fields
}

func NewAsyncProducer

func NewAsyncProducer(producer sarama.AsyncProducer, topic string) *AsyncProducer

func (*AsyncProducer) Produce

func (ap *AsyncProducer) Produce(key []byte, value []byte) (partition int32, offset int64, err error)

func (*AsyncProducer) ProduceWithMessage

func (ap *AsyncProducer) ProduceWithMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

func (*AsyncProducer) ProduceWithTopic

func (ap *AsyncProducer) ProduceWithTopic(topic string, key []byte, value []byte) (partition int32, offset int64, err error)

type CgConsumer

type CgConsumer struct {
	// contains filtered or unexported fields
}

CgConsumer represents a Sarama consumer group consumer

func NewCgConsumer

func NewCgConsumer(msgHandler MessageHandler) *CgConsumer

func (*CgConsumer) Cleanup

func (consumer *CgConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*CgConsumer) CloseReady

func (c *CgConsumer) CloseReady()

func (*CgConsumer) ConsumeClaim

func (consumer *CgConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*CgConsumer) MakeReady

func (c *CgConsumer) MakeReady()

func (*CgConsumer) Setup

func (consumer *CgConsumer) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

func (*CgConsumer) WaitReady

func (c *CgConsumer) WaitReady()

type Consumer

type Consumer interface {
	Setup(sarama.ConsumerGroupSession) error
	Cleanup(sarama.ConsumerGroupSession) error
	ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
	MakeReady()
	WaitReady()
	CloseReady()
}

type ConsumerGroup

type ConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(cg sarama.ConsumerGroup, consumer Consumer, topics []string) *ConsumerGroup

func (*ConsumerGroup) Finish

func (cg *ConsumerGroup) Finish() error

func (*ConsumerGroup) Start

func (cg *ConsumerGroup) Start() error

func (*ConsumerGroup) StartWith

func (cg *ConsumerGroup) StartWith(loaded chan bool) error

func (*ConsumerGroup) Stop

func (cg *ConsumerGroup) Stop() error

func (*ConsumerGroup) Toggle

func (cg *ConsumerGroup) Toggle()

type MessageHandler

type MessageHandler interface {
	Handle(msg *sarama.ConsumerMessage) error
}

type SyncProducer

type SyncProducer struct {
	sarama.SyncProducer
	// contains filtered or unexported fields
}

func NewSyncProducer

func NewSyncProducer(producer sarama.SyncProducer, topic string) *SyncProducer

func (*SyncProducer) Produce

func (sp *SyncProducer) Produce(key []byte, value []byte) (partition int32, offset int64, err error)

func (*SyncProducer) ProduceWithMessage

func (sp *SyncProducer) ProduceWithMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

func (*SyncProducer) ProduceWithTopic

func (sp *SyncProducer) ProduceWithTopic(topic string, key []byte, value []byte) (partition int32, offset int64, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL