Documentation ¶
Index ¶
- Constants
- func HeadersFromContext(ctx context.Context) ([]kafka.Header, bool)
- func NewHeadersContext(ctx context.Context, headers ...kafka.Header) context.Context
- func WithSyncCall() queue.CallOptions
- type Balancer
- type BalancerFunc
- type CRC32Balancer
- type Conf
- type Hash
- type Headers
- type LeastBytes
- type Murmur2Balancer
- type PushOption
- func WithAuth(username, password string) PushOption
- func WithBalancer(balancer Balancer) PushOption
- func WithBatchBytes(batchBytes int64) PushOption
- func WithBatchSize(batchSize int) PushOption
- func WithChunkSize(chunkSize int) PushOption
- func WithCompletion(completion func(messages []kafka.Message, err error)) PushOption
- func WithDisableAutoTopicCreation() PushOption
- func WithFlushInterval(flushInterval time.Duration) PushOption
- func WithRequiredAcks(requiredAcks RequiredAcks) PushOption
- type Pusher
- type QueueOption
- type Queues
- type ReferenceHash
- type RequiredAcks
- type RoundRobin
- type XXHashBalancer
Constants ¶
View Source
const ( RequireNone = kafka.RequireNone RequireOne = kafka.RequireOne RequireAll = kafka.RequireAll )
Variables ¶
This section is empty.
Functions ¶
func HeadersFromContext ¶
func NewHeadersContext ¶
func WithSyncCall ¶ added in v0.2.0
func WithSyncCall() queue.CallOptions
Types ¶
type BalancerFunc ¶ added in v0.1.3
type BalancerFunc = kafka.BalancerFunc
type CRC32Balancer ¶ added in v0.1.3
type CRC32Balancer = kafka.CRC32Balancer
type Conf ¶
type Conf struct { Brokers []string Group string Topic string Offset string `json:",options=first|last,default=last"` Conns int `json:",default=1"` Consumers int `json:",default=8"` Processors int `json:",default=8"` MinBytes int `json:",default=10240"` // 10K MaxBytes int `json:",default=10485760"` // 10M Username string `json:",optional"` Password string `json:",optional"` }
type LeastBytes ¶ added in v0.1.3
type LeastBytes = kafka.LeastBytes
type Murmur2Balancer ¶ added in v0.1.3
type Murmur2Balancer = kafka.Murmur2Balancer
type PushOption ¶
type PushOption func(options *pushOptions)
func WithAuth ¶ added in v0.5.0
func WithAuth(username, password string) PushOption
func WithBalancer ¶ added in v0.1.3
func WithBalancer(balancer Balancer) PushOption
func WithBatchBytes ¶ added in v0.1.6
func WithBatchBytes(batchBytes int64) PushOption
func WithBatchSize ¶ added in v0.1.6
func WithBatchSize(batchSize int) PushOption
func WithChunkSize ¶
func WithChunkSize(chunkSize int) PushOption
func WithCompletion ¶ added in v0.0.12
func WithCompletion(completion func(messages []kafka.Message, err error)) PushOption
func WithDisableAutoTopicCreation ¶ added in v0.1.5
func WithDisableAutoTopicCreation() PushOption
func WithFlushInterval ¶
func WithFlushInterval(flushInterval time.Duration) PushOption
func WithRequiredAcks ¶ added in v0.1.6
func WithRequiredAcks(requiredAcks RequiredAcks) PushOption
type Pusher ¶
type Pusher struct {
// contains filtered or unexported fields
}
type QueueOption ¶
type QueueOption func(*queueOptions)
func WithCommitInterval ¶
func WithCommitInterval(interval time.Duration) QueueOption
func WithMaxWait ¶
func WithMaxWait(wait time.Duration) QueueOption
func WithMetrics ¶
func WithMetrics(metrics *stat.Metrics) QueueOption
func WithQueueCapacity ¶
func WithQueueCapacity(queueCapacity int) QueueOption
type Queues ¶
type Queues struct {
// contains filtered or unexported fields
}
func MustNewQueue ¶
func MustNewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) *Queues
type ReferenceHash ¶ added in v0.1.3
type ReferenceHash = kafka.ReferenceHash
type RequiredAcks ¶ added in v0.1.6
type RequiredAcks = kafka.RequiredAcks
type RoundRobin ¶ added in v0.1.3
type RoundRobin = kafka.RoundRobin
type XXHashBalancer ¶ added in v0.1.4
type XXHashBalancer struct {
// contains filtered or unexported fields
}
func (*XXHashBalancer) Balance ¶ added in v0.1.4
func (x *XXHashBalancer) Balance(msg kafka.Message, partitions ...int) (partition int)
Click to show internal directories.
Click to hide internal directories.