kafka

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2019 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CommitFilter

type CommitFilter struct {
	*CommitFilterCfg
	// contains filtered or unexported fields
}

func NewCommitFilter

func NewCommitFilter(cfg *CommitFilterCfg) *CommitFilter

func (*CommitFilter) GetAfterChan

func (f *CommitFilter) GetAfterChan() chan *KafkaMsg

func (*CommitFilter) GetBeforeChan

func (f *CommitFilter) GetBeforeChan() chan *KafkaMsg

type CommitFilterCfg

type CommitFilterCfg struct {
	KMsgPool         *sync.Pool
	IntervalNum      int
	IntervalDuration time.Duration
}

type KafkaCli

type KafkaCli struct {
	*KafkaCliCfg
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/Laisky/go-utils/kafka"
	"github.com/pkg/errors"
)

func main() {
	var (
		kmsgPool = &sync.Pool{
			New: func() interface{} {
				return &kafka.KafkaMsg{}
			},
		}
	)
	cli, err := kafka.NewKafkaCliWithGroupId(&kafka.KafkaCliCfg{
		Brokers:          []string{"brokers url here"},
		Topics:           []string{"topics name here"},
		Groupid:          "group id",
		KMsgPool:         kmsgPool,
		IntervalNum:      100,
		IntervalDuration: 5 * time.Second,
	})
	if err != nil {
		panic(errors.Wrap(err, "try to connect to kafka got error"))
	}

	for kmsg := range cli.Messages() {
		// do something with kafka message
		fmt.Println(string(kmsg.Message))
		cli.CommitWithMsg(kmsg) // async commit
	}
}
Output:

func NewKafkaCliWithGroupId

func NewKafkaCliWithGroupId(cfg *KafkaCliCfg) (*KafkaCli, error)

func (*KafkaCli) Close

func (k *KafkaCli) Close()

func (*KafkaCli) CommitWithMsg

func (k *KafkaCli) CommitWithMsg(kmsg *KafkaMsg)

func (*KafkaCli) ListenNotifications

func (k *KafkaCli) ListenNotifications()

func (*KafkaCli) Messages

func (k *KafkaCli) Messages() <-chan *KafkaMsg

type KafkaCliCfg

type KafkaCliCfg struct {
	Brokers, Topics  []string
	Groupid          string
	KMsgPool         *sync.Pool
	IntervalNum      int
	IntervalDuration time.Duration
}

type KafkaMsg

type KafkaMsg struct {
	Topic     string
	Message   []byte
	Offset    int64
	Partition int32
	Timestamp time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL