client

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2020 License: AGPL-3.0 Imports: 13 Imported by: 1

Documentation

Overview

Package client implements a kafka consumer that works with single or multi-part messages for OpenNMS Sink API messages.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaClient

type KafkaClient struct {
	Bootstrap   string      // The Kafka Server Bootstrap string.
	Topic       string      // The name of the Kafka Topic.
	GroupID     string      // The name of the Consumer Group ID.
	Parameters  Propertites // List of Kafka Consumer Parameters.
	IPC         string      // either 'rpc' or 'sink'.
	IsTelemetry bool        // true to treat payload as telemetry data (only when IPC='sink')
	// contains filtered or unexported fields
}

KafkaClient defines a simple Kafka consumer client.

func (*KafkaClient) Initialize

func (cli *KafkaClient) Initialize() error

Initialize builds the Kafka consumer object and the cache for chunk handling.

func (*KafkaClient) Start

func (cli *KafkaClient) Start(action ProcessSinkMessage)

Start registers the consumer for the chosen topic, and reads messages from it on an infinite loop. It is recommended to use it within a Go Routine as it is a blocking operation.

func (*KafkaClient) Stop

func (cli *KafkaClient) Stop()

Stop terminates the Kafka consumer and waits for the execution of all pending action handlers.

type KafkaConsumer

type KafkaConsumer interface {
	Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error
	Poll(timeoutMs int) (event kafka.Event)
	CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
	Close() (err error)
}

KafkaConsumer creates an generic interface with the relevant methods from kafka.Consumer

type ProcessSinkMessage

type ProcessSinkMessage func(msg []byte)

ProcessSinkMessage defines the action to execute after successfully received a Sink message. It receives the payload as an array of bytes, and a wait group for synchronization purposes.

type Propertites

type Propertites []string

Propertites represents an array of string flags

func (*Propertites) Set

func (p *Propertites) Set(value string) error

Set stores a string flag in the array

func (*Propertites) String

func (p *Propertites) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL