Documentation ¶
Index ¶
- type KafkaConsumerServer
- func (k *KafkaConsumerServer) AddHandler(ctx context.Context, topicName string, handler KafkaEventProcessor)
- func (k *KafkaConsumerServer) Commit(ctx context.Context) error
- func (k *KafkaConsumerServer) GetCorrelationParams(headers map[string]string) *log.CorrelationParam
- func (k *KafkaConsumerServer) GetCustomerID(headers map[string]string) *log.CustomerIdentifier
- func (k *KafkaConsumerServer) GetMessageContext(msg *kafka.Message) context.Context
- func (k *KafkaConsumerServer) GetSpanFromContext(ctx context.Context) (span.Span, bool)
- func (k *KafkaConsumerServer) HealthCheck(ctx context.Context) error
- func (k *KafkaConsumerServer) HealthCheckMonitor(ctx context.Context)
- func (k *KafkaConsumerServer) Name(ctx context.Context) string
- func (k *KafkaConsumerServer) ProcessEvent(ctx context.Context, msg *kafka.Message, handler KafkaEventProcessor)
- func (k *KafkaConsumerServer) Shutdown(ctx context.Context) error
- func (k *KafkaConsumerServer) StartConsumer(ctx context.Context)
- func (k *KafkaConsumerServer) StoreMessage(ctx context.Context, msg *kafka.Message) error
- func (k *KafkaConsumerServer) Subscribe(ctx context.Context)
- type KafkaConsumerServerConfig
- type KafkaEventProcessor
- type Tracer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaConsumerServer ¶
func New ¶
func New(appConfig KafkaConsumerServerConfig, logger log.Log, t Tracer, errorNotifier errors.ErrorNotifier) *KafkaConsumerServer
func (*KafkaConsumerServer) AddHandler ¶
func (k *KafkaConsumerServer) AddHandler(ctx context.Context, topicName string, handler KafkaEventProcessor)
func (*KafkaConsumerServer) Commit ¶
func (k *KafkaConsumerServer) Commit(ctx context.Context) error
func (*KafkaConsumerServer) GetCorrelationParams ¶
func (k *KafkaConsumerServer) GetCorrelationParams(headers map[string]string) *log.CorrelationParam
func (*KafkaConsumerServer) GetCustomerID ¶
func (k *KafkaConsumerServer) GetCustomerID(headers map[string]string) *log.CustomerIdentifier
func (*KafkaConsumerServer) GetMessageContext ¶
func (k *KafkaConsumerServer) GetMessageContext(msg *kafka.Message) context.Context
func (*KafkaConsumerServer) GetSpanFromContext ¶
func (*KafkaConsumerServer) HealthCheck ¶
func (k *KafkaConsumerServer) HealthCheck(ctx context.Context) error
func (*KafkaConsumerServer) HealthCheckMonitor ¶
func (k *KafkaConsumerServer) HealthCheckMonitor(ctx context.Context)
func (*KafkaConsumerServer) ProcessEvent ¶
func (k *KafkaConsumerServer) ProcessEvent(ctx context.Context, msg *kafka.Message, handler KafkaEventProcessor)
func (*KafkaConsumerServer) Shutdown ¶
func (k *KafkaConsumerServer) Shutdown(ctx context.Context) error
func (*KafkaConsumerServer) StartConsumer ¶
func (k *KafkaConsumerServer) StartConsumer(ctx context.Context)
func (*KafkaConsumerServer) StoreMessage ¶
func (*KafkaConsumerServer) Subscribe ¶
func (k *KafkaConsumerServer) Subscribe(ctx context.Context)
type KafkaConsumerServerConfig ¶
type KafkaConsumerServerConfig struct { baseapp.ServerConfig kafka.KafkaConsumerConfig HealthCheckInSec int HealthFilePath string }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.