impl

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateCommonSaramaConfig

func CreateCommonSaramaConfig(version string, props CommonProperties) (*sarama.Config, error)

func NewSaramaAdmin

func NewSaramaAdmin(props *properties.Client) core.Admin

func NewSaramaConsumerClient

func NewSaramaConsumerClient(globalProps *properties.Client) (sarama.Client, error)

func NewSaramaProducerClient

func NewSaramaProducerClient(globalProps *properties.Client) (sarama.Client, error)

Types

type CommonProperties

type CommonProperties interface {
	GetClientId() string
	GetSecurityProtocol() string
	GetTls() *properties.Tls
}

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	// contains filtered or unexported fields
}

func NewConsumerGroupHandler

func NewConsumerGroupHandler(client sarama.Client, handler core.ConsumerHandler, mapper *SaramaMapper) *ConsumerGroupHandler

func (*ConsumerGroupHandler) Cleanup

func (*ConsumerGroupHandler) ConsumeClaim

func (*ConsumerGroupHandler) MarkUnready

func (cg *ConsumerGroupHandler) MarkUnready()

func (*ConsumerGroupHandler) Setup

func (*ConsumerGroupHandler) WaitForReady

func (cg *ConsumerGroupHandler) WaitForReady() chan bool

type DebugLogger

type DebugLogger struct {
}

func NewDebugLogger

func NewDebugLogger() *DebugLogger

func (DebugLogger) Print

func (l DebugLogger) Print(v ...interface{})

func (DebugLogger) Printf

func (l DebugLogger) Printf(format string, v ...interface{})

func (DebugLogger) Println

func (l DebugLogger) Println(v ...interface{})

type SaramaAdmin

type SaramaAdmin struct {
	// contains filtered or unexported fields
}

func (SaramaAdmin) CreateTopics

func (s SaramaAdmin) CreateTopics(configurations []core.TopicConfiguration) error

func (SaramaAdmin) DeleteGroups

func (s SaramaAdmin) DeleteGroups(groupIds []string) error

func (SaramaAdmin) DeleteTopics

func (s SaramaAdmin) DeleteTopics(topics []string) error

type SaramaAsyncProducer

type SaramaAsyncProducer struct {
	// contains filtered or unexported fields
}

func NewSaramaAsyncProducer

func NewSaramaAsyncProducer(client sarama.Client, mapper *SaramaMapper) (*SaramaAsyncProducer, error)

func (*SaramaAsyncProducer) Close

func (p *SaramaAsyncProducer) Close() error

func (*SaramaAsyncProducer) Errors

func (p *SaramaAsyncProducer) Errors() <-chan *core.ProducerError

func (*SaramaAsyncProducer) Send

func (p *SaramaAsyncProducer) Send(m *core.Message)

func (*SaramaAsyncProducer) Successes

func (p *SaramaAsyncProducer) Successes() <-chan *core.Message

type SaramaConsumer

type SaramaConsumer struct {
	// contains filtered or unexported fields
}

func NewSaramaConsumer

func NewSaramaConsumer(
	mapper *SaramaMapper,
	clientProps *properties.Client,
	topicConsumer *properties.TopicConsumer,
	handler core.ConsumerHandler,
) (*SaramaConsumer, error)

func (*SaramaConsumer) Start

func (c *SaramaConsumer) Start(ctx context.Context)

func (*SaramaConsumer) Stop

func (c *SaramaConsumer) Stop()

func (*SaramaConsumer) WaitForReady

func (c *SaramaConsumer) WaitForReady() chan bool

type SaramaConsumers

type SaramaConsumers struct {
	// contains filtered or unexported fields
}

func NewSaramaConsumers

func NewSaramaConsumers(
	clientProps *properties.Client,
	consumerProps *properties.KafkaConsumer,
	mapper *SaramaMapper,
	handlers []core.ConsumerHandler,
) (*SaramaConsumers, error)

func (*SaramaConsumers) Start

func (s *SaramaConsumers) Start(ctx context.Context)

func (*SaramaConsumers) Stop

func (s *SaramaConsumers) Stop()

func (SaramaConsumers) WaitForReady

func (s SaramaConsumers) WaitForReady() chan bool

type SaramaMapper

type SaramaMapper struct {
}

func NewSaramaMapper

func NewSaramaMapper() *SaramaMapper

func (SaramaMapper) PtrToCoreHeaders

func (p SaramaMapper) PtrToCoreHeaders(headers []*sarama.RecordHeader) []core.MessageHeader

func (SaramaMapper) ToCoreConsumerMessage

func (p SaramaMapper) ToCoreConsumerMessage(msg *sarama.ConsumerMessage) *core.ConsumerMessage

func (SaramaMapper) ToCoreHeaders

func (p SaramaMapper) ToCoreHeaders(headers []sarama.RecordHeader) []core.MessageHeader

func (SaramaMapper) ToCoreMessage

func (p SaramaMapper) ToCoreMessage(msg *sarama.ProducerMessage) *core.Message

func (SaramaMapper) ToSaramaHeaders

func (p SaramaMapper) ToSaramaHeaders(headers []core.MessageHeader) []sarama.RecordHeader

type SaramaSyncProducer

type SaramaSyncProducer struct {
	// contains filtered or unexported fields
}

func NewSaramaSyncProducer

func NewSaramaSyncProducer(client sarama.Client, mapper *SaramaMapper) (*SaramaSyncProducer, error)

func (*SaramaSyncProducer) Close

func (s *SaramaSyncProducer) Close() error

func (SaramaSyncProducer) Send

func (s SaramaSyncProducer) Send(m *core.Message) (partition int32, offset int64, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL