Documentation ¶
Index ¶
- Constants
- Variables
- func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string
- type BulkEventHandler
- type EventHandler
- type Kafka
- func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, ...) (pubsub.BulkPublishResponse, error)
- func (k *Kafka) Close() error
- func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error)
- func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error)
- func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error
- func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error
- func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error)
- func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandlerConfig, topics ...string)
- type KafkaBulkMessage
- type KafkaBulkMessageEntry
- type KafkaMetadata
- type NewEvent
- type OAuthTokenSource
- type SaramaLogBridge
- type SchemaCacheEntry
- type SchemaType
- type SubscriptionHandlerConfig
- type TopicHandlerConfig
- type XDGSCRAMClient
Constants ¶
const ( // DefaultMaxBulkSubCount is the default max bulk count for kafka pubsub component // if the MaxBulkCountKey is not set in the metadata. DefaultMaxBulkSubCount = 80 // DefaultMaxBulkSubAwaitDurationMs is the default max bulk await duration for kafka pubsub component // if the MaxBulkAwaitDurationKey is not set in the metadata. DefaultMaxBulkSubAwaitDurationMs = 10000 )
Variables ¶
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
func GetEventMetadata ¶
func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string
Types ¶
type BulkEventHandler ¶
type BulkEventHandler func(ctx context.Context, msg *KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error)
BulkEventHandler is the handler used to handle the subscribed bulk event.
type EventHandler ¶
EventHandler is the handler used to handle the subscribed event.
type Kafka ¶
type Kafka struct { // The default value should be true for kafka pubsub component and false for kafka binding component // This default value can be overridden by metadata consumeRetryEnabled DefaultConsumeRetryEnabled bool // contains filtered or unexported fields }
Kafka allows reading/writing to a Kafka consumer group.
func (*Kafka) BulkPublish ¶
func (*Kafka) DeserializeValue ¶
func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error)
func (*Kafka) GetTopicHandlerConfig ¶
func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error)
GetTopicBulkHandler returns the handlerConfig for a topic
func (*Kafka) Publish ¶
func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error
Publish message to Kafka cluster.
func (*Kafka) SerializeValue ¶
type KafkaBulkMessage ¶
type KafkaBulkMessage struct { Entries []KafkaBulkMessageEntry `json:"entries"` Topic string `json:"topic"` Metadata map[string]string `json:"metadata"` }
KafkaBulkMessage is a bulk event arriving from a message bus instance.
type KafkaBulkMessageEntry ¶
type KafkaBulkMessageEntry struct { EntryId string `json:"entryId"` //nolint:stylecheck Event []byte `json:"event"` ContentType string `json:"contentType,omitempty"` Metadata map[string]string `json:"metadata"` }
KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.
type KafkaMetadata ¶
type KafkaMetadata struct { Brokers string `mapstructure:"brokers"` ConsumerGroup string `mapstructure:"consumerGroup"` ClientID string `mapstructure:"clientId"` AuthType string `mapstructure:"authType"` SaslUsername string `mapstructure:"saslUsername"` SaslPassword string `mapstructure:"saslPassword"` SaslMechanism string `mapstructure:"saslMechanism"` InitialOffset string `mapstructure:"initialOffset"` MaxMessageBytes int `mapstructure:"maxMessageBytes"` OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"` OidcClientID string `mapstructure:"oidcClientID"` OidcClientSecret string `mapstructure:"oidcClientSecret"` OidcScopes string `mapstructure:"oidcScopes"` OidcExtensions string `mapstructure:"oidcExtensions"` TLSDisable bool `mapstructure:"disableTls"` TLSSkipVerify bool `mapstructure:"skipVerify"` TLSCaCert string `mapstructure:"caCert"` TLSClientCert string `mapstructure:"clientCert"` TLSClientKey string `mapstructure:"clientKey"` ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"` ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"` HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"` SessionTimeout time.Duration `mapstructure:"sessionTimeout"` Version string `mapstructure:"version"` // configs for kafka client ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"` ClientConnectionKeepAliveInterval time.Duration `mapstructure:"clientConnectionKeepAliveInterval"` // aws iam auth profile AWSAccessKey string `mapstructure:"awsAccessKey"` AWSSecretKey string `mapstructure:"awsSecretKey"` AWSSessionToken string `mapstructure:"awsSessionToken"` AWSIamRoleArn string `mapstructure:"awsIamRoleArn"` AWSStsSessionName string `mapstructure:"awsStsSessionName"` AWSRegion string `mapstructure:"awsRegion"` // schema registry SchemaRegistryURL string `mapstructure:"schemaRegistryURL"` SchemaRegistryAPIKey string `mapstructure:"schemaRegistryAPIKey"` SchemaRegistryAPISecret string `mapstructure:"schemaRegistryAPISecret"` SchemaCachingEnabled bool `mapstructure:"schemaCachingEnabled"` SchemaLatestVersionCacheTTL time.Duration `mapstructure:"schemaLatestVersionCacheTTL"` // contains filtered or unexported fields }
type NewEvent ¶
type NewEvent struct { Data []byte `json:"data"` Topic string `json:"topic"` Metadata map[string]string `json:"metadata"` ContentType *string `json:"contentType,omitempty"` }
NewEvent is an event arriving from a message bus instance.
type OAuthTokenSource ¶
type OAuthTokenSource struct { CachedToken oauth2.Token Extensions map[string]string TokenEndpoint oauth2.Endpoint ClientID string ClientSecret string Scopes []string // contains filtered or unexported fields }
func (*OAuthTokenSource) Token ¶
func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error)
type SaramaLogBridge ¶
type SaramaLogBridge struct {
// contains filtered or unexported fields
}
func (SaramaLogBridge) Print ¶
func (b SaramaLogBridge) Print(v ...interface{})
func (SaramaLogBridge) Printf ¶
func (b SaramaLogBridge) Printf(format string, v ...interface{})
func (SaramaLogBridge) Println ¶
func (b SaramaLogBridge) Println(v ...interface{})
type SchemaCacheEntry ¶
type SchemaCacheEntry struct {
// contains filtered or unexported fields
}
type SchemaType ¶
type SchemaType int
const ( None SchemaType = iota Avro )
func GetValueSchemaType ¶
func GetValueSchemaType(metadata map[string]string) (SchemaType, error)
type SubscriptionHandlerConfig ¶
type SubscriptionHandlerConfig struct { IsBulkSubscribe bool SubscribeConfig pubsub.BulkSubscribeConfig BulkHandler BulkEventHandler Handler EventHandler ValueSchemaType SchemaType }
SubscriptionHandlerConfig is the handler and configuration for subscription.
type TopicHandlerConfig ¶
type TopicHandlerConfig map[string]SubscriptionHandlerConfig
TopicHandlerConfig is the map of topics and sruct containing handler and their config.
func (TopicHandlerConfig) TopicList ¶
func (tbh TopicHandlerConfig) TopicList() []string
// TopicList returns the list of topics
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool