gkafka

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// General
	PropBootstrapServers = "bootstrap.servers"
	PropSecurityProtocol = "security.protocol"
	PropSASLMechanism    = "sasl.mechanism"
	PropSASLUsername     = "sasl.username"
	PropSASLPassword     = "sasl.password"

	// Consumer
	PropGroupID             = "group.id"
	PropQueuedMaxMessagesKb = "queued.max.messages.kbytes"
	PropMaxPollInterval     = "max.poll.interval.ms"

	// Producer
	PropIdempotence     = "enable.idempotence"
	PropCompressionType = "compression.type"
)

Kafka properties commonly used and for setting up default config. See https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html for all properties that can be used for connector configuration.

View Source
const (
	// Interval increase from 5 min default to 10 min
	DefaultMaxPollInterval = 600000

	// Maximum number of kilobytes per topic+partition in the local consumer queue.
	// To not go OOM if big backlog, set this low. Default is 1 048 576 KB  = 1GB
	// per partition. A few MBs seems to give good enough throughput while keeping
	// memory requirements low.
	DefaultQueuedMaxMessagesKb = 2048
)

Default values if omitted in external config

View Source
const (
	// If "group.id" property is assigned with this value on the format
	// "@UniqueWithPrefix.my-groupid-prefix" a unique group.id value will be generated
	// on the format "my-groupid-prefix.<unique extractor id>.<ISO UTC timestamp micros>"
	UniqueGroupIDWithPrefix = "@UniqueWithPrefix"
)

Special prop constructs

Variables

View Source
var (
	// ErrMissingGroupID is returned from NewExtractor() if a "group.id" value is not found
	ErrMissingGroupID = errors.New("group.id is missing in config")
)

Errors

Functions

func NewExtractorFactory

func NewExtractorFactory(config *Config, cf ikafka.ConsumerFactory, pf ikafka.ProducerFactory) entity.ExtractorFactory

NewExtractorFactory creates a Geist source connector factory. cf and pf can normally be set to nil to use the default internal ones, unless special setups are needed.

func NewLoaderFactory

func NewLoaderFactory(config *Config) entity.LoaderFactory

Types

type Config

type Config struct {

	// KafkaProps is used to provide standard Kafka Properties to producer/consumer
	// entities. It should be filled in with required props such as "bootstrap.servers"
	// but can also be filled in with default properties common for intended streams.
	// All these properties can be overridden in each Stream Spec by using its
	// config.properties JSON object.
	KafkaProps map[string]any

	// PollTimeoutMs is the default value to use as the consumer poll timeout for
	// each Extractor's Kafka consumer. It can be overridden per stream in stream
	// specs. If not set the default value gki.DefaultPollTimeoutMs will be used.
	PollTimeoutMs int

	// Env is only required to be filled in if stream specs for this use of Geist
	// are using different topic specs for different environments, typically "dev",
	// "stage", and "prod". Any string is allowed as long as it matches the ones
	// used in the stream specs.
	Env string

	// CreateTopics specifies if the Extractor is allowed to create DLQ topics
	// (if configured in stream specs), and Loader is allowed to create specified
	// topics if they do not exists.
	CreateTopics bool

	// SendToSource specifies if the Extractor's SendToSource() method should be
	// available for use in certain stream scenarios. For example, if we have a
	// stream spec with Kafka set as source, having SendToSource set to true will
	// make it possible to send events to this stream with geist.Publish().
	// Having it disabled reduces memory footprint.
	SendToSource bool
}

Config is the external config provided by the Geist client to the factory when starting up, which is to be used during stream creations.

Directories

Path Synopsis
internal
gki

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL