Documentation ¶
Index ¶
- Constants
- Variables
- func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error)
- type Cluster
- type Config
- type Configs
- type Diagnostic
- type HandlerConfig
- type SASLAuth
- type Service
- func (s *Service) Close() error
- func (s *Service) Cluster(id string) (*Cluster, bool)
- func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, error)
- func (s *Service) Open() error
- func (s *Service) Test(options interface{}) error
- func (s *Service) TestOptions() interface{}
- func (s *Service) Update(newConfigs []interface{}) error
- type WriteTarget
- type XDGSCRAMClient
Constants ¶
View Source
const ( DefaultTimeout = 10 * time.Second DefaultBatchSize = 100 DefaultBatchTimeout = 1 * time.Second DefaultID = "default" )
Variables ¶
View Source
var ErrNonPositivePartitions = errors.New("number of partitions must be positive")
View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
func SASLVersion ¶ added in v1.6.4
func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error)
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func NewCluster ¶
func (*Cluster) WriteMessage ¶
func (c *Cluster) WriteMessage(diagnostic Diagnostic, target WriteTarget, key, msg []byte) error
type Config ¶
type Config struct { Enabled bool `toml:"enabled" override:"enabled"` // ID is a unique identifier for this Kafka config ID string `toml:"id" override:"id"` // Brokers is a list of host:port addresses of Kafka brokers. Brokers []string `toml:"brokers" override:"brokers"` // Timeout on network operations with the brokers. // If 0 a default of 10s will be used. Timeout toml.Duration `toml:"timeout" override:"timeout"` // BatchSize is the number of messages that are batched before being sent to Kafka // If 0 a default of 100 will be used. BatchSize int `toml:"batch-size" override:"batch-size"` // BatchTimeout is the maximum amount of time to wait before flushing an incomplete batch. // If 0 a default of 1s will be used. BatchTimeout toml.Duration `toml:"batch-timeout" override:"batch-timeout"` // UseSSL enable ssl communication // Must be true for the other ssl options to take effect. UseSSL bool `toml:"use-ssl" override:"use-ssl"` // Path to CA file SSLCA string `toml:"ssl-ca" override:"ssl-ca"` // Path to host cert file SSLCert string `toml:"ssl-cert" override:"ssl-cert"` // Path to cert key file SSLKey string `toml:"ssl-key" override:"ssl-key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` // Authentication using SASL SASLAuth }
func (*Config) ApplyConditionalDefaults ¶
func (c *Config) ApplyConditionalDefaults()
type Diagnostic ¶
type Diagnostic interface { WithContext(ctx ...keyvalue.T) Diagnostic InsecureSkipVerify() Error(msg string, err error) }
type HandlerConfig ¶
type SASLAuth ¶ added in v1.6.4
type SASLAuth struct { SASLUsername string `toml:"sasl-username" override:"sasl-username"` SASLPassword string `toml:"sasl-password" override:"sasl-password"` SASLMechanism string `toml:"sasl-mechanism" override:"sasl-mechanism"` SASLVersion *int `toml:"sasl-version" override:"sasl-version"` // GSSAPI config SASLGSSAPIServiceName string `toml:"sasl-gssapi-service-name" override:"sasl-gssapi-service-name"` SASLGSSAPIAuthType string `toml:"sasl-gssapi-auth-type" override:"sasl-gssapi-auth-type"` SASLGSSAPIDisablePAFXFAST bool `toml:"sasl-gssapi-disable-pafxfast" override:"sasl-gssapi-disable-pafxfast"` SASLGSSAPIKerberosConfigPath string `toml:"sasl-gssapi-kerberos-config-path" override:"sasl-gssapi-kerberos-config-path"` SASLGSSAPIKeyTabPath string `toml:"sasl-gssapi-key-tab-path" override:"sasl-gssapi-key-tab-path"` SASLGSSAPIRealm string `toml:"sasl-gssapi-realm" override:"sasl-gssapi-realm"` // OAUTHBEARER config. experimental. undoubtedly this is not good enough. SASLAccessToken string `toml:"sasl-access-token" override:"sasl-access-token"` }
func (*SASLAuth) SetSASLConfig ¶ added in v1.6.4
SetSASLConfig configures SASL for kafka (sarama) We mutate instead of returning the appropriate struct, because sarama.NewConfig() already populates certain defaults that we do not want to disrupt.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService(cs Configs, d Diagnostic) *Service
func (*Service) TestOptions ¶
func (s *Service) TestOptions() interface{}
type WriteTarget ¶ added in v1.6.0
type XDGSCRAMClient ¶ added in v1.6.4
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶ added in v1.6.4
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶ added in v1.6.4
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.