rocketmq

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2022 License: Apache-2.0 Imports: 31 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config added in v0.2.5

type Config struct {
	Addresses []string        `json:"addr" toml:"addr"`
	Consumer  *ConsumerConfig `json:"consumer" toml:"consumer"`
	Producer  *ProducerConfig `json:"producer" toml:"producer"`
}

Config config...

func DefaultConfig added in v0.2.5

func DefaultConfig() Config

DefaultConfig ...

func RawConfig added in v0.2.5

func RawConfig(key string) Config

RawConfig 返回配置

type ConsumerConfig

type ConsumerConfig struct {
	Name            string        `json:"name" toml:"name"`
	Enable          bool          `json:"enable" toml:"enable"`
	Addr            []string      `json:"addr" toml:"addr"`
	Topic           string        `json:"topic" toml:"topic"`
	Group           string        `json:"group" toml:"group"`
	DialTimeout     time.Duration `json:"dialTimeout" toml:"dialTimeout"`
	RwTimeout       time.Duration `json:"rwTimeout" toml:"rwTimeout"`
	SubExpression   string        `json:"subExpression" toml:"subExpression"`
	Rate            float64       `json:"rate" toml:"rate"`
	Capacity        int64         `json:"capacity" toml:"capacity"`
	WaitMaxDuration time.Duration `json:"waitMaxDuration" toml:"waitMaxDuration"`
	Shadow          Shadow        `json:"shadow" toml:"shadow"`
	Reconsume       int32         `json:"reconsume" toml:"reconsume"`
	AccessKey       string        `json:"accessKey" toml:"accessKey"`
	SecretKey       string        `json:"secretKey" toml:"secretKey"`
	MessageModel    string        `json:"messageModel" toml:"messageModel"` // 消费模式,默认clustering
	// client实例名,默认会基于Addr字段生成md5,支持多集群
	InstanceName string `json:"instanceName" toml:"instanceName"`
	EnableTrace  bool   `json:"enableTrace" toml:"enableTrace"`
	// 批量消费的最大消息数量,取值范围:[1, 1024],默认值为1
	ConsumeMessageBatchMaxSize int `json:"consumeMessageBatchMaxSize" toml:"consumeMessageBatchMaxSize"`
	// 每批次从broker拉取消息的最大个数,取值范围:[1, 1024],默认值为32
	PullBatchSize int32 `json:"pullBatchSize" toml:"pullBatchSize"`
}

ConsumerConfig consumer config

func DefaultConsumerConfig

func DefaultConsumerConfig() ConsumerConfig

DefaultConsumerConfig ...

func RawConsumerConfig

func RawConsumerConfig(key string) ConsumerConfig

RawConsumerConfig 返回配置

func StdPushConsumerConfig

func StdPushConsumerConfig(name string) ConsumerConfig

StdPushConsumerConfig ...

func (ConsumerConfig) Build

func (conf ConsumerConfig) Build() *PushConsumer

type FlowInfo added in v0.2.5

type FlowInfo struct {
	Name      string   `json:"name"`
	Addr      []string `json:"addr"`
	Topic     string   `json:"topic"`
	Group     string   `json:"group"`
	GroupType string   `json:"groupType"` // 类型, consumer 消费者, producer 生产者
	istats.FlowInfoBase
}

type Producer added in v0.2.5

type Producer struct {
	rocketmq.Producer

	ProducerConfig
	// contains filtered or unexported fields
}

func GetProducer added in v0.2.5

func GetProducer(name string) *Producer

func InvokerProducer added in v0.2.5

func InvokerProducer(name string) *Producer

Invoker ...

func StdNewProducer added in v0.2.5

func StdNewProducer(name string) *Producer

func (*Producer) Close added in v0.2.5

func (pc *Producer) Close() error

func (*Producer) Send added in v0.2.5

func (pc *Producer) Send(msg []byte) error

Send rocketmq发送消息

func (*Producer) SendMsg added in v0.2.5

func (pc *Producer) SendMsg(msg *primitive.Message) (*primitive.SendResult, error)

SendMsg... 自定义消息格式

func (*Producer) SendWithContext added in v0.2.5

func (pc *Producer) SendWithContext(ctx context.Context, msg []byte) error

SendWithContext 发送消息

func (*Producer) SendWithMsg added in v0.4.3

func (pc *Producer) SendWithMsg(ctx context.Context, msg *primitive.Message) error

SendWithMsg... 自定义消息格式

func (*Producer) SendWithResult added in v0.2.5

func (pc *Producer) SendWithResult(msg []byte, tag string) (*primitive.SendResult, error)

SendWithResult rocket mq 发送消息,可以自定义选择 tag 及返回结果

func (*Producer) SendWithTag added in v0.2.5

func (pc *Producer) SendWithTag(msg []byte, tag string) error

SendWithTag rocket mq 发送消息,可以自定义选择 tag

func (*Producer) Start added in v0.2.5

func (pc *Producer) Start() error

func (*Producer) WithInterceptor added in v0.2.5

func (pc *Producer) WithInterceptor(fs ...primitive.Interceptor) *Producer

type ProducerConfig

type ProducerConfig struct {
	Name        string        `json:"name" toml:"name"`
	Addr        []string      `json:"addr" toml:"addr"`
	Topic       string        `json:"topic" toml:"topic"`
	Group       string        `json:"group" toml:"group"`
	Retry       int           `json:"retry" toml:"retry"`
	DialTimeout time.Duration `json:"dialTimeout" toml:"dialTimeout"`
	RwTimeout   time.Duration `json:"rwTimeout" toml:"rwTimeout"`
	Shadow      Shadow        `json:"shadow" toml:"shadow"`
	AccessKey   string        `json:"accessKey" toml:"accessKey"`
	SecretKey   string        `json:"secretKey" toml:"secretKey"`
	// client实例名,默认会基于Addr字段生成md5,支持多集群
	InstanceName string `json:"instanceName" toml:"instanceName"`
	EnableTrace  bool   `json:"enableTrace" toml:"enableTrace"`
}

ProducerConfig producer config

func DefaultProducerConfig

func DefaultProducerConfig() ProducerConfig

DefaultProducerConfig ...

func RawProducerConfig

func RawProducerConfig(key string) ProducerConfig

RawProducerConfig ...

func StdProducerConfig

func StdProducerConfig(name string) *ProducerConfig

StdProducerConfig ...

func (*ProducerConfig) Build

func (conf *ProducerConfig) Build() *Producer

type PushConsumer added in v0.2.5

type PushConsumer struct {
	rocketmq.PushConsumer

	ConsumerConfig
	// contains filtered or unexported fields
}

func GetConsumer added in v0.2.5

func GetConsumer(name string) *PushConsumer

Get ...

func (*PushConsumer) Close added in v0.2.5

func (cc *PushConsumer) Close()

func (*PushConsumer) RegisterBatchMessage added in v0.4.4

func (cc *PushConsumer) RegisterBatchMessage(f func(context.Context, ...*primitive.MessageExt) error) *PushConsumer

func (*PushConsumer) RegisterSingleMessage added in v0.4.4

func (cc *PushConsumer) RegisterSingleMessage(f func(context.Context, *primitive.MessageExt) error) *PushConsumer

func (*PushConsumer) Start added in v0.2.5

func (cc *PushConsumer) Start() error

func (*PushConsumer) Subscribe deprecated added in v0.2.5

func (cc *PushConsumer) Subscribe(topic string, f func(context.Context, *primitive.MessageExt) error) *PushConsumer

Deprecated: use RegisterSingleMessage or RegisterBatchMessage instead

func (*PushConsumer) WithInterceptor added in v0.2.5

func (cc *PushConsumer) WithInterceptor(fs ...primitive.Interceptor) *PushConsumer

type Shadow added in v0.2.5

type Shadow struct {
	Mode string `json:"mode" toml:"mode"`
	// mode开启模式下白名单内topic不进行丢弃
	WitheTopics []string `json:"witheTopics" toml:"witheTopics"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL