kafkaclient

package module
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2022 License: GPL-3.0 Imports: 4 Imported by: 0

README

بِسْمِ اللّٰهِ الرَّحْمٰنِ الرَّحِيْمِ


السَّلاَمُ عَلَيْكُمْ وَرَحْمَةُ اللهِ وَبَرَكَاتُهُ


ٱلْحَمْدُ لِلَّهِ رَبِّ ٱلْعَٰلَمِينَ

ٱلْحَمْدُ لِلَّهِ رَبِّ ٱلْعَٰلَمِينَ

ٱلْحَمْدُ لِلَّهِ رَبِّ ٱلْعَٰلَمِينَ


اللَّهُمَّ صَلِّ عَلَى مُحَمَّدٍ ، وَعَلَى آلِ مُحَمَّدٍ ، كَمَا صَلَّيْتَ عَلَى إِبْرَاهِيمَ وَعَلَى آلِ إِبْرَاهِيمَ ، إِنَّكَ حَمِيدٌ مَجِيدٌ ، اللَّهُمَّ بَارِكْ عَلَى مُحَمَّدٍ ، وَعَلَى آلِ مُحَمَّدٍ ، كَمَا بَارَكْتَ عَلَى إِبْرَاهِيمَ ، وَعَلَى آلِ إِبْرَاهِيمَ ، إِنَّكَ حَمِيدٌ مَجِيدٌ

kafkaclient

This is a kafka client. It is designed to wrap https://github.com/confluentinc/confluent-kafka-go.

Documentation

Index

Constants

View Source
const (
	GoEventsChannelEnable        = "go.events.channel.enable"
	GoApplicationRebalanceEnable = "go.application.rebalance.enable"
	ClientID                     = "client.id"
)

List of const for consumers

View Source
const (
	TypeProducer    = "producer"
	TypeConsumer    = "consumer"
	TypeAdminClient = "admin-client"
)

List of resources type

Variables

View Source
var TimeoutFlush = 5 * time.Second

TimeoutFlush specifies the flush's timeout for the producer.

Functions

This section is empty.

Types

type AdminClient

type AdminClient struct {
	*kafka.AdminClient
}

AdminClient promotes the origin admin client of kafka client.

func (*AdminClient) Close

func (ac *AdminClient) Close() (err error)

Close close the underlying resources.

func (*AdminClient) GetOrigin

func (ac *AdminClient) GetOrigin() *kafka.AdminClient

GetOrigin returns the origin admin client from kafka.

type Closer

type Closer interface {
	Close() (err error)
}

Closer specifies the contract to close all of the underlying resoources.

type ConsumeArgs

type ConsumeArgs struct {
	Topics       []string
	RebalanceCb  kafka.RebalanceCb
	Polling      int
	Workers      uint64
	Handler      MessageHandler
	EventHandler EventHandler
}

ConsumeArgs specifies the arguments for Consumer.Consume and Consumer.ConsumeEvent.

func (*ConsumeArgs) SetEventHandler

func (c *ConsumeArgs) SetEventHandler(handler EventHandler) *ConsumeArgs

SetEventHandler is a setter.

func (*ConsumeArgs) SetHandler

func (c *ConsumeArgs) SetHandler(handler MessageHandler) *ConsumeArgs

SetHandler is a setter.

func (*ConsumeArgs) SetPolling

func (c *ConsumeArgs) SetPolling(polling int) *ConsumeArgs

SetPolling is a setter.

func (*ConsumeArgs) SetRebalanceCb

func (c *ConsumeArgs) SetRebalanceCb(cb kafka.RebalanceCb) *ConsumeArgs

SetRebalanceCb is a setter.

func (*ConsumeArgs) SetTopics

func (c *ConsumeArgs) SetTopics(topics []string) *ConsumeArgs

SetTopics is a setter.

func (*ConsumeArgs) SetWorkers

func (c *ConsumeArgs) SetWorkers(workers uint64) *ConsumeArgs

SetWorkers is a setter.

type Consumer

type Consumer struct {
	*kafka.Consumer
}

Consumer promotes the origin consumer of kafka client.

func (*Consumer) GetOrigin

func (c *Consumer) GetOrigin() *kafka.Consumer

GetOrigin returns the origin consumer of kafka.

type Container

type Container struct {
	// contains filtered or unexported fields
}

Container contains all variables and configs to run kafka.

func NewContainer

func NewContainer(config kafka.ConfigMap) *Container

NewContainer initialize a new container.

func (*Container) Close

func (c *Container) Close()

Close all resources regarding the current instance.

func (*Container) Consume added in v0.0.4

func (c *Container) Consume(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)

Consume create consumers based on per thread and directly consume messages from the Kafka broker.

func (*Container) ConsumeBatch added in v0.0.5

func (c *Container) ConsumeBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)

ConsumeBatch create consumers based on per thread and directly consume messages from the Kafka broker. ConsumeBatch is an improved version of Consume but polling in a batch manner.

func (*Container) ConsumeEvent added in v0.0.4

func (c *Container) ConsumeEvent(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)

ConsumeEvent create consumers based on per thread and directly consume events from the Kafka broker.

func (*Container) ConsumeEventBatch added in v0.0.5

func (c *Container) ConsumeEventBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error)

ConsumeEventBatch create consumers based on per thread and directly consume events from the Kafka broker. ConsumeEventBatch is an upgraded version of ConsumeEvent with the batch processing.

func (*Container) NewAdminClient

func (c *Container) NewAdminClient(config kafka.ConfigMap) (ac *AdminClient, err error)

NewAdminClient initialize a new admin client from this container.

func (*Container) NewConsumer

func (c *Container) NewConsumer(config kafka.ConfigMap) (cons *Consumer, err error)

NewConsumer initialize a new consumer from this container.

func (*Container) NewProducer

func (c *Container) NewProducer(config kafka.ConfigMap) (prod *Producer, err error)

NewProducer initialize a new producer from this container.

type EventHandler

type EventHandler func(cons *Consumer, event kafka.Event) (err error)

EventHandler is a handler for handling event. It will automatically retry the handler for the current message if it returns error.

type MessageHandler

type MessageHandler func(cons *Consumer, msg *kafka.Message) (err error)

MessageHandler is a handler for handling message. It will automatically retry the handler for the current event if it returns error.

type Producer

type Producer struct {
	*kafka.Producer
}

Producer promotes the producer of kafka client.

func (*Producer) Close

func (p *Producer) Close() (err error)

Close close the underlying resources.

func (*Producer) GetOrigin

func (p *Producer) GetOrigin() *kafka.Producer

GetOrigin returns the producer origin from kafka.

func (*Producer) Publish

func (p *Producer) Publish(msg *kafka.Message) (event kafka.Event, err error)

Publish publishes message synchronously to the kafka's brokers.

func (*Producer) PublishAsync

func (p *Producer) PublishAsync(msg *kafka.Message, eventChan chan kafka.Event) (err error)

PublishAsync publishes message asynchronously to the kafka's brokers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL