Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct { // > @3@4@5@6 // > // > The name of kafka brokers to read from. Brokers []string `json:"brokers" required:"true"` // * // > @3@4@5@6 // > // > The list of kafka topics to read from. Topics []string `json:"topics" required:"true"` // * // > @3@4@5@6 // > // > The name of consumer group to use. ConsumerGroup string `json:"consumer_group" default:"file-d"` // * // > @3@4@5@6 // > // > Kafka client ID. ClientID string `json:"client_id" default:"file-d"` // * // > @3@4@5@6 // > // > The number of unprocessed messages in the buffer that are loaded in the background from kafka. (max.poll.records) ChannelBufferSize int `json:"channel_buffer_size" default:"256"` // * // > @3@4@5@6 // > MaxConcurrentConsumers sets the maximum number of consumers // > Optimal value: number of topics * number of partitions of topic // > MaxConcurrentConsumers int `json:"max_concurrent_consumers" default:"5"` // * // > @3@4@5@6 // > // > MaxConcurrentFetches sets the maximum number of fetch requests to allow in // > flight or buffered at once, overriding the unbounded (i.e. number of // > brokers) default. MaxConcurrentFetches int `json:"max_concurrent_fetches" default:"0"` // * // > @3@4@5@6 // > // > FetchMaxBytes (fetch.max.bytes) sets the maximum amount of bytes a broker will try to send during a fetch FetchMaxBytes cfg.Expression `json:"fetch_max_bytes" default:"52428800" parse:"expression"` // * FetchMaxBytes_ int32 // > @3@4@5@6 // > // > FetchMinBytes (fetch.min.bytes) sets the minimum amount of bytes a broker will try to send during a fetch FetchMinBytes cfg.Expression `json:"fetch_min_bytes" default:"1" parse:"expression"` // * FetchMinBytes_ int32 // > @3@4@5@6 // > // > The newest and oldest values is used when a consumer starts but there is no committed offset for the assigned partition. // > * *`newest`* - set offset to the newest message // > * *`oldest`* - set offset to the oldest message Offset string `json:"offset" default:"newest" options:"newest|oldest"` // * Offset_ OffsetType // > @3@4@5@6 // > // > Algorithm used by Kafka to assign partitions to consumers in a group. // > * *`round-robin`* - M0: [t0p0, t0p2, t1p1], M1: [t0p1, t1p0, t1p2] // > * *`range`* - M0: [t0p0, t0p1, t1p0, t1p1], M1: [t0p2, t1p2] // > * *`sticky`* - ensures minimal partition movement on group changes while also ensuring optimal balancing // > * *`cooperative-sticky`* - performs the sticky balancing strategy, but additionally opts the consumer group into "cooperative" rebalancing Balancer string `json:"balancer" default:"round-robin" options:"round-robin|range|sticky|cooperative-sticky"` // * // > @3@4@5@6 // > // > The maximum amount of time the consumer expects a message takes to process for the user. (Not used anymore!) ConsumerMaxProcessingTime cfg.Duration `json:"consumer_max_processing_time" default:"200ms" parse:"duration"` // * ConsumerMaxProcessingTime_ time.Duration // > @3@4@5@6 // > // > The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it returns fewer than that anyways. (fetch.max.wait.ms) ConsumerMaxWaitTime cfg.Duration `json:"consumer_max_wait_time" default:"250ms" parse:"duration"` // * ConsumerMaxWaitTime_ time.Duration // > @3@4@5@6 // > // > AutoCommitInterval sets how long to go between autocommits AutoCommitInterval cfg.Duration `json:"auto_commit_interval" default:"1s" parse:"duration"` // * AutoCommitInterval_ time.Duration // > @3@4@5@6 // > // > SessionTimeout sets how long a member in the group can go between heartbeats SessionTimeout cfg.Duration `json:"session_timeout" default:"10s" parse:"duration"` // * SessionTimeout_ time.Duration // > @3@4@5@6 // > // > HeartbeatInterval sets how long a group member goes between heartbeats to Kafka HeartbeatInterval cfg.Duration `json:"heartbeat_interval" default:"3s" parse:"duration"` // * HeartbeatInterval_ time.Duration // > @3@4@5@6 // > // > If set, the plugin will use SASL authentications mechanism. SaslEnabled bool `json:"is_sasl_enabled" default:"false"` // * // > @3@4@5@6 // > // > SASL mechanism to use. SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512|AWS_MSK_IAM"` // * // > @3@4@5@6 // > // > SASL username. SaslUsername string `json:"sasl_username" default:"user"` // * // > @3@4@5@6 // > // > SASL password. SaslPassword string `json:"sasl_password" default:"password"` // * // > @3@4@5@6 // > // > If set, the plugin will use SSL/TLS connections method. SslEnabled bool `json:"is_ssl_enabled" default:"false"` // * // > @3@4@5@6 // > // > If set, the plugin will skip SSL/TLS verification. SslSkipVerify bool `json:"ssl_skip_verify" default:"false"` // * // > @3@4@5@6 // > // > Path or content of a PEM-encoded client certificate file. ClientCert string `json:"client_cert"` // * // > @3@4@5@6 // > // > > Path or content of a PEM-encoded client key file. ClientKey string `json:"client_key"` // * // > @3@4@5@6 // > // > Path or content of a PEM-encoded CA file. CACert string `json:"ca_cert"` // * // > @3@4@5@6 // > // > Meta params // > // > Add meta information to an event (look at Meta params) // > Use [go-template](https://pkg.go.dev/text/template) syntax // > // > Example: “`topic: '{{ .topic }}'“` Meta cfg.MetaTemplates `json:"meta"` // * }
! config-params ^ config-params
func (*Config) GetBrokers ¶ added in v0.29.0
func (*Config) GetClientID ¶ added in v0.29.0
func (*Config) GetSaslConfig ¶ added in v0.29.0
func (c *Config) GetSaslConfig() cfg.KafkaClientSaslConfig
func (*Config) GetSslConfig ¶ added in v0.29.0
func (c *Config) GetSslConfig() cfg.KafkaClientSslConfig
func (*Config) IsSaslEnabled ¶ added in v0.29.0
func (*Config) IsSslEnabled ¶ added in v0.29.0
type OffsetType ¶ added in v0.19.2
type OffsetType byte
const ( OffsetTypeNewest OffsetType = iota OffsetTypeOldest )
Click to show internal directories.
Click to hide internal directories.