kafka

package
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2024 License: MIT Imports: 12 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultKeyKafkaTopicsConfig = "KAFKA_TOPICS_CONFIG"
)

Variables

This section is empty.

Functions

func CreateAzureEventHubConfig added in v0.7.1

func CreateAzureEventHubConfig() *sarama.Config

func NewSha256ScramClient

func NewSha256ScramClient() sarama.SCRAMClient

func NewSha512ScramClient

func NewSha512ScramClient() sarama.SCRAMClient

func ParseTopicConfigs added in v0.7.0

func ParseTopicConfigs(jsonString string) (map[string]TopicConfig, error)

Types

type Config added in v0.7.0

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig added in v0.7.0

func NewConfig() *Config

func (*Config) ConfigItems added in v0.7.0

func (c *Config) ConfigItems() []auconfigapi.ConfigItem

func (*Config) Obtain added in v0.7.0

func (c *Config) Obtain(getter func(key string) string)

func (*Config) TopicConfigs added in v0.7.0

func (c *Config) TopicConfigs() map[string]TopicConfig

type Consumer

type Consumer[E any] struct {
	// contains filtered or unexported fields
}

func CreateConsumer added in v0.7.0

func CreateConsumer[E any](
	ctx context.Context,
	topicConfig TopicConfig,
	receiveCallback func(context.Context, *string, *E, time.Time) error,
	configPreset *sarama.Config,
) (*Consumer[E], error)

func (*Consumer[E]) Close

func (c *Consumer[E]) Close(ctx context.Context)

type DefaultConfigImpl

type DefaultConfigImpl struct {
	// contains filtered or unexported fields
}

func NewDefaultConfig added in v0.7.0

func NewDefaultConfig() *DefaultConfigImpl

func (*DefaultConfigImpl) ConfigItems added in v0.7.0

func (c *DefaultConfigImpl) ConfigItems() []auconfigapi.ConfigItem

func (*DefaultConfigImpl) ObtainValues added in v0.7.0

func (c *DefaultConfigImpl) ObtainValues(getter func(string) string) error

func (*DefaultConfigImpl) TopicConfigs

func (c *DefaultConfigImpl) TopicConfigs(key string) (TopicConfig, bool)

type SyncProducer

type SyncProducer[V any] struct {
	// contains filtered or unexported fields
}

func CreateSyncProducer added in v0.7.0

func CreateSyncProducer[V any](
	_ context.Context,
	topicConfig TopicConfig,
	configPreset *sarama.Config,
) (*SyncProducer[V], error)

func (*SyncProducer[E]) Close

func (p *SyncProducer[E]) Close(ctx context.Context)

func (*SyncProducer[V]) Produce

func (p *SyncProducer[V]) Produce(
	_ context.Context,
	key *string,
	value *V,
) error

type TopicConfig

type TopicConfig struct {
	Topic         string
	Brokers       []string
	Username      string
	Password      string
	ConsumerGroup *string
	AuthType      sarama.SASLMechanism
}

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL