Documentation ¶
Index ¶
- func CreateCommonSaramaConfig(version string, props CommonProperties) (*sarama.Config, error)
- func NewSaramaAdmin(props *properties.Client) core.Admin
- func NewSaramaConsumerClient(globalProps *properties.Client) (sarama.Client, error)
- func NewSaramaProducerClient(globalProps *properties.Client) (sarama.Client, error)
- type CommonProperties
- type ConsumerGroupHandler
- func (cg *ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error
- func (cg *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (cg *ConsumerGroupHandler) MarkUnready()
- func (cg *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error
- func (cg *ConsumerGroupHandler) WaitForReady() chan bool
- type DebugLogger
- type SaramaAdmin
- type SaramaAsyncProducer
- type SaramaConsumer
- type SaramaConsumers
- type SaramaMapper
- func (p SaramaMapper) PtrToCoreHeaders(headers []*sarama.RecordHeader) []core.MessageHeader
- func (p SaramaMapper) ToCoreConsumerMessage(msg *sarama.ConsumerMessage) *core.ConsumerMessage
- func (p SaramaMapper) ToCoreHeaders(headers []sarama.RecordHeader) []core.MessageHeader
- func (p SaramaMapper) ToCoreMessage(msg *sarama.ProducerMessage) *core.Message
- func (p SaramaMapper) ToSaramaHeaders(headers []core.MessageHeader) []sarama.RecordHeader
- type SaramaSyncProducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateCommonSaramaConfig ¶
func CreateCommonSaramaConfig(version string, props CommonProperties) (*sarama.Config, error)
func NewSaramaAdmin ¶
func NewSaramaAdmin(props *properties.Client) core.Admin
func NewSaramaConsumerClient ¶
func NewSaramaConsumerClient(globalProps *properties.Client) (sarama.Client, error)
func NewSaramaProducerClient ¶
func NewSaramaProducerClient(globalProps *properties.Client) (sarama.Client, error)
Types ¶
type CommonProperties ¶
type CommonProperties interface { GetClientId() string GetSecurityProtocol() string GetTls() *properties.Tls }
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func NewConsumerGroupHandler ¶
func NewConsumerGroupHandler(client sarama.Client, handler core.ConsumerHandler, mapper *SaramaMapper) *ConsumerGroupHandler
func (*ConsumerGroupHandler) Cleanup ¶
func (cg *ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error
func (*ConsumerGroupHandler) ConsumeClaim ¶
func (cg *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerGroupHandler) MarkUnready ¶
func (cg *ConsumerGroupHandler) MarkUnready()
func (*ConsumerGroupHandler) Setup ¶
func (cg *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error
func (*ConsumerGroupHandler) WaitForReady ¶
func (cg *ConsumerGroupHandler) WaitForReady() chan bool
type DebugLogger ¶
type DebugLogger struct { }
func NewDebugLogger ¶
func NewDebugLogger() *DebugLogger
func (DebugLogger) Print ¶
func (l DebugLogger) Print(v ...interface{})
func (DebugLogger) Printf ¶
func (l DebugLogger) Printf(format string, v ...interface{})
func (DebugLogger) Println ¶
func (l DebugLogger) Println(v ...interface{})
type SaramaAdmin ¶
type SaramaAdmin struct {
// contains filtered or unexported fields
}
func (SaramaAdmin) CreateTopics ¶
func (s SaramaAdmin) CreateTopics(configurations []core.TopicConfiguration) error
func (SaramaAdmin) DeleteGroups ¶
func (s SaramaAdmin) DeleteGroups(groupIds []string) error
func (SaramaAdmin) DeleteTopics ¶
func (s SaramaAdmin) DeleteTopics(topics []string) error
type SaramaAsyncProducer ¶
type SaramaAsyncProducer struct {
// contains filtered or unexported fields
}
func NewSaramaAsyncProducer ¶
func NewSaramaAsyncProducer(client sarama.Client, mapper *SaramaMapper) (*SaramaAsyncProducer, error)
func (*SaramaAsyncProducer) Close ¶
func (p *SaramaAsyncProducer) Close() error
func (*SaramaAsyncProducer) Errors ¶
func (p *SaramaAsyncProducer) Errors() <-chan *core.ProducerError
func (*SaramaAsyncProducer) Send ¶
func (p *SaramaAsyncProducer) Send(m *core.Message)
func (*SaramaAsyncProducer) Successes ¶
func (p *SaramaAsyncProducer) Successes() <-chan *core.Message
type SaramaConsumer ¶
type SaramaConsumer struct {
// contains filtered or unexported fields
}
func NewSaramaConsumer ¶
func NewSaramaConsumer( mapper *SaramaMapper, clientProps *properties.Client, topicConsumer *properties.TopicConsumer, handler core.ConsumerHandler, ) (*SaramaConsumer, error)
func (*SaramaConsumer) Start ¶
func (c *SaramaConsumer) Start(ctx context.Context)
func (*SaramaConsumer) Stop ¶
func (c *SaramaConsumer) Stop()
func (*SaramaConsumer) WaitForReady ¶
func (c *SaramaConsumer) WaitForReady() chan bool
type SaramaConsumers ¶
type SaramaConsumers struct {
// contains filtered or unexported fields
}
func NewSaramaConsumers ¶
func NewSaramaConsumers( clientProps *properties.Client, consumerProps *properties.KafkaConsumer, mapper *SaramaMapper, handlers []core.ConsumerHandler, ) (*SaramaConsumers, error)
func (*SaramaConsumers) Start ¶
func (s *SaramaConsumers) Start(ctx context.Context)
func (*SaramaConsumers) Stop ¶
func (s *SaramaConsumers) Stop()
func (SaramaConsumers) WaitForReady ¶
func (s SaramaConsumers) WaitForReady() chan bool
type SaramaMapper ¶
type SaramaMapper struct { }
func NewSaramaMapper ¶
func NewSaramaMapper() *SaramaMapper
func (SaramaMapper) PtrToCoreHeaders ¶
func (p SaramaMapper) PtrToCoreHeaders(headers []*sarama.RecordHeader) []core.MessageHeader
func (SaramaMapper) ToCoreConsumerMessage ¶
func (p SaramaMapper) ToCoreConsumerMessage(msg *sarama.ConsumerMessage) *core.ConsumerMessage
func (SaramaMapper) ToCoreHeaders ¶
func (p SaramaMapper) ToCoreHeaders(headers []sarama.RecordHeader) []core.MessageHeader
func (SaramaMapper) ToCoreMessage ¶
func (p SaramaMapper) ToCoreMessage(msg *sarama.ProducerMessage) *core.Message
func (SaramaMapper) ToSaramaHeaders ¶
func (p SaramaMapper) ToSaramaHeaders(headers []core.MessageHeader) []sarama.RecordHeader
type SaramaSyncProducer ¶
type SaramaSyncProducer struct {
// contains filtered or unexported fields
}
func NewSaramaSyncProducer ¶
func NewSaramaSyncProducer(client sarama.Client, mapper *SaramaMapper) (*SaramaSyncProducer, error)
func (*SaramaSyncProducer) Close ¶
func (s *SaramaSyncProducer) Close() error
Click to show internal directories.
Click to hide internal directories.