sikafka

package
v2.3.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AsyncProducerWithConfig

func AsyncProducerWithConfig(config *sarama.Config, brokers []string) (sarama.AsyncProducer, error)

Deprecated

func ConsumerGroupWithConfig

func ConsumerGroupWithConfig(config *sarama.Config, brokers []string, group string) (sarama.ConsumerGroup, error)

func DefaultAsyncProducer

func DefaultAsyncProducer(brokers []string) (sarama.AsyncProducer, error)

Deprecated

func DefaultConsumerGroup

func DefaultConsumerGroup(brokers []string, group string, version string, assignor string, oldest bool) (sarama.ConsumerGroup, error)

func DefaultSyncProducer

func DefaultSyncProducer(brokers []string) (sarama.SyncProducer, error)

Deprecated

func SyncProducerWithConfig

func SyncProducerWithConfig(config *sarama.Config, brokers []string) (sarama.SyncProducer, error)

Deprecated

Types

type AsyncProducer

type AsyncProducer struct {
	sarama.AsyncProducer
	// contains filtered or unexported fields
}

func NewAsyncProducer

func NewAsyncProducer(producer sarama.AsyncProducer, topic string) *AsyncProducer

func RetryableAsyncProducer added in v2.3.8

func RetryableAsyncProducer(brokers []string, version, topic string) (*AsyncProducer, error)

RetryableAsyncProducer returns SyncProducer with retry configured.

func (*AsyncProducer) Produce

func (ap *AsyncProducer) Produce(key []byte, value []byte) (partition int32, offset int64, err error)

func (*AsyncProducer) ProduceWithMessage

func (ap *AsyncProducer) ProduceWithMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

func (*AsyncProducer) ProduceWithTopic

func (ap *AsyncProducer) ProduceWithTopic(topic string, key []byte, value []byte) (partition int32, offset int64, err error)

type CgConsumer

type CgConsumer struct {
	// contains filtered or unexported fields
}

CgConsumer represents a Sarama consumer group consumer

func NewCgConsumer

func NewCgConsumer(msgHandler MessageHandler) *CgConsumer

func (*CgConsumer) Cleanup

func (consumer *CgConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*CgConsumer) CloseReady

func (c *CgConsumer) CloseReady()

func (*CgConsumer) ConsumeClaim

func (consumer *CgConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*CgConsumer) MakeReady

func (c *CgConsumer) MakeReady()

func (*CgConsumer) Setup

func (consumer *CgConsumer) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

func (*CgConsumer) WaitReady

func (c *CgConsumer) WaitReady() <-chan bool

type Consumer

type Consumer interface {
	Setup(sarama.ConsumerGroupSession) error
	Cleanup(sarama.ConsumerGroupSession) error
	ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
	MakeReady()
	WaitReady() <-chan bool
	CloseReady()
}

type ConsumerGroup

type ConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(cg sarama.ConsumerGroup, consumer Consumer, topics []string) *ConsumerGroup

func (*ConsumerGroup) Finish

func (cg *ConsumerGroup) Finish() error

func (*ConsumerGroup) Start

func (cg *ConsumerGroup) Start() error

func (*ConsumerGroup) StartWith

func (cg *ConsumerGroup) StartWith(loaded chan bool) error

func (*ConsumerGroup) Stop

func (cg *ConsumerGroup) Stop() error

func (*ConsumerGroup) Toggle

func (cg *ConsumerGroup) Toggle()

type MessageHandler

type MessageHandler interface {
	Handle(msg *sarama.ConsumerMessage) error
}

type SyncProducer

type SyncProducer struct {
	sarama.SyncProducer
	// contains filtered or unexported fields
}

func NewSyncProducer

func NewSyncProducer(producer sarama.SyncProducer, topic string, opts ...SyncProducerOption) *SyncProducer

func RetryableSyncProducer added in v2.3.7

func RetryableSyncProducer(brokers []string, version, topic string) (*SyncProducer, error)

RetryableSyncProducer returns SyncProducer with retry configured.

func (*SyncProducer) Produce

func (sp *SyncProducer) Produce(key []byte, value []byte) (partition int32, offset int64, err error)

func (*SyncProducer) ProduceWithMessage

func (sp *SyncProducer) ProduceWithMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

func (*SyncProducer) ProduceWithTopic

func (sp *SyncProducer) ProduceWithTopic(topic string, key []byte, value []byte) (partition int32, offset int64, err error)

type SyncProducerOption added in v2.3.6

type SyncProducerOption interface {
	// contains filtered or unexported methods
}

type SyncProducerOptionFunc added in v2.3.6

type SyncProducerOptionFunc func(o *SyncProducer) error

func WithSyncProducerOptionRetyMax added in v2.3.6

func WithSyncProducerOptionRetyMax(retryMax uint16) SyncProducerOptionFunc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL