Documentation ¶
Index ¶
Constants ¶
View Source
const ( // General PropBootstrapServers = "bootstrap.servers" PropSecurityProtocol = "security.protocol" PropSASLMechanism = "sasl.mechanism" PropSASLUsername = "sasl.username" PropSASLPassword = "sasl.password" // Consumer PropGroupID = "group.id" PropQueuedMaxMessagesKb = "queued.max.messages.kbytes" PropMaxPollInterval = "max.poll.interval.ms" // Producer PropIdempotence = "enable.idempotence" PropCompressionType = "compression.type" )
Kafka properties commonly used and for setting up default config. See https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html for all properties that can be used for connector configuration.
View Source
const ( // Interval increase from 5 min default to 10 min DefaultMaxPollInterval = 600000 // Maximum number of kilobytes per topic+partition in the local consumer queue. // To not go OOM if big backlog, set this low. Default is 1 048 576 KB = 1GB // per partition. A few MBs seems to give good enough throughput while keeping // memory requirements low. DefaultQueuedMaxMessagesKb = 2048 )
Default values if omitted in external config
View Source
const ( // If "group.id" property is assigned with this value on the format // "@UniqueWithPrefix.my-groupid-prefix" a unique group.id value will be generated // on the format "my-groupid-prefix.<unique extractor id>.<ISO UTC timestamp micros>" UniqueGroupIDWithPrefix = "@UniqueWithPrefix" )
Special prop constructs
Variables ¶
View Source
var ( // ErrMissingGroupID is returned from NewExtractor() if a "group.id" value is not found ErrMissingGroupID = errors.New("group.id is missing in config") )
Errors
Functions ¶
func NewExtractorFactory ¶
func NewExtractorFactory(config *Config, cf ikafka.ConsumerFactory, pf ikafka.ProducerFactory) entity.ExtractorFactory
NewExtractorFactory creates a Geist source connector factory. cf and pf can normally be set to nil to use the default internal ones, unless special setups are needed.
func NewLoaderFactory ¶
func NewLoaderFactory(config *Config) entity.LoaderFactory
Types ¶
type Config ¶
type Config struct { // KafkaProps is used to provide standard Kafka Properties to producer/consumer // entities. It should be filled in with required props such as "bootstrap.servers" // but can also be filled in with default properties common for intended streams. // All these properties can be overridden in each Stream Spec by using its // config.properties JSON object. KafkaProps map[string]any // PollTimeoutMs is the default value to use as the consumer poll timeout for // each Extractor's Kafka consumer. It can be overridden per stream in stream // specs. If not set the default value gki.DefaultPollTimeoutMs will be used. PollTimeoutMs int // Env is only required to be filled in if stream specs for this use of Geist // are using different topic specs for different environments, typically "dev", // "stage", and "prod". Any string is allowed as long as it matches the ones // used in the stream specs. Env string // CreateTopics specifies if the Extractor is allowed to create DLQ topics // (if configured in stream specs), and Loader is allowed to create specified // topics if they do not exists. CreateTopics bool // SendToSource specifies if the Extractor's SendToSource() method should be // available for use in certain stream scenarios. For example, if we have a // stream spec with Kafka set as source, having SendToSource set to true will // make it possible to send events to this stream with geist.Publish(). // Having it disabled reduces memory footprint. SendToSource bool }
Config is the external config provided by the Geist client to the factory when starting up, which is to be used during stream creations.
Click to show internal directories.
Click to hide internal directories.