Documentation ¶
Index ¶
- Constants
- type Config
- type Consumer
- type KafkaQueue
- func (this *KafkaQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.ConsumerAPI, error)
- func (this *KafkaQueue) AcquireProducer(cfg *queue.QueueConfig) (queue.ProducerAPI, error)
- func (this *KafkaQueue) Close(string) error
- func (this *KafkaQueue) CommitOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig, offset queue.Offset) (bool, error)
- func (this *KafkaQueue) DeleteOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) error
- func (this *KafkaQueue) Depth(q string) int64
- func (this *KafkaQueue) Destroy(k string) error
- func (this *KafkaQueue) GetOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.Offset, error)
- func (this *KafkaQueue) GetQueues() []string
- func (this *KafkaQueue) GetStorageSize(k string) uint64
- func (this *KafkaQueue) Init(q string) error
- func (this *KafkaQueue) LatestOffset(k *queue.QueueConfig) queue.Offset
- func (this *KafkaQueue) Name() string
- func (this *KafkaQueue) Pop(q string, t time.Duration) (data []byte, timeout bool)
- func (this *KafkaQueue) Push(q string, data []byte) error
- func (this *KafkaQueue) ReleaseConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, ...) error
- func (this *KafkaQueue) ReleaseProducer(k *queue.QueueConfig, producer queue.ProducerAPI) error
- func (this *KafkaQueue) Setup()
- func (this *KafkaQueue) Start() error
- func (this *KafkaQueue) Stop() error
- type Producer
Constants ¶
View Source
const SCRAM_SHA_256_Mechanism = "SCRAM-SHA-256"
const PLAIN_MECHANISM = "PLAIN"//
View Source
const SCRAM_SHA_512_Mechanism = "SCRAM-SHA-512"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Enabled bool `config:"enabled"` Default bool `config:"default"` Compression bool `config:"compression"` NumOfPartition int `config:"num_of_partition"` NumOfReplica int `config:"num_of_replica"` Prefix string `config:"prefix"` ProducerBatchMaxBytes int32 `config:"producer_batch_max_bytes"` MaxBufferedRecords int `config:"max_buffered_records"` ManualFlushing bool `config:"manual_flushing"` Brokers []string `config:"brokers"` Username string `config:"username"` Password string `config:"password"` TLS bool `config:"tls"` Mechanism string `config:"mechanism"` }
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) FetchMessages ¶
func (*Consumer) ResetOffset ¶
type KafkaQueue ¶
type KafkaQueue struct {
// contains filtered or unexported fields
}
func (*KafkaQueue) AcquireConsumer ¶
func (this *KafkaQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.ConsumerAPI, error)
func (*KafkaQueue) AcquireProducer ¶
func (this *KafkaQueue) AcquireProducer(cfg *queue.QueueConfig) (queue.ProducerAPI, error)
func (*KafkaQueue) Close ¶
func (this *KafkaQueue) Close(string) error
func (*KafkaQueue) CommitOffset ¶
func (this *KafkaQueue) CommitOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig, offset queue.Offset) (bool, error)
func (*KafkaQueue) DeleteOffset ¶
func (this *KafkaQueue) DeleteOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) error
func (*KafkaQueue) Depth ¶
func (this *KafkaQueue) Depth(q string) int64
func (*KafkaQueue) Destroy ¶
func (this *KafkaQueue) Destroy(k string) error
func (*KafkaQueue) GetOffset ¶
func (this *KafkaQueue) GetOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.Offset, error)
func (*KafkaQueue) GetQueues ¶
func (this *KafkaQueue) GetQueues() []string
func (*KafkaQueue) GetStorageSize ¶
func (this *KafkaQueue) GetStorageSize(k string) uint64
func (*KafkaQueue) Init ¶
func (this *KafkaQueue) Init(q string) error
func (*KafkaQueue) LatestOffset ¶
func (this *KafkaQueue) LatestOffset(k *queue.QueueConfig) queue.Offset
func (*KafkaQueue) Name ¶
func (this *KafkaQueue) Name() string
func (*KafkaQueue) ReleaseConsumer ¶
func (this *KafkaQueue) ReleaseConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, instance queue.ConsumerAPI) error
func (*KafkaQueue) ReleaseProducer ¶
func (this *KafkaQueue) ReleaseProducer(k *queue.QueueConfig, producer queue.ProducerAPI) error
func (*KafkaQueue) Setup ¶
func (this *KafkaQueue) Setup()
func (*KafkaQueue) Start ¶
func (this *KafkaQueue) Start() error
func (*KafkaQueue) Stop ¶
func (this *KafkaQueue) Stop() error
Click to show internal directories.
Click to hide internal directories.