Documentation ¶
Index ¶
- Variables
- func AddXOriginIfMissing(message *[]sarama.RecordHeader) error
- func AddXTrace(message *[]sarama.RecordHeader, value string) error
- func GetKafkaStats() (sent uint64, received uint64, sentBytesA uint64, recvBytesA uint64)
- func MessageToConsumerMessage(message *Message) *sarama.ConsumerMessage
- func MessageToProducerMessage(message *Message) *sarama.ProducerMessage
- func NewConsumerGroupHandler(queue chan Message, automark bool) (sarama.ConsumerGroupHandler, error)
- func ProducerMessageToConsumerMessage(message *sarama.ProducerMessage) (*sarama.ConsumerMessage, error)
- type Client
- func (c *Client) AbortTxn() error
- func (c *Client) AddMessageToTxn(msg *Message) error
- func (c *Client) BeginTxn() error
- func (c *Client) ChangeSubscribedTopics(newTopicRegex *regexp.Regexp)
- func (c *Client) Close() error
- func (c *Client) Closed() bool
- func (c *Client) CommitTxn() error
- func (c *Client) EnqueueMessage(msg Message) (err error)
- func (c *Client) GetConsumerErrorsChannel() <-chan error
- func (c *Client) GetMessages() <-chan Message
- func (c *Client) GetProducerErrorsChannel() <-chan *sarama.ProducerError
- func (c *Client) GetProducerSuccessesChannel() <-chan *sarama.ProducerMessage
- func (c *Client) GetQueueLength() int
- func (c *Client) MarkMessage(msg *Message) error
- func (c *Client) Ready() bool
- func (c *Client) TopicCreator(topic string) (err error)
- type ConsumerGroupHandler
- func (c *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error
- func (c *ConsumerGroupHandler) Commit() error
- func (c *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *ConsumerGroupHandler) MarkMessage(msg *sarama.ConsumerMessage, metadata string) error
- func (c *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error
- type Message
- type NewClientOptions
- type SenderTag
- type TraceValue
Constants ¶
This section is empty.
Variables ¶
var MicroserviceName, _ = env.GetAsString("MICROSERVICE_NAME", false, "")
var SerialNumber, _ = env.GetAsString("SERIAL_NUMBER", false, "")
Functions ¶
func AddXOriginIfMissing ¶ added in v1.4.0
func AddXOriginIfMissing(message *[]sarama.RecordHeader) error
func GetKafkaStats ¶ added in v1.2.3
GetKafkaStats returns the number of sent and received messages and approximate bytes.
func MessageToConsumerMessage ¶ added in v1.6.0
func MessageToConsumerMessage(message *Message) *sarama.ConsumerMessage
func MessageToProducerMessage ¶ added in v1.6.0
func MessageToProducerMessage(message *Message) *sarama.ProducerMessage
func NewConsumerGroupHandler ¶
func NewConsumerGroupHandler(queue chan Message, automark bool) (sarama.ConsumerGroupHandler, error)
func ProducerMessageToConsumerMessage ¶ added in v1.6.0
func ProducerMessageToConsumerMessage(message *sarama.ProducerMessage) (*sarama.ConsumerMessage, error)
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewKafkaClient ¶
func NewKafkaClient(opts *NewClientOptions) (client *Client, err error)
func (*Client) AddMessageToTxn ¶ added in v1.6.0
func (*Client) ChangeSubscribedTopics ¶
func (*Client) EnqueueMessage ¶
func (*Client) GetConsumerErrorsChannel ¶ added in v1.6.0
Consumer
func (*Client) GetMessages ¶
func (*Client) GetProducerErrorsChannel ¶ added in v1.6.0
func (c *Client) GetProducerErrorsChannel() <-chan *sarama.ProducerError
func (*Client) GetProducerSuccessesChannel ¶ added in v1.6.0
func (c *Client) GetProducerSuccessesChannel() <-chan *sarama.ProducerMessage
func (*Client) GetQueueLength ¶ added in v1.8.0
func (*Client) MarkMessage ¶ added in v1.11.0
func (*Client) TopicCreator ¶ added in v1.3.1
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (*ConsumerGroupHandler) Cleanup ¶
func (c *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error
func (*ConsumerGroupHandler) Commit ¶ added in v1.11.0
func (c *ConsumerGroupHandler) Commit() error
Commit commits all marked messages to the broker. This is a blocking operation.
func (*ConsumerGroupHandler) ConsumeClaim ¶
func (c *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerGroupHandler) MarkMessage ¶ added in v1.11.0
func (c *ConsumerGroupHandler) MarkMessage(msg *sarama.ConsumerMessage, metadata string) error
MarkMessage marks a message as consumed.
func (*ConsumerGroupHandler) Setup ¶
func (c *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error
type Message ¶ added in v1.2.0
type Message struct { Topic string Value []byte Header map[string][]byte Key []byte Offset int64 Partition int32 }
func ProducerMessageToMessage ¶ added in v1.6.0
func ProducerMessageToMessage(prodMsg *sarama.ProducerMessage) (*Message, error)
func ToKafkaMessage ¶
type NewClientOptions ¶
type NewClientOptions struct { ListenTopicRegex *regexp.Regexp SenderTag SenderTag ConsumerGroupId string TransactionalID string ClientID string Brokers []string StartOffset int64 OpenDeadLine time.Duration Partitions int32 ReplicationFactor int16 EnableTLS bool AutoCommit bool ProducerReturnSuccesses bool AutoMark bool }
NewClientOptions are the options for creating a new kafka client. ListenTopicRegex is the regex to match topics to listen to. ConsumerName is the name of the consumer group (group.id). Brokers is the list of brokers to connect to. StartOffset is the offset to start consuming from. Partitions is the number of partitions to create for new topics. ReplicationFactor is the replication factor to use for new topics. EnableTLS enables TLS for the connection. SenderTag controls the sender tagging feature. ClientID is the client ID to use for the connection, only relevant for debugging. AutoCommit enables auto-commit (Default: true) [Note: AutoCommit only commits Marked Messages. You shouldn't need to set this to false] OpenDeadLine is the deadline, until connection to the brokers must be established. ProducerReturnSuccesses enables the success output channel of back to the user. TransactionalID is required for transaction of producers, e.g.,BeginTxn(). AutoMark enabled automatic marking of messages as consumed. [Set this to false if you want to manually mark messages as consumed]
type SenderTag ¶ added in v1.4.0
type SenderTag struct { OverwriteSerialNumber *string OverwriteMicroserviceName *string Enabled bool }
SenderTag controls the sender tagging feature. Enabled enables the feature. OverwriteSerialNumber overwrites the serial number with the given value, else the SERIAL_NUMBER env variable is used. OverwriteMicroserviceName overwrites the microservice name with the given value, else the MICROSERVICE_NAME env variable is used.
type TraceValue ¶ added in v1.4.0
func GetTrace ¶ added in v1.4.0
func GetTrace(message *[]sarama.RecordHeader, key string) *TraceValue