producer

package
v1.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2020 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

*

  • builtin interceptor

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTopicEmpty   = errors.New("topic is nil")
	ErrMessageEmpty = errors.New("message is nil")
	ErrNotRunning   = errors.New("producer not started")
)

Functions

func MarshalMessageBatch

func MarshalMessageBatch(msgs ...*primitive.Message) []byte

func NewDefaultProducer

func NewDefaultProducer(opts ...Option) (*defaultProducer, error)

func NewTransactionProducer

func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error)

TODO: checkLocalTransaction

Types

type Option

type Option func(*producerOptions)

func WithCredentials

func WithCredentials(c primitive.Credentials) Option

func WithGroupName

func WithGroupName(group string) Option

WithGroupName set group name address

func WithInstanceName

func WithInstanceName(name string) Option

func WithInterceptor

func WithInterceptor(f ...primitive.Interceptor) Option

func WithNameServer

func WithNameServer(nameServers primitive.NamesrvAddr) Option

WithNameServer set NameServer address, only support one NameServer cluster in alpha2

func WithNamespace

func WithNamespace(namespace string) Option

WithNamespace set the namespace of producer

func WithQueueSelector

func WithQueueSelector(s QueueSelector) Option

func WithRetry

func WithRetry(retries int) Option

WithRetry return a Option that specifies the retry times when send failed. TODO: use retry middleware instead

func WithSendMsgTimeout

func WithSendMsgTimeout(duration time.Duration) Option

func WithTrace

func WithTrace(traceCfg *primitive.TraceConfig) Option

WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.

func WithVIPChannel

func WithVIPChannel(enable bool) Option

type QueueSelector

type QueueSelector interface {
	Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue
}

func NewHashQueueSelector

func NewHashQueueSelector() QueueSelector

func NewManualQueueSelector

func NewManualQueueSelector() QueueSelector

func NewRandomQueueSelector

func NewRandomQueueSelector() QueueSelector

func NewRoundRobinQueueSelector

func NewRoundRobinQueueSelector() QueueSelector

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL