Documentation
¶
Index ¶
- func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string, ...) error
- func MakeAdminClient(ctx context.Context, env *KafkaEnvConfig) (sarama.ClusterAdmin, error)
- func NewConfigFromEnv(ctx context.Context) ([]string, *sarama.Config, error)
- func NewConfigFromSpec(ctx context.Context, kc kubernetes.Interface, obj *sourcesv1beta1.KafkaSource) ([]string, *sarama.Config, error)
- func NewConfigWithEnv(ctx context.Context, env *KafkaEnvConfig) ([]string, *sarama.Config, error)
- func NewProducer(ctx context.Context) (sarama.Client, error)
- type AdapterNet
- type AdapterSASL
- type AdapterTLS
- type KafkaConfig
- type KafkaEnvConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitOffsets ¶ added in v0.24.0
func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string, consumerGroup string) error
We want to make sure that ALL consumer group offsets are set before marking the source as ready, to avoid "losing" events in case the consumer group session is closed before at least one message is consumed from ALL partitions. Without InitOffsets, an event sent to a partition with an uninitialized offset will not be forwarded when the session is closed (or a rebalancing is in progress).
func MakeAdminClient ¶ added in v0.22.4
func MakeAdminClient(ctx context.Context, env *KafkaEnvConfig) (sarama.ClusterAdmin, error)
NewProducer is a helper method for constructing an admin client
func NewConfigFromEnv ¶
NewConfig extracts the Kafka configuration from the environment.
func NewConfigFromSpec ¶ added in v0.24.0
func NewConfigFromSpec(ctx context.Context, kc kubernetes.Interface, obj *sourcesv1beta1.KafkaSource) ([]string, *sarama.Config, error)
NewConfig extracts the Kafka configuration from a KafkaSource spec.
func NewConfigWithEnv ¶
NewConfig extracts the Kafka configuration from the environment.
Types ¶
type AdapterNet ¶
type AdapterNet struct { SASL AdapterSASL TLS AdapterTLS }
type AdapterSASL ¶
type AdapterTLS ¶
type KafkaConfig ¶ added in v0.24.0
type KafkaConfig struct {
SaramaYamlString string
}
type KafkaEnvConfig ¶
type KafkaEnvConfig struct { // KafkaConfigJson is the environment variable that's passed to adapter by the controller. // It contains configuration from the Kafka configmap. KafkaConfigJson string `envconfig:"K_KAFKA_CONFIG"` BootstrapServers []string `envconfig:"KAFKA_BOOTSTRAP_SERVERS" required:"true"` Net AdapterNet }
func NewEnvConfigFromSpec ¶ added in v0.24.0
func NewEnvConfigFromSpec(ctx context.Context, kc kubernetes.Interface, obj *sourcesv1beta1.KafkaSource) (KafkaEnvConfig, error)
NewEnvConfigFromSpec validates and creates a KafkaEnvConfig from a KafkaSource