consumerserver

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const PackageName = "component.ekafka.consumerserver"

PackageName is the name of this component.

Variables

This section is empty.

Functions

func DefaultConfig

func DefaultConfig() *config

DefaultConfig returns a default config.

Types

type Component

type Component struct {
	ServerCtx context.Context
	// contains filtered or unexported fields
}

Component starts an Ego server for message consuming.

func NewConsumerServerComponent

func NewConsumerServerComponent(name string, config *config, ekafkaComponent *ekafka.Component, logger *elog.Component) *Component

NewConsumerServerComponent creates a new server instance.

func (*Component) Consumer

func (cmp *Component) Consumer() *ekafka.Consumer

Consumer returns the default Consumer.

func (*Component) ConsumerGroup

func (cmp *Component) ConsumerGroup() *ekafka.ConsumerGroup

ConsumerGroup returns the default ConsumerGroup.

func (*Component) GracefulStop

func (cmp *Component) GracefulStop(ctx context.Context) error

GracefulStop stops the server.

func (*Component) Info

func (cmp *Component) Info() *server.ServiceInfo

Info returns server info, used by governor and consumer balancer.

func (*Component) Init

func (cmp *Component) Init() error

Init ...

func (*Component) Name

func (cmp *Component) Name() string

Name returns the name of this instance.

func (*Component) OnConsumeEachMessage added in v1.0.2

func (cmp *Component) OnConsumeEachMessage(handler OnConsumeEachMessageHandler) error

OnConsumeEachMessage register a handler for each message. When the handler returns an error, the message will be retried if the error is ErrRecoverableError else the message will not be committed.

func (*Component) OnConsumerGroupStart

func (cmp *Component) OnConsumerGroupStart(handler OnConsumerGroupStartHandler) error

OnConsumerGroupStart ...

func (*Component) OnEachMessage

func (cmp *Component) OnEachMessage(consumptionErrors chan<- error, handler OnEachMessageHandler) error

OnEachMessage ... Deprecated: use OnConsumeEachMessage instead.

func (*Component) OnStart

func (cmp *Component) OnStart(handler OnStartHandler) error

OnStart ...

func (*Component) PackageName

func (cmp *Component) PackageName() string

PackageName returns the package name.

func (*Component) Start

func (cmp *Component) Start() error

Start will start consuming.

func (*Component) Stop

func (cmp *Component) Stop() error

Stop stops the server.

type Container

type Container struct {
	// contains filtered or unexported fields
}

func DefaultContainer

func DefaultContainer() *Container

DefaultContainer 返回默认Container

func Load

func Load(key string) *Container

Load 载入配置,初始化Container

func (*Container) Build

func (c *Container) Build(options ...Option) *Component

Build 构建Container

type OnConsumeEachMessageHandler added in v1.0.3

type OnConsumeEachMessageHandler = func(ctx context.Context, message *ekafka.Message) error

OnConsumeEachMessageHandler ...

type OnConsumerGroupStartHandler

type OnConsumerGroupStartHandler = func(ctx context.Context, consumerGroup *ekafka.ConsumerGroup) error

OnConsumerGroupStartHandler ...

type OnEachMessageHandler

type OnEachMessageHandler = func(ctx context.Context, message kafka.Message) error

OnEachMessageHandler ...

type OnStartHandler

type OnStartHandler = func(ctx context.Context, consumer *ekafka.Consumer) error

OnStartHandler ...

type Option

type Option func(c *Container)

func WithDebug

func WithDebug(debug bool) Option

WithDebug enables debug mode.

func WithEkafka

func WithEkafka(ekafkaComponent *ekafka.Component) Option

WithEkafka ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL