Documentation ¶
Index ¶
- type Config
- type ConsumerDefaultConfig
- type FlowInfo
- type Producer
- func (pc *Producer) Close() error
- func (pc *Producer) MustStart()
- func (pc *Producer) SendMsg(ctx context.Context, msg *primitive.Message) (*primitive.SendResult, error)
- func (pc *Producer) SendWithContext(ctx context.Context, msg []byte) error
- func (pc *Producer) SendWithMsg(ctx context.Context, msg *primitive.Message) error
- func (pc *Producer) SendWithResult(ctx context.Context, msg []byte, tag string) (*primitive.SendResult, error)
- func (pc *Producer) Start() error
- func (pc *Producer) WithInterceptor(fs ...primitive.Interceptor) *Producer
- type ProducerConfig
- type PullConsumer
- type PullConsumerConfig
- type PushConsumer
- func (cc *PushConsumer) Close()
- func (cc *PushConsumer) MustStart()
- func (cc *PushConsumer) RegisterBatchMessage(f func(context.Context, ...*primitive.MessageExt) error) *PushConsumer
- func (cc *PushConsumer) RegisterSingleMessage(f func(context.Context, *primitive.MessageExt) error) *PushConsumer
- func (cc *PushConsumer) Start() error
- func (cc *PushConsumer) WithInterceptor(fs ...primitive.Interceptor) *PushConsumer
- type PushConsumerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶ added in v0.2.5
type Config struct { Addresses []string `json:"addr" toml:"addr"` PushConsumer *PushConsumerConfig `json:"consumer" toml:"consumer" mapstructure:",squash"` PullConsumer *PullConsumerConfig `json:"pullConsumer" toml:"pullConsumer" mapstructure:",squash"` Producer *ProducerConfig `json:"producer" toml:"producer"` }
Config config...
type ConsumerDefaultConfig ¶ added in v0.10.0
type ConsumerDefaultConfig struct { Name string `json:"name" toml:"name"` Enable bool `json:"enable" toml:"enable"` Addr []string `json:"addr" toml:"addr"` Topic string `json:"topic" toml:"topic"` Group string `json:"group" toml:"group"` DialTimeout time.Duration `json:"dialTimeout" toml:"dialTimeout"` SubExpression string `json:"subExpression" toml:"subExpression"` // 最大重复消费次数 Reconsume int32 `json:"reconsume" toml:"reconsume"` AccessKey string `json:"accessKey" toml:"accessKey"` SecretKey string `json:"secretKey" toml:"secretKey"` MessageModel string `json:"messageModel" toml:"messageModel"` // 消费模式,默认clustering // client实例名,默认会基于Addr字段生成md5,支持多集群 InstanceName string `json:"instanceName" toml:"instanceName"` // 批量消费的最大消息数量,取值范围:[1, 1024],默认值为1 ConsumeMessageBatchMaxSize int `json:"consumeMessageBatchMaxSize" toml:"consumeMessageBatchMaxSize"` // 每批次从broker拉取消息的最大个数,取值范围:[1, 1024],默认值为32 PullBatchSize int32 `json:"pullBatchSize" toml:"pullBatchSize"` // 设置每次消息拉取的时间间隔,push模式最大为65535*time.Millisecond PullInterval time.Duration `json:"pullInterval" toml:"pullInterval"` // 是否开启trace EnableTrace bool `json:"enableTrace" toml:"enableTrace"` }
type Producer ¶ added in v0.2.5
type Producer struct { rocketmq.Producer ProducerConfig // contains filtered or unexported fields }
func StdNewProducer ¶ added in v0.2.5
func (*Producer) MustStart ¶ added in v0.11.0
func (pc *Producer) MustStart()
MustStart panics when error found.
func (*Producer) SendMsg ¶ added in v0.2.5
func (pc *Producer) SendMsg(ctx context.Context, msg *primitive.Message) (*primitive.SendResult, error)
SendMsg... 自定义消息格式
func (*Producer) SendWithContext ¶ added in v0.2.5
SendWithContext 发送消息
func (*Producer) SendWithMsg ¶ added in v0.4.3
SendWithMsg 发送消息,可以自定义选择tag
func (*Producer) SendWithResult ¶ added in v0.2.5
func (pc *Producer) SendWithResult(ctx context.Context, msg []byte, tag string) (*primitive.SendResult, error)
SendWithResult rocket mq 发送消息,可以自定义选择 tag 及返回结果
func (*Producer) WithInterceptor ¶ added in v0.2.5
func (pc *Producer) WithInterceptor(fs ...primitive.Interceptor) *Producer
type ProducerConfig ¶
type ProducerConfig struct { Name string `json:"name" toml:"name"` Addr []string `json:"addr" toml:"addr"` Topic string `json:"topic" toml:"topic"` Group string `json:"group" toml:"group"` Retry int `json:"retry" toml:"retry"` DialTimeout time.Duration `json:"dialTimeout" toml:"dialTimeout"` RwTimeout time.Duration `json:"rwTimeout" toml:"rwTimeout"` AccessKey string `json:"accessKey" toml:"accessKey"` SecretKey string `json:"secretKey" toml:"secretKey"` // client实例名,默认会基于Addr字段生成md5,支持多集群 InstanceName string `json:"instanceName" toml:"instanceName"` EnableTrace bool `json:"enableTrace" toml:"enableTrace"` }
ProducerConfig producer config
func RawProducerConfig ¶
func RawProducerConfig(name string) *ProducerConfig
RawProducerConfig 返回produce配置 nolint:dupl
func (*ProducerConfig) Build ¶
func (conf *ProducerConfig) Build() *Producer
type PullConsumer ¶ added in v0.10.0
type PullConsumer struct { rocketmq.PullConsumer PullConsumerConfig // contains filtered or unexported fields }
func (*PullConsumer) Close ¶ added in v0.10.0
func (cc *PullConsumer) Close()
func (*PullConsumer) MustStart ¶ added in v0.11.0
func (cc *PullConsumer) MustStart()
MustStart panics when error found.
func (*PullConsumer) Poll ¶ added in v0.10.0
func (cc *PullConsumer) Poll(ctx context.Context, f func(context.Context, []*primitive.MessageExt) error)
func (*PullConsumer) Start ¶ added in v0.10.0
func (cc *PullConsumer) Start() error
type PullConsumerConfig ¶ added in v0.10.0
type PullConsumerConfig struct { ConsumerDefaultConfig // 持久化offset间隔 RefreshPersistOffsetDuration time.Duration `json:"refreshPersistOffsetDuration" toml:"refreshPersistOffsetDuration"` PollTimeout time.Duration `json:"pollTimeout" toml:"pollTimeout"` }
PullConsumerConfig pull consumer config
func RawPullConsumerConfig ¶ added in v0.10.0
func RawPullConsumerConfig(name string) *PullConsumerConfig
RawPullConsumerConfig 返回pull consume配置 nolint:dupl
func StdPullConsumerConfig ¶ added in v0.10.0
func StdPullConsumerConfig(name string) *PullConsumerConfig
StdPullConsumerConfig ...
func (*PullConsumerConfig) Build ¶ added in v0.10.0
func (conf *PullConsumerConfig) Build() *PullConsumer
type PushConsumer ¶ added in v0.2.5
type PushConsumer struct { rocketmq.PushConsumer PushConsumerConfig // contains filtered or unexported fields }
func (*PushConsumer) Close ¶ added in v0.2.5
func (cc *PushConsumer) Close()
func (*PushConsumer) MustStart ¶ added in v0.11.0
func (cc *PushConsumer) MustStart()
MustStart panics when error found.
func (*PushConsumer) RegisterBatchMessage ¶ added in v0.4.4
func (cc *PushConsumer) RegisterBatchMessage(f func(context.Context, ...*primitive.MessageExt) error) *PushConsumer
func (*PushConsumer) RegisterSingleMessage ¶ added in v0.4.4
func (cc *PushConsumer) RegisterSingleMessage(f func(context.Context, *primitive.MessageExt) error) *PushConsumer
func (*PushConsumer) Start ¶ added in v0.2.5
func (cc *PushConsumer) Start() error
func (*PushConsumer) WithInterceptor ¶ added in v0.2.5
func (cc *PushConsumer) WithInterceptor(fs ...primitive.Interceptor) *PushConsumer
type PushConsumerConfig ¶ added in v0.10.0
type PushConsumerConfig struct { ConsumerDefaultConfig RwTimeout time.Duration `json:"rwTimeout" toml:"rwTimeout"` Rate float64 `json:"rate" toml:"rate"` Capacity int64 `json:"capacity" toml:"capacity"` WaitMaxDuration time.Duration `json:"waitMaxDuration" toml:"waitMaxDuration"` // 消费消息的协程数,默认为20 ConsumeGoroutineNums int `json:"consumeGoroutineNums" toml:"consumeGoroutineNums"` }
PushConsumerConfig push consumer config
func RawPushConsumerConfig ¶ added in v0.10.0
func RawPushConsumerConfig(name string) *PushConsumerConfig
RawPushConsumerConfig 返push consume回配置 nolint:dupl
func StdPushConsumerConfig ¶
func StdPushConsumerConfig(name string) *PushConsumerConfig
StdPushConsumerConfig ...
func (*PushConsumerConfig) Build ¶ added in v0.10.0
func (conf *PushConsumerConfig) Build() *PushConsumer
Click to show internal directories.
Click to hide internal directories.