Documentation ¶
Index ¶
- Variables
- func AddXOriginIfMissing(message *[]sarama.RecordHeader) error
- func AddXTrace(message *[]sarama.RecordHeader, value string) error
- func GetKafkaStats() (sent uint64, received uint64, sentBytesA uint64, recvBytesA uint64)
- func MessageToConsumerMessage(message *Message) *sarama.ConsumerMessage
- func MessageToProducerMessage(message *Message) *sarama.ProducerMessage
- func NewConsumerGroupHandler(queue *chan Message) sarama.ConsumerGroupHandler
- func ProducerMessageToConsumerMessage(message *sarama.ProducerMessage) (*sarama.ConsumerMessage, error)
- type Client
- func (c *Client) AbortTxn() error
- func (c *Client) AddMessageToTxn(msg *Message) error
- func (c *Client) BeginTxn() error
- func (c *Client) ChangeSubscribedTopics(newTopicRegex *regexp.Regexp)
- func (c *Client) Close() error
- func (c *Client) Closed() bool
- func (c *Client) CommitTxn() error
- func (c *Client) EnqueueMessage(msg Message) (err error)
- func (c *Client) GetConsumerErrorsChannel() <-chan error
- func (c *Client) GetMessages() <-chan Message
- func (c *Client) GetProducerErrorsChannel() <-chan *sarama.ProducerError
- func (c *Client) GetProducerSuccessesChannel() <-chan *sarama.ProducerMessage
- func (c *Client) GetQueueLength() int
- func (c *Client) Ready() bool
- func (c *Client) TopicCreator(topic string) (err error)
- type ConsumerGroupHandler
- type Message
- type NewClientOptions
- type SenderTag
- type TraceValue
Constants ¶
This section is empty.
Variables ¶
var MicroserviceName, _ = env.GetAsString("MICROSERVICE_NAME", false, "")
var SerialNumber, _ = env.GetAsString("SERIAL_NUMBER", false, "")
Functions ¶
func AddXOriginIfMissing ¶ added in v1.4.0
func AddXOriginIfMissing(message *[]sarama.RecordHeader) error
func GetKafkaStats ¶ added in v1.2.3
GetKafkaStats returns the number of sent and received messages and approximate bytes.
func MessageToConsumerMessage ¶ added in v1.6.0
func MessageToConsumerMessage(message *Message) *sarama.ConsumerMessage
func MessageToProducerMessage ¶ added in v1.6.0
func MessageToProducerMessage(message *Message) *sarama.ProducerMessage
func NewConsumerGroupHandler ¶
func NewConsumerGroupHandler(queue *chan Message) sarama.ConsumerGroupHandler
func ProducerMessageToConsumerMessage ¶ added in v1.6.0
func ProducerMessageToConsumerMessage(message *sarama.ProducerMessage) (*sarama.ConsumerMessage, error)
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewKafkaClient ¶
func NewKafkaClient(opts *NewClientOptions) (client *Client, err error)
func (*Client) AddMessageToTxn ¶ added in v1.6.0
func (*Client) ChangeSubscribedTopics ¶
func (*Client) EnqueueMessage ¶
func (*Client) GetConsumerErrorsChannel ¶ added in v1.6.0
Consumer
func (*Client) GetMessages ¶
func (*Client) GetProducerErrorsChannel ¶ added in v1.6.0
func (c *Client) GetProducerErrorsChannel() <-chan *sarama.ProducerError
func (*Client) GetProducerSuccessesChannel ¶ added in v1.6.0
func (c *Client) GetProducerSuccessesChannel() <-chan *sarama.ProducerMessage
func (*Client) GetQueueLength ¶ added in v1.8.0
func (*Client) TopicCreator ¶ added in v1.3.1
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (ConsumerGroupHandler) Cleanup ¶
func (c ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error
func (ConsumerGroupHandler) ConsumeClaim ¶
func (c ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (ConsumerGroupHandler) Setup ¶
func (c ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error
type Message ¶ added in v1.2.0
func ProducerMessageToMessage ¶ added in v1.6.0
func ProducerMessageToMessage(prodMsg *sarama.ProducerMessage) (*Message, error)
func ToKafkaMessage ¶
type NewClientOptions ¶
type NewClientOptions struct { ListenTopicRegex *regexp.Regexp SenderTag SenderTag ConsumerName string Brokers []string StartOffset int64 Partitions int32 ReplicationFactor int16 EnableTLS bool ClientID string AutoCommit bool OpenDeadLine time.Duration ProducerReturnSuccesses bool }
NewClientOptions are the options for creating a new kafka client. ListenTopicRegex is the regex to match topics to listen to. ConsumerName is the name of the consumer group (group.id). Brokers is the list of brokers to connect to. StartOffset is the offset to start consuming from. Partitions is the number of partitions to create for new topics. ReplicationFactor is the replication factor to use for new topics. EnableTLS enables TLS for the connection. SenderTag controls the sender tagging feature. ClientID is the client ID to use for the connection, only relevant for debugging. AutoCommit enables auto-commit (Default: true) OpenDeadLine is the deadline, until connection to the brokers must be established. ProducerReturnSuccesses enables the success output channel of back to the user.
type SenderTag ¶ added in v1.4.0
type SenderTag struct { OverwriteSerialNumber *string OverwriteMicroserviceName *string Enabled bool }
SenderTag controls the sender tagging feature. Enabled enables the feature. OverwriteSerialNumber overwrites the serial number with the given value, else the SERIAL_NUMBER env variable is used. OverwriteMicroserviceName overwrites the microservice name with the given value, else the MICROSERVICE_NAME env variable is used.
type TraceValue ¶ added in v1.4.0
func GetTrace ¶ added in v1.4.0
func GetTrace(message *[]sarama.RecordHeader, key string) *TraceValue