Documentation ¶
Index ¶
- Constants
- Variables
- func NewManager(properties engines.Properties) (engines.DBManager, error)
- type EventHandler
- type Kafka
- func (k *Kafka) AddTopicHandler(topic string, handlerConfig SubscriptionHandlerConfig)
- func (k *Kafka) BrokerClose()
- func (k *Kafka) BrokerCreateTopics(topic string) error
- func (k *Kafka) BrokerOpen() error
- func (k *Kafka) Close() (err error)
- func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error)
- func (k *Kafka) Init(_ context.Context, metadata map[string]string) error
- func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error
- func (k *Kafka) RemoveTopicHandler(topic string)
- func (k *Kafka) Subscribe(ctx context.Context) error
- type KafkaBulkMessage
- type KafkaBulkMessageEntry
- type Manager
- type NewEvent
- type OAuthTokenSource
- type SaramaLogBridge
- type SubscriptionHandlerConfig
- type TopicHandlerConfig
- type XDGSCRAMClient
Constants ¶
const ( // DefaultMaxBulkSubCount is the default max bulk count for kafka pubsub component // if the MaxBulkCountKey is not set in the metadata. DefaultMaxBulkSubCount = 80 // DefaultMaxBulkSubAwaitDurationMs is the default max bulk await duration for kafka pubsub component // if the MaxBulkAwaitDurationKey is not set in the metadata. DefaultMaxBulkSubAwaitDurationMs = 10000 )
Variables ¶
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
func NewManager ¶
func NewManager(properties engines.Properties) (engines.DBManager, error)
Types ¶
type EventHandler ¶
EventHandler is the handler used to handle the subscribed event.
type Kafka ¶
type Kafka struct { Producer sarama.SyncProducer // The default value should be true for kafka pubsub component and false for kafka binding component // This default value can be overridden by metadata consumeRetryEnabled DefaultConsumeRetryEnabled bool // contains filtered or unexported fields }
Kafka allows reading/writing to a Kafka consumer group.
func (*Kafka) AddTopicHandler ¶
func (k *Kafka) AddTopicHandler(topic string, handlerConfig SubscriptionHandlerConfig)
AddTopicHandler adds a handler and configuration for a topic
func (*Kafka) BrokerClose ¶
func (k *Kafka) BrokerClose()
func (*Kafka) BrokerCreateTopics ¶
func (*Kafka) BrokerOpen ¶
func (*Kafka) GetTopicHandlerConfig ¶
func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error)
GetTopicHandlerConfig returns the handlerConfig for a topic
func (*Kafka) Publish ¶
func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error
Publish message to Kafka cluster.
func (*Kafka) RemoveTopicHandler ¶
RemoveTopicHandler removes a topic handler
type KafkaBulkMessage ¶
type KafkaBulkMessage struct { Entries []KafkaBulkMessageEntry `json:"entries"` Topic string `json:"topic"` Metadata map[string]string `json:"metadata"` }
KafkaBulkMessage is a bulk event arriving from a message bus instance.
type KafkaBulkMessageEntry ¶
type KafkaBulkMessageEntry struct { EntryID string `json:"entryId"` //nolint:stylecheck Event []byte `json:"event"` ContentType string `json:"contentType,omitempty"` Metadata map[string]string `json:"metadata"` }
KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.
type Manager ¶
type Manager struct { engines.DBManagerBase Kafka *Kafka // contains filtered or unexported fields }
type NewEvent ¶
type NewEvent struct { Data []byte `json:"data"` Topic string `json:"topic"` Metadata map[string]string `json:"metadata"` ContentType *string `json:"contentType,omitempty"` }
NewEvent is an event arriving from a message bus instance.
type OAuthTokenSource ¶
type OAuthTokenSource struct { CachedToken oauth2.Token Extensions map[string]string TokenEndpoint oauth2.Endpoint ClientID string ClientSecret string Scopes []string // contains filtered or unexported fields }
func (*OAuthTokenSource) Token ¶
func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error)
type SaramaLogBridge ¶
type SaramaLogBridge struct {
// contains filtered or unexported fields
}
func (SaramaLogBridge) Print ¶
func (b SaramaLogBridge) Print(v ...interface{})
func (SaramaLogBridge) Printf ¶
func (b SaramaLogBridge) Printf(format string, v ...interface{})
func (SaramaLogBridge) Println ¶
func (b SaramaLogBridge) Println(v ...interface{})
type SubscriptionHandlerConfig ¶
type SubscriptionHandlerConfig struct { IsBulkSubscribe bool Handler EventHandler }
SubscriptionHandlerConfig is the handler and configuration for subscription.
type TopicHandlerConfig ¶
type TopicHandlerConfig map[string]SubscriptionHandlerConfig
TopicHandlerConfig is the map of topics and sruct containing handler and their config.
func (TopicHandlerConfig) TopicList ¶
func (tbh TopicHandlerConfig) TopicList() []string
TopicList returns the list of topics
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool