kafka

package
v0.0.0-...-feb181f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewProducer

func NewProducer(manager *KafkaManager, topic string, identity *authnapi.Identity) (*kafkaProducer, error)

NewProducer produces a kafka producer that is bound to a particular topic.

func NewProducerEventsCounter

func NewProducerEventsCounter(meter metric.Meter, histogramName string) (metric.Int64Counter, error)

NewProducerEventsCounter creates a meter for capturing event metrics

Types

type CompletedConfig

type CompletedConfig struct {
	// contains filtered or unexported fields
}

type Config

type Config struct {
	*Options

	// this can be set manually for testing
	KafkaConfig *kafka.ConfigMap
}

func NewConfig

func NewConfig(o *Options) *Config

func (*Config) Complete

func (c *Config) Complete() (CompletedConfig, error)

type KafkaManager

type KafkaManager struct {
	Config   CompletedConfig
	Source   string
	Protocol *confluent.Protocol
	Client   cloudevents.Client
	Errors   <-chan error

	Logger *log.Helper
}

func New

func New(config CompletedConfig, source string, logger *log.Helper) (*KafkaManager, error)

func (*KafkaManager) Errs

func (m *KafkaManager) Errs() <-chan error

func (*KafkaManager) Lookup

func (m *KafkaManager) Lookup(identity *authnapi.Identity, resource_type string, resource_id uuid.UUID) (api.Producer, error)

Lookup figures out which topic should be used for the given identity and resource.

func (*KafkaManager) Shutdown

func (m *KafkaManager) Shutdown(ctx context.Context) error

type Options

type Options struct {
	DefaultTopic    string `mapstructure:"default-topic"`
	BuiltInFeatures string `mapstructure:"builtin-features"`
	ClientId        string `mapstructure:"client-id"`
	//MetadataBrokerList                 string `mapstructure:"metadata-broker-list"`
	BootstrapServers                   string `mapstructure:"bootstrap-servers"`
	MessageMaxBytes                    int    `mapstructure:"message-max-bytes"`
	MessageCopyMaxBytes                int    `mapstructure:"message-copy-max-bytes"`
	ReceiveMessageMaxBytes             int    `mapstructure:"receive-message-max-bytes"`
	MaxInFlightRequestsPerConnection   int    `mapstructure:"max-in-flight-requests-per-connection"`
	MaxInFlight                        int    `mapstructure:"max-in-flight"`
	TopicMetadataRefreshIntervalMs     int    `mapstructure:"topic-metadata-refresh-interval-ms"`
	MetadataMaxAgeMs                   int    `mapstructure:"metadata-max-age-ms"`
	TopicMetadataRefreshFastIntervalMs int    `mapstructure:"topic-metadata-refresh-fast-interval-ms"`
	TopicMetadataRefreshSparse         bool   `mapstructure:"topic-metadata-refresh-sparse"`
	TopicMetadataPropagationMaxMs      int    `mapstructure:"topic-metadata-propagation-max-ms"`
	TopicBlacklist                     string `mapstructure:"topic-blacklist"`
	Debug                              string `mapstructure:"debug"`
	SocketTimeoutMs                    int    `mapstructure:"socket-timeout-ms"`
	SocketSendBufferBytes              int    `mapstructure:"socket-send-buffer-bytes"`
	SocketReceiveBufferBytes           int    `mapstructure:"socket-receive-buffer-bytes"`
	SocketKeepAliveEnable              bool   `mapstructure:"socket-keepalive-enable"`
	SocketNagleDisable                 bool   `mapstructure:"socket-nagle-disable"`
	SocketMaxFails                     int    `mapstructure:"socket-max-fails"`
	BrokerAddressTtl                   int    `mapstructure:"broker-address-ttl"`
	BrokerAddressFamily                string `mapstructure:"broker-address-family"`
	SocketConnectionSetupTimeoutMs     int    `mapstructure:"socket-connection-setup-timeout-ms"`
	ConnectionsMaxIdleMs               int    `mapstructure:"connections-max-idle-ms"`
	ReconnectBackoffMs                 int    `mapstructure:"reconnect-backoff-ms"`
	ReconnectBackoffMaxMs              int    `mapstructure:"reconnect-backoff-max-ms"`
	StatisticsIntervalMs               int    `mapstructure:"statistics-interval-ms"`
	EnabledEvents                      int    `mapstructure:"enabled-events"`
	LogLevel                           int    `mapstructure:"log-level"`
	LogQueue                           bool   `mapstructure:"log-queue"`
	LogThreadName                      bool   `mapstructure:"log-thread-name"`
	EnableRandomSeed                   bool   `mapstructure:"enabled-random-seed"`
	LogConnectionClose                 bool   `mapstructure:"log-connection-close"`
	InternalTerminationSignal          int    `mapstructure:"internal-termination-signal"`
	ApiVersionRequest                  bool   `mapstructure:"api-version-request"`
	ApiVersionRequestTimeoutMs         int    `mapstructure:"api-version-request-timeout-ms"`
	ApiVersionVersionFallbackMs        int    `mapstructure:"api-version-version-fallback-ms"`
	BrokerVersionFallback              string `mapstructure:"broker-version-fallback"`
	AllowAutoCreateTopics              bool   `mapstructure:"allow-auto-create-topics"`
	SecurityProtocol                   string `mapstructure:"security-protocol"`
	SslCipherSuites                    string `mapstructure:"ssl-cipher-suites"`
	SslCurvesList                      string `mapstructure:"ssl-curves-list"`
	SslSigAlgsList                     string `mapstructure:"ssl-sigalgs-list"`
	SslKeyLocation                     string `mapstructure:"ssl-key-location"`
	SslKeyPassword                     string `mapstructure:"ssl-key-password"`
	SslKeyPem                          string `mapstructure:"ssl-key-pem"`
	SslCertificateLocation             string `mapstructure:"ssl-certificate-location"`
	SslCertificatePem                  string `mapstructure:"ssl-certificate-pem"`
	SslCaLocation                      string `mapstructure:"ssl-ca-location"`
	SslCaPem                           string `mapstructure:"ssl-ca-pem"`
	SslCrlLocation                     string `mapstructure:"ssl-crl-location"`
	SslKeystoreLocation                string `mapstructure:"ssl-keystore-location"`
	SslKeystorePassword                string `mapstructure:"ssl-keystore-password"`
	SslProviders                       string `mapstructure:"ssl-providers"`
	SslEngineId                        string `mapstructure:"ssl-engine-id"`
	EnableSslCertificateVerification   bool   `mapstructure:"enable-ssl-certificate-verification"`
	SslEndpointIdentificationAlgorithm string `mapstructure:"ssl-endpoint-identification-algorithm"`
	// SaslMechanisms                     string `mapstructure:"sasl-mechanisms"`
	SaslMechanism                    string `mapstructure:"sasl-mechanism"`
	SaslKerberosServiceName          string `mapstructure:"sasl-kerberos-service-name"`
	SaslKerberosPrincipal            string `mapstructure:"sasl-kerberos-principal"`
	SaslKerberosKinitCmd             string `mapstructure:"sasl-kerberos-kinit-cmd"`
	SaslKerberosKeytab               string `mapstructure:"sasl-kerberos-keytab"`
	SaslKerberosMinTimeBeforeRelogin int    `mapstructure:"sasl-kerberos-min-time-before-relogin"`
	SaslUsername                     string `mapstructure:"sasl-username"`
	SaslPassword                     string `mapstructure:"sasl-password"`
	SaslOauthBearerConfig            string `mapstructure:"sasl-oauthbearer-config"`
	EnableSaslOauthBearerUnsecureJwt bool   `mapstructure:"enable-sasl-oauthbearer-unsecure-jwt"`
	SaslOauthBearerMethod            string `mapstructure:"sasl-oauthbearer-method"`
	SaslOauthBearerClientId          string `mapstructure:"sasl-oauthbearer-client-id"`
	SaslOauthBearerClientSecret      string `mapstructure:"sasl-oauthbearer-client-secret"`
	SaslOauthBearerScope             string `mapstructure:"sasl-oauthbearer-scope"`
	SaslOauthBearerExtensions        string `mapstructure:"sasl-oauthbearer-extensions"`
	SaslOauthBearerTokenEndpointUrl  string `mapstructure:"sasl-oauthbearer-token-endpoint-url"`
	PluginLibraryPaths               string `mapstructure:"plugin-library-paths"`
	ClientDnsLookup                  string `mapstructure:"client-dns-lookup"`
	EnableMetricsPush                bool   `mapstructure:"enable-metrics-push"`
	ClientRack                       string `mapstructure:"client-rack"`
	RetryBackoffMs                   int    `mapstructure:"retry-backoff-ms"`
	RetryBackoffMaxMs                int    `mapstructure:"retry-backoff-max-ms"`
}

func NewOptions

func NewOptions() *Options

func (*Options) AddFlags

func (o *Options) AddFlags(fs *pflag.FlagSet, prefix string)

func (*Options) Complete

func (o *Options) Complete() []error

func (*Options) Validate

func (o *Options) Validate() []error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL