Documentation ¶
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close()
- func (c *Client) Consume(messageHandler MessageHandler, topicsWithoutPrefix ...string) error
- func (c *Client) Enable(enabled bool)
- func (c *Client) NewEvent(action string, message string) *Event
- func (c *Client) PublishEvent(event *Event, topic string) (int32, int64, error)
- func (c *Client) PublishMessage(message []byte, topic string) (int32, int64, error)
- func (c *Client) Usage()
- func (c *Client) Verbose(enabled bool)
- type ClientConfig
- type Event
- type MessageHandler
- type SaramaConsumer
- type XDGSCRAMClient
Constants ¶
const EventVersion = "1.0"
EventVersion is used in Kafka Record header
Variables ¶
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { // Public Config *ClientConfig // contains filtered or unexported fields }
func NewClient ¶
func NewClient() *Client
NewClient creates a new client with auto configuration based on envconfig and argv[0] as default clientId
func NewClientFromConfig ¶
func NewClientFromConfig(config *ClientConfig) *Client
NewClientFromConfig creates a new client (note that producers will be initialized on demand, so no errors are expected)
func NewClientWithId ¶
NewClientWithId creates a new client the given clientId and default config
func (*Client) Close ¶
func (c *Client) Close()
Close closes the client, if syncProducer is initialized it will also close it
func (*Client) Consume ¶
func (c *Client) Consume(messageHandler MessageHandler, topicsWithoutPrefix ...string) error
Consume is a blocking function that reads message from a topic todo add consumeOnce function or flag to prevent infinite loop
func (*Client) Enable ¶
Enable disables all communication (functions can be called but will only log) Default value is true
func (*Client) PublishEvent ¶
PublishEvent expects an Event struct which it will serialize as json before pushing it to the topic
func (*Client) PublishMessage ¶
PublishMessage expects a byte message which it will push to the topic this is the actual handlers to which other publish functions such as PublishEvent delegate See also https://github.com/Shopify/sarama/blob/master/tools/kafka-console-producer/kafka-console-producer.go
type ClientConfig ¶
type ClientConfig struct { Brokers string `required:"true" desc:"Comma separated List of brokers" split_words:"true"` SaslUsername string `required:"true" desc:"User for SASL Auth" split_words:"true"` SaslPassword string `required:"true" desc:"Password for SASL Auth" split_words:"true"` SaslMechanism string `default:"SCRAM-SHA-256" required:"true" desc:"SASL Mechanism" split_words:"true"` TlsEnabled bool `default:"true" desc:"TLS Encryption active" split_words:"true"` SaslEnabled bool `default:"true" desc:"Use SASL Authentication" split_words:"true"` TopicPrefix string `default:"" desc:"Optional prefix, prepended to topic name and empty by default" split_words:"true"` Enabled bool `default:"true" desc:"Communication Enabled" split_words:"true"` Verbose bool `default:"false" desc:"Verbose Logging" split_words:"true"` ClientId string `default:"" desc:"ClientId, will be also used as default source" split_words:"true"` OffsetMode string `default:"newest" desc:"Default offset for consumer, values: newest or oldest" split_words:"true"` KafkaVersion string `default:"2.6.0" desc:"Version of Kafka, important for initiating consumer group"` ConsumerTimeout time.Duration `desc:"Duration how long the consumer is looping (default: forever)"` }
ClientConfig derived from envConfig
func NewConfig ¶
func NewConfig() *ClientConfig
type MessageHandler ¶
type MessageHandler func(message *sarama.ConsumerMessage)
MessageHandler is our custom handler attached to the SaramaConsumer supposed to be passed in by the caller
type SaramaConsumer ¶
type SaramaConsumer struct {
// contains filtered or unexported fields
}
SaramaConsumer represents a SaramaConsumer consumer group consumer
func (*SaramaConsumer) Cleanup ¶
func (consumer *SaramaConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*SaramaConsumer) ConsumeClaim ¶
func (consumer *SaramaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*SaramaConsumer) Setup ¶
func (consumer *SaramaConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
Begin calls HashGeneratorFcn.NewClient which constructs a SCRAM client component based on a given hash.Hash factory receiver
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool