Documentation ¶
Index ¶
- Constants
- func FromHeaders(sh []segmentio.Header) []kafka.Header
- func Mechanism(sasl kafka.SASLConfig) sasl.Mechanism
- func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer
- func NewTLSConfig(sasl kafka.SASLConfig) *tls.Config
- func ToHeaders(h []kafka.Header) []segmentio.Header
- type Consumer
- type MessageWrapper
- func (m *MessageWrapper) GetHeaders() map[string][]byte
- func (m *MessageWrapper) IncreaseRetryCount()
- func (m *MessageWrapper) IsExceedMaxRetryCount(maxRetry int) bool
- func (m *MessageWrapper) NewProduceTime()
- func (m *MessageWrapper) RouteMessageToTopic(topic string)
- func (m *MessageWrapper) To(increaseRetry bool) segmentio.Message
- type Producer
Constants ¶
View Source
const ( RetryHeaderKey = "x-retry-count" MessageProduceTimeHeaderKey = "x-produce-time" )
Variables ¶
This section is empty.
Functions ¶
func Mechanism ¶
func Mechanism(sasl kafka.SASLConfig) sasl.Mechanism
TODO: we can support `plain` authentication type link: https://github.com/segmentio/kafka-go#plain
func NewTLSConfig ¶
func NewTLSConfig(sasl kafka.SASLConfig) *tls.Config
Types ¶
type Consumer ¶
type Consumer interface { ReadMessage(ctx context.Context) (*MessageWrapper, error) Stop() }
type MessageWrapper ¶
func NewMessageWrapper ¶
func NewMessageWrapper(msg segmentio.Message) *MessageWrapper
func (*MessageWrapper) GetHeaders ¶
func (m *MessageWrapper) GetHeaders() map[string][]byte
func (*MessageWrapper) IncreaseRetryCount ¶
func (m *MessageWrapper) IncreaseRetryCount()
func (*MessageWrapper) IsExceedMaxRetryCount ¶
func (m *MessageWrapper) IsExceedMaxRetryCount(maxRetry int) bool
func (*MessageWrapper) NewProduceTime ¶
func (m *MessageWrapper) NewProduceTime()
func (*MessageWrapper) RouteMessageToTopic ¶
func (m *MessageWrapper) RouteMessageToTopic(topic string)
Click to show internal directories.
Click to hide internal directories.