kafka

package
v1.7.0-rc6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2023 License: MIT Imports: 25 Imported by: 40

Documentation

Index

Constants

View Source
const (
	DefaultTimeout      = 10 * time.Second
	DefaultBatchSize    = 100
	DefaultBatchTimeout = 1 * time.Second
	DefaultID           = "default"
)

Variables

View Source
var ErrNonPositivePartitions = errors.New("number of partitions must be positive")
View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

Functions

func SASLVersion added in v1.6.4

func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error)

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(c Config) *Cluster

func (*Cluster) Close

func (c *Cluster) Close()

func (*Cluster) Update

func (c *Cluster) Update(cfg Config) error

func (*Cluster) WriteMessage

func (c *Cluster) WriteMessage(diagnostic Diagnostic, target WriteTarget, key, msg []byte) error

type Config

type Config struct {
	Enabled bool `toml:"enabled" override:"enabled"`
	// ID is a unique identifier for this Kafka config
	ID string `toml:"id" override:"id"`
	// Brokers is a list of host:port addresses of Kafka brokers.
	Brokers []string `toml:"brokers" override:"brokers"`
	// Timeout on network operations with the brokers.
	// If 0 a default of 10s will be used.
	Timeout toml.Duration `toml:"timeout" override:"timeout"`
	// BatchSize is the number of messages that are batched before being sent to Kafka
	// If 0 a default of 100 will be used.
	BatchSize int `toml:"batch-size" override:"batch-size"`
	// BatchTimeout is the maximum amount of time to wait before flushing an incomplete batch.
	// If 0 a default of 1s will be used.
	BatchTimeout toml.Duration `toml:"batch-timeout" override:"batch-timeout"`
	// UseSSL enable ssl communication
	// Must be true for the other ssl options to take effect.
	UseSSL bool `toml:"use-ssl" override:"use-ssl"`
	// Path to CA file
	SSLCA string `toml:"ssl-ca" override:"ssl-ca"`
	// Path to host cert file
	SSLCert string `toml:"ssl-cert" override:"ssl-cert"`
	// Path to cert key file
	SSLKey string `toml:"ssl-key" override:"ssl-key"`
	// Use SSL but skip chain & host verification
	InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"`
	// Authentication using SASL
	SASLAuth
}

func NewConfig

func NewConfig() Config

func (*Config) ApplyConditionalDefaults

func (c *Config) ApplyConditionalDefaults()

func (Config) Validate

func (c Config) Validate() error

type Configs

type Configs []Config

func (Configs) Validate

func (cs Configs) Validate() error

type Diagnostic

type Diagnostic interface {
	WithContext(ctx ...keyvalue.T) Diagnostic
	InsecureSkipVerify()
	Error(msg string, err error)
}

type HandlerConfig

type HandlerConfig struct {
	Cluster              string `mapstructure:"cluster"`
	Topic                string `mapstructure:"topic"`
	Template             string `mapstructure:"template"`
	DisablePartitionById bool   `mapstructure:"disablePartitionById"`
	PartitionAlgorithm   string `mapstructure:"partitionAlgorithm"`
}

type SASLAuth added in v1.6.4

type SASLAuth struct {
	SASLUsername  string `toml:"sasl-username" override:"sasl-username"`
	SASLPassword  string `toml:"sasl-password" override:"sasl-password"`
	SASLMechanism string `toml:"sasl-mechanism" override:"sasl-mechanism"`
	SASLVersion   *int   `toml:"sasl-version" override:"sasl-version"`

	// GSSAPI config
	SASLGSSAPIServiceName        string `toml:"sasl-gssapi-service-name" override:"sasl-gssapi-service-name"`
	SASLGSSAPIAuthType           string `toml:"sasl-gssapi-auth-type" override:"sasl-gssapi-auth-type"`
	SASLGSSAPIDisablePAFXFAST    bool   `toml:"sasl-gssapi-disable-pafxfast" override:"sasl-gssapi-disable-pafxfast"`
	SASLGSSAPIKerberosConfigPath string `toml:"sasl-gssapi-kerberos-config-path" override:"sasl-gssapi-kerberos-config-path"`
	SASLGSSAPIKeyTabPath         string `toml:"sasl-gssapi-key-tab-path" override:"sasl-gssapi-key-tab-path"`
	SASLGSSAPIRealm              string `toml:"sasl-gssapi-realm" override:"sasl-gssapi-realm"`

	// OAUTHBEARER config. experimental. undoubtedly this is not good enough.
	SASLAccessToken string `toml:"sasl-access-token" override:"sasl-access-token"`
}

func (*SASLAuth) SetSASLConfig added in v1.6.4

func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error

SetSASLConfig configures SASL for kafka (sarama) We mutate instead of returning the appropriate struct, because sarama.NewConfig() already populates certain defaults that we do not want to disrupt.

func (*SASLAuth) Token added in v1.6.4

func (k *SASLAuth) Token() (*sarama.AccessToken, error)

Token does nothing smart, it just grabs a hard-coded token from config.

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(cs Configs, d Diagnostic) *Service

func (*Service) Close

func (s *Service) Close() error

func (*Service) Cluster

func (s *Service) Cluster(id string) (*Cluster, bool)

func (*Service) Handler

func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, error)

func (*Service) Open

func (s *Service) Open() error

func (*Service) Test

func (s *Service) Test(options interface{}) error

func (*Service) TestOptions

func (s *Service) TestOptions() interface{}

func (*Service) Update

func (s *Service) Update(newConfigs []interface{}) error

type WriteTarget added in v1.6.0

type WriteTarget struct {
	Topic              string
	PartitionById      bool
	PartitionAlgorithm string
}

type XDGSCRAMClient added in v1.6.4

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin added in v1.6.4

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done added in v1.6.4

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step added in v1.6.4

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL