Documentation ¶
Index ¶
- Constants
- func DefaultConfig() *config
- type BatchHandler
- type BatchListener
- type Component
- func (cmp *Component) Consumer() *ekafka.Consumer
- func (cmp *Component) ConsumerGroup() *ekafka.ConsumerGroup
- func (cmp *Component) GracefulStop(ctx context.Context) error
- func (cmp *Component) Info() *server.ServiceInfo
- func (cmp *Component) Init() error
- func (cmp *Component) Name() string
- func (cmp *Component) OnConsumeEachMessage(handler OnConsumeEachMessageHandler) error
- func (cmp *Component) OnConsumerGroupStart(handler OnConsumerGroupStartHandler) error
- func (cmp *Component) OnEachMessage(consumptionErrors chan<- error, handler OnEachMessageHandler) error
- func (cmp *Component) OnStart(handler OnStartHandler) error
- func (cmp *Component) PackageName() string
- func (cmp *Component) Start() error
- func (cmp *Component) Stop() error
- func (cmp *Component) Subscribe(listener Listener)
- func (cmp *Component) SubscribeBatchHandler(handler BatchHandler, batchSize int, timeout time.Duration)
- func (cmp *Component) SubscribeSingleHandler(handler Handler)
- type Container
- type Handler
- type Listener
- type OnConsumeEachMessageHandler
- type OnConsumerGroupStartHandler
- type OnEachMessageHandler
- type OnStartHandler
- type Option
- type SyncListener
Constants ¶
const PackageName = "component.ekafka.consumerserver"
PackageName is the name of this component.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BatchHandler ¶ added in v1.0.4
type BatchHandler func(lastCtx context.Context, messages []*ekafka.CtxMessage) error
type BatchListener ¶ added in v1.0.4
type BatchListener struct { Batch []*ekafka.CtxMessage BatchUpdateSize int Timeout time.Duration Handler BatchHandler // contains filtered or unexported fields }
type Component ¶
Component starts an Ego server for message consuming.
func NewConsumerServerComponent ¶
func NewConsumerServerComponent(name string, config *config, ekafkaComponent *ekafka.Component, logger *elog.Component) *Component
NewConsumerServerComponent creates a new server instance.
func (*Component) ConsumerGroup ¶
func (cmp *Component) ConsumerGroup() *ekafka.ConsumerGroup
ConsumerGroup returns the default ConsumerGroup.
func (*Component) GracefulStop ¶
GracefulStop stops the server.
func (*Component) Info ¶
func (cmp *Component) Info() *server.ServiceInfo
Info returns server info, used by governor and consumer balancer.
func (*Component) OnConsumeEachMessage ¶ added in v1.0.2
func (cmp *Component) OnConsumeEachMessage(handler OnConsumeEachMessageHandler) error
OnConsumeEachMessage register a handler for each message. When the handler returns an error, the message will be retried if the error is ErrRecoverableError else the message will not be committed.
func (*Component) OnConsumerGroupStart ¶
func (cmp *Component) OnConsumerGroupStart(handler OnConsumerGroupStartHandler) error
OnConsumerGroupStart ...
func (*Component) OnEachMessage ¶
func (cmp *Component) OnEachMessage(consumptionErrors chan<- error, handler OnEachMessageHandler) error
OnEachMessage ... Deprecated: use OnConsumeEachMessage instead.
func (*Component) PackageName ¶
PackageName returns the package name.
func (*Component) SubscribeBatchHandler ¶ added in v1.0.4
func (cmp *Component) SubscribeBatchHandler(handler BatchHandler, batchSize int, timeout time.Duration)
SubscribeBatchHandler append a batch listener with this handler for each message. A batch messages will be handled when batch size or timeout reached
func (*Component) SubscribeSingleHandler ¶ added in v1.0.4
SubscribeSingleHandler append a single listener with this handler for each message
type OnConsumeEachMessageHandler ¶ added in v1.0.3
OnConsumeEachMessageHandler ...
type OnConsumerGroupStartHandler ¶
type OnConsumerGroupStartHandler = func(ctx context.Context, consumerGroup *ekafka.ConsumerGroup) error
OnConsumerGroupStartHandler ...
type OnEachMessageHandler ¶
OnEachMessageHandler ...
type OnStartHandler ¶
OnStartHandler ...