kafka

package
v0.41.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: BSD-3-Clause Imports: 10 Imported by: 0

README

Kafka output

It sends the event batches to kafka brokers using franz-go lib.

Config params

brokers []string required

List of kafka brokers to write to.


default_topic string required

The default topic name if nothing will be found in the event field or should_use_topic_field isn't set.


client_id string default=file-d

Kafka client ID.


use_topic_field bool default=false

If set, the plugin will use topic name from the event field.


topic_field string default=topic

Which event field to use as topic name. It works only if should_use_topic_field is set.


workers_count cfg.Expression default=gomaxprocs*4

How many workers will be instantiated to send batches.


batch_size cfg.Expression default=capacity/4

A maximum quantity of the events to pack into one batch.


batch_size_bytes cfg.Expression default=0

A minimum size of events in a batch to send. If both batch_size and batch_size_bytes are set, they will work together.


batch_flush_timeout cfg.Duration default=200ms

After this timeout the batch will be sent even if batch isn't full.


max_message_bytes cfg.Expression default=1000000

The maximum permitted size of a message. Should be set equal to or smaller than the broker's message.max.bytes.


compression string default=none options=none|gzip|snappy|lz4|zstd

Compression codec


ack string default=leader options=no|leader|all-isr

Required acks for produced records


retry int default=10

Retries of insertion. If File.d cannot insert for this number of attempts, File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).


fatal_on_failed_insert bool default=false

After an insert error, fall with a non-zero exit code or not Experimental feature


retention cfg.Duration default=50ms

Retention milliseconds for retry.


retention_exponentially_multiplier int default=2

Multiplier for exponential increase of retention between retries


is_sasl_enabled bool default=false

If set, the plugin will use SASL authentications mechanism.


sasl_mechanism string default=SCRAM-SHA-512 options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512

SASL mechanism to use.


sasl_username string default=user

SASL username.


sasl_password string default=password

SASL password.


is_ssl_enabled bool default=false

If set, the plugin will use SSL/TLS connections method.


ssl_skip_verify bool default=false

If set, the plugin will skip SSL/TLS verification.


client_cert string

Path or content of a PEM-encoded client certificate file.


client_key string

Path or content of a PEM-encoded client key file.


ca_cert string

Path or content of a PEM-encoded CA file.



Generated using insane-doc

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

func NewClient added in v0.29.0

func NewClient(c *Config, l *zap.Logger) *kgo.Client

Types

type Config

type Config struct {
	// > @3@4@5@6
	// >
	// > List of kafka brokers to write to.
	Brokers []string `json:"brokers" required:"true"` // *

	// > @3@4@5@6
	// >
	// > The default topic name if nothing will be found in the event field or `should_use_topic_field` isn't set.
	DefaultTopic string `json:"default_topic" required:"true"` // *

	// > @3@4@5@6
	// >
	// > Kafka client ID.
	ClientID string `json:"client_id" default:"file-d"` // *

	// > @3@4@5@6
	// >
	// > If set, the plugin will use topic name from the event field.
	UseTopicField bool `json:"use_topic_field" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Which event field to use as topic name. It works only if `should_use_topic_field` is set.
	TopicField string `json:"topic_field" default:"topic"` // *

	// > @3@4@5@6
	// >
	// > How many workers will be instantiated to send batches.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` // *
	WorkersCount_ int

	// > @3@4@5@6
	// >
	// > A maximum quantity of the events to pack into one batch.
	BatchSize  cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` // *
	BatchSize_ int

	// > @3@4@5@6
	// >
	// > A minimum size of events in a batch to send.
	// > If both batch_size and batch_size_bytes are set, they will work together.
	BatchSizeBytes  cfg.Expression `json:"batch_size_bytes" default:"0" parse:"expression"` // *
	BatchSizeBytes_ int

	// > @3@4@5@6
	// >
	// > After this timeout the batch will be sent even if batch isn't full.
	BatchFlushTimeout  cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // *
	BatchFlushTimeout_ time.Duration

	// > @3@4@5@6
	// >
	// > The maximum permitted size of a message.
	// > Should be set equal to or smaller than the broker's `message.max.bytes`.
	MaxMessageBytes  cfg.Expression `json:"max_message_bytes" default:"1000000" parse:"expression"` // *
	MaxMessageBytes_ int

	// > @3@4@5@6
	// >
	// > Compression codec
	Compression string `json:"compression" default:"none" options:"none|gzip|snappy|lz4|zstd"` // *

	// > @3@4@5@6
	// >
	// > Required acks for produced records
	Ack string `json:"ack" default:"leader" options:"no|leader|all-isr"` // *

	// > @3@4@5@6
	// >
	// > Retries of insertion. If File.d cannot insert for this number of attempts,
	// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
	Retry int `json:"retry" default:"10"` // *

	// > @3@4@5@6
	// >
	// > After an insert error, fall with a non-zero exit code or not
	// > **Experimental feature**
	FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Retention milliseconds for retry.
	Retention  cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
	Retention_ time.Duration

	// > @3@4@5@6
	// >
	// > Multiplier for exponential increase of retention between retries
	RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

	// > @3@4@5@6
	// >
	// > If set, the plugin will use SASL authentications mechanism.
	SaslEnabled bool `json:"is_sasl_enabled" default:"false"` // *

	// > @3@4@5@6
	// >
	// > SASL mechanism to use.
	SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512"` // *

	// > @3@4@5@6
	// >
	// > SASL username.
	SaslUsername string `json:"sasl_username" default:"user"` // *

	// > @3@4@5@6
	// >
	// > SASL password.
	SaslPassword string `json:"sasl_password" default:"password"` // *

	// > @3@4@5@6
	// >
	// > If set, the plugin will use SSL/TLS connections method.
	SslEnabled bool `json:"is_ssl_enabled" default:"false"` // *

	// > @3@4@5@6
	// >
	// > If set, the plugin will skip SSL/TLS verification.
	SslSkipVerify bool `json:"ssl_skip_verify" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Path or content of a PEM-encoded client certificate file.
	ClientCert string `json:"client_cert"` // *

	// > @3@4@5@6
	// >
	// > > Path or content of a PEM-encoded client key file.
	ClientKey string `json:"client_key"` // *

	// > @3@4@5@6
	// >
	// > Path or content of a PEM-encoded CA file.
	CACert string `json:"ca_cert"` // *
}

! config-params ^ config-params

func (*Config) GetBrokers added in v0.29.0

func (c *Config) GetBrokers() []string

func (*Config) GetClientID added in v0.29.0

func (c *Config) GetClientID() string

func (*Config) GetSaslConfig added in v0.29.0

func (c *Config) GetSaslConfig() cfg.KafkaClientSaslConfig

func (*Config) GetSslConfig added in v0.29.0

func (c *Config) GetSslConfig() cfg.KafkaClientSslConfig

func (*Config) IsSaslEnabled added in v0.29.0

func (c *Config) IsSaslEnabled() bool

func (*Config) IsSslEnabled added in v0.29.0

func (c *Config) IsSslEnabled() bool

type KafkaClient added in v0.29.0

type KafkaClient interface {
	ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults
	Close()
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Out

func (p *Plugin) Out(event *pipeline.Event)

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL