Documentation
¶
Overview ¶
Package dms implements a kafka consumer based on sarama, user can consume messages asynchronous or synchronous with dms, and ensure message not lost.
Index ¶
- type BizHandler
- type Consumer
- type DmsHandler
- func (h *DmsHandler) AddTopicToMethod(method MethodInfo)
- func (h *DmsHandler) Cleanup(sess sarama.ConsumerGroupSession) error
- func (h *DmsHandler) Close() error
- func (h *DmsHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (h *DmsHandler) OnConsume(msg *sarama.ConsumerMessage)
- func (h *DmsHandler) Setup(sess sarama.ConsumerGroupSession) error
- func (h *DmsHandler) Start(wg *sync.WaitGroup)
- type MethodInfo
- type OffsetManager
- type OffsetNode
- type OffsetPersist
- type Properties
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BizHandler ¶
type BizHandler func(msg *sarama.ConsumerMessage) error
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(ctx context.Context, methods []MethodInfo, propertiesMap map[string]*Properties, offsetPersist OffsetPersist) (*Consumer, error)
type DmsHandler ¶
type DmsHandler struct {
// contains filtered or unexported fields
}
func NewDmsHandler ¶
func NewDmsHandler( ctx context.Context, methodInfo MethodInfo, pool *ants.Pool, limiter *rate.Limiter, properties *Properties, offsetPersist OffsetPersist) (*DmsHandler, error)
func (*DmsHandler) AddTopicToMethod ¶
func (h *DmsHandler) AddTopicToMethod(method MethodInfo)
func (*DmsHandler) Cleanup ¶
func (h *DmsHandler) Cleanup(sess sarama.ConsumerGroupSession) error
func (*DmsHandler) Close ¶
func (h *DmsHandler) Close() error
func (*DmsHandler) ConsumeClaim ¶
func (h *DmsHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*DmsHandler) OnConsume ¶
func (h *DmsHandler) OnConsume(msg *sarama.ConsumerMessage)
func (*DmsHandler) Setup ¶
func (h *DmsHandler) Setup(sess sarama.ConsumerGroupSession) error
Setup implements ConsumerGroupHandler interface, when set disable auto commit, Setup will obtain a valid offset of groupId-topic-partition from kafka broker, db and the broker's beginning offset.
func (*DmsHandler) Start ¶
func (h *DmsHandler) Start(wg *sync.WaitGroup)
type MethodInfo ¶
type MethodInfo struct { GroupId string Topics []string BizGroup string Method BizHandler }
func (*MethodInfo) GetUniqueKey ¶
func (m *MethodInfo) GetUniqueKey() string
type OffsetManager ¶
type OffsetManager struct {
// contains filtered or unexported fields
}
func NewOffsetManager ¶
func NewOffsetManager(startOffset int64, blockCapacity, partition int, groupId, topic string, version byte) *OffsetManager
type OffsetNode ¶
type OffsetNode struct {
// contains filtered or unexported fields
}
func NewOffsetNode ¶
func NewOffsetNode(capacity int) *OffsetNode
type OffsetPersist ¶
type Properties ¶
type Properties struct { Addrs []string Async bool OffsetBlockSize int BizRetryTimes int LimitPerSecond int // goroutine pool size PoolSize int PoolTaskSize int SaramaConfig *sarama.Config InitialOffset int64 CommitSize int // default partitionCount*OffsetBlockSize AutoCommit bool CommitInterval time.Duration }
func (*Properties) Clone ¶
func (p *Properties) Clone() *Properties
Source Files
¶
Click to show internal directories.
Click to hide internal directories.