Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CommitFilter ¶
type CommitFilter struct { *CommitFilterCfg // contains filtered or unexported fields }
func NewCommitFilter ¶
func NewCommitFilter(cfg *CommitFilterCfg) *CommitFilter
func (*CommitFilter) GetAfterChan ¶
func (f *CommitFilter) GetAfterChan() chan *KafkaMsg
func (*CommitFilter) GetBeforeChan ¶
func (f *CommitFilter) GetBeforeChan() chan *KafkaMsg
type CommitFilterCfg ¶
type KafkaCli ¶
type KafkaCli struct { *KafkaCliCfg // contains filtered or unexported fields }
Example ¶
package main import ( "fmt" "sync" "time" "github.com/Laisky/go-utils/kafka" "github.com/pkg/errors" ) func main() { var ( kmsgPool = &sync.Pool{ New: func() interface{} { return &kafka.KafkaMsg{} }, } ) cli, err := kafka.NewKafkaCliWithGroupId(&kafka.KafkaCliCfg{ Brokers: []string{"brokers url here"}, Topics: []string{"topics name here"}, Groupid: "group id", KMsgPool: kmsgPool, IntervalNum: 100, IntervalDuration: 5 * time.Second, }) if err != nil { panic(errors.Wrap(err, "try to connect to kafka got error")) } for kmsg := range cli.Messages() { // do something with kafka message fmt.Println(string(kmsg.Message)) cli.CommitWithMsg(kmsg) // async commit } }
Output:
func NewKafkaCliWithGroupId ¶
func NewKafkaCliWithGroupId(cfg *KafkaCliCfg) (*KafkaCli, error)
func (*KafkaCli) CommitWithMsg ¶
func (*KafkaCli) ListenNotifications ¶
func (k *KafkaCli) ListenNotifications()
type KafkaCliCfg ¶
Click to show internal directories.
Click to hide internal directories.