Documentation ¶
Index ¶
- type Kafka
- func (k *Kafka) AddRouter(router Router)
- func (k *Kafka) AddRouters()
- func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error
- func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (k *Kafka) Setup(sarama.ConsumerGroupSession) error
- func (k *Kafka) Start() func()
- type Router
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Kafka ¶
type Kafka struct { Brokers []string Routers map[string]Router //key是topic 对应的是处理函数 Topics []string //OffsetNewest int64 = -1 //OffsetOldest int64 = -2 StartOffset int64 `json:",optional"` Version string `json:",optional"` Group string `json:",optional"` ChannelBufferSize int `json:",default=20"` // contains filtered or unexported fields }
func NewKafka ¶
func NewKafka(service *svc.ServiceContext) *Kafka
func (*Kafka) AddRouters ¶
func (k *Kafka) AddRouters()
func (*Kafka) Cleanup ¶
func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Kafka) ConsumeClaim ¶
func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
type Router ¶
type Router struct { Topic string Handler func(ctx context.Context, svcCtx *svc.ServiceContext) logic.LogicHandle }
Click to show internal directories.
Click to hide internal directories.