Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CommitFilter ¶
type CommitFilter struct {
// contains filtered or unexported fields
}
CommitFilter buffer to lazy commit kafka message
func NewCommitFilter ¶
func NewCommitFilter(ctx context.Context, kMsgPool *sync.Pool, opts ...CommitFilterOptFunc) (f *CommitFilter, err error)
NewCommitFilter create new CommitFilter
func (*CommitFilter) GetAfterChan ¶
func (f *CommitFilter) GetAfterChan() chan *KafkaMsg
GetAfterChan get channel out of GetAfterChan
func (*CommitFilter) GetBeforeChan ¶
func (f *CommitFilter) GetBeforeChan() chan *KafkaMsg
GetBeforeChan get channel send message in CommitFilter
type CommitFilterOptFunc ¶
type CommitFilterOptFunc func(*commitCheckOption) error
CommitFilterOptFunc option for CommitFilter
func WithCommitFilterCheckChanSize ¶
func WithCommitFilterCheckChanSize(size int) CommitFilterOptFunc
WithCommitFilterCheckChanSize set commit check channel's size
func WithCommitFilterCheckInterval ¶
func WithCommitFilterCheckInterval(interval time.Duration) CommitFilterOptFunc
WithCommitFilterCheckInterval set commit check interval
func WithCommitFilterCheckNum ¶
func WithCommitFilterCheckNum(num int) CommitFilterOptFunc
WithCommitFilterCheckNum set commit check num
type KafkaCli ¶
type KafkaCli struct { *KafkaCliCfg // contains filtered or unexported fields }
KafkaCli kafka consumer client
Example ¶
var ( kmsgPool = &sync.Pool{ New: func() interface{} { return &KafkaMsg{} }, } ) cli, err := NewKafkaCliWithGroupID( context.Background(), &KafkaCliCfg{ Brokers: []string{"brokers url here"}, Topics: []string{"topics name here"}, Groupid: "group id", KMsgPool: kmsgPool, }, WithCommitFilterCheckInterval(5*time.Second), WithCommitFilterCheckNum(100), ) if err != nil { panic(errors.Wrap(err, "try to connect to kafka got error")) } for kmsg := range cli.Messages(context.Background()) { // do something with kafka message fmt.Println(string(kmsg.Message)) cli.CommitWithMsg(kmsg) // async commit }
Output:
func NewKafkaCliWithGroupID ¶
func NewKafkaCliWithGroupID(ctx context.Context, cfg *KafkaCliCfg, opts ...CommitFilterOptFunc) (k *KafkaCli, err error)
NewKafkaCliWithGroupID create new kafka consumer
func (*KafkaCli) CommitWithMsg ¶
CommitWithMsg commit kafka message
func (*KafkaCli) ListenNotifications ¶
ListenNotifications log kafka broker notify
type KafkaCliCfg ¶
KafkaCliCfg configuration for kafka message
Click to show internal directories.
Click to hide internal directories.