Documentation ¶
Index ¶
Constants ¶
View Source
const ( KafkaPartitionKey = attribute.Key("messaging.kafka.partition") KafkaConsumerGroupKey = attribute.Key("messaging.kafka.consumer_group") )
Variables ¶
This section is empty.
Functions ¶
func NewKafkaListener ¶
Types ¶
type KafkaConsumeMessage ¶
type KafkaConsumeMessage struct { *KafkaMessageCarrier // contains filtered or unexported fields }
func (*KafkaConsumeMessage) Commit ¶
func (k *KafkaConsumeMessage) Commit(ctx context.Context) error
func (*KafkaConsumeMessage) GetEventConsumeMessage ¶
func (k *KafkaConsumeMessage) GetEventConsumeMessage(ctx context.Context) (*event.EventConsumeMessage, error)
type KafkaIterator ¶
type KafkaIterator struct {
// contains filtered or unexported fields
}
func (*KafkaIterator) Close ¶
func (k *KafkaIterator) Close() error
func (*KafkaIterator) Next ¶
func (k *KafkaIterator) Next(ctx context.Context) (event.ConsumeMessage, error)
type KafkaListener ¶
type KafkaListener struct { Brokers []string `json:"brokers" mapstructure:"brokers"` KeyFile string `json:"key_file" mapstructure:"key_file"` CertFile string `json:"cert_file" mapstructure:"cert_file"` CACertificate string `json:"ca_cert" mapstructure:"ca_cert"` AuthType string `json:"auth_type" mapstructure:"auth_type"` Username string `json:"username" mapstructure:"username"` Password string `json:"password" mapstructure:"password"` // PrintLogLevel will set default log level for all log message other than error message // available level are: panic, fatal, error, warning, info, debug, discard // discard level will discard any of the log message received // default: debug PrintLogLevel string `json:"print_log_level" mapstructure:"print_log_level"` // ErrorLogLevel will set default log level for error message log // available level are: panic, fatal, error, warning, info, debug, discard // discard level will discard any of the log message received // default: error ErrorLogLevel string `json:"error_log_level" mapstructure:"error_log_level"` QueueCapacity int `json:"queue_capacity" mapstructure:"queue_capacity"` MinBytes int `json:"min_bytes" mapstructure:"min_bytes"` MaxBytes int `json:"max_bytes" mapstructure:"max_bytes"` MaxWait string `json:"max_wait" mapstructure:"max_wait"` MaxAttempts int `json:"max_attempts" mapstructure:"max_attempts"` ReadlagInterval string `json:"read_lag_interval" mapstructure:"read_lag_interval"` HeartbeatInterval string `json:"heartbeat_interval" mapstructure:"heartbeat_interval"` CommitInterval string `json:"commit_interval" mapstructure:"commit_interval"` PartitionWatchInterval string `json:"partition_watch_interval" mapstructure:"partition_watch_interval"` WatchPartitionChanges bool `json:"watch_partition_changes" mapstructure:"watch_partition_changes"` SessionTimeout string `json:"session_timeout" mapstructure:"session_timeout"` RebalanceTimeout string `json:"rebalance_timeout" mapstructure:"rebalance_timeout"` JoinGroupBackoff string `json:"join_group_backoff" mapstructure:"join_group_backoff"` RetentionTime string `json:"retention_time" mapstructure:"retention_time"` StartOffset int64 `json:"start_offset" mapstructure:"start_offset"` ReadBackoffMin string `json:"read_backoff_min" mapstructure:"read_backoff_min"` ReadBackoffMax string `json:"read_backoff_max" mapstructure:"read_backoff_max"` }
type KafkaMessageCarrier ¶
type KafkaMessageCarrier struct {
*kafka.Message
}
func (KafkaMessageCarrier) Get ¶
func (k KafkaMessageCarrier) Get(key string) string
Get retrieves a single value for a given key.
func (KafkaMessageCarrier) Keys ¶
func (k KafkaMessageCarrier) Keys() []string
Keys returns a slice of all key identifiers in the carrier.
func (KafkaMessageCarrier) Set ¶
func (k KafkaMessageCarrier) Set(key, val string)
Set sets a header.
type KafkaSender ¶
type KafkaSender struct { Brokers []string `json:"brokers" mapstructure:"brokers"` BatchSize int `json:"batch_size" mapstructure:"batch_size"` BatchTimeout string `json:"batch_timeout" mapstructure:"batch_timeout"` KeyFile string `json:"key_file" mapstructure:"key_file"` CertFile string `json:"cert_file" mapstructure:"cert_file"` CACertificate string `json:"ca_cert" mapstructure:"ca_cert"` AuthType string `json:"auth_type" mapstructure:"auth_type"` Username string `json:"username" mapstructure:"username"` Password string `json:"password" mapstructure:"password"` MaxAttempts int `json:"max_attempts" mapstructure:"max_attempts"` Balancer string `json:"balancer" mapstructure:"balancer"` BalancerConfig map[string]interface{} `json:"balancer_config" mapstructure:"balancer_config"` // PrintLogLevel will set default log level for all log message other than error message // available level are: panic, fatal, error, warning, info, debug, discard // discard level will discard any of the log message received // default: debug PrintLogLevel string `json:"print_log_level" mapstructure:"print_log_level"` // ErrorLogLevel will set default log level for error message log // available level are: panic, fatal, error, warning, info, debug, discard // discard level will discard any of the log message received // default: error ErrorLogLevel string `json:"error_log_level" mapstructure:"error_log_level"` // contains filtered or unexported fields }
func (*KafkaSender) Send ¶
func (k *KafkaSender) Send(ctx context.Context, message *event.EventMessage) error
Click to show internal directories.
Click to hide internal directories.