kafka

package
v3.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigToFields

func ConfigToFields(c Config) ([]*dax.Field, error)

ConfigToFields returns a list of *dax.Field based on the IDField and Fields in the Config.

func ValidateConfig

func ValidateConfig(c Config) error

ValidateConfig validates the config is usable.

Types

type Config

type Config struct {
	Hosts  []string `mapstructure:"hosts" help:"Kafka hosts."`
	Group  string   `mapstructure:"group" help:"Kafka group."`
	Topics []string `mapstructure:"topics" help:"Kafka topics to read from."`

	BatchSize         int           `mapstructure:"batch-size" help:"Batch size."`
	BatchMaxStaleness time.Duration `` /* 210-byte string literal not displayed */
	Timeout           time.Duration `mapstructure:"timeout" help:"Time to wait for more records from Kafka before flushing a batch. 0 to disable."`

	Table  string  `mapstructure:"table" help:"Destination table name."`
	Fields []Field `mapstructure:"fields"`
}

Config is the user-facing configuration for kafka support in the CLI. This is unmarshalled from the the toml config file supplied by the user.

type ConfigForIDK

type ConfigForIDK struct {
	Hosts  []string
	Group  string
	Topics []string

	BatchSize         int
	BatchMaxStaleness time.Duration
	Timeout           time.Duration

	Table   string
	IDField string
	Fields  []idk.RawField
}

ConfigForIDK represents Config converted to values suitable for IDK. In particular, the idk.RawField is used in parsing the schema in IDK.

func ConvertConfig

func ConvertConfig(c Config) (ConfigForIDK, error)

ConvertConfig converts a Config to one that suitable for IDK.

type Field

type Field struct {
	Name       string   `mapstructure:"name"`
	SourceType string   `mapstructure:"source-type"`
	SourcePath []string `mapstructure:"source-path"`
	PrimaryKey bool     `mapstructure:"primary-key"`
}

Field is a user-facing configuration field.

func CheckFieldCompatibility

func CheckFieldCompatibility(cflds []Field, scr *featurebase.ShowColumnsResponse) ([]Field, error)

CheckFieldCompatibility ensures that the fields provided in the kafka config are compatible with the fields in the existing table. It returns a copy of the kafka config fields with empty values defaulted to the table field configuration.

func FieldsToConfig

func FieldsToConfig(flds []*dax.Field) []Field

FieldsToConfig returns a Config.Fields based on a list of *dax.Field.

type Runner

type Runner struct {
	idk.Main   `flag:"!embed"`
	KafkaHosts []string       `help:"Comma separated list of host:port pairs for Kafka."`
	Group      string         `help:"Kafka group."`
	Topics     []string       `help:"Kafka topics to read from."`
	Timeout    time.Duration  `help:"Time to wait for more records from Kafka before flushing a batch. 0 to disable."`
	Header     []idk.RawField `help:"Header configuration."`
}

Runner is a CLI-specific kafka consumer. It's similar to idk.kafka_static.Main in that it embeds idk.Main and contains additional functionality specific to its use case.

func NewRunner

func NewRunner(cfg ConfigForIDK, batcher fbbatch.Batcher, logWriter io.Writer) *Runner

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL