Documentation
¶
Index ¶
- func CreateKafkaAsyncProducer(ac *config.AppConfig) (sarama.AsyncProducer, error)
- func CreateKafkaConsumer(ac *config.AppConfig) (sarama.ConsumerGroup, error)
- func CreateKafkaProducer(ac *config.AppConfig) (sarama.SyncProducer, error)
- type KafkaClient
- func (kc *KafkaClient) Close()
- func (kc *KafkaClient) ConsumeMessages(ctx context.Context, topics []string, handler *MessageHandler)
- func (kc *KafkaClient) PostAsyncMessage(c *gin.Context)
- func (kc *KafkaClient) PostMessage(c *gin.Context)
- func (kc *KafkaClient) SendAsyncMessage(topic, key string, message []byte) error
- func (kc *KafkaClient) SendMessage(topic, key string, message []byte) error
- type Message
- type MessageBuffer
- type MessageHandler
- type MessageMetadata
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateKafkaAsyncProducer ¶
func CreateKafkaAsyncProducer(ac *config.AppConfig) (sarama.AsyncProducer, error)
CreateKafkaAsyncProducer creates a new Sarama AsyncProducer
func CreateKafkaConsumer ¶
func CreateKafkaConsumer(ac *config.AppConfig) (sarama.ConsumerGroup, error)
CreateKafkaConsumer creates a new Sarama ConsumerGroup
func CreateKafkaProducer ¶
func CreateKafkaProducer(ac *config.AppConfig) (sarama.SyncProducer, error)
CreateKafkaProducer creates a new Sarama SyncProducer
Types ¶
type KafkaClient ¶
type KafkaClient struct { Producer sarama.SyncProducer AsyncProducer sarama.AsyncProducer Consumer sarama.ConsumerGroup }
KafkaClient is a wrapper around a Sarama SyncProducer, AsyncProducer and ConsumerGroup
func NewKafkaClient ¶
func NewKafkaClient(ac *config.AppConfig) (*KafkaClient, error)
NewKafkaClient creates a new KafkaClient
func (*KafkaClient) ConsumeMessages ¶
func (kc *KafkaClient) ConsumeMessages(ctx context.Context, topics []string, handler *MessageHandler)
ConsumeMessages consumes messages from Kafka
func (*KafkaClient) PostAsyncMessage ¶
func (kc *KafkaClient) PostAsyncMessage(c *gin.Context)
PostAsyncMessage is a handler for POST /async-messages/:topic
func (*KafkaClient) PostMessage ¶
func (kc *KafkaClient) PostMessage(c *gin.Context)
PostMessage is a handler for POST /messages/:topic
func (*KafkaClient) SendAsyncMessage ¶
func (kc *KafkaClient) SendAsyncMessage(topic, key string, message []byte) error
SendAsyncMessage sends a message to Kafka asynchronously
func (*KafkaClient) SendMessage ¶
func (kc *KafkaClient) SendMessage(topic, key string, message []byte) error
SendMessage sends a message to Kafka
type Message ¶
type Message struct { Metadata MessageMetadata `json:"metadata"` Value string `json:"value"` Partition int32 `json:"partition"` Offset int64 `json:"offset"` }
Message represents a Kafka message
type MessageBuffer ¶
type MessageBuffer struct { MaxSize int // contains filtered or unexported fields }
MessageBuffer is a buffer of Kafka messages Concurrent access to the buffer is protected via a RWMutex
func (*MessageBuffer) GetMessages ¶
func (mb *MessageBuffer) GetMessages(c *gin.Context)
GetMessages returns the messages in the buffer returning it in JSON format
func (*MessageBuffer) SaveMessage ¶
func (mb *MessageBuffer) SaveMessage(msg Message)
SaveMessage saves a message to the buffer
type MessageHandler ¶
type MessageHandler struct { Ready chan bool // contains filtered or unexported fields }
MessageHandler is a Sarama consumer group handler
func NewMessageHandler ¶
func NewMessageHandler(buffer *MessageBuffer) *MessageHandler
NewMessageHandler creates a new MessageHandler
func (*MessageHandler) Cleanup ¶
func (c *MessageHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*MessageHandler) ConsumeClaim ¶
func (c *MessageHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*MessageHandler) Setup ¶
func (c *MessageHandler) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type MessageMetadata ¶
MessageMetadata represents metadata about a Kafka message