golang

package module
v5.1.1-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2024 License: Apache-2.0 Imports: 51 Imported by: 31

README

The Golang Implementation of Apache RocketMQ Client

Codecov-golang

Here is the golang implementation of the client for Apache RocketMQ.

Architecture

We build the following protocols described in rocketmq-apis on top of gRPC-go, utilizing Protocol buffers to serialize and deserialize data in transmission.

Quick Start

Installation

With Go modules(Go 1.11+), simply add the following import to your code, and then go [build|run|test] will automatically fetch the necessary dependencies.

import "github.com/apache/rocketmq-clients/golang"

Otherwise, to install the golang package, run the following command:

go get -u github.com/apache/rocketmq-clients/golang/v5

Documentation

Index

Constants

View Source
const (
	CLIENT_LOG_ROOT     = "rocketmq.client.logRoot"
	CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex"
	CLIENT_LOG_FILESIZE = "rocketmq.client.logFileMaxSize"
	CLIENT_LOG_LEVEL    = "rocketmq.client.logLevel"
	// CLIENT_LOG_ADDITIVE        = "rocketmq.client.log.additive"
	CLIENT_LOG_FILENAME = "rocketmq.client.logFileName"
	// CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize"
	ENABLE_CONSOLE_APPENDER = "mq.consoleAppender.enabled"
)
View Source
const (
	MESSAGE_ID_LENGTH_FOR_V1_OR_LATER        = 34
	MESSAGE_ID_VERSION_V0             string = "00"
	MESSAGE_ID_VERSION_V1             string = "01"
)
View Source
const (
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION           = "messaging.rocketmq.operation"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE           = "messaging.rocketmq.namespace"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG                 = "messaging.rocketmq.message_tag"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS                = "messaging.rocketmq.message_keys"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID           = "messaging.rocketmq.client_id"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE        = "messaging.rocketmq.message_type"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP        = "messaging.rocketmq.client_group"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT             = "messaging.rocketmq.attempt"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE          = "messaging.rocketmq.batch_size"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP  = "messaging.rocketmq.delivery_timestamp"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP = "messaging.rocketmq.available_timestamp"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY          = "messaging.rocketmq.access_key"

	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM  = "rocketmq"
	SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND           = "topic"
	SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL         = "RMQ-gRPC"
	SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION = "v1"

	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE      = "normal"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE        = "fifo"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE       = "delay"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE = "transaction"

	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION     = "send"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_RECEIVE_OPERATION  = "receive"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PULL_OPERATION     = "pull"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION    = "await"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION  = "process"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ACK_OPERATION      = "ack"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NACK_OPERATION     = "nack"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION   = "commit"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION = "rollback"
	SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DLQ_OPERATION      = "dlq"

	// Messaging span attribute name list
	SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM             = "messaging.system"
	SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION        = "messaging.destination"
	SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND   = "messaging.destination_kind"
	SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL           = "messaging.protocol"
	SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION   = "messaging.protocol_version"
	SPAN_ATTRIBUTE_KEY_MESSAGING_URL                = "messaging.url"
	SPAN_ATTRIBUTE_KEY_MESSAGING_ID                 = "messaging.message_id"
	SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes"
	SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION          = "messaging.operation"

	SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION    = "send"
	SPAN_ATTRIBUTE_VALUE_MESSAGING_RECEIVE_OPERATION = "receive"
	SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION = "process"

	SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction"

	// Span annotation
	SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption"
	SPAN_ANNOTATION_MESSAGE_KEYS      = "__message_keys"
	SPAN_ANNOTATION_ATTR_START_TIME   = "__start_time"
)

RocketMQ span attribute name list

View Source
const (
	MAX_MESSAGE_NUM = 1
)

Variables

View Source
var (
	MLatencyMs = stats.Int64("publish_latency", "Publish latency in milliseconds", "ms")

	PublishLatencyView = view.View{
		Name:        "rocketmq_send_cost_time",
		Description: "Publish latency",
		Measure:     MLatencyMs,
		Aggregation: view.Distribution(1, 5, 10, 20, 50, 200, 500),
		TagKeys:     []tag.Key{topicTag, clientIdTag, invocationStatusTag},
	}
)
View Source
var (
	ErrNoAvailableBrokers = errors.New("rocketmq: no available brokers")
)
View Source
var (
	ErrNoAvailableEndpoints = errors.New("rocketmq: no available endpoints")
)
View Source
var NewClient = func(config *Config, opts ...ClientOption) (Client, error) {
	endpoints, err := utils.ParseTarget(config.Endpoint)
	if err != nil {
		return nil, err
	}
	cli := &defaultClient{
		config:                        config,
		opts:                          defaultNSOptions,
		clientID:                      utils.GenClientID(),
		accessPoint:                   endpoints,
		messageInterceptors:           make([]MessageInterceptor, 0),
		endpointsTelemetryClientTable: make(map[string]*defaultClientSession),
		on:                            *atomic.NewBool(true),
	}
	cli.log = sugarBaseLogger.With("client_id", cli.clientID)
	for _, opt := range opts {
		opt.apply(&cli.opts)
	}
	cli.done = make(chan struct{}, 1)
	cli.clientMeterProvider = NewDefaultClientMeterProvider(cli)
	return cli, nil
}
View Source
var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) {
	endpoints, err := utils.ParseTarget(config.Endpoint)
	if err != nil {
		return nil, err
	}
	cli := &defaultClient{
		config:                        config,
		opts:                          defaultNSOptions,
		clientID:                      utils.GenClientID(),
		accessPoint:                   endpoints,
		messageInterceptors:           make([]MessageInterceptor, 0),
		endpointsTelemetryClientTable: make(map[string]*defaultClientSession),
		on:                            *atomic.NewBool(true),
		clientManager:                 &MockClientManager{},
	}
	cli.log = sugarBaseLogger.With("client_id", cli.clientID)
	for _, opt := range opts {
		opt.apply(&cli.opts)
	}
	cli.done = make(chan struct{}, 1)
	cli.clientMeterProvider = NewDefaultClientMeterProvider(cli)
	return cli, nil
}
View Source
var NewClientConn = func(endpoint string, opts ...ConnOption) (ClientConn, error) {
	client := &clientConn{
		opts:     defaultConnOptions,
		validate: validator.New(),
	}
	if len(endpoint) == 0 {
		return nil, ErrNoAvailableEndpoints
	}
	for _, opt := range opts {
		opt.apply(&client.opts)
	}

	baseCtx := context.TODO()
	if client.opts.Context != nil {
		baseCtx = client.opts.Context
	}

	ctx, cancel := context.WithCancel(baseCtx)

	client.ctx = ctx
	client.cancel = cancel
	client.creds = credentials.NewTLS(client.opts.TLS)

	if client.opts.MaxCallSendMsgSize > 0 || client.opts.MaxCallRecvMsgSize > 0 {
		if client.opts.MaxCallRecvMsgSize > 0 && client.opts.MaxCallSendMsgSize > client.opts.MaxCallRecvMsgSize {
			return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", client.opts.MaxCallRecvMsgSize, client.opts.MaxCallSendMsgSize)
		}
		if client.opts.MaxCallSendMsgSize > 0 {
			client.callOpts = append(client.callOpts, grpc.MaxCallSendMsgSize(client.opts.MaxCallSendMsgSize))
		}
		if client.opts.MaxCallRecvMsgSize > 0 {
			client.callOpts = append(client.callOpts, grpc.MaxCallRecvMsgSize(client.opts.MaxCallRecvMsgSize))
		}
	}

	conn, err := client.dial(endpoint)
	if err != nil {
		client.cancel()
		return nil, err
	}
	client.conn = conn

	return client, nil
}
View Source
var NewDefaultClientManager = func() *defaultClientManager {
	return &defaultClientManager{
		rpcClientTable: make(map[string]RpcClient),
		done:           make(chan struct{}),
		opts:           defaultClientManagerOptions,
	}
}
View Source
var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter {
	return &defaultClientMeter{
		enabled:     *atomic.NewBool(on),
		endpoints:   endpoints,
		ocaExporter: exporter,
	}
}
View Source
var NewDefaultClientMeterProvider = func(client *defaultClient) ClientMeterProvider {
	cmp := &defaultClientMeterProvider{
		client:      client,
		clientMeter: NewDefaultClientMeter(nil, false, nil, "nil"),
	}
	client.registerMessageInterceptor(NewDefaultMessageMeterInterceptor(cmp))
	return cmp
}
View Source
var NewDefaultMessageMeterInterceptor = func(clientMeterProvider ClientMeterProvider) *defaultMessageMeterInterceptor {
	return &defaultMessageMeterInterceptor{
		clientMeterProvider: clientMeterProvider,
	}
}
View Source
var NewFilterExpression = func(expression string) *FilterExpression {
	return &FilterExpression{
		expression:     expression,
		expressionType: TAG,
	}
}
View Source
var NewFilterExpressionWithType = func(expression string, expressionType FilterExpressionType) *FilterExpression {
	return &FilterExpression{
		expression:     expression,
		expressionType: expressionType,
	}
}
View Source
var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error) {
	copyOpt := defaultProducerOptions
	po := &copyOpt
	for _, opt := range opts {
		opt.apply(po)
	}
	cli, err := po.clientFunc(config)
	if err != nil {
		return nil, err
	}
	p := &defaultProducer{
		po:      *po,
		cli:     cli.(*defaultClient),
		checker: po.checker,
	}
	p.cli.initTopics = po.topics
	endpoints, err := utils.ParseTarget(config.Endpoint)
	if err != nil {
		return nil, err
	}
	p.pSetting = &producerSettings{
		clientId:   p.cli.GetClientID(),
		endpoints:  endpoints,
		clientType: v2.ClientType_PRODUCER,
		retryPolicy: &v2.RetryPolicy{
			MaxAttempts: po.maxAttempts,
			Strategy: &v2.RetryPolicy_ExponentialBackoff{
				ExponentialBackoff: &v2.ExponentialBackoff{
					Max:        durationpb.New(time.Duration(0)),
					Initial:    durationpb.New(time.Duration(0)),
					Multiplier: 1,
				},
			},
		},
		requestTimeout:      p.cli.opts.timeout,
		validateMessageType: *atomic.NewBool(true),
		maxBodySizeBytes:    *atomic.NewInt32(4 * 1024 * 1024),
	}
	for _, topic := range po.topics {
		topicResource := &v2.Resource{
			Name:              topic,
			ResourceNamespace: config.NameSpace,
		}
		p.pSetting.topics.Store(topic, topicResource)
	}
	p.cli.settings = p.pSetting
	p.cli.clientImpl = p
	return p, nil
}
View Source
var NewPublishingLoadBalancer = func(messageQueues []*v2.MessageQueue) (PublishingLoadBalancer, error) {
	plb := &publishingLoadBalancer{
		messageQueues: messageQueues,
	}
	return plb, nil
}
View Source
var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
	if msg == nil {
		return nil, fmt.Errorf("message is nil")
	}
	pMsg := &PublishingMessage{
		msg: msg,
	}

	maxBodySizeBytes := int(settings.maxBodySizeBytes.Load())

	length := len(msg.Body)
	if length > maxBodySizeBytes {
		return nil, fmt.Errorf("message body size exceeds the threshold, max size=%d bytes", maxBodySizeBytes)
	}

	pMsg.encoding = v2.Encoding_IDENTITY

	pMsg.namespace = namespace

	pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()

	if msg.GetMessageGroup() == nil && msg.GetDeliveryTimestamp() == nil && !txEnabled {
		pMsg.messageType = v2.MessageType_NORMAL
		return pMsg, nil
	}

	if msg.GetMessageGroup() != nil && !txEnabled {
		pMsg.messageType = v2.MessageType_FIFO
		return pMsg, nil
	}

	if msg.GetDeliveryTimestamp() != nil && !txEnabled {
		pMsg.messageType = v2.MessageType_DELAY
		return pMsg, nil
	}

	if msg.GetMessageGroup() == nil && msg.GetDeliveryTimestamp() == nil && txEnabled {
		pMsg.messageType = v2.MessageType_TRANSACTION
		return pMsg, nil
	}

	return nil, fmt.Errorf("transactional message should not set messageGroup or deliveryTimestamp")
}
View Source
var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, error) {
	rc := &rpcClient{
		target: target,
		opts:   defaultRpcClientOptions,
	}
	for _, opt := range opts {
		opt.apply(&rc.opts)
	}
	conn, err := rc.opts.clientConnFunc(target, rc.opts.connOptions...)
	if err != nil {
		return nil, fmt.Errorf("create grpc conn failed, err=%w", err)
	}
	rc.conn = conn
	rc.msc = v2.NewMessagingServiceClient(conn.Conn())
	rc.activityNanoTime = time.Now()
	sugarBaseLogger.Infof("create rpc client success, target=%v", target)
	return rc, nil
}
View Source
var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (SimpleConsumer, error) {
	copyOpt := defaultSimpleConsumerOptions
	scOpts := &copyOpt
	for _, opt := range opts {
		opt.apply(scOpts)
	}
	if len(config.ConsumerGroup) == 0 {
		return nil, fmt.Errorf("consumerGroup could not be nil")
	}
	cli, err := scOpts.clientFunc(config)
	if err != nil {
		return nil, err
	}
	sc := &defaultSimpleConsumer{
		scOpts:    *scOpts,
		cli:       cli.(*defaultClient),
		groupName: config.ConsumerGroup,

		awaitDuration:           scOpts.awaitDuration,
		subscriptionExpressions: scOpts.subscriptionExpressions,
	}
	if sc.subscriptionExpressions == nil {
		sc.subscriptionExpressions = make(map[string]*FilterExpression)
	}
	sc.cli.initTopics = make([]string, 0)
	for topic := range scOpts.subscriptionExpressions {
		sc.cli.initTopics = append(sc.cli.initTopics, topic)
	}
	endpoints, err := utils.ParseTarget(config.Endpoint)
	if err != nil {
		return nil, err
	}
	sc.scSettings = &simpleConsumerSettings{
		clientId:       sc.cli.GetClientID(),
		endpoints:      endpoints,
		clientType:     v2.ClientType_SIMPLE_CONSUMER,
		requestTimeout: sc.cli.opts.timeout,

		groupName: &v2.Resource{
			Name:              sc.groupName,
			ResourceNamespace: config.NameSpace,
		},
		longPollingTimeout:      scOpts.awaitDuration,
		subscriptionExpressions: scOpts.subscriptionExpressions,
	}
	sc.cli.settings = sc.scSettings
	sc.cli.clientImpl = sc
	return sc, nil
}
View Source
var NewSubscriptionLoadBalancer = func(messageQueues []*v2.MessageQueue) (SubscriptionLoadBalancer, error) {
	slb := &subscriptionLoadBalancer{
		messageQueues: messageQueues,
	}
	return slb, nil
}
View Source
var NewTransactionImpl = func(producerImpl Producer) *transactionImpl {
	return &transactionImpl{
		producerImpl: producerImpl,
		messages:     make(map[string]*PublishingMessage),
	}
}
View Source
var SUB_ALL = NewFilterExpression("*")

Functions

func InitLogger

func InitLogger()

func NewDefaultClientSession

func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error)

func ResetLogger

func ResetLogger()

Types

type Client

type Client interface {
	GetClientID() string
	Sign(ctx context.Context) context.Context
	GracefulStop() error
}

type ClientConn

type ClientConn interface {
	Conn() *grpc.ClientConn
	Close() error
}

type ClientConnFunc

type ClientConnFunc func(string, ...ConnOption) (ClientConn, error)

type ClientManager

type ClientManager interface {
	RegisterClient(client Client)
	UnRegisterClient(client Client)
	QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)
	HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)
	SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)
	Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
	EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error)
	NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error)
	ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
	AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)
	ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)
}

type ClientMeterProvider

type ClientMeterProvider interface {
	Reset(metric *v2.Metric)
	// contains filtered or unexported methods
}

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods
}

A ClientOption sets options such as timeout, etc.

func WithClientConnFunc

func WithClientConnFunc(f ClientConnFunc) ClientOption

WithClientConnFunc returns a Option that sets ClientConnFunc for nameserver. Default is NewClientConn.

func WithConnOptions

func WithConnOptions(opts ...ConnOption) ClientOption

WithConnOptions returns a Option that sets ConnOption for grpc ClientConn.

func WithQueryRouteTimeout

func WithQueryRouteTimeout(d time.Duration) ClientOption

WithQueryRouteTimeout returns a Option that sets timeout duration for nameserver. Default is 3s.

func WithRpcClientOptions

func WithRpcClientOptions(opts ...RpcClientOption) ClientOption

WithRpcClientOptions returns a Option that sets RpcClientOption for grpc ClientConn.

type ClientSettings

type ClientSettings interface {
	GetClientID() string
	GetClientType() v2.ClientType
	GetAccessPoint() *v2.Endpoints
	GetRetryPolicy() *v2.RetryPolicy
	GetRequestTimeout() time.Duration
	// contains filtered or unexported methods
}

type Config

type Config struct {
	Endpoint      string `validate:"required"`
	NameSpace     string
	ConsumerGroup string
	Credentials   *credentials.SessionCredentials `validate:"required"`
}

type ConnOption

type ConnOption interface {
	// contains filtered or unexported methods
}

A ConnOption sets options such as tls.Config, etc.

func WithContext

func WithContext(ctx context.Context) ConnOption

WithContext is the default client context; it can be used to cancel grpc dial out and other operations that do not have an explicit context.

func WithDialOptions

func WithDialOptions(dialOptions ...grpc.DialOption) ConnOption

WithDialOptions returns a ConnOption that sets grpc.DialOption for grpc.DialContext.

func WithDialTimeout

func WithDialTimeout(dur time.Duration) ConnOption

WithDialTimeout returns a ConnOption that sets DialTimeout for grpc.DialContext. Default it is 5 second.

func WithMaxCallRecvMsgSize

func WithMaxCallRecvMsgSize(size int) ConnOption

WithMaxCallRecvMsgSize returns a ConnOption that sets client-side request send limit in bytes for grpc.DialContext.

func WithMaxCallSendMsgSize

func WithMaxCallSendMsgSize(size int) ConnOption

WithMaxCallSendMsgSize returns a ConnOption that sets the client-side response receive limit. If 0, it defaults to "math.MaxInt32", because range response can easily exceed request send limits. Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.

func WithTLSConfig

func WithTLSConfig(tc *tls.Config) ConnOption

WithTLSConfig returns a ConnOption that sets tls.Config for grpc.DialContext. Default it is x509 insecure tls.Config.

func WithZapLogger

func WithZapLogger(logger *zap.Logger) ConnOption

type Consumer

type Consumer interface {
	GetGroupName() string
	// contains filtered or unexported methods
}

type ErrRpcStatus

type ErrRpcStatus struct {
	Code    int32
	Message string
}

func AsErrRpcStatus

func AsErrRpcStatus(err error) (*ErrRpcStatus, bool)

func (*ErrRpcStatus) Error

func (err *ErrRpcStatus) Error() string

func (*ErrRpcStatus) GetCode

func (err *ErrRpcStatus) GetCode() int32

func (*ErrRpcStatus) GetMessage

func (err *ErrRpcStatus) GetMessage() string

type FilterExpression

type FilterExpression struct {
	// contains filtered or unexported fields
}

type FilterExpressionType

type FilterExpressionType int32
const (
	SQL92 FilterExpressionType = iota
	TAG
	UNSPECIFIED
)

type InvocationStatus

type InvocationStatus string
const (
	InvocationStatus_SUCCESS InvocationStatus = "success"
	InvocationStatus_FAILURE InvocationStatus = "failure"
)

type Message

type Message struct {
	Topic string
	Body  []byte
	Tag   *string
	// contains filtered or unexported fields
}

func (*Message) AddProperty

func (msg *Message) AddProperty(key, value string)

func (*Message) GetDeliveryTimestamp

func (msg *Message) GetDeliveryTimestamp() *time.Time

func (*Message) GetKeys

func (msg *Message) GetKeys() []string

func (*Message) GetMessageCommon

func (msg *Message) GetMessageCommon() *MessageCommon

func (*Message) GetMessageGroup

func (msg *Message) GetMessageGroup() *string

func (*Message) GetProperties

func (msg *Message) GetProperties() map[string]string

func (*Message) GetTag

func (msg *Message) GetTag() *string

func (*Message) SetDelayTimestamp

func (msg *Message) SetDelayTimestamp(deliveryTimestamp time.Time)

func (*Message) SetKeys

func (msg *Message) SetKeys(keys ...string)

func (*Message) SetMessageGroup

func (msg *Message) SetMessageGroup(messageGroup string)

func (*Message) SetTag

func (msg *Message) SetTag(tag string)

type MessageCommon

type MessageCommon struct {
	// contains filtered or unexported fields
}

type MessageHookPoints

type MessageHookPoints int32
const (
	MessageHookPoints_SEND MessageHookPoints = iota
	MessageHookPoints_RECEIVE
	MessageHookPoints_CONSUME
	MessageHookPoints_ACK
	MessageHookPoints_CHANGE_INVISIBLE_DURATION
	MessageHookPoints_COMMIT_TRANSACTION
	MessageHookPoints_ROLLBACK_TRANSACTION
	MessageHookPoints_FORWARD_TO_DLQ
)

type MessageHookPointsStatus

type MessageHookPointsStatus int32
const (
	MessageHookPointsStatus_UNSET MessageHookPointsStatus = iota
	MessageHookPointsStatus_OK
	MessageHookPointsStatus_ERROR
)

type MessageId

type MessageId interface {
	// GetVersion Get the version of the messageId
	GetVersion() string
	// String string-formed string id
	String() string
}

MessageId Abstract message id

func NewMessageId

func NewMessageId(version, suffix string) MessageId

type MessageIdCodec

type MessageIdCodec interface {
	NextMessageId() MessageId
	Decode(messageId string) MessageId
}

* The codec for the message-id.

Codec here provides the following two functions:

1. Provide decoding function of message-id of all versions above v0.

2. Provide a generator of message-id of v1 version.

The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version number. For V1, these two bytes are 0x0001.

V1 message id example

┌──┬────────────┬────┬────────┬────────┐
│01│56F7E71C361B│21BC│024CCDBE│00000000│
└──┴────────────┴────┴────────┴────────┘

V1 version message id generation rules

process id(lower 2bytes)
        ▲

mac address(lower 6bytes) │ sequence number(big endian)

             ▲        │          ▲ (4bytes)
             │        │          │
       ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
0x01+  │     6     │ │2│ │ 4 │ │ 4 │
       └───────────┘ └─┘ └─┬─┘ └───┘
                           │
                           ▼
    seconds since 2021-01-01 00:00:00(UTC+0)
                  (lower 4bytes)

func GetMessageIdCodecInstance

func GetMessageIdCodecInstance() MessageIdCodec

type MessageInterceptor

type MessageInterceptor interface {
	// contains filtered or unexported methods
}

type MessageMeterInterceptor

type MessageMeterInterceptor interface {
	MessageInterceptor
}

type MessageView

type MessageView struct {
	ReceiptHandle string
	// contains filtered or unexported fields
}

func (*MessageView) GetBody

func (msg *MessageView) GetBody() []byte

func (*MessageView) GetBornHost

func (msg *MessageView) GetBornHost() *string

func (*MessageView) GetBornTimestamp

func (msg *MessageView) GetBornTimestamp() *time.Time

func (*MessageView) GetDeliveryAttempt

func (msg *MessageView) GetDeliveryAttempt() int32

func (*MessageView) GetDeliveryTimestamp

func (msg *MessageView) GetDeliveryTimestamp() *time.Time

func (*MessageView) GetKeys

func (msg *MessageView) GetKeys() []string

func (*MessageView) GetMessageCommon

func (msg *MessageView) GetMessageCommon() *MessageCommon

func (*MessageView) GetMessageGroup

func (msg *MessageView) GetMessageGroup() *string

func (*MessageView) GetMessageId

func (msg *MessageView) GetMessageId() string

func (*MessageView) GetOffset

func (msg *MessageView) GetOffset() int64

func (*MessageView) GetProperties

func (msg *MessageView) GetProperties() map[string]string

func (*MessageView) GetReceiptHandle

func (msg *MessageView) GetReceiptHandle() string

func (*MessageView) GetTag

func (msg *MessageView) GetTag() *string

func (*MessageView) GetTopic

func (msg *MessageView) GetTopic() string

func (*MessageView) GetTraceContext

func (msg *MessageView) GetTraceContext() *string

func (*MessageView) SetDelayTimeLevel

func (msg *MessageView) SetDelayTimeLevel(deliveryTimestamp time.Time)

func (*MessageView) SetKeys

func (msg *MessageView) SetKeys(keys ...string)

func (*MessageView) SetMessageGroup

func (msg *MessageView) SetMessageGroup(messageGroup string)

func (*MessageView) SetTag

func (msg *MessageView) SetTag(tag string)

type MockClient

type MockClient struct {
	// contains filtered or unexported fields
}

MockClient is a mock of Client interface.

func NewMockClient

func NewMockClient(ctrl *gomock.Controller) *MockClient

NewMockClient creates a new mock instance.

func (*MockClient) EXPECT

func (m *MockClient) EXPECT() *MockClientMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockClient) GetClientID

func (m *MockClient) GetClientID() string

GetClientID mocks base method.

func (*MockClient) GracefulStop

func (m *MockClient) GracefulStop() error

GracefulStop mocks base method.

func (*MockClient) Sign

func (m *MockClient) Sign(ctx context.Context) context.Context

Sign mocks base method.

type MockClientManager

type MockClientManager struct {
	// contains filtered or unexported fields
}

MockClientManager is a mock of ClientManager interface.

func NewMockClientManager

func NewMockClientManager(ctrl *gomock.Controller) *MockClientManager

NewMockClientManager creates a new mock instance.

func (*MockClientManager) AckMessage

func (m *MockClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)

AckMessage mocks base method.

func (*MockClientManager) ChangeInvisibleDuration

func (m *MockClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)

ChangeInvisibleDuration mocks base method.

func (*MockClientManager) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockClientManager) EndTransaction

func (m *MockClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error)

EndTransaction mocks base method.

func (*MockClientManager) HeartBeat

func (m *MockClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)

HeartBeat mocks base method.

func (*MockClientManager) NotifyClientTermination

func (m *MockClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error)

NotifyClientTermination mocks base method.

func (*MockClientManager) QueryRoute

func (m *MockClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)

QueryRoute mocks base method.

func (*MockClientManager) ReceiveMessage

ReceiveMessage mocks base method.

func (*MockClientManager) RegisterClient

func (m *MockClientManager) RegisterClient(client Client)

RegisterClient mocks base method.

func (*MockClientManager) SendMessage

func (m *MockClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)

SendMessage mocks base method.

func (*MockClientManager) Telemetry

Telemetry mocks base method.

func (*MockClientManager) UnRegisterClient

func (m *MockClientManager) UnRegisterClient(client Client)

UnRegisterClient mocks base method.

type MockClientManagerMockRecorder

type MockClientManagerMockRecorder struct {
	// contains filtered or unexported fields
}

MockClientManagerMockRecorder is the mock recorder for MockClientManager.

func (*MockClientManagerMockRecorder) AckMessage

func (mr *MockClientManagerMockRecorder) AckMessage(ctx, endpoints, request, duration interface{}) *gomock.Call

AckMessage indicates an expected call of AckMessage.

func (*MockClientManagerMockRecorder) ChangeInvisibleDuration

func (mr *MockClientManagerMockRecorder) ChangeInvisibleDuration(ctx, endpoints, request, duration interface{}) *gomock.Call

ChangeInvisibleDuration indicates an expected call of ChangeInvisibleDuration.

func (*MockClientManagerMockRecorder) EndTransaction

func (mr *MockClientManagerMockRecorder) EndTransaction(ctx, endpoints, request, duration interface{}) *gomock.Call

EndTransaction indicates an expected call of EndTransaction.

func (*MockClientManagerMockRecorder) HeartBeat

func (mr *MockClientManagerMockRecorder) HeartBeat(ctx, endpoints, request, duration interface{}) *gomock.Call

HeartBeat indicates an expected call of HeartBeat.

func (*MockClientManagerMockRecorder) NotifyClientTermination

func (mr *MockClientManagerMockRecorder) NotifyClientTermination(ctx, endpoints, request, duration interface{}) *gomock.Call

NotifyClientTermination indicates an expected call of NotifyClientTermination.

func (*MockClientManagerMockRecorder) QueryRoute

func (mr *MockClientManagerMockRecorder) QueryRoute(ctx, endpoints, request, duration interface{}) *gomock.Call

QueryRoute indicates an expected call of QueryRoute.

func (*MockClientManagerMockRecorder) ReceiveMessage

func (mr *MockClientManagerMockRecorder) ReceiveMessage(ctx, endpoints, request interface{}) *gomock.Call

ReceiveMessage indicates an expected call of ReceiveMessage.

func (*MockClientManagerMockRecorder) RegisterClient

func (mr *MockClientManagerMockRecorder) RegisterClient(client interface{}) *gomock.Call

RegisterClient indicates an expected call of RegisterClient.

func (*MockClientManagerMockRecorder) SendMessage

func (mr *MockClientManagerMockRecorder) SendMessage(ctx, endpoints, request, duration interface{}) *gomock.Call

SendMessage indicates an expected call of SendMessage.

func (*MockClientManagerMockRecorder) Telemetry

func (mr *MockClientManagerMockRecorder) Telemetry(ctx, endpoints, duration interface{}) *gomock.Call

Telemetry indicates an expected call of Telemetry.

func (*MockClientManagerMockRecorder) UnRegisterClient

func (mr *MockClientManagerMockRecorder) UnRegisterClient(client interface{}) *gomock.Call

UnRegisterClient indicates an expected call of UnRegisterClient.

type MockClientMockRecorder

type MockClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockClientMockRecorder is the mock recorder for MockClient.

func (*MockClientMockRecorder) GetClientID

func (mr *MockClientMockRecorder) GetClientID() *gomock.Call

GetClientID indicates an expected call of GetClientID.

func (*MockClientMockRecorder) GracefulStop

func (mr *MockClientMockRecorder) GracefulStop() *gomock.Call

GracefulStop indicates an expected call of GracefulStop.

func (*MockClientMockRecorder) Sign

func (mr *MockClientMockRecorder) Sign(ctx interface{}) *gomock.Call

Sign indicates an expected call of Sign.

type MockRpcClient

type MockRpcClient struct {
	// contains filtered or unexported fields
}

MockRpcClient is a mock of RpcClient interface.

func NewMockRpcClient

func NewMockRpcClient(ctrl *gomock.Controller) *MockRpcClient

NewMockRpcClient creates a new mock instance.

func (*MockRpcClient) AckMessage

func (m *MockRpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)

AckMessage mocks base method.

func (*MockRpcClient) ChangeInvisibleDuration

ChangeInvisibleDuration mocks base method.

func (*MockRpcClient) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockRpcClient) EndTransaction

func (m *MockRpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)

EndTransaction mocks base method.

func (*MockRpcClient) GetTarget

func (m *MockRpcClient) GetTarget() string

GetTarget mocks base method.

func (*MockRpcClient) GracefulStop

func (m *MockRpcClient) GracefulStop() error

GracefulStop mocks base method.

func (*MockRpcClient) HeartBeat

func (m *MockRpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)

HeartBeat mocks base method.

func (*MockRpcClient) NotifyClientTermination

NotifyClientTermination mocks base method.

func (*MockRpcClient) QueryRoute

func (m *MockRpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)

QueryRoute mocks base method.

func (*MockRpcClient) ReceiveMessage

ReceiveMessage mocks base method.

func (*MockRpcClient) SendMessage

func (m *MockRpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)

SendMessage mocks base method.

func (*MockRpcClient) Telemetry

Telemetry mocks base method.

type MockRpcClientMockRecorder

type MockRpcClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockRpcClientMockRecorder is the mock recorder for MockRpcClient.

func (*MockRpcClientMockRecorder) AckMessage

func (mr *MockRpcClientMockRecorder) AckMessage(ctx, request interface{}) *gomock.Call

AckMessage indicates an expected call of AckMessage.

func (*MockRpcClientMockRecorder) ChangeInvisibleDuration

func (mr *MockRpcClientMockRecorder) ChangeInvisibleDuration(ctx, request interface{}) *gomock.Call

ChangeInvisibleDuration indicates an expected call of ChangeInvisibleDuration.

func (*MockRpcClientMockRecorder) EndTransaction

func (mr *MockRpcClientMockRecorder) EndTransaction(ctx, request interface{}) *gomock.Call

EndTransaction indicates an expected call of EndTransaction.

func (*MockRpcClientMockRecorder) GetTarget

func (mr *MockRpcClientMockRecorder) GetTarget() *gomock.Call

GetTarget indicates an expected call of GetTarget.

func (*MockRpcClientMockRecorder) GracefulStop

func (mr *MockRpcClientMockRecorder) GracefulStop() *gomock.Call

GracefulStop indicates an expected call of GracefulStop.

func (*MockRpcClientMockRecorder) HeartBeat

func (mr *MockRpcClientMockRecorder) HeartBeat(ctx, request interface{}) *gomock.Call

HeartBeat indicates an expected call of HeartBeat.

func (*MockRpcClientMockRecorder) NotifyClientTermination

func (mr *MockRpcClientMockRecorder) NotifyClientTermination(ctx, request interface{}) *gomock.Call

NotifyClientTermination indicates an expected call of NotifyClientTermination.

func (*MockRpcClientMockRecorder) QueryRoute

func (mr *MockRpcClientMockRecorder) QueryRoute(ctx, request interface{}) *gomock.Call

QueryRoute indicates an expected call of QueryRoute.

func (*MockRpcClientMockRecorder) ReceiveMessage

func (mr *MockRpcClientMockRecorder) ReceiveMessage(ctx, request interface{}) *gomock.Call

ReceiveMessage indicates an expected call of ReceiveMessage.

func (*MockRpcClientMockRecorder) SendMessage

func (mr *MockRpcClientMockRecorder) SendMessage(ctx, request interface{}) *gomock.Call

SendMessage indicates an expected call of SendMessage.

func (*MockRpcClientMockRecorder) Telemetry

func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call

Telemetry indicates an expected call of Telemetry.

type MockisClient

type MockisClient struct {
	// contains filtered or unexported fields
}

MockisClient is a mock of isClient interface.

func NewMockisClient

func NewMockisClient(ctrl *gomock.Controller) *MockisClient

NewMockisClient creates a new mock instance.

func (*MockisClient) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

type MockisClientMockRecorder

type MockisClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockisClientMockRecorder is the mock recorder for MockisClient.

type NewClientFunc

type NewClientFunc func(*Config, ...ClientOption) (Client, error)

type Producer

type Producer interface {
	Send(context.Context, *Message) ([]*SendReceipt, error)
	SendWithTransaction(context.Context, *Message, Transaction) ([]*SendReceipt, error)
	SendAsync(context.Context, *Message, func(context.Context, []*SendReceipt, error))
	BeginTransaction() Transaction
	Start() error
	GracefulStop() error
	// contains filtered or unexported methods
}

type ProducerOption

type ProducerOption interface {
	// contains filtered or unexported methods
}

A ProducerOption sets options such as tls.Config, etc.

func WithClientFunc

func WithClientFunc(f NewClientFunc) ProducerOption

WithClientFunc returns a ProducerOption that sets ClientFunc for producer. Default is nameserver.New.

func WithMaxAttempts

func WithMaxAttempts(m int32) ProducerOption

func WithTopics

func WithTopics(t ...string) ProducerOption

func WithTransactionChecker

func WithTransactionChecker(checker *TransactionChecker) ProducerOption

type PublishingLoadBalancer

type PublishingLoadBalancer interface {
	TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error)
	TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error)
	CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
}

type PublishingMessage

type PublishingMessage struct {
	// contains filtered or unexported fields
}

type RpcClient

type RpcClient interface {
	GracefulStop() error
	HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)
	QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)
	SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)
	Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error)
	EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)
	NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error)
	ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
	AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)
	ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error)

	GetTarget() string
	// contains filtered or unexported methods
}

type RpcClientOption

type RpcClientOption interface {
	// contains filtered or unexported methods
}

A RpcClientOption sets options such as tls.Config, etc.

func WithHealthCheckDuration

func WithHealthCheckDuration(d time.Duration) RpcClientOption

WithHealthCheckDuration returns a RpcClientOption that sets healthCheckDuration for RpcClient. Default is 15s.

func WithHeartbeatDuration

func WithHeartbeatDuration(d time.Duration) RpcClientOption

WithHeartbeatDuration returns a RpcClientOption that sets heartbeatDuration for RpcClient. Default is 10s.

func WithRpcClientClientConnFunc

func WithRpcClientClientConnFunc(f ClientConnFunc) RpcClientOption

WithRpcClientClientConnFunc returns a RpcClientOption that sets ClientConnFunc for RpcClient. Default is NewClientConn.

func WithRpcClientConnOption

func WithRpcClientConnOption(opts ...ConnOption) RpcClientOption

WithRpcClientConnOption returns a RpcClientOption that sets ConnOption for RpcClient.

func WithRpcClientTimeout

func WithRpcClientTimeout(d time.Duration) RpcClientOption

WithRpcClientTimeout returns a RpcClientOption that sets time for RpcClient when heartbeat and health check. Default is 5s.

type SendReceipt

type SendReceipt struct {
	MessageID     string
	TransactionId string
	Offset        int64
	Endpoints     *v2.Endpoints
}

type SimpleConsumer

type SimpleConsumer interface {
	Consumer

	Start() error
	GracefulStop() error

	Subscribe(topic string, filterExpression *FilterExpression) error
	Unsubscribe(topic string) error
	Ack(ctx context.Context, messageView *MessageView) error
	Receive(ctx context.Context, maxMessageNum int32, invisibleDuration time.Duration) ([]*MessageView, error)
	ChangeInvisibleDuration(messageView *MessageView, invisibleDuration time.Duration) error
	ChangeInvisibleDurationAsync(messageView *MessageView, invisibleDuration time.Duration)
}

type SimpleConsumerOption

type SimpleConsumerOption interface {
	// contains filtered or unexported methods
}

A ConsumerOption sets options such as tag, etc.

func WithAwaitDuration

func WithAwaitDuration(awaitDuration time.Duration) SimpleConsumerOption

func WithSubscriptionExpressions

func WithSubscriptionExpressions(subscriptionExpressions map[string]*FilterExpression) SimpleConsumerOption

WithTag returns a consumerOption that sets tag for consumer. Note: Default it uses *.

type SubscriptionLoadBalancer

type SubscriptionLoadBalancer interface {
	TakeMessageQueue() (*v2.MessageQueue, error)
	CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
}

type Transaction

type Transaction interface {
	Commit() error
	RollBack() error
}

type TransactionChecker

type TransactionChecker struct {
	Check func(msg *MessageView) TransactionResolution
}

type TransactionResolution

type TransactionResolution int32
const (
	UNKNOWN TransactionResolution = iota // 开始生成枚举值, 默认为0
	COMMIT
	ROLLBACK
)

type UnifiedMessage

type UnifiedMessage struct {
	// contains filtered or unexported fields
}

func (*UnifiedMessage) GetMessage

func (uMsg *UnifiedMessage) GetMessage() *Message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL