Documentation ¶
Index ¶
- Constants
- func GetBalancerString(balancer Balancer) string
- func NewBoolPtr(value bool) *bool
- func NewMetricMiddleware(cfg *ConsumerConfig, app *fiber.App, consumerMetric *ConsumerMetric, ...) (func(ctx *fiber.Ctx) error, error)
- type API
- type APIConfiguration
- type Balancer
- type BatchConfiguration
- type BatchConsumeFn
- type ConsumeFn
- type Consumer
- type ConsumerConfig
- type ConsumerMetric
- type DialConfig
- type Dialer
- type DistributedTracingConfiguration
- type Header
- type Headers
- type IncomingMessage
- type Layer
- type LogLevel
- type LoggerInterface
- type Mechanism
- type Message
- type MetricCollector
- type MetricConfiguration
- type OtelKafkaKonsumerWriter
- type PreBatchFn
- type Producer
- type ProducerConfig
- type ProducerInterceptor
- type ProducerInterceptorContext
- type Reader
- type ReaderConfig
- type RetryConfiguration
- type SASLConfig
- type SkipMessageByHeaderFn
- type TLSConfig
- type Transport
- type TransportConfig
- type Writer
- type WriterConfig
Constants ¶
View Source
const ( MechanismScram = "scram" MechanismPlain = "plain" )
View Source
const Name = "kafka_konsumer"
Variables ¶
This section is empty.
Functions ¶
func GetBalancerString ¶ added in v2.3.4
func NewBoolPtr ¶
func NewMetricMiddleware ¶
func NewMetricMiddleware(cfg *ConsumerConfig, app *fiber.App, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector, ) (func(ctx *fiber.Ctx) error, error)
Types ¶
type API ¶
type API interface { Start() Stop() }
func NewAPI ¶
func NewAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector) API
type APIConfiguration ¶
type Balancer ¶ added in v2.3.1
type Balancer kafka.Balancer
func GetBalancerCRC32 ¶ added in v2.3.1
func GetBalancerCRC32() Balancer
func GetBalancerHash ¶ added in v2.3.1
func GetBalancerHash() Balancer
func GetBalancerLeastBytes ¶ added in v2.3.1
func GetBalancerLeastBytes() Balancer
func GetBalancerMurmur2Balancer ¶ added in v2.3.1
func GetBalancerMurmur2Balancer() Balancer
func GetBalancerReferenceHash ¶ added in v2.3.1
func GetBalancerReferenceHash() Balancer
func GetBalancerRoundRobin ¶ added in v2.3.1
func GetBalancerRoundRobin() Balancer
type BatchConfiguration ¶
type BatchConfiguration struct { BatchConsumeFn BatchConsumeFn PreBatchFn PreBatchFn MessageGroupLimit int MessageGroupByteSizeLimit any }
func (*BatchConfiguration) JSON ¶ added in v2.3.4
func (cfg *BatchConfiguration) JSON() string
type BatchConsumeFn ¶
type Consumer ¶
type Consumer interface { // Consume starts consuming Consume() // Pause function pauses consumer, it is stop consuming new messages // It works idempotent under the hood // Calling with multiple goroutines is safe Pause() // Resume function resumes consumer, it is start to working // It works idempotent under the hood // Calling with multiple goroutines is safe Resume() // GetMetricCollectors for the purpose of making metric collectors available. // You can register these collectors on your own http server. // Please look at the examples/with-metric-collector directory. GetMetricCollectors() []prometheus.Collector // WithLogger for injecting custom log implementation WithLogger(logger LoggerInterface) // Stop for graceful shutdown. In order to avoid data loss, you have to call it! Stop() error }
func NewConsumer ¶
func NewConsumer(cfg *ConsumerConfig) (Consumer, error)
type ConsumerConfig ¶
type ConsumerConfig struct { DistributedTracingConfiguration DistributedTracingConfiguration Logger LoggerInterface APIConfiguration APIConfiguration MetricConfiguration MetricConfiguration SASL *SASLConfig TLS *TLSConfig Dial *DialConfig BatchConfiguration *BatchConfiguration ConsumeFn ConsumeFn SkipMessageByHeaderFn SkipMessageByHeaderFn TransactionalRetry *bool RetryConfiguration RetryConfiguration LogLevel LogLevel Rack string VerifyTopicOnStartup bool ClientID string Reader ReaderConfig CommitInterval time.Duration MessageGroupDuration time.Duration Concurrency int DistributedTracingEnabled bool RetryEnabled bool APIEnabled bool // MetricPrefix is used for prometheus fq name prefix. // If not provided, default metric prefix value is `kafka_konsumer`. // Currently, there are two exposed prometheus metrics. `processed_messages_total_current` and `unprocessed_messages_total_current`. // So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and // `kafka_konsumer_unprocessed_messages_total_current`. MetricPrefix string }
func (*ConsumerConfig) JSON ¶ added in v2.3.4
func (cfg *ConsumerConfig) JSON() string
func (*ConsumerConfig) JSONPretty ¶ added in v2.3.4
func (cfg *ConsumerConfig) JSONPretty() string
func (*ConsumerConfig) String ¶ added in v2.3.4
func (cfg *ConsumerConfig) String() string
type ConsumerMetric ¶
type DistributedTracingConfiguration ¶
type DistributedTracingConfiguration struct { TracerProvider trace.TracerProvider Propagator propagation.TextMapPropagator }
type IncomingMessage ¶ added in v2.1.8
type IncomingMessage struct {
// contains filtered or unexported fields
}
type LoggerInterface ¶
type LoggerInterface interface { // With returns a logger based off the root logger and decorates it with the given context and arguments. With(args ...interface{}) LoggerInterface // Debug uses fmt.Sprint to construct and log a message at DEBUG level Debug(args ...interface{}) // Info uses fmt.Sprint to construct and log a message at INFO level Info(args ...interface{}) // Warn uses fmt.Sprint to construct and log a message at ERROR level Warn(args ...interface{}) // Error uses fmt.Sprint to construct and log a message at ERROR level Error(args ...interface{}) // Debugf uses fmt.Sprintf to construct and log a message at DEBUG level Debugf(format string, args ...interface{}) // Infof uses fmt.Sprintf to construct and log a message at INFO level Infof(format string, args ...interface{}) // Warnf uses fmt.Sprintf to construct and log a message at WARN level Warnf(format string, args ...interface{}) // Errorf uses fmt.Sprintf to construct and log a message at ERROR level Errorf(format string, args ...interface{}) Infow(msg string, keysAndValues ...interface{}) Errorw(msg string, keysAndValues ...interface{}) Warnw(msg string, keysAndValues ...interface{}) }
LoggerInterface is a logger that supports log levels, context and structured logging.
func NewZapLogger ¶
func NewZapLogger(level LogLevel) LoggerInterface
type Message ¶
type Message struct { Time time.Time WriterData interface{} // Context To enable distributed tracing support Context context.Context Topic string Key []byte Value []byte Headers Headers Partition int Offset int64 HighWaterMark int64 // IsFailed Is only used on transactional retry disabled IsFailed bool // If available, kafka-konsumer writes this description into the failed message's // headers as `x-error-message` key when producing retry topic ErrDescription string }
func (*Message) RemoveHeader ¶
type MetricCollector ¶ added in v2.3.0
type MetricCollector struct {
// contains filtered or unexported fields
}
func NewMetricCollector ¶ added in v2.3.0
func NewMetricCollector(metricPrefix string, consumerMetric *ConsumerMetric) *MetricCollector
func (*MetricCollector) Collect ¶ added in v2.3.0
func (s *MetricCollector) Collect(ch chan<- prometheus.Metric)
func (*MetricCollector) Describe ¶ added in v2.3.0
func (s *MetricCollector) Describe(ch chan<- *prometheus.Desc)
type MetricConfiguration ¶
type MetricConfiguration struct { // Path default is /metrics Path *string }
type OtelKafkaKonsumerWriter ¶
type PreBatchFn ¶ added in v2.0.6
type Producer ¶
type Producer interface { Produce(ctx context.Context, message Message) error ProduceBatch(ctx context.Context, messages []Message) error Close() error }
func NewProducer ¶
func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error)
type ProducerConfig ¶
type ProducerConfig struct { DistributedTracingConfiguration DistributedTracingConfiguration Transport *TransportConfig SASL *SASLConfig TLS *TLSConfig ClientID string Writer WriterConfig DistributedTracingEnabled bool }
func (*ProducerConfig) JSON ¶ added in v2.3.4
func (cfg *ProducerConfig) JSON() string
func (*ProducerConfig) JSONPretty ¶ added in v2.3.4
func (cfg *ProducerConfig) JSONPretty() string
func (*ProducerConfig) String ¶ added in v2.3.4
func (cfg *ProducerConfig) String() string
type ProducerInterceptor ¶ added in v2.3.5
type ProducerInterceptor interface {
OnProduce(ctx ProducerInterceptorContext)
}
type ProducerInterceptorContext ¶ added in v2.3.5
type Reader ¶
type Reader interface { FetchMessage(ctx context.Context, msg *kafka.Message) error Close() error CommitMessages(messages []kafka.Message) error }
func NewOtelReaderWrapper ¶
func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error)
func NewReaderWrapper ¶
type ReaderConfig ¶
type ReaderConfig kafka.ReaderConfig
func (ReaderConfig) JSON ¶ added in v2.3.4
func (cfg ReaderConfig) JSON() string
type RetryConfiguration ¶
type RetryConfiguration struct { // MetricPrefix is used for prometheus fq name prefix. // If not provided, default metric prefix value is `kafka_cronsumer`. // Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`. // So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and // `kafka_cronsumer_discarded_messages_total_current`. MetricPrefix string SASL *SASLConfig TLS *TLSConfig ClientID string StartTimeCron string Topic string DeadLetterTopic string Rack string VerifyTopicOnStartup bool LogLevel LogLevel Brokers []string Balancer Balancer MaxRetry int WorkDuration time.Duration SkipMessageByHeaderFn SkipMessageByHeaderFn Concurrency int QueueCapacity int ProducerBatchSize int ProducerBatchTimeout time.Duration }
func (RetryConfiguration) JSON ¶ added in v2.3.4
func (cfg RetryConfiguration) JSON() string
type SASLConfig ¶
func (*SASLConfig) IsEmpty ¶
func (s *SASLConfig) IsEmpty() bool
func (*SASLConfig) JSON ¶ added in v2.3.4
func (s *SASLConfig) JSON() string
type SkipMessageByHeaderFn ¶ added in v2.2.9
type SkipMessageByHeaderFn func(header []kafka.Header) bool
type TransportConfig ¶
type Writer ¶
func NewOtelProducer ¶
func NewOtelProducer(cfg *ProducerConfig, writer *segmentio.Writer) (Writer, error)
type WriterConfig ¶
type WriterConfig struct { ErrorLogger kafka.Logger Logger kafka.Logger Balancer kafka.Balancer Completion func(messages []kafka.Message, err error) Topic string Brokers []string ReadTimeout time.Duration BatchTimeout time.Duration BatchBytes int64 WriteTimeout time.Duration RequiredAcks kafka.RequiredAcks BatchSize int WriteBackoffMax time.Duration WriteBackoffMin time.Duration MaxAttempts int Async bool Compression kafka.Compression AllowAutoTopicCreation bool }
func (WriterConfig) JSON ¶ added in v2.3.4
func (cfg WriterConfig) JSON() string
Source Files ¶
- api.go
- balancer.go
- batch_consumer.go
- collector.go
- consumer.go
- consumer_base.go
- consumer_config.go
- data_units.go
- layer.go
- layer_dialer.go
- layer_transport.go
- logger.go
- mechanism.go
- message.go
- metric.go
- otel_producer.go
- otel_reader_wrapper.go
- producer.go
- producer_config.go
- producer_interceptor.go
- reader_wrapper.go
- sub_process.go
- tls.go
- verify_topic.go
- zap.go
Click to show internal directories.
Click to hide internal directories.