kafkaconsumer

package
v5.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaConsumerServer

type KafkaConsumerServer struct {
	*baseapp.BaseApp
	// contains filtered or unexported fields
}

func New

func New(appConfig KafkaConsumerServerConfig, logger log.Log, t Tracer, errorNotifier errors.ErrorNotifier) *KafkaConsumerServer

func (*KafkaConsumerServer) AddHandler

func (k *KafkaConsumerServer) AddHandler(ctx context.Context, topicName string, handler KafkaEventProcessor)

func (*KafkaConsumerServer) Commit

func (k *KafkaConsumerServer) Commit(ctx context.Context) error

func (*KafkaConsumerServer) GetCorrelationParams

func (k *KafkaConsumerServer) GetCorrelationParams(headers map[string]string) *log.CorrelationParam

func (*KafkaConsumerServer) GetCustomerID

func (k *KafkaConsumerServer) GetCustomerID(headers map[string]string) *log.CustomerIdentifier

func (*KafkaConsumerServer) GetMessageContext

func (k *KafkaConsumerServer) GetMessageContext(msg *kafka.Message) context.Context

func (*KafkaConsumerServer) GetSpanFromContext

func (k *KafkaConsumerServer) GetSpanFromContext(ctx context.Context) (span.Span, bool)

func (*KafkaConsumerServer) HealthCheck

func (k *KafkaConsumerServer) HealthCheck(ctx context.Context) error

func (*KafkaConsumerServer) HealthCheckMonitor

func (k *KafkaConsumerServer) HealthCheckMonitor(ctx context.Context)

func (*KafkaConsumerServer) Name

func (*KafkaConsumerServer) ProcessEvent

func (k *KafkaConsumerServer) ProcessEvent(ctx context.Context, msg *kafka.Message, handler KafkaEventProcessor)

func (*KafkaConsumerServer) Shutdown

func (k *KafkaConsumerServer) Shutdown(ctx context.Context) error

func (*KafkaConsumerServer) StartConsumer

func (k *KafkaConsumerServer) StartConsumer(ctx context.Context)

func (*KafkaConsumerServer) StoreMessage

func (k *KafkaConsumerServer) StoreMessage(ctx context.Context, msg *kafka.Message) error

func (*KafkaConsumerServer) Subscribe

func (k *KafkaConsumerServer) Subscribe(ctx context.Context)

type KafkaConsumerServerConfig

type KafkaConsumerServerConfig struct {
	baseapp.ServerConfig
	kafka.KafkaConsumerConfig
	HealthCheckInSec int
	HealthFilePath   string
}

type KafkaEventProcessor

type KafkaEventProcessor func(context.Context, *kafka.Message) error

type Tracer

type Tracer interface {
	StartKafkaSpanFromMessage(ctx context.Context, msg *ckafka.Message) (context.Context, span.Span)
	span.SpanOp
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL