Documentation ¶
Index ¶
- func CreateConsumerSession(k *KP, wg *sync.WaitGroup, ctx context.Context) error
- func MarshalStringMessage(message string, retries int) string
- func UnmarshalStringMessage(message string) (string, int, error)
- type Config
- type ConsumerStruct
- func (consumer *ConsumerStruct) Cleanup(sarama.ConsumerGroupSession) error
- func (consumer *ConsumerStruct) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (Consumer *ConsumerStruct) GetReady() chan bool
- func (consumer *ConsumerStruct) Process(ctx context.Context, message *sarama.ConsumerMessage) error
- func (consumer *ConsumerStruct) SetReady(ready chan bool)
- func (consumer *ConsumerStruct) Setup(sarama.ConsumerGroupSession) error
- type KP
- type KPConsumer
- type KPProducer
- type KafkaConfig
- type KafkaProcessor
- type Producer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateConsumerSession ¶
func MarshalStringMessage ¶
Types ¶
type Config ¶
type Config struct {
KafkaConfig KafkaConfig `json:"kafka"`
}
type ConsumerStruct ¶
type ConsumerStruct struct { Processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error // contains filtered or unexported fields }
ConsumerStruct represents a Sarama consumer group consumer
func (*ConsumerStruct) Cleanup ¶
func (consumer *ConsumerStruct) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerStruct) ConsumeClaim ¶
func (consumer *ConsumerStruct) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerStruct) GetReady ¶
func (Consumer *ConsumerStruct) GetReady() chan bool
func (*ConsumerStruct) Process ¶
func (consumer *ConsumerStruct) Process(ctx context.Context, message *sarama.ConsumerMessage) error
func (*ConsumerStruct) SetReady ¶
func (consumer *ConsumerStruct) SetReady(ready chan bool)
func (*ConsumerStruct) Setup ¶
func (consumer *ConsumerStruct) Setup(sarama.ConsumerGroupSession) error
type KP ¶
type KP struct {
// contains filtered or unexported fields
}
type KPConsumer ¶
type KPConsumer interface { Setup(sarama.ConsumerGroupSession) error Cleanup(sarama.ConsumerGroupSession) error ConsumeClaim(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error GetReady() chan bool SetReady(chan bool) Process(ctx context.Context, message *sarama.ConsumerMessage) error }
func NewConsumer ¶
func NewConsumer(topic string, retryTopic string, deadLetterTopic string, retries int, processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error, onFailure *func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error, producer KPProducer, backoffPolicyTime time.Duration) KPConsumer
type KPProducer ¶
type KPProducer interface { GetProducer(kafkaConfig KafkaConfig) *Producer ProduceMessage(ctx context.Context, topic string, key string, message string) error }
func NewProducer ¶
func NewProducer(kafkaConfig KafkaConfig) KPProducer
type KafkaConfig ¶
type KafkaConfig struct {
KafkaBootstrapServers []string `json:"kafka_bootstrap"`
}
type KafkaProcessor ¶
type KafkaProcessor interface { Process(processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error) Start(ctx context.Context) error Stop() OnFailure(failure func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error) }
func NewKafkaProcessor ¶
func NewKafkaProcessor(topic string, retryTopic string, deadLetterTopic string, retries int, consumerGroup string, kafkaConfig KafkaConfig, backoffDuration time.Duration) KafkaProcessor
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func GetProducer ¶
func GetProducer(kafkaConfig KafkaConfig) *Producer
func (*Producer) GetProducer ¶
func (p *Producer) GetProducer(kafkaConfig KafkaConfig) *Producer
Source Files ¶
Click to show internal directories.
Click to hide internal directories.