Documentation
¶
Overview ¶
Package client implements a kafka consumer that works with single or multi-part messages for OpenNMS Sink API messages.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaClient ¶
type KafkaClient struct { Bootstrap string // The Kafka Server Bootstrap string. Topic string // The name of the Kafka Topic. GroupID string // The name of the Consumer Group ID. Parameters Propertites // List of Kafka Consumer Parameters. IPC string // either 'rpc' or 'sink'. IsTelemetry bool // true to treat payload as telemetry data (only when IPC='sink') // contains filtered or unexported fields }
KafkaClient defines a simple Kafka consumer client.
func (*KafkaClient) Initialize ¶
func (cli *KafkaClient) Initialize() error
Initialize builds the Kafka consumer object and the cache for chunk handling.
func (*KafkaClient) Start ¶
func (cli *KafkaClient) Start(action ProcessSinkMessage)
Start registers the consumer for the chosen topic, and reads messages from it on an infinite loop. It is recommended to use it within a Go Routine as it is a blocking operation.
func (*KafkaClient) Stop ¶
func (cli *KafkaClient) Stop()
Stop terminates the Kafka consumer and waits for the execution of all pending action handlers.
type KafkaConsumer ¶
type KafkaConsumer interface { Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error Poll(timeoutMs int) (event kafka.Event) CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error) Close() (err error) }
KafkaConsumer creates an generic interface with the relevant methods from kafka.Consumer
type ProcessSinkMessage ¶
type ProcessSinkMessage func(msg []byte)
ProcessSinkMessage defines the action to execute after successfully received a Sink message. It receives the payload as an array of bytes, and a wait group for synchronization purposes.
type Propertites ¶
type Propertites []string
Propertites represents an array of string flags
func (*Propertites) Set ¶
func (p *Propertites) Set(value string) error
Set stores a string flag in the array
func (*Propertites) String ¶
func (p *Propertites) String() string