alirmq

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2020 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OnsDomainFormatUrl        = "ons.%s.aliyuncs.com"
	NormalMessageType         = 0
	PartitionOrderMessageType = 1
	GlobalOrderMessageType    = 2
	TransactionMessageType    = 4
	DelayMessageType          = 5
)

Variables

View Source
var Logger = logrus.New()

Functions

func AddLogHook added in v0.1.5

func AddLogHook(f LogHook)

func CreateMessage added in v0.1.5

func CreateMessage(topic string, body []byte, setter ...MessageSetter) *primitive.Message

func DefaultErrorCallback

func DefaultErrorCallback(err error, msg *primitive.MessageExt)

func StringSet added in v0.1.5

func StringSet(ss []string) []string

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cred *RocketMQCredentials, setter ...ConsumerSetter) (*Consumer, error)

func (*Consumer) CheckSubRelation added in v0.1.5

func (c *Consumer) CheckSubRelation() (bool, error)

func (*Consumer) SetErrorCallback

func (c *Consumer) SetErrorCallback(fn func(err error, msg *primitive.MessageExt))

func (*Consumer) Start

func (c *Consumer) Start() error

func (*Consumer) Stop

func (c *Consumer) Stop() error

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topic, tag string, handler Handler) error

type ConsumerOption

type ConsumerOption struct {
	InstanceName      string
	Broadcasting      bool  // 是否开启广播
	ConsumeOrderly    bool  // 是否顺序消费
	MaxReconsumeTimes int32 //  失败消息重试次数
	MaxTopicCount     int   // 一个consumer 消费多少topic
	ConsumeTimeout    int   // 消息消费耗时
	// contains filtered or unexported fields
}

type ConsumerSetter

type ConsumerSetter func(option *ConsumerOption)

func WithBroadcasting

func WithBroadcasting(broadcast bool) ConsumerSetter

func WithConsumeOrderly

func WithConsumeOrderly(orderly bool) ConsumerSetter

func WithConsumeTimeout

func WithConsumeTimeout(n int) ConsumerSetter

func WithConsumerLokHook added in v0.1.5

func WithConsumerLokHook(f LogHook) ConsumerSetter

func WithInstanceName

func WithInstanceName(name string) ConsumerSetter

func WithMaxTopicCount

func WithMaxTopicCount(count int) ConsumerSetter

func WithReconsumerTime

func WithReconsumerTime(n int32) ConsumerSetter

type CreateTopicError added in v0.1.5

type CreateTopicError struct {
	Msg string
	Err error
}

type ErrorCallback

type ErrorCallback func(err error, msg *primitive.MessageExt)

type GroupInfo added in v0.1.5

type GroupInfo struct {
	GroupName string
	Remark    string
}

type Handler

type Handler func(*M) error

type LogHook added in v0.1.5

type LogHook func(entry *logrus.Entry)

LogHook log hook模板

type M

type M struct {
	MsgId           string
	Topic           string
	Tag             string
	Key             string
	Body            []byte
	ReConsumerTimes int32
}

type Message

type Message struct {
	Topic      string
	Tag        string
	Body       []byte
	Key        string
	Properties map[string]string
}

type MessageSetter

type MessageSetter func(message *Message)

func WithKey

func WithKey(key string) MessageSetter

func WithProperty

func WithProperty(key, value string) MessageSetter

func WithTag

func WithTag(tag string) MessageSetter

type Monitor added in v0.1.5

type Monitor struct {
	Callback LogHook
}

Monitor 信息监控

func NewMonitor added in v0.1.5

func NewMonitor(l LogHook) *Monitor

NewMonitor 返回一个实例

func (*Monitor) Fire added in v0.1.5

func (m *Monitor) Fire(entry *logrus.Entry) error

Fire 实际执行了回调

func (*Monitor) Levels added in v0.1.5

func (m *Monitor) Levels() []logrus.Level

Levels 这些级别的日志会被回调

type MultiSubscription added in v0.1.5

type MultiSubscription struct {
	Topic string
	Tag   string
	Fn    []Handler
}

type OnsClient added in v0.1.5

type OnsClient struct {
	Region     string
	AccessKey  string
	SecretKey  string
	InstanceId string
	Domain     string
	// contains filtered or unexported fields
}

func NewOnsClient added in v0.1.5

func NewOnsClient(region, accessKey, secretKey, instanceId string) (*OnsClient, error)

func (*OnsClient) ClearConsumerAllTopicMessage added in v0.1.5

func (c *OnsClient) ClearConsumerAllTopicMessage(group string) error

清空消费者组堆积的所有消息, 耗时可能比较久

func (*OnsClient) ClearConsumerTopicMessage added in v0.1.5

func (c *OnsClient) ClearConsumerTopicMessage(group, topic string) error

清空消费者组某个topic的消息

func (*OnsClient) ConsumerResetOffset added in v0.1.5

func (c *OnsClient) ConsumerResetOffset(group, topic string, resetTimeStamp int64) error

回溯消息

func (*OnsClient) CreateGroup added in v0.1.5

func (c *OnsClient) CreateGroup(group, remark string) error

创建组

func (*OnsClient) CreateOnsConsumerStatusRequest added in v0.1.5

func (c *OnsClient) CreateOnsConsumerStatusRequest(group string) (response *ons.OnsConsumerStatusResponse, err error)

回溯消息

func (*OnsClient) CreateTopic added in v0.1.5

func (c *OnsClient) CreateTopic(topic, remark string, messageType int) error

func (*OnsClient) DeleteGroup added in v0.1.5

func (c *OnsClient) DeleteGroup(group string) error

删除组

func (*OnsClient) DeleteTopic added in v0.1.5

func (c *OnsClient) DeleteTopic(topic string) error

func (*OnsClient) ListConsumerGroupSub added in v0.1.5

func (c *OnsClient) ListConsumerGroupSub(group string) ([]*Sub, error)

获取消费者组订阅的topic

func (*OnsClient) ListGroup added in v0.1.5

func (c *OnsClient) ListGroup() ([]*GroupInfo, error)

func (*OnsClient) ListTopic added in v0.1.5

func (c *OnsClient) ListTopic() ([]*TopicInfo, error)

type Producer added in v0.1.5

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(cred *RocketMQCredentials, setter ...ProducerSetter) (*Producer, error)

func (*Producer) CreateTopic added in v0.1.5

func (p *Producer) CreateTopic(topics ...string) []*CreateTopicError

func (*Producer) RawProducer added in v0.1.5

func (p *Producer) RawProducer() *rocketmq.Producer

func (*Producer) Send added in v0.1.5

func (p *Producer) Send(topic string, body []byte, setter ...MessageSetter) SendResult

func (*Producer) SendAsync added in v0.1.5

func (p *Producer) SendAsync(topic string, body []byte, callback func(ctx context.Context, result *SendResult), setter ...MessageSetter) error

func (*Producer) Start added in v0.1.5

func (p *Producer) Start() error

func (*Producer) Stop added in v0.1.5

func (p *Producer) Stop() error

type ProducerOption

type ProducerOption struct {
	GroupName          string
	SendMessageTimeout int
	InstanceName       string
	UnitName           string
	Region             string
	Retry              int
	// contains filtered or unexported fields
}

type ProducerSetter

type ProducerSetter func(option *ProducerOption)

func WithProducerGroupName

func WithProducerGroupName(name string) ProducerSetter

func WithProducerInstanceName

func WithProducerInstanceName(name string) ProducerSetter

func WithProducerLokHook added in v0.1.5

func WithProducerLokHook(f LogHook) ProducerSetter

func WithRetry added in v0.1.5

func WithRetry(retry int) ProducerSetter

func WithSendMessageTimeout

func WithSendMessageTimeout(timeout int) ProducerSetter

type RocketMQCredentials

type RocketMQCredentials struct {
	NameServer       string
	AccessKey        string
	SecretKey        string
	NameSpace        string
	GroupName        string
	Region           string
	RetryTime        int
	SubCallback      []Subscription
	MultiSubCallback []MultiSubscription
}

rocket mq connection config

type SendResult

type SendResult struct {
	Result *primitive.SendResult
	Err    error
}

func (*SendResult) MessageId

func (r *SendResult) MessageId() string

func (*SendResult) Success

func (r *SendResult) Success() bool

type Sub added in v0.1.5

type Sub struct {
	Topic string
	Tag   string
}

type Subscription added in v0.1.5

type Subscription struct {
	Topic string
	Tag   string
	Fn    Handler
}

type TopicInfo added in v0.1.5

type TopicInfo struct {
	Topic       string
	MessageType int
	Remark      string
	CreateTime  int64
}

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL