rocketmq

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2021 License: MIT Imports: 13 Imported by: 5

README

rocketmq

Documentation

Index

Constants

View Source
const (
	DefaultHttpAccessProto = "http"
	TCPCGOProto            = "tcp-cgo"
	TCPNativeProto         = "tcp"
)

Variables

View Source
var (
	Producers = make(map[string]Producer)
	Consumers = make(map[string]PushConsumer)
)

Functions

This section is empty.

Types

type MQHttpConsumer

type MQHttpConsumer struct {
	// contains filtered or unexported fields
}

func NewMQHttpConsumer

func NewMQHttpConsumer(md *Metadata) (*MQHttpConsumer, error)

NewMQHttpConsumer

func (*MQHttpConsumer) Init

func (mq *MQHttpConsumer) Init(md *Metadata) error

func (*MQHttpConsumer) Shutdown

func (mq *MQHttpConsumer) Shutdown() error

Shutdown the PullConsumer

func (*MQHttpConsumer) Start

func (mq *MQHttpConsumer) Start() error

Start the PullConsumer for consuming message

func (*MQHttpConsumer) Subscribe

func (mq *MQHttpConsumer) Subscribe(topic string, selector mqc.MessageSelector, f func(context.Context, ...*primitive.MessageExt) (mqc.ConsumeResult, error)) error

Subscribe a topic for consuming

func (*MQHttpConsumer) Unsubscribe

func (mq *MQHttpConsumer) Unsubscribe(topic string) error

Unsubscribe a topic

type MQHttpProducer

type MQHttpProducer struct {
	// contains filtered or unexported fields
}

func NewMQHttpProducer

func NewMQHttpProducer(md *Metadata) (*MQHttpProducer, error)

NewMQHttpProducer

func (*MQHttpProducer) Init

func (mq *MQHttpProducer) Init(md *Metadata) error

func (*MQHttpProducer) SendAsync

func (mq *MQHttpProducer) SendAsync(ctx context.Context, f func(ctx context.Context, result *primitive.SendResult, err error), msg ...*primitive.Message) error

Send async message, unimplemented

func (*MQHttpProducer) SendOneWay

func (mq *MQHttpProducer) SendOneWay(ctx context.Context, msg ...*primitive.Message) error

Send oneway message, unimplemented

func (*MQHttpProducer) SendSync

func (mq *MQHttpProducer) SendSync(ctx context.Context, msgs ...*primitive.Message) (*primitive.SendResult, error)

Send sync message, support single msg just now

func (*MQHttpProducer) Shutdown

func (mq *MQHttpProducer) Shutdown() error

Shutdown the Producer

func (*MQHttpProducer) Start

func (mq *MQHttpProducer) Start() error

Start the Producer

type Metadata

type Metadata struct {
	// sdk proto (http or tcp),default tcp
	AccessProto string `json:"accessProto,omitempty"`

	// rocketmq Credentials
	AccessKey string `json:"accessKey,omitempty"`

	// rocketmq Credentials
	SecretKey string `json:"secretKey,omitempty"`

	// rocketmq's endpoint, optional, just for http proto
	Endpoint string `json:"endpoint,omitempty"`

	// rocketmq's name server, optional
	NameServer string `json:"nameServer,omitempty"`

	// rocketmq's namespace, optional
	InstanceId string `json:"instanceId,omitempty"`

	// consumer group for rocketmq's subscribers, suggested to provide
	ConsumerGroup string `json:"consumerGroup,omitempty"`

	// consumer group for rocketmq's subscribers, suggested to provide, just for http proto
	ConsumerBatchSize int `json:"consumerBatchSize,string,omitempty"`

	// consumer group for rocketmq's subscribers, suggested to provide, just for cgo proto
	ConsumerThreadNums int `json:"consumerThreadNums,string,omitempty"`

	// rocketmq's name server domain, optional
	NameServerDomain string `json:"nameServerDomain,omitempty"`

	// retry times to connect rocketmq's broker, optional
	Retries int `json:"retries,string,omitempty"`
}

type NativeRocketMQConsumer

type NativeRocketMQConsumer struct {
	// contains filtered or unexported fields
}

*

func (*NativeRocketMQConsumer) Init

func (mq *NativeRocketMQConsumer) Init(md *Metadata) error

func (*NativeRocketMQConsumer) Shutdown

func (mq *NativeRocketMQConsumer) Shutdown() error

Shutdown the PullConsumer

func (*NativeRocketMQConsumer) Start

func (mq *NativeRocketMQConsumer) Start() error

Start the PullConsumer for consuming message

func (*NativeRocketMQConsumer) Subscribe

Subscribe a topic for consuming

func (*NativeRocketMQConsumer) Unsubscribe

func (mq *NativeRocketMQConsumer) Unsubscribe(topic string) error

Unsubscribe a topic

type NativeRocketMQProducer

type NativeRocketMQProducer struct {
	// contains filtered or unexported fields
}

*

func (*NativeRocketMQProducer) Init

func (mq *NativeRocketMQProducer) Init(md *Metadata) error

NewRocketMQProducer

func (*NativeRocketMQProducer) SendAsync

func (mq *NativeRocketMQProducer) SendAsync(ctx context.Context, f func(ctx context.Context, result *primitive.SendResult, err error), msg ...*primitive.Message) error

Send async message, unimplemented

func (*NativeRocketMQProducer) SendOneWay

func (mq *NativeRocketMQProducer) SendOneWay(ctx context.Context, msg ...*primitive.Message) error

Send oneway message, unimplemented

func (*NativeRocketMQProducer) SendSync

Send sync message, support single msg just now

func (*NativeRocketMQProducer) Shutdown

func (mq *NativeRocketMQProducer) Shutdown() error

Shutdown the Producer

func (*NativeRocketMQProducer) Start

func (mq *NativeRocketMQProducer) Start() error

Start the Producer

type Producer

type Producer interface {
	Init(md *Metadata) error
	Start() error
	Shutdown() error
	SendSync(ctx context.Context, mq ...*primitive.Message) (*primitive.SendResult, error)
	SendAsync(ctx context.Context, mq func(ctx context.Context, result *primitive.SendResult, err error),
		msg ...*primitive.Message) error
	SendOneWay(ctx context.Context, mq ...*primitive.Message) error
}

type PushConsumer

type PushConsumer interface {
	Init(md *Metadata) error

	// Start the PullConsumer for consuming message
	Start() error

	// Shutdown the PullConsumer, all offset of MessageQueue will be sync to broker before process exit
	Shutdown() error

	// Subscribe a topic for consuming
	Subscribe(topic string, selector consumer.MessageSelector, f func(context.Context, ...*primitive.MessageExt) (mqc.ConsumeResult, error)) error

	// Unsubscribe a topic
	Unsubscribe(topic string) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL