Documentation
¶
Index ¶
- func Register(name string, queueHandler queueHandler) error
- func Use(name string) (queueHandler, error)
- type Consumer
- type Handler
- type Instance
- func (this *Instance) GetQueue(name string) (QueueHandler, error)
- func (this *Instance) HandlerName() string
- func (this *Instance) Initiate(ctx context.Context) (newCtx context.Context, err error)
- func (this *Instance) NewQueue(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error)
- func (this *Instance) OnRequestShutdown(c *routing.Context) error
- func (this *Instance) OnRequestStartup(c *routing.Context) error
- func (this *Instance) OnShutdown(ctx context.Context) (context.Context, error)
- func (this *Instance) OnStartup(ctx context.Context) (context.Context, error)
- func (this *Instance) Use(ctx context.Context, handlerName string) error
- type KafkaHandler
- type KafkaQueue
- type Producer
- type QueueHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) MarkOffset ¶
func (c *Consumer) MarkOffset(message *sarama.ConsumerMessage)
func (*Consumer) Messages ¶
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage
type Handler ¶
type Handler interface { Initiate(ctx context.Context) error NewQueue(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error) GetQueue(name string) (QueueHandler, error) }
func NewKafkaHandler ¶
func NewKafkaHandler() Handler
type Instance ¶
type Instance struct { Config *config.Instance Utility *utility.Instance // contains filtered or unexported fields }
func NewInstance ¶
func NewInstance() *Instance
func (*Instance) HandlerName ¶
func (*Instance) OnRequestShutdown ¶
func (*Instance) OnRequestStartup ¶
func (*Instance) OnShutdown ¶
type KafkaHandler ¶
type KafkaHandler struct {
// contains filtered or unexported fields
}
func (*KafkaHandler) GetQueue ¶
func (this *KafkaHandler) GetQueue(name string) (QueueHandler, error)
func (*KafkaHandler) NewQueue ¶
func (this *KafkaHandler) NewQueue(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error)
type KafkaQueue ¶
type KafkaQueue struct {
// contains filtered or unexported fields
}
func (*KafkaQueue) GetConfig ¶
func (this *KafkaQueue) GetConfig() map[string]interface{}
func (*KafkaQueue) NewConsumer ¶
func (this *KafkaQueue) NewConsumer(ctx context.Context, groupName string, topic string) (*Consumer, error)
创建消费者
Click to show internal directories.
Click to hide internal directories.