kafkaex

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ERR_CONSUMER_HASRUN  = "consumer has run"
	ERR_CONSUMER_NOTRUN  = "consumer no run"
	ERR_CONSUMER_EXIST   = "consumer exist"
	ERR_CONSUMER_NOFOUND = "consumer no found"
	ERR_GROUP_NOFOUND    = "group no found"
)

Variables

View Source
var (
	ErrConsumerHasRun  = errors.New(ERR_CONSUMER_HASRUN)
	ErrConsumerNotRun  = errors.New(ERR_CONSUMER_NOTRUN)
	ErrConsumerExist   = errors.New(ERR_CONSUMER_EXIST)
	ErrConsumerNoFound = errors.New(ERR_CONSUMER_NOFOUND)
	ErrGroupNoFound    = errors.New(ERR_GROUP_NOFOUND)
)

Functions

func SetLogger

func SetLogger(l ILogger)

Types

type ConsumeHandler

type ConsumeHandler func(context.Context, *Message) (ReceiptStatus, error)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer consumer impl

func NewConsumer

func NewConsumer(group *ConsumerGroup, handler ConsumeHandler, topics ...string) *Consumer

NewConsumer new consumer

func (*Consumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) Close

func (c *Consumer) Close() error

Close close consume goroutine

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Consumer) Go

func (c *Consumer) Go() error

Go async consume message

func (*Consumer) ID

func (c *Consumer) ID() string

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(groupid string, client sarama.Client) (*ConsumerGroup, error)

func (*ConsumerGroup) Close

func (g *ConsumerGroup) Close()

func (*ConsumerGroup) ConsumerMap

func (g *ConsumerGroup) ConsumerMap() map[string]IConsumer

func (*ConsumerGroup) CreateConsumer

func (g *ConsumerGroup) CreateConsumer(h ConsumeHandler, topics ...string) error

func (*ConsumerGroup) GetConsumer

func (g *ConsumerGroup) GetConsumer(id string) IConsumer

func (*ConsumerGroup) ID

func (g *ConsumerGroup) ID() string

type IConsumer

type IConsumer interface {
	sarama.ConsumerGroupHandler
	ID() string
	Go() error
	Close() error
}

IConsumer consumer interface

type IConsumerGroup

type IConsumerGroup interface {
	ID() string
	CreateConsumer(h ConsumeHandler, topics ...string) error
	GetConsumer(id string) IConsumer
	ConsumerMap() map[string]IConsumer
}

type ILogger

type ILogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
	Debug(ctx context.Context, msg string, args ...interface{})
	Info(ctx context.Context, msg string, args ...interface{})
	Warn(ctx context.Context, msg string, args ...interface{})
	Error(ctx context.Context, msg string, args ...interface{})
}

type KafkaManager

type KafkaManager struct {
	// contains filtered or unexported fields
}

KafkaManager kafka mq manager

func GetKafkaManager

func GetKafkaManager() *KafkaManager

func InitManager

func InitManager(brokers ...string) *KafkaManager

func NewKafkaManager

func NewKafkaManager(brokers ...string) *KafkaManager

NewKafkaManager new a kafka mq manager

func (*KafkaManager) ConsumerClose

func (m *KafkaManager) ConsumerClose()

func (*KafkaManager) ConsumersGo

func (m *KafkaManager) ConsumersGo()

func (*KafkaManager) GetConsumer

func (m *KafkaManager) GetConsumer(groupid, id string) (IConsumer, error)

func (*KafkaManager) Init

func (m *KafkaManager) Init() error

init init

func (*KafkaManager) NewConsumer

func (m *KafkaManager) NewConsumer(groupid string, handler ConsumeHandler, topics ...string) error

func (*KafkaManager) NewProducer

func (m *KafkaManager) NewProducer() error

func (*KafkaManager) Publish

func (m *KafkaManager) Publish(ctx context.Context, topic, key string, msg []byte) error

type Message

type Message struct {
	Id         string `json:"id"`         // identify id
	Topic      string `json:"topic"`      // topic
	Partition  int32  `json:"partition"`  // partition
	ConsumerId string `json:"consumerId"` // consumerId
	Offset     int64  `json:"offset"`     // offset
	Headers    string `json:"headers"`    // headers
	Key        []byte `json:"key"`        // key
	Value      []byte `json:"value"`      // value
	Ts         int64  `json:"ts"`         // ts
	BlockTs    int64  `json:"blockts"`    // blockts
}

type ReceiptStatus

type ReceiptStatus int32
const (
	ReceiptSuccess   ReceiptStatus = iota // 处理成功
	ReceiptAlreadyDo                      // 已经处理,重复消息
	ReceiptErrUnKnow                      // 未知错误
	ReceiptErrParse                       // 解析失败
	ReceiptRetryMax                       // 重试达到上限
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL