kafka

package
v1.14.0-rc.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxBulkSubCount is the default max bulk count for kafka pubsub component
	// if the MaxBulkCountKey is not set in the metadata.
	DefaultMaxBulkSubCount = 80
	// DefaultMaxBulkSubAwaitDurationMs is the default max bulk await duration for kafka pubsub component
	// if the MaxBulkAwaitDurationKey is not set in the metadata.
	DefaultMaxBulkSubAwaitDurationMs = 10000
)

Variables

Functions

func GetEventMetadata

func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string

Types

type BulkEventHandler

type BulkEventHandler func(ctx context.Context, msg *KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error)

BulkEventHandler is the handler used to handle the subscribed bulk event.

type EventHandler

type EventHandler func(ctx context.Context, msg *NewEvent) error

EventHandler is the handler used to handle the subscribed event.

type Kafka

type Kafka struct {

	// The default value should be true for kafka pubsub component and false for kafka binding component
	// This default value can be overridden by metadata consumeRetryEnabled
	DefaultConsumeRetryEnabled bool
	// contains filtered or unexported fields
}

Kafka allows reading/writing to a Kafka consumer group.

func NewKafka

func NewKafka(logger logger.Logger) *Kafka

func (*Kafka) BulkPublish

func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, metadata map[string]string) (pubsub.BulkPublishResponse, error)

func (*Kafka) Close

func (k *Kafka) Close() error

func (*Kafka) DeserializeValue

func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error)

func (*Kafka) GetTopicHandlerConfig

func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error)

GetTopicBulkHandler returns the handlerConfig for a topic

func (*Kafka) Init

func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error

Init does metadata parsing and connection establishment.

func (*Kafka) Publish

func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error

Publish message to Kafka cluster.

func (*Kafka) SerializeValue

func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error)

func (*Kafka) Subscribe

func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandlerConfig, topics ...string)

Subscribe adds a handler and configuration for a topic, and subscribes. Unsubscribes to the topic on context cancel.

type KafkaBulkMessage

type KafkaBulkMessage struct {
	Entries  []KafkaBulkMessageEntry `json:"entries"`
	Topic    string                  `json:"topic"`
	Metadata map[string]string       `json:"metadata"`
}

KafkaBulkMessage is a bulk event arriving from a message bus instance.

type KafkaBulkMessageEntry

type KafkaBulkMessageEntry struct {
	EntryId     string            `json:"entryId"` //nolint:stylecheck
	Event       []byte            `json:"event"`
	ContentType string            `json:"contentType,omitempty"`
	Metadata    map[string]string `json:"metadata"`
}

KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.

type KafkaMetadata

type KafkaMetadata struct {
	Brokers string `mapstructure:"brokers"`

	ConsumerGroup string `mapstructure:"consumerGroup"`
	ClientID      string `mapstructure:"clientId"`
	AuthType      string `mapstructure:"authType"`
	SaslUsername  string `mapstructure:"saslUsername"`
	SaslPassword  string `mapstructure:"saslPassword"`
	SaslMechanism string `mapstructure:"saslMechanism"`
	InitialOffset string `mapstructure:"initialOffset"`

	MaxMessageBytes   int    `mapstructure:"maxMessageBytes"`
	OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
	OidcClientID      string `mapstructure:"oidcClientID"`
	OidcClientSecret  string `mapstructure:"oidcClientSecret"`
	OidcScopes        string `mapstructure:"oidcScopes"`
	OidcExtensions    string `mapstructure:"oidcExtensions"`

	TLSDisable           bool          `mapstructure:"disableTls"`
	TLSSkipVerify        bool          `mapstructure:"skipVerify"`
	TLSCaCert            string        `mapstructure:"caCert"`
	TLSClientCert        string        `mapstructure:"clientCert"`
	TLSClientKey         string        `mapstructure:"clientKey"`
	ConsumeRetryEnabled  bool          `mapstructure:"consumeRetryEnabled"`
	ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
	HeartbeatInterval    time.Duration `mapstructure:"heartbeatInterval"`
	SessionTimeout       time.Duration `mapstructure:"sessionTimeout"`
	Version              string        `mapstructure:"version"`

	// configs for kafka client
	ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"`
	ClientConnectionKeepAliveInterval            time.Duration `mapstructure:"clientConnectionKeepAliveInterval"`

	// aws iam auth profile
	AWSAccessKey      string `mapstructure:"awsAccessKey"`
	AWSSecretKey      string `mapstructure:"awsSecretKey"`
	AWSSessionToken   string `mapstructure:"awsSessionToken"`
	AWSIamRoleArn     string `mapstructure:"awsIamRoleArn"`
	AWSStsSessionName string `mapstructure:"awsStsSessionName"`
	AWSRegion         string `mapstructure:"awsRegion"`

	// schema registry
	SchemaRegistryURL           string        `mapstructure:"schemaRegistryURL"`
	SchemaRegistryAPIKey        string        `mapstructure:"schemaRegistryAPIKey"`
	SchemaRegistryAPISecret     string        `mapstructure:"schemaRegistryAPISecret"`
	SchemaCachingEnabled        bool          `mapstructure:"schemaCachingEnabled"`
	SchemaLatestVersionCacheTTL time.Duration `mapstructure:"schemaLatestVersionCacheTTL"`
	// contains filtered or unexported fields
}

type NewEvent

type NewEvent struct {
	Data        []byte            `json:"data"`
	Topic       string            `json:"topic"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

NewEvent is an event arriving from a message bus instance.

type OAuthTokenSource

type OAuthTokenSource struct {
	CachedToken   oauth2.Token
	Extensions    map[string]string
	TokenEndpoint oauth2.Endpoint
	ClientID      string
	ClientSecret  string
	Scopes        []string
	// contains filtered or unexported fields
}

func (*OAuthTokenSource) Token

func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error)

type SaramaLogBridge

type SaramaLogBridge struct {
	// contains filtered or unexported fields
}

func (SaramaLogBridge) Print

func (b SaramaLogBridge) Print(v ...interface{})

func (SaramaLogBridge) Printf

func (b SaramaLogBridge) Printf(format string, v ...interface{})

func (SaramaLogBridge) Println

func (b SaramaLogBridge) Println(v ...interface{})

type SchemaCacheEntry

type SchemaCacheEntry struct {
	// contains filtered or unexported fields
}

type SchemaType

type SchemaType int
const (
	None SchemaType = iota
	Avro
)

func GetValueSchemaType

func GetValueSchemaType(metadata map[string]string) (SchemaType, error)

type SubscriptionHandlerConfig

type SubscriptionHandlerConfig struct {
	IsBulkSubscribe bool
	SubscribeConfig pubsub.BulkSubscribeConfig
	BulkHandler     BulkEventHandler
	Handler         EventHandler
	ValueSchemaType SchemaType
}

SubscriptionHandlerConfig is the handler and configuration for subscription.

type TopicHandlerConfig

type TopicHandlerConfig map[string]SubscriptionHandlerConfig

TopicHandlerConfig is the map of topics and sruct containing handler and their config.

func (TopicHandlerConfig) TopicList

func (tbh TopicHandlerConfig) TopicList() []string

// TopicList returns the list of topics

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Directories

Path Synopsis
Package mock_srclient is a generated GoMock package.
Package mock_srclient is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL