Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewProducer ¶
func NewProducer(manager *KafkaManager, topic string, identity *authnapi.Identity) (*kafkaProducer, error)
NewProducer produces a kafka producer that is bound to a particular topic.
func NewProducerEventsCounter ¶
func NewProducerEventsCounter(meter metric.Meter, histogramName string) (metric.Int64Counter, error)
NewProducerEventsCounter creates a meter for capturing event metrics
Types ¶
type CompletedConfig ¶
type CompletedConfig struct {
// contains filtered or unexported fields
}
type Config ¶
type Config struct { *Options // this can be set manually for testing KafkaConfig *kafka.ConfigMap }
func (*Config) Complete ¶
func (c *Config) Complete() (CompletedConfig, error)
type KafkaManager ¶
type KafkaManager struct { Config CompletedConfig Source string Protocol *confluent.Protocol Client cloudevents.Client Errors <-chan error Logger *log.Helper }
func New ¶
func New(config CompletedConfig, source string, logger *log.Helper) (*KafkaManager, error)
func (*KafkaManager) Errs ¶
func (m *KafkaManager) Errs() <-chan error
type Options ¶
type Options struct { DefaultTopic string `mapstructure:"default-topic"` BuiltInFeatures string `mapstructure:"builtin-features"` ClientId string `mapstructure:"client-id"` //MetadataBrokerList string `mapstructure:"metadata-broker-list"` BootstrapServers string `mapstructure:"bootstrap-servers"` MessageMaxBytes int `mapstructure:"message-max-bytes"` MessageCopyMaxBytes int `mapstructure:"message-copy-max-bytes"` ReceiveMessageMaxBytes int `mapstructure:"receive-message-max-bytes"` MaxInFlightRequestsPerConnection int `mapstructure:"max-in-flight-requests-per-connection"` MaxInFlight int `mapstructure:"max-in-flight"` TopicMetadataRefreshIntervalMs int `mapstructure:"topic-metadata-refresh-interval-ms"` MetadataMaxAgeMs int `mapstructure:"metadata-max-age-ms"` TopicMetadataRefreshFastIntervalMs int `mapstructure:"topic-metadata-refresh-fast-interval-ms"` TopicMetadataRefreshSparse bool `mapstructure:"topic-metadata-refresh-sparse"` TopicMetadataPropagationMaxMs int `mapstructure:"topic-metadata-propagation-max-ms"` TopicBlacklist string `mapstructure:"topic-blacklist"` Debug string `mapstructure:"debug"` SocketTimeoutMs int `mapstructure:"socket-timeout-ms"` SocketSendBufferBytes int `mapstructure:"socket-send-buffer-bytes"` SocketReceiveBufferBytes int `mapstructure:"socket-receive-buffer-bytes"` SocketKeepAliveEnable bool `mapstructure:"socket-keepalive-enable"` SocketNagleDisable bool `mapstructure:"socket-nagle-disable"` SocketMaxFails int `mapstructure:"socket-max-fails"` BrokerAddressTtl int `mapstructure:"broker-address-ttl"` BrokerAddressFamily string `mapstructure:"broker-address-family"` SocketConnectionSetupTimeoutMs int `mapstructure:"socket-connection-setup-timeout-ms"` ConnectionsMaxIdleMs int `mapstructure:"connections-max-idle-ms"` ReconnectBackoffMs int `mapstructure:"reconnect-backoff-ms"` ReconnectBackoffMaxMs int `mapstructure:"reconnect-backoff-max-ms"` StatisticsIntervalMs int `mapstructure:"statistics-interval-ms"` EnabledEvents int `mapstructure:"enabled-events"` LogLevel int `mapstructure:"log-level"` LogQueue bool `mapstructure:"log-queue"` LogThreadName bool `mapstructure:"log-thread-name"` EnableRandomSeed bool `mapstructure:"enabled-random-seed"` LogConnectionClose bool `mapstructure:"log-connection-close"` InternalTerminationSignal int `mapstructure:"internal-termination-signal"` ApiVersionRequest bool `mapstructure:"api-version-request"` ApiVersionRequestTimeoutMs int `mapstructure:"api-version-request-timeout-ms"` ApiVersionVersionFallbackMs int `mapstructure:"api-version-version-fallback-ms"` BrokerVersionFallback string `mapstructure:"broker-version-fallback"` AllowAutoCreateTopics bool `mapstructure:"allow-auto-create-topics"` SecurityProtocol string `mapstructure:"security-protocol"` SslCipherSuites string `mapstructure:"ssl-cipher-suites"` SslCurvesList string `mapstructure:"ssl-curves-list"` SslSigAlgsList string `mapstructure:"ssl-sigalgs-list"` SslKeyLocation string `mapstructure:"ssl-key-location"` SslKeyPassword string `mapstructure:"ssl-key-password"` SslKeyPem string `mapstructure:"ssl-key-pem"` SslCertificateLocation string `mapstructure:"ssl-certificate-location"` SslCertificatePem string `mapstructure:"ssl-certificate-pem"` SslCaLocation string `mapstructure:"ssl-ca-location"` SslCaPem string `mapstructure:"ssl-ca-pem"` SslCrlLocation string `mapstructure:"ssl-crl-location"` SslKeystoreLocation string `mapstructure:"ssl-keystore-location"` SslKeystorePassword string `mapstructure:"ssl-keystore-password"` SslProviders string `mapstructure:"ssl-providers"` SslEngineId string `mapstructure:"ssl-engine-id"` EnableSslCertificateVerification bool `mapstructure:"enable-ssl-certificate-verification"` SslEndpointIdentificationAlgorithm string `mapstructure:"ssl-endpoint-identification-algorithm"` // SaslMechanisms string `mapstructure:"sasl-mechanisms"` SaslMechanism string `mapstructure:"sasl-mechanism"` SaslKerberosServiceName string `mapstructure:"sasl-kerberos-service-name"` SaslKerberosPrincipal string `mapstructure:"sasl-kerberos-principal"` SaslKerberosKinitCmd string `mapstructure:"sasl-kerberos-kinit-cmd"` SaslKerberosKeytab string `mapstructure:"sasl-kerberos-keytab"` SaslKerberosMinTimeBeforeRelogin int `mapstructure:"sasl-kerberos-min-time-before-relogin"` SaslUsername string `mapstructure:"sasl-username"` SaslPassword string `mapstructure:"sasl-password"` SaslOauthBearerConfig string `mapstructure:"sasl-oauthbearer-config"` EnableSaslOauthBearerUnsecureJwt bool `mapstructure:"enable-sasl-oauthbearer-unsecure-jwt"` SaslOauthBearerMethod string `mapstructure:"sasl-oauthbearer-method"` SaslOauthBearerClientId string `mapstructure:"sasl-oauthbearer-client-id"` SaslOauthBearerClientSecret string `mapstructure:"sasl-oauthbearer-client-secret"` SaslOauthBearerScope string `mapstructure:"sasl-oauthbearer-scope"` SaslOauthBearerExtensions string `mapstructure:"sasl-oauthbearer-extensions"` SaslOauthBearerTokenEndpointUrl string `mapstructure:"sasl-oauthbearer-token-endpoint-url"` PluginLibraryPaths string `mapstructure:"plugin-library-paths"` ClientDnsLookup string `mapstructure:"client-dns-lookup"` EnableMetricsPush bool `mapstructure:"enable-metrics-push"` ClientRack string `mapstructure:"client-rack"` RetryBackoffMs int `mapstructure:"retry-backoff-ms"` RetryBackoffMaxMs int `mapstructure:"retry-backoff-max-ms"` }
func NewOptions ¶
func NewOptions() *Options
Click to show internal directories.
Click to hide internal directories.