kafka

package
v0.8.2-beta.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2024 License: AGPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxBulkSubCount is the default max bulk count for kafka pubsub component
	// if the MaxBulkCountKey is not set in the metadata.
	DefaultMaxBulkSubCount = 80
	// DefaultMaxBulkSubAwaitDurationMs is the default max bulk await duration for kafka pubsub component
	// if the MaxBulkAwaitDurationKey is not set in the metadata.
	DefaultMaxBulkSubAwaitDurationMs = 10000
)

Variables

Functions

func NewManager

func NewManager(properties engines.Properties) (engines.DBManager, error)

Types

type EventHandler

type EventHandler func(ctx context.Context, msg *NewEvent) error

EventHandler is the handler used to handle the subscribed event.

type Kafka

type Kafka struct {
	Producer sarama.SyncProducer

	// The default value should be true for kafka pubsub component and false for kafka binding component
	// This default value can be overridden by metadata consumeRetryEnabled
	DefaultConsumeRetryEnabled bool
	// contains filtered or unexported fields
}

Kafka allows reading/writing to a Kafka consumer group.

func NewKafka

func NewKafka(logger logr.Logger) *Kafka

func (*Kafka) AddTopicHandler

func (k *Kafka) AddTopicHandler(topic string, handlerConfig SubscriptionHandlerConfig)

AddTopicHandler adds a handler and configuration for a topic

func (*Kafka) BrokerClose

func (k *Kafka) BrokerClose()

func (*Kafka) BrokerCreateTopics

func (k *Kafka) BrokerCreateTopics(topic string) error

func (*Kafka) BrokerOpen

func (k *Kafka) BrokerOpen() error

func (*Kafka) Close

func (k *Kafka) Close() (err error)

func (*Kafka) GetTopicHandlerConfig

func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error)

GetTopicHandlerConfig returns the handlerConfig for a topic

func (*Kafka) Init

func (k *Kafka) Init(_ context.Context, metadata map[string]string) error

Init does metadata parsing and connection establishment.

func (*Kafka) Publish

func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error

Publish message to Kafka cluster.

func (*Kafka) RemoveTopicHandler

func (k *Kafka) RemoveTopicHandler(topic string)

RemoveTopicHandler removes a topic handler

func (*Kafka) Subscribe

func (k *Kafka) Subscribe(ctx context.Context) error

Subscribe to topic in the Kafka cluster, in a background goroutine

type KafkaBulkMessage

type KafkaBulkMessage struct {
	Entries  []KafkaBulkMessageEntry `json:"entries"`
	Topic    string                  `json:"topic"`
	Metadata map[string]string       `json:"metadata"`
}

KafkaBulkMessage is a bulk event arriving from a message bus instance.

type KafkaBulkMessageEntry

type KafkaBulkMessageEntry struct {
	EntryID     string            `json:"entryId"` //nolint:stylecheck
	Event       []byte            `json:"event"`
	ContentType string            `json:"contentType,omitempty"`
	Metadata    map[string]string `json:"metadata"`
}

KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.

type Manager

type Manager struct {
	engines.DBManagerBase
	Kafka *Kafka
	// contains filtered or unexported fields
}

func (*Manager) IsCurrentMemberHealthy

func (mgr *Manager) IsCurrentMemberHealthy(ctx context.Context, cluster *dcs.Cluster) bool

type NewEvent

type NewEvent struct {
	Data        []byte            `json:"data"`
	Topic       string            `json:"topic"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

NewEvent is an event arriving from a message bus instance.

type OAuthTokenSource

type OAuthTokenSource struct {
	CachedToken   oauth2.Token
	Extensions    map[string]string
	TokenEndpoint oauth2.Endpoint
	ClientID      string
	ClientSecret  string
	Scopes        []string
	// contains filtered or unexported fields
}

func (*OAuthTokenSource) Token

func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error)

type SaramaLogBridge

type SaramaLogBridge struct {
	// contains filtered or unexported fields
}

func (SaramaLogBridge) Print

func (b SaramaLogBridge) Print(v ...interface{})

func (SaramaLogBridge) Printf

func (b SaramaLogBridge) Printf(format string, v ...interface{})

func (SaramaLogBridge) Println

func (b SaramaLogBridge) Println(v ...interface{})

type SubscriptionHandlerConfig

type SubscriptionHandlerConfig struct {
	IsBulkSubscribe bool
	Handler         EventHandler
}

SubscriptionHandlerConfig is the handler and configuration for subscription.

type TopicHandlerConfig

type TopicHandlerConfig map[string]SubscriptionHandlerConfig

TopicHandlerConfig is the map of topics and sruct containing handler and their config.

func (TopicHandlerConfig) TopicList

func (tbh TopicHandlerConfig) TopicList() []string

TopicList returns the list of topics

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL