kafka

package
v1.7.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2024 License: MIT Imports: 32 Imported by: 40

Documentation

Index

Constants

View Source
const (
	DefaultTimeout               = 10 * time.Second
	DefaultBatchSize             = 100
	DefaultBatchTimeout          = 1 * time.Second
	DefaultID                    = "default"
	DefaultSASLOAUTHExpiryMargin = 10 * time.Second
)

Variables

View Source
var ErrNonPositivePartitions = errors.New("number of partitions must be positive")
View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

Functions

func SASLVersion added in v1.6.4

func SASLVersion(kafkaVersion kafka.KafkaVersion, saslVersion *int) (int16, error)

Types

type Closer added in v1.7.6

type Closer interface {
	Close()
}

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(c Config) *Cluster

func (*Cluster) Close

func (c *Cluster) Close()

func (*Cluster) Update

func (c *Cluster) Update(cfg Config) error

func (*Cluster) WriteMessage

func (c *Cluster) WriteMessage(diagnostic Diagnostic, target WriteTarget, key, msg []byte) error

type Config

type Config struct {
	Enabled bool `toml:"enabled" override:"enabled"`
	// ID is a unique identifier for this Kafka config
	ID string `toml:"id" override:"id"`
	// Brokers is a list of host:port addresses of Kafka brokers.
	Brokers []string `toml:"brokers" override:"brokers"`
	// Timeout on network operations with the brokers.
	// If 0 a default of 10s will be used.
	Timeout toml.Duration `toml:"timeout" override:"timeout"`
	// BatchSize is the number of messages that are batched before being sent to Kafka
	// If 0 a default of 100 will be used.
	BatchSize int `toml:"batch-size" override:"batch-size"`
	// BatchTimeout is the maximum amount of time to wait before flushing an incomplete batch.
	// If 0 a default of 1s will be used.
	BatchTimeout toml.Duration `toml:"batch-timeout" override:"batch-timeout"`
	// UseSSL enable ssl communication
	// Must be true for the other ssl options to take effect.
	UseSSL bool `toml:"use-ssl" override:"use-ssl"`
	// Path to CA file
	SSLCA string `toml:"ssl-ca" override:"ssl-ca"`
	// Path to host cert file
	SSLCert string `toml:"ssl-cert" override:"ssl-cert"`
	// Path to cert key file
	SSLKey string `toml:"ssl-key" override:"ssl-key"`
	// Use SSL but skip chain & host verification
	InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"`
	// Authentication using SASL
	SASLAuth
}

func NewConfig

func NewConfig() Config

func (*Config) ApplyConditionalDefaults

func (c *Config) ApplyConditionalDefaults()

func (Config) Validate

func (c Config) Validate() error

type Configs

type Configs []Config

func (Configs) Validate

func (cs Configs) Validate() error

type Diagnostic

type Diagnostic interface {
	WithContext(ctx ...keyvalue.T) Diagnostic
	InsecureSkipVerify()
	Error(msg string, err error)
}

type HandlerConfig

type HandlerConfig struct {
	Cluster              string `mapstructure:"cluster"`
	Topic                string `mapstructure:"topic"`
	Template             string `mapstructure:"template"`
	DisablePartitionById bool   `mapstructure:"disablePartitionById"`
	PartitionAlgorithm   string `mapstructure:"partitionAlgorithm"`
}

type RefreshingToken added in v1.7.6

type RefreshingToken struct {
	// contains filtered or unexported fields
}

func NewRefreshingToken added in v1.7.6

func NewRefreshingToken(source oauth2.TokenSource, cancel context.CancelFunc, extensions map[string]string) *RefreshingToken

func (*RefreshingToken) Close added in v1.7.6

func (k *RefreshingToken) Close()

func (*RefreshingToken) Token added in v1.7.6

func (k *RefreshingToken) Token() (*kafka.AccessToken, error)

type SASLAuth added in v1.6.4

type SASLAuth struct {
	SASLUsername   string            `toml:"sasl-username" override:"sasl-username"`
	SASLPassword   string            `toml:"sasl-password" override:"sasl-password"`
	SASLExtensions map[string]string `toml:"sasl_extensions" override:"sasl_extensions"`
	SASLMechanism  string            `toml:"sasl-mechanism" override:"sasl-mechanism"`
	SASLVersion    *int              `toml:"sasl-version" override:"sasl-version"`

	// GSSAPI config
	SASLGSSAPIServiceName        string `toml:"sasl-gssapi-service-name" override:"sasl-gssapi-service-name"`
	SASLGSSAPIAuthType           string `toml:"sasl-gssapi-auth-type" override:"sasl-gssapi-auth-type"`
	SASLGSSAPIDisablePAFXFAST    bool   `toml:"sasl-gssapi-disable-pafxfast" override:"sasl-gssapi-disable-pafxfast"`
	SASLGSSAPIKerberosConfigPath string `toml:"sasl-gssapi-kerberos-config-path" override:"sasl-gssapi-kerberos-config-path"`
	SASLGSSAPIKeyTabPath         string `toml:"sasl-gssapi-key-tab-path" override:"sasl-gssapi-key-tab-path"`
	SASLGSSAPIRealm              string `toml:"sasl-gssapi-realm" override:"sasl-gssapi-realm"`

	// OAUTHBEARER config
	// Service name for OAuth2 token endpoint: empty or custom, auth0, azuread
	SASLOAUTHService      string            `toml:"sasl-oauth-service" override:"sasl-oauth-service"`
	SASLOAUTHClientID     string            `toml:"sasl-oauth-client-id" override:"sasl-oauth-client-id"`
	SASLOAUTHClientSecret string            `toml:"sasl-oauth-client-secret" override:"sasl-oauth-client-secret"`
	SASLOAUTHTokenURL     string            `toml:"sasl-oauth-token-url" override:"sasl-oauth-token-url"`
	SASLOAUTHScopes       []string          `toml:"sasl-oauth-scopes" override:"sasl-oauth-scopes"`
	SASLOAUTHParams       map[string]string `toml:"sasl-oauth-parameters" override:"sasl-oauth-parameters"`
	SASLOAUTHExpiryMargin time.Duration     `toml:"sasl-oauth-token-expiry-margin" override:"sasl-oauth-token-expiry-margin"`
	// Static token, if set it will override the token source.
	SASLAccessToken string `toml:"sasl-access-token" override:"sasl-access-token"`
	// Tenant ID for AzureAD
	SASLOAUTHTenant string `toml:"sasl-oauth-tenant-id" override:"sasl-oauth-tenant-id"`
}

func (*SASLAuth) SetSASLConfig added in v1.6.4

func (k *SASLAuth) SetSASLConfig(config *kafka.Config) (Closer, error)

SetSASLConfig configures SASL for kafka (sarama) We mutate instead of returning the appropriate struct, because kafka.NewConfig() already populates certain defaults that we do not want to disrupt.

func (*SASLAuth) Validate added in v1.7.6

func (k *SASLAuth) Validate() error

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(cs Configs, d Diagnostic) *Service

func (*Service) Close

func (s *Service) Close() error

func (*Service) Cluster

func (s *Service) Cluster(id string) (*Cluster, bool)

func (*Service) Handler

func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, error)

func (*Service) Open

func (s *Service) Open() error

func (*Service) Test

func (s *Service) Test(options interface{}) error

func (*Service) TestOptions

func (s *Service) TestOptions() interface{}

func (*Service) Update

func (s *Service) Update(newConfigs []interface{}) error

type StaticToken added in v1.7.6

type StaticToken struct {
	// contains filtered or unexported fields
}

func NewStaticToken added in v1.7.6

func NewStaticToken(token string, extensions map[string]string) *StaticToken

func (*StaticToken) Token added in v1.7.6

func (k *StaticToken) Token() (*kafka.AccessToken, error)

type WriteTarget added in v1.6.0

type WriteTarget struct {
	Topic              string
	PartitionById      bool
	PartitionAlgorithm string
}

type WriterConfig added in v1.7.6

type WriterConfig struct {
	// additional resource to close
	Closer Closer
	Config *kafka.Config
}

type XDGSCRAMClient added in v1.6.4

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin added in v1.6.4

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done added in v1.6.4

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step added in v1.6.4

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL