Documentation ¶
Index ¶
- func NewConsumerGroupHandler(queue *chan KafkaMessage) sarama.ConsumerGroupHandler
- type Client
- func (c *Client) ChangeSubscribedTopics(newTopicRegex *regexp.Regexp)
- func (c *Client) Close() error
- func (c *Client) EnqueueMessage(msg KafkaMessage) (err error)
- func (c *Client) GSub(ctx context.Context, cncl context.CancelFunc)
- func (c *Client) GetMessages() <-chan KafkaMessage
- func (c *Client) Ready() bool
- type ConsumerGroupHandler
- type KafkaMessage
- type NewClientOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewConsumerGroupHandler ¶
func NewConsumerGroupHandler(queue *chan KafkaMessage) sarama.ConsumerGroupHandler
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewKafkaClient ¶
func NewKafkaClient(opts NewClientOptions) (client *Client, err error)
func (*Client) ChangeSubscribedTopics ¶
func (*Client) EnqueueMessage ¶
func (c *Client) EnqueueMessage(msg KafkaMessage) (err error)
func (*Client) GetMessages ¶
func (c *Client) GetMessages() <-chan KafkaMessage
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (ConsumerGroupHandler) Cleanup ¶
func (c ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error
func (ConsumerGroupHandler) ConsumeClaim ¶
func (c ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (ConsumerGroupHandler) Setup ¶
func (c ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error
type KafkaMessage ¶
func ToKafkaMessage ¶
func ToKafkaMessage(topic string, message interface{}) (KafkaMessage, error)
Click to show internal directories.
Click to hide internal directories.