Documentation
¶
Index ¶
- Constants
- Variables
- type AdminClient
- type Closer
- type ConsumeArgs
- func (c *ConsumeArgs) SetEventHandler(handler EventHandler) *ConsumeArgs
- func (c *ConsumeArgs) SetHandler(handler MessageHandler) *ConsumeArgs
- func (c *ConsumeArgs) SetPolling(polling int) *ConsumeArgs
- func (c *ConsumeArgs) SetRebalanceCb(cb kafka.RebalanceCb) *ConsumeArgs
- func (c *ConsumeArgs) SetTopics(topics []string) *ConsumeArgs
- func (c *ConsumeArgs) SetWorkers(workers uint64) *ConsumeArgs
- type Consumer
- type Container
- func (c *Container) Close()
- func (c *Container) Consume(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)
- func (c *Container) ConsumeBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)
- func (c *Container) ConsumeEvent(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)
- func (c *Container) ConsumeEventBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)
- func (c *Container) NewAdminClient(config kafka.ConfigMap) (ac *AdminClient, err error)
- func (c *Container) NewConsumer(config kafka.ConfigMap) (cons *Consumer, err error)
- func (c *Container) NewProducer(config kafka.ConfigMap) (prod *Producer, err error)
- type EventHandler
- type MessageHandler
- type Producer
Constants ¶
const ( GoEventsChannelEnable = "go.events.channel.enable" GoApplicationRebalanceEnable = "go.application.rebalance.enable" ClientID = "client.id" )
List of const for consumers
const ( TypeProducer = "producer" TypeConsumer = "consumer" TypeAdminClient = "admin-client" )
List of resources type
Variables ¶
var TimeoutFlush = 5 * time.Second
TimeoutFlush specifies the flush's timeout for the producer.
Functions ¶
This section is empty.
Types ¶
type AdminClient ¶
type AdminClient struct {
*kafka.AdminClient
}
AdminClient promotes the origin admin client of kafka client.
func (*AdminClient) Close ¶
func (ac *AdminClient) Close() (err error)
Close close the underlying resources.
func (*AdminClient) GetOrigin ¶
func (ac *AdminClient) GetOrigin() *kafka.AdminClient
GetOrigin returns the origin admin client from kafka.
type Closer ¶
type Closer interface {
Close() (err error)
}
Closer specifies the contract to close all of the underlying resoources.
type ConsumeArgs ¶
type ConsumeArgs struct { Topics []string RebalanceCb kafka.RebalanceCb Polling int Workers uint64 Handler MessageHandler EventHandler EventHandler }
ConsumeArgs specifies the arguments for Consumer.Consume and Consumer.ConsumeEvent.
func (*ConsumeArgs) SetEventHandler ¶
func (c *ConsumeArgs) SetEventHandler(handler EventHandler) *ConsumeArgs
SetEventHandler is a setter.
func (*ConsumeArgs) SetHandler ¶
func (c *ConsumeArgs) SetHandler(handler MessageHandler) *ConsumeArgs
SetHandler is a setter.
func (*ConsumeArgs) SetPolling ¶
func (c *ConsumeArgs) SetPolling(polling int) *ConsumeArgs
SetPolling is a setter.
func (*ConsumeArgs) SetRebalanceCb ¶
func (c *ConsumeArgs) SetRebalanceCb(cb kafka.RebalanceCb) *ConsumeArgs
SetRebalanceCb is a setter.
func (*ConsumeArgs) SetTopics ¶
func (c *ConsumeArgs) SetTopics(topics []string) *ConsumeArgs
SetTopics is a setter.
func (*ConsumeArgs) SetWorkers ¶
func (c *ConsumeArgs) SetWorkers(workers uint64) *ConsumeArgs
SetWorkers is a setter.
type Container ¶
type Container struct {
// contains filtered or unexported fields
}
Container contains all variables and configs to run kafka.
func NewContainer ¶
NewContainer initialize a new container.
func (*Container) Close ¶
func (c *Container) Close()
Close all resources regarding the current instance.
func (*Container) Consume ¶ added in v0.0.4
func (c *Container) Consume(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)
Consume create consumers based on per thread and directly consume messages from the Kafka broker.
func (*Container) ConsumeBatch ¶ added in v0.0.5
func (c *Container) ConsumeBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)
ConsumeBatch create consumers based on per thread and directly consume messages from the Kafka broker. ConsumeBatch is an improved version of Consume but polling in a batch manner.
func (*Container) ConsumeEvent ¶ added in v0.0.4
func (c *Container) ConsumeEvent(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)
ConsumeEvent create consumers based on per thread and directly consume events from the Kafka broker.
func (*Container) ConsumeEventBatch ¶ added in v0.0.5
func (c *Container) ConsumeEventBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)
ConsumeEventBatch create consumers based on per thread and directly consume events from the Kafka broker. ConsumeEventBatch is an upgraded version of ConsumeEvent with the batch processing.
func (*Container) NewAdminClient ¶
func (c *Container) NewAdminClient(config kafka.ConfigMap) (ac *AdminClient, err error)
NewAdminClient initialize a new admin client from this container.
func (*Container) NewConsumer ¶
NewConsumer initialize a new consumer from this container.
type EventHandler ¶
EventHandler is a handler for handling event. It will automatically retry the handler for the current message if it returns error.
type MessageHandler ¶
MessageHandler is a handler for handling message. It will automatically retry the handler for the current event if it returns error.