Documentation
¶
Index ¶
- Constants
- Variables
- func SASLVersion(kafkaVersion kafka.KafkaVersion, saslVersion *int) (int16, error)
- type Closer
- type Cluster
- type Config
- type Configs
- type Diagnostic
- type HandlerConfig
- type RefreshingToken
- type SASLAuth
- type Service
- func (s *Service) Close() error
- func (s *Service) Cluster(id string) (*Cluster, bool)
- func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, error)
- func (s *Service) Open() error
- func (s *Service) Test(options interface{}) error
- func (s *Service) TestOptions() interface{}
- func (s *Service) Update(newConfigs []interface{}) error
- type StaticToken
- type WriteTarget
- type WriterConfig
- type XDGSCRAMClient
Constants ¶
Variables ¶
View Source
var ErrNonPositivePartitions = errors.New("number of partitions must be positive")
View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
func SASLVersion ¶ added in v1.6.4
func SASLVersion(kafkaVersion kafka.KafkaVersion, saslVersion *int) (int16, error)
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func NewCluster ¶
func (*Cluster) WriteMessage ¶
func (c *Cluster) WriteMessage(diagnostic Diagnostic, target WriteTarget, key, msg []byte) error
type Config ¶
type Config struct { Enabled bool `toml:"enabled" override:"enabled"` // ID is a unique identifier for this Kafka config ID string `toml:"id" override:"id"` // Brokers is a list of host:port addresses of Kafka brokers. Brokers []string `toml:"brokers" override:"brokers"` // Timeout on network operations with the brokers. // If 0 a default of 10s will be used. Timeout toml.Duration `toml:"timeout" override:"timeout"` // BatchSize is the number of messages that are batched before being sent to Kafka // If 0 a default of 100 will be used. BatchSize int `toml:"batch-size" override:"batch-size"` // BatchTimeout is the maximum amount of time to wait before flushing an incomplete batch. // If 0 a default of 1s will be used. BatchTimeout toml.Duration `toml:"batch-timeout" override:"batch-timeout"` // UseSSL enable ssl communication // Must be true for the other ssl options to take effect. UseSSL bool `toml:"use-ssl" override:"use-ssl"` // Path to CA file SSLCA string `toml:"ssl-ca" override:"ssl-ca"` // Path to host cert file SSLCert string `toml:"ssl-cert" override:"ssl-cert"` // Path to cert key file SSLKey string `toml:"ssl-key" override:"ssl-key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` // Authentication using SASL SASLAuth }
func (*Config) ApplyConditionalDefaults ¶
func (c *Config) ApplyConditionalDefaults()
type Diagnostic ¶
type Diagnostic interface { WithContext(ctx ...keyvalue.T) Diagnostic InsecureSkipVerify() Error(msg string, err error) }
type HandlerConfig ¶
type RefreshingToken ¶ added in v1.7.6
type RefreshingToken struct {
// contains filtered or unexported fields
}
func NewRefreshingToken ¶ added in v1.7.6
func NewRefreshingToken(source oauth2.TokenSource, cancel context.CancelFunc, extensions map[string]string) *RefreshingToken
func (*RefreshingToken) Close ¶ added in v1.7.6
func (k *RefreshingToken) Close()
func (*RefreshingToken) Token ¶ added in v1.7.6
func (k *RefreshingToken) Token() (*kafka.AccessToken, error)
type SASLAuth ¶ added in v1.6.4
type SASLAuth struct { SASLUsername string `toml:"sasl-username" override:"sasl-username"` SASLPassword string `toml:"sasl-password" override:"sasl-password"` SASLExtensions map[string]string `toml:"sasl_extensions" override:"sasl_extensions"` SASLMechanism string `toml:"sasl-mechanism" override:"sasl-mechanism"` SASLVersion *int `toml:"sasl-version" override:"sasl-version"` // GSSAPI config SASLGSSAPIServiceName string `toml:"sasl-gssapi-service-name" override:"sasl-gssapi-service-name"` SASLGSSAPIAuthType string `toml:"sasl-gssapi-auth-type" override:"sasl-gssapi-auth-type"` SASLGSSAPIDisablePAFXFAST bool `toml:"sasl-gssapi-disable-pafxfast" override:"sasl-gssapi-disable-pafxfast"` SASLGSSAPIKerberosConfigPath string `toml:"sasl-gssapi-kerberos-config-path" override:"sasl-gssapi-kerberos-config-path"` SASLGSSAPIKeyTabPath string `toml:"sasl-gssapi-key-tab-path" override:"sasl-gssapi-key-tab-path"` SASLGSSAPIRealm string `toml:"sasl-gssapi-realm" override:"sasl-gssapi-realm"` // OAUTHBEARER config // Service name for OAuth2 token endpoint: empty or custom, auth0, azuread SASLOAUTHService string `toml:"sasl-oauth-service" override:"sasl-oauth-service"` SASLOAUTHClientID string `toml:"sasl-oauth-client-id" override:"sasl-oauth-client-id"` SASLOAUTHClientSecret string `toml:"sasl-oauth-client-secret" override:"sasl-oauth-client-secret"` SASLOAUTHTokenURL string `toml:"sasl-oauth-token-url" override:"sasl-oauth-token-url"` SASLOAUTHScopes []string `toml:"sasl-oauth-scopes" override:"sasl-oauth-scopes"` SASLOAUTHParams map[string]string `toml:"sasl-oauth-parameters" override:"sasl-oauth-parameters"` SASLOAUTHExpiryMargin time.Duration `toml:"sasl-oauth-token-expiry-margin" override:"sasl-oauth-token-expiry-margin"` // Static token, if set it will override the token source. SASLAccessToken string `toml:"sasl-access-token" override:"sasl-access-token"` // Tenant ID for AzureAD SASLOAUTHTenant string `toml:"sasl-oauth-tenant-id" override:"sasl-oauth-tenant-id"` }
func (*SASLAuth) SetSASLConfig ¶ added in v1.6.4
SetSASLConfig configures SASL for kafka (sarama) We mutate instead of returning the appropriate struct, because kafka.NewConfig() already populates certain defaults that we do not want to disrupt.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService(cs Configs, d Diagnostic) *Service
func (*Service) TestOptions ¶
func (s *Service) TestOptions() interface{}
type StaticToken ¶ added in v1.7.6
type StaticToken struct {
// contains filtered or unexported fields
}
func NewStaticToken ¶ added in v1.7.6
func NewStaticToken(token string, extensions map[string]string) *StaticToken
func (*StaticToken) Token ¶ added in v1.7.6
func (k *StaticToken) Token() (*kafka.AccessToken, error)
type WriteTarget ¶ added in v1.6.0
type WriterConfig ¶ added in v1.7.6
type XDGSCRAMClient ¶ added in v1.6.4
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶ added in v1.6.4
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶ added in v1.6.4
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.