Documentation ¶
Overview ¶
Package kafka is the most complex backend as it has several different operating modes. Due to this, it should not be used as a "template" for other backends - most other backends are significantly simpler and follow an expected flow.
Index ¶
- Constants
- Variables
- func ConnectAllTopics(dialer *skafka.Dialer, connArgs *args.KafkaConn, topics []string) (map[string]*skafka.Conn, error)
- func NewReaderForRead(dialer *skafka.Dialer, connArgs *args.KafkaConn, readArgs *args.KafkaReadArgs) (*skafka.Reader, error)
- func NewReaderForRelay(dialer *skafka.Dialer, connArgs *args.KafkaConn, ...) (*skafka.Reader, error)
- func NewWriter(dialer *skafka.Dialer, connArgs *args.KafkaConn, topics ...string) (*skafka.Writer, error)
- type Kafka
- func (k *Kafka) Close(_ context.Context) error
- func (k *Kafka) DisplayError(msg *records.ErrorRecord) error
- func (k *Kafka) DisplayMessage(cliOpts *opts.CLIOptions, msg *records.ReadRecord) error
- func (k *Kafka) Dynamic(ctx context.Context, opts *opts.DynamicOptions, dynamicSvc dynamic.IDynamic) error
- func (k *Kafka) Name() string
- func (k *Kafka) NewLag(readArgs *args.KafkaReadArgs) (*Lag, error)
- func (k *Kafka) Read(ctx context.Context, readOpts *opts.ReadOptions, ...) error
- func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh chan interface{}, ...) error
- func (k *Kafka) Test(_ context.Context) error
- func (k *Kafka) Write(ctx context.Context, writeOpts *opts.WriteOptions, ...) error
- type Lag
- type Reader
- type Writer
Constants ¶
const ( BackendName = "kafka" DefaultConsumerGroupName = "plumber-server" DefaultBatchSize = 1 )
const (
DefaultLagInterval = 5 * time.Second
)
const (
RetryReadInterval = 5 * time.Second
)
const (
RetrySampledReadInterval = 10 * time.Second
)
Variables ¶
var (
ErrMissingTopic = errors.New("You must specify at least one topic")
)
Functions ¶
func ConnectAllTopics ¶ added in v1.0.0
func NewReaderForRead ¶ added in v1.0.0
func NewReaderForRelay ¶ added in v1.0.0
func NewWriter ¶ added in v0.1.0
func NewWriter(dialer *skafka.Dialer, connArgs *args.KafkaConn, topics ...string) (*skafka.Writer, error)
NewWriter creates a new instance of a writer that can write messages to a topic. NOTE: Continuing to use the deprecated NewWriter() func to avoid dealing with TLS issues (since *Writer does not have a Dialer and Transport has TLS defined separate from the dialer).
Types ¶
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka holds all attributes required for performing a write to Kafka. This struct should be instantiated via the kafka.Read(..) func.
func (*Kafka) Close ¶ added in v1.0.0
Close is a noop for kafka because read/write/lag/etc. all handle conn setup and teardown on their own.
func (*Kafka) DisplayError ¶ added in v1.0.0
func (k *Kafka) DisplayError(msg *records.ErrorRecord) error
func (*Kafka) DisplayMessage ¶ added in v1.0.0
func (k *Kafka) DisplayMessage(cliOpts *opts.CLIOptions, msg *records.ReadRecord) error
func (*Kafka) Dynamic ¶ added in v1.0.0
func (k *Kafka) Dynamic(ctx context.Context, opts *opts.DynamicOptions, dynamicSvc dynamic.IDynamic) error
Dynamic starts up a new GRPC client connected to the dProxy service and receives a stream of outbound replay messages which are then written to the message bus.
func (*Kafka) NewLag ¶ added in v1.0.0
func (k *Kafka) NewLag(readArgs *args.KafkaReadArgs) (*Lag, error)
NewLag creates a lag instance but does not perform any lag operations
func (*Kafka) Read ¶ added in v0.1.0
func (k *Kafka) Read( ctx context.Context, readOpts *opts.ReadOptions, resultsChan chan *records.ReadRecord, errorChan chan *records.ErrorRecord, ) error
func (*Kafka) Relay ¶ added in v1.0.0
func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh chan interface{}, errorCh chan *records.ErrorRecord) error
Relay sets up a new Kafka relayer
func (*Kafka) Write ¶ added in v0.1.0
func (k *Kafka) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh chan *records.ErrorRecord, messages ...*records.WriteRecord) error
Write is the entry point function for performing write operations in Kafka.
This is where we verify that the passed args and flags combo makes sense, attempt to establish a connection, parse protobuf before finally attempting to perform the write.
type Lag ¶ added in v0.31.0
type Lag struct {
// contains filtered or unexported fields
}
func (*Lag) GetAllPartitionsLastOffset ¶ added in v1.0.0
func (l *Lag) GetAllPartitionsLastOffset(topic string, partitions []skafka.Partition) (int64, error)
GetAllPartitionsLastOffset finds the MAX lastOffset across given partitions and topic.
func (*Lag) GetPartitionLastOffset ¶ added in v1.0.0
GetPartitionLastOffset - gets the last offset for a given partition. It is used by both Read() and Lag() to display offset stats and/or calculate lag.