kafka

package
v1.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2023 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package kafka is the most complex backend as it has several different operating modes. Due to this, it should not be used as a "template" for other backends - most other backends are significantly simpler and follow an expected flow.

Index

Constants

View Source
const (
	BackendName = "kafka"

	DefaultConsumerGroupName = "plumber-server"

	DefaultBatchSize = 1
)
View Source
const (
	DefaultLagInterval = 5 * time.Second
)
View Source
const (
	RetryReadInterval = 5 * time.Second
)
View Source
const (
	RetrySampledReadInterval = 10 * time.Second
)

Variables

View Source
var (
	ErrMissingTopic = errors.New("You must specify at least one topic")
)

Functions

func ConnectAllTopics added in v1.0.0

func ConnectAllTopics(dialer *skafka.Dialer, connArgs *args.KafkaConn, topics []string) (map[string]*skafka.Conn, error)

func NewReaderForRead added in v1.0.0

func NewReaderForRead(dialer *skafka.Dialer, connArgs *args.KafkaConn, readArgs *args.KafkaReadArgs) (*skafka.Reader, error)

func NewReaderForRelay added in v1.0.0

func NewReaderForRelay(dialer *skafka.Dialer, connArgs *args.KafkaConn, relayArgs *args.KafkaRelayArgs) (*skafka.Reader, error)

func NewWriter added in v0.1.0

func NewWriter(dialer *skafka.Dialer, connArgs *args.KafkaConn, topics ...string) (*skafka.Writer, error)

NewWriter creates a new instance of a writer that can write messages to a topic. NOTE: Continuing to use the deprecated NewWriter() func to avoid dealing with TLS issues (since *Writer does not have a Dialer and Transport has TLS defined separate from the dialer).

Types

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka holds all attributes required for performing a write to Kafka. This struct should be instantiated via the kafka.Read(..) func.

func New added in v1.0.0

func New(connOpts *opts.ConnectionOptions) (*Kafka, error)

func (*Kafka) Close added in v1.0.0

func (k *Kafka) Close(_ context.Context) error

Close is a noop for kafka because read/write/lag/etc. all handle conn setup and teardown on their own.

func (*Kafka) DisplayError added in v1.0.0

func (k *Kafka) DisplayError(msg *records.ErrorRecord) error

func (*Kafka) DisplayMessage added in v1.0.0

func (k *Kafka) DisplayMessage(cliOpts *opts.CLIOptions, msg *records.ReadRecord) error

func (*Kafka) Name added in v1.0.0

func (k *Kafka) Name() string

func (*Kafka) NewLag added in v1.0.0

func (k *Kafka) NewLag(readArgs *args.KafkaReadArgs) (*Lag, error)

NewLag creates a lag instance but does not perform any lag operations

func (*Kafka) Read added in v0.1.0

func (k *Kafka) Read(
	ctx context.Context,
	readOpts *opts.ReadOptions,
	resultsChan chan *records.ReadRecord,
	errorChan chan *records.ErrorRecord,
) error

func (*Kafka) Relay added in v1.0.0

func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh chan interface{}, errorCh chan<- *records.ErrorRecord) error

Relay sets up a new Kafka relayer

func (*Kafka) Test added in v1.0.0

func (k *Kafka) Test(_ context.Context) error

func (*Kafka) Tunnel added in v1.4.0

func (k *Kafka) Tunnel(ctx context.Context, opts *opts.TunnelOptions, tunnelSvc tunnel.ITunnel, errorCh chan<- *records.ErrorRecord) error

Tunnels starts up a new GRPC client connected to the dProxy service and receives a stream of outbound replay messages which are then written to the message bus.

func (*Kafka) Write added in v0.1.0

func (k *Kafka) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error

Write is the entry point function for performing write operations in Kafka.

This is where we verify that the passed args and flags combo makes sense, attempt to establish a connection, parse protobuf before finally attempting to perform the write.

type Lag added in v0.31.0

type Lag struct {
	// contains filtered or unexported fields
}

func (*Lag) GetAllPartitionsLastOffset added in v1.0.0

func (l *Lag) GetAllPartitionsLastOffset(topic string, partitions []skafka.Partition) (int64, error)

GetAllPartitionsLastOffset finds the MAX lastOffset across given partitions and topic.

func (*Lag) GetPartitionLastOffset added in v1.0.0

func (l *Lag) GetPartitionLastOffset(topic string, part int) (int64, error)

GetPartitionLastOffset - gets the last offset for a given partition. It is used by both Read() and Lag() to display offset stats and/or calculate lag.

func (*Lag) Lag added in v1.0.0

func (l *Lag) Lag(
	ctx context.Context,
	resultsCh chan *records.ReadRecord,
	errorCh chan<- *records.ErrorRecord,
	interval time.Duration,
) error

Lag fetches topic stats on the given interval and returns them over the resultsChan. This is a blocking call.

type Reader

type Reader struct {
	Conn   *skafka.Conn
	Reader *skafka.Reader
}

type Writer

type Writer struct {
	Writer *skafka.Writer
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL