kafka

package
v1.8.0-rc.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2022 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventHandler

type EventHandler func(ctx context.Context, msg *NewEvent) error

EventHandler is the handler used to handle the subscribed event.

type Kafka

type Kafka struct {

	// The default value should be true for kafka pubsub component and false for kafka binding component
	// This default value can be overridden by metadata consumeRetryEnabled
	DefaultConsumeRetryEnabled bool
	// contains filtered or unexported fields
}

Kafka allows reading/writing to a Kafka consumer group.

func NewKafka

func NewKafka(logger logger.Logger) *Kafka

func (*Kafka) AddTopicHandler

func (k *Kafka) AddTopicHandler(topic string, handler EventHandler)

AddTopicHandler adds a topic handler

func (*Kafka) Close

func (k *Kafka) Close() (err error)

func (*Kafka) GetTopicHandler

func (k *Kafka) GetTopicHandler(topic string) (EventHandler, error)

GetTopicHandler returns the handler for a topic

func (*Kafka) Init

func (k *Kafka) Init(metadata map[string]string) error

Init does metadata parsing and connection establishment.

func (*Kafka) Publish

func (k *Kafka) Publish(topic string, data []byte, metadata map[string]string) error

Publish message to Kafka cluster.

func (*Kafka) RemoveTopicHandler

func (k *Kafka) RemoveTopicHandler(topic string)

RemoveTopicHandler removes a topic handler

func (*Kafka) Subscribe

func (k *Kafka) Subscribe(ctx context.Context) error

Subscribe to topic in the Kafka cluster, in a background goroutine

type NewEvent

type NewEvent struct {
	Data        []byte            `json:"data"`
	Topic       string            `json:"topic"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

NewEvent is an event arriving from a message bus instance.

type OAuthTokenSource

type OAuthTokenSource struct {
	CachedToken   oauth2.Token
	Extensions    map[string]string
	TokenEndpoint oauth2.Endpoint
	ClientID      string
	ClientSecret  string
	Scopes        []string
	// contains filtered or unexported fields
}

func (*OAuthTokenSource) Token

func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error)

type SaramaLogBridge

type SaramaLogBridge struct {
	// contains filtered or unexported fields
}

func (SaramaLogBridge) Print

func (b SaramaLogBridge) Print(v ...interface{})

func (SaramaLogBridge) Printf

func (b SaramaLogBridge) Printf(format string, v ...interface{})

func (SaramaLogBridge) Println

func (b SaramaLogBridge) Println(v ...interface{})

type TopicHandlers

type TopicHandlers map[string]EventHandler

Map of topics and their handlers

func (TopicHandlers) TopicList

func (th TopicHandlers) TopicList() []string

TopicList returns the list of topics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL