kafka_sasl

package
v3.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Main

type Main struct {
	idk.Main             `flag:"!embed"`
	idk.ConfluentCommand `flag:"!embed"`
	Group                string               `help:"Kafka group."`
	Topics               []string             `help:"Kafka topics to read from."`
	Timeout              time.Duration        `help:"Time to wait for more records from Kafka before flushing a batch. 0 to disable."`
	SkipOld              bool                 `short:"" help:"Skip to the most recent Kafka message rather than starting at the beginning."`
	Header               string               `help:"Path to the static schema, in JSON header format."`
	AllowMissingFields   bool                 `help:"Will proceed with ingest even if a field is missing from a record but specified in the JSON config file. Default false"`
	ConfigMap            *confluent.ConfigMap `flag:"-"`
}

func NewMain

func NewMain() (*Main, error)

func (*Main) CopyIn

func (m *Main) CopyIn(config idk.ConfluentCommand)

type Record

type Record struct {
	// contains filtered or unexported fields
}

func (*Record) Commit

func (r *Record) Commit(ctx context.Context) error

func (*Record) Data

func (r *Record) Data() []interface{}

func (*Record) Schema added in v3.34.0

func (r *Record) Schema() interface{}

func (*Record) StreamOffset

func (r *Record) StreamOffset() (string, uint64)

type Source

type Source struct {
	idk.ConfluentCommand
	Topics             []string
	Group              string
	Log                logger.Logger
	Timeout            time.Duration
	SkipOld            bool
	Header             string
	AllowMissingFields bool

	ConfigMap *confluent.ConfigMap
	// contains filtered or unexported fields
}

Source implements the idk.Source interface using kafka as a data source. It is not threadsafe! Due to the way Kafka clients work, to achieve concurrency, create multiple Sources.

func NewSource

func NewSource() *Source

NewSource gets a new Source

func (*Source) Close

func (s *Source) Close() error

Close closes the underlying kafka consumer.

func (*Source) CommitMessages

func (s *Source) CommitMessages(recs []confluent.TopicPartition) ([]confluent.TopicPartition, error)

func (*Source) Open

func (s *Source) Open() error

Open initializes the kafka source.

func (*Source) Record

func (s *Source) Record() (idk.Record, error)

Record returns the value of the next kafka message. The same Record object may be used by successive calls to Record, so it should not be retained.

func (*Source) Schema

func (s *Source) Schema() []idk.Field

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL