client

package
v0.24.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitOffsets added in v0.24.0

func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string, consumerGroup string) error

We want to make sure that ALL consumer group offsets are set before marking the source as ready, to avoid "losing" events in case the consumer group session is closed before at least one message is consumed from ALL partitions. Without InitOffsets, an event sent to a partition with an uninitialized offset will not be forwarded when the session is closed (or a rebalancing is in progress).

func MakeAdminClient added in v0.22.4

func MakeAdminClient(ctx context.Context, env *KafkaEnvConfig) (sarama.ClusterAdmin, error)

NewProducer is a helper method for constructing an admin client

func NewConfigFromEnv

func NewConfigFromEnv(ctx context.Context) ([]string, *sarama.Config, error)

NewConfig extracts the Kafka configuration from the environment.

func NewConfigFromSpec added in v0.24.0

func NewConfigFromSpec(ctx context.Context, kc kubernetes.Interface, obj *sourcesv1beta1.KafkaSource) ([]string, *sarama.Config, error)

NewConfig extracts the Kafka configuration from a KafkaSource spec.

func NewConfigWithEnv

func NewConfigWithEnv(ctx context.Context, env *KafkaEnvConfig) ([]string, *sarama.Config, error)

NewConfig extracts the Kafka configuration from the environment.

func NewProducer

func NewProducer(ctx context.Context) (sarama.Client, error)

NewProducer is a helper method for constructing a client for producing kafka methods.

Types

type AdapterNet

type AdapterNet struct {
	SASL AdapterSASL
	TLS  AdapterTLS
}

type AdapterSASL

type AdapterSASL struct {
	Enable   bool   `envconfig:"KAFKA_NET_SASL_ENABLE" required:"false"`
	User     string `envconfig:"KAFKA_NET_SASL_USER" required:"false"`
	Password string `envconfig:"KAFKA_NET_SASL_PASSWORD" required:"false"`
	Type     string `envconfig:"KAFKA_NET_SASL_TYPE" required:"false"`
}

type AdapterTLS

type AdapterTLS struct {
	Enable bool   `envconfig:"KAFKA_NET_TLS_ENABLE" required:"false"`
	Cert   string `envconfig:"KAFKA_NET_TLS_CERT" required:"false"`
	Key    string `envconfig:"KAFKA_NET_TLS_KEY" required:"false"`
	CACert string `envconfig:"KAFKA_NET_TLS_CA_CERT" required:"false"`
}

type KafkaConfig added in v0.24.0

type KafkaConfig struct {
	SaramaYamlString string
}

type KafkaEnvConfig

type KafkaEnvConfig struct {
	// KafkaConfigJson is the environment variable that's passed to adapter by the controller.
	// It contains configuration from the Kafka configmap.
	KafkaConfigJson  string   `envconfig:"K_KAFKA_CONFIG"`
	BootstrapServers []string `envconfig:"KAFKA_BOOTSTRAP_SERVERS" required:"true"`
	Net              AdapterNet
}

func NewEnvConfigFromSpec added in v0.24.0

func NewEnvConfigFromSpec(ctx context.Context, kc kubernetes.Interface, obj *sourcesv1beta1.KafkaSource) (KafkaEnvConfig, error)

NewEnvConfigFromSpec validates and creates a KafkaEnvConfig from a KafkaSource

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL