kafka

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2020 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNilChan error = fmt.Errorf("Source channel is nil")

ErrNilChan indicates that the FilePublisher has no source channel

Functions

func NewPublisher

func NewPublisher(opts *Opts, src <-chan *sse.Event) (publisher.Publisher, error)

NewPublisher returns a Publisher initialized with the source channel and kafka destination provided

Types

type ConnectionOpts

type ConnectionOpts struct {
	Brokers []string
	Topic   string
}

ConnectionOpts wrap the information needed to connect to kafka

type Opts

type Opts struct {
	Broker string
	Topic  string
}

Opts hold configuration for the kafka publisheru

type PrometheusCollector

type PrometheusCollector struct {
	Publisher *Publisher
}

PrometheusCollector reports stats from the kafka client to Prometheus

func (PrometheusCollector) Collect

func (k PrometheusCollector) Collect(ch chan<- prometheus.Metric)

Collect implements the Collector's Collect method

func (PrometheusCollector) Describe

func (k PrometheusCollector) Describe(ch chan<- *prometheus.Desc)

Describe implements the Collector's Describe method

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher reads Events and writes them to disk

func (*Publisher) GetResumeID

func (f *Publisher) GetResumeID() string

GetResumeID will try to get the latest message published to Kafka and extract a resume ID from it

func (*Publisher) ProcessEvent

func (f *Publisher) ProcessEvent(e *sse.Event) error

ProcessEvent writes a single event to a kafka

func (*Publisher) ReadAndPublish

func (f *Publisher) ReadAndPublish() (int64, error)

ReadAndPublish will read Events from the input channel and write them to the kafka topic configured for this Publisher.

Calling ReadAndPublish() will reset the processed message counter of the underlying Publisher and returns the value of the counter when the Publisher's source channel is closed

func (*Publisher) ValidateConnection

func (f *Publisher) ValidateConnection() error

ValidateConnection tests the connection to Kafka using the details given when creating the Publisher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL