kafka

package
v1.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2021 License: MIT Imports: 18 Imported by: 40

Documentation

Index

Constants

View Source
const (
	DefaultTimeout      = 10 * time.Second
	DefaultBatchSize    = 100
	DefaultBatchTimeout = 1 * time.Second
	DefaultID           = "default"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(c Config) *Cluster

func (*Cluster) Close

func (c *Cluster) Close()

func (*Cluster) Update

func (c *Cluster) Update(cfg Config) error

func (*Cluster) WriteMessage

func (c *Cluster) WriteMessage(diagnostic Diagnostic, target WriteTarget, key, msg []byte) error

type Config

type Config struct {
	Enabled bool `toml:"enabled" override:"enabled"`
	// ID is a unique identifier for this Kafka config
	ID string `toml:"id" override:"id"`
	// Brokers is a list of host:port addresses of Kafka brokers.
	Brokers []string `toml:"brokers" override:"brokers"`
	// Timeout on network operations with the brokers.
	// If 0 a default of 10s will be used.
	Timeout toml.Duration `toml:"timeout" override:"timeout"`
	// BatchSize is the number of messages that are batched before being sent to Kafka
	// If 0 a default of 100 will be used.
	BatchSize int `toml:"batch-size" override:"batch-size"`
	// BatchTimeout is the maximum amount of time to wait before flushing an incomplete batch.
	// If 0 a default of 1s will be used.
	BatchTimeout toml.Duration `toml:"batch-timeout" override:"batch-timeout"`
	// UseSSL enable ssl communication
	// Must be true for the other ssl options to take effect.
	UseSSL bool `toml:"use-ssl" override:"use-ssl"`
	// Path to CA file
	SSLCA string `toml:"ssl-ca" override:"ssl-ca"`
	// Path to host cert file
	SSLCert string `toml:"ssl-cert" override:"ssl-cert"`
	// Path to cert key file
	SSLKey string `toml:"ssl-key" override:"ssl-key"`
	// Use SSL but skip chain & host verification
	InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"`
}

func NewConfig

func NewConfig() Config

func (*Config) ApplyConditionalDefaults

func (c *Config) ApplyConditionalDefaults()

func (Config) Validate

func (c Config) Validate() error

func (Config) WriterConfig

func (c Config) WriterConfig(diagnostic Diagnostic, target WriteTarget) (kafka.WriterConfig, error)

type Configs

type Configs []Config

func (Configs) Validate

func (cs Configs) Validate() error

type Diagnostic

type Diagnostic interface {
	WithContext(ctx ...keyvalue.T) Diagnostic
	InsecureSkipVerify()
	Error(msg string, err error)
}

type HandlerConfig

type HandlerConfig struct {
	Cluster              string `mapstructure:"cluster"`
	Topic                string `mapstructure:"topic"`
	Template             string `mapstructure:"template"`
	DisablePartitionById bool   `mapstructure:"disablePartitionById"`
	PartitionAlgorithm   string `mapstructure:"partitionAlgorithm"`
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(cs Configs, d Diagnostic) *Service

func (*Service) Close

func (s *Service) Close() error

func (*Service) Cluster

func (s *Service) Cluster(id string) (*Cluster, bool)

func (*Service) Handler

func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, error)

func (*Service) Open

func (s *Service) Open() error

func (*Service) Test

func (s *Service) Test(options interface{}) error

func (*Service) TestOptions

func (s *Service) TestOptions() interface{}

func (*Service) Update

func (s *Service) Update(newConfigs []interface{}) error

type WriteTarget added in v1.6.0

type WriteTarget struct {
	Topic              string
	PartitionById      bool
	PartitionAlgorithm string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL