kafka_queue

package
v0.0.0-...-38575d5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: AGPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const SCRAM_SHA_256_Mechanism = "SCRAM-SHA-256"

const PLAIN_MECHANISM = "PLAIN"//

View Source
const SCRAM_SHA_512_Mechanism = "SCRAM-SHA-512"

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Enabled               bool   `config:"enabled"`
	Default               bool   `config:"default"`
	Compression           bool   `config:"compression"`
	NumOfPartition        int    `config:"num_of_partition"`
	NumOfReplica          int    `config:"num_of_replica"`
	Prefix                string `config:"prefix"`
	ProducerBatchMaxBytes int32  `config:"producer_batch_max_bytes"`
	MaxBufferedRecords    int    `config:"max_buffered_records"`
	ManualFlushing        bool   `config:"manual_flushing"`

	Brokers  []string `config:"brokers"`
	Username string   `config:"username"`
	Password string   `config:"password"`
	TLS      bool     `config:"tls"`

	Mechanism string `config:"mechanism"`
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (this *Consumer) Close() error

func (*Consumer) CommitOffset

func (this *Consumer) CommitOffset(off queue.Offset) error

func (*Consumer) FetchMessages

func (this *Consumer) FetchMessages(ctx *queue.Context, numOfMessages int) (messages []queue.Message, isTimeout bool, err error)

func (*Consumer) ResetOffset

func (this *Consumer) ResetOffset(part, readPos int64) (err error)

type KafkaQueue

type KafkaQueue struct {
	// contains filtered or unexported fields
}

func (*KafkaQueue) AcquireConsumer

func (this *KafkaQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.ConsumerAPI, error)

func (*KafkaQueue) AcquireProducer

func (this *KafkaQueue) AcquireProducer(cfg *queue.QueueConfig) (queue.ProducerAPI, error)

func (*KafkaQueue) Close

func (this *KafkaQueue) Close(string) error

func (*KafkaQueue) CommitOffset

func (this *KafkaQueue) CommitOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig, offset queue.Offset) (bool, error)

func (*KafkaQueue) DeleteOffset

func (this *KafkaQueue) DeleteOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) error

func (*KafkaQueue) Depth

func (this *KafkaQueue) Depth(q string) int64

func (*KafkaQueue) Destroy

func (this *KafkaQueue) Destroy(k string) error

func (*KafkaQueue) GetOffset

func (this *KafkaQueue) GetOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.Offset, error)

func (*KafkaQueue) GetQueues

func (this *KafkaQueue) GetQueues() []string

func (*KafkaQueue) GetStorageSize

func (this *KafkaQueue) GetStorageSize(k string) uint64

func (*KafkaQueue) Init

func (this *KafkaQueue) Init(q string) error

func (*KafkaQueue) LatestOffset

func (this *KafkaQueue) LatestOffset(k *queue.QueueConfig) queue.Offset

func (*KafkaQueue) Name

func (this *KafkaQueue) Name() string

func (*KafkaQueue) Pop

func (this *KafkaQueue) Pop(q string, t time.Duration) (data []byte, timeout bool)

func (*KafkaQueue) Push

func (this *KafkaQueue) Push(q string, data []byte) error

func (*KafkaQueue) ReleaseConsumer

func (this *KafkaQueue) ReleaseConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, instance queue.ConsumerAPI) error

func (*KafkaQueue) ReleaseProducer

func (this *KafkaQueue) ReleaseProducer(k *queue.QueueConfig, producer queue.ProducerAPI) error

func (*KafkaQueue) Setup

func (this *KafkaQueue) Setup()

func (*KafkaQueue) Start

func (this *KafkaQueue) Start() error

func (*KafkaQueue) Stop

func (this *KafkaQueue) Stop() error

type Producer

type Producer struct {
	ID string
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Produce

func (p *Producer) Produce(reqs *[]queue.ProduceRequest) (*[]queue.ProduceResponse, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL