Versions in this module Expand all Collapse all v5 v5.0.7 Mar 21, 2023 v5.0.5 Mar 21, 2023 Changes in this version + const CLIENT_LOG_FILENAME + const CLIENT_LOG_FILESIZE + const CLIENT_LOG_LEVEL + const CLIENT_LOG_MAXINDEX + const CLIENT_LOG_ROOT + const ENABLE_CONSOLE_APPENDER + const MAX_MESSAGE_NUM + const MESSAGE_ID_LENGTH_FOR_V1_OR_LATER + const MESSAGE_ID_VERSION_V0 + const MESSAGE_ID_VERSION_V1 + const SPAN_ANNOTATION_ATTR_START_TIME + const SPAN_ANNOTATION_AWAIT_CONSUMPTION + const SPAN_ANNOTATION_MESSAGE_KEYS + const SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION + const SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND + const SPAN_ATTRIBUTE_KEY_MESSAGING_ID + const SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION + const SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES + const SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL + const SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION + const SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM + const SPAN_ATTRIBUTE_KEY_MESSAGING_URL + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION + const SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG + const SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION + const SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND + const SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION + const SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL + const SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION + const SPAN_ATTRIBUTE_VALUE_MESSAGING_RECEIVE_OPERATION + const SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ACK_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DLQ_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NACK_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PULL_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_RECEIVE_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION + const SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE + var ErrNoAvailableBrokers = errors.New("rocketmq: no available brokers") + var ErrNoAvailableEndpoints = errors.New("rocketmq: no available endpoints") + var MLatencyMs = stats.Int64("publish_latency", "Publish latency in milliseconds", "ms") + var NewClient = func(config *Config, opts ...ClientOption) (Client, error) + var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) + var NewClientConn = func(endpoint string, opts ...ConnOption) (ClientConn, error) + var NewDefaultClientManager = func() *defaultClientManager + var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter + var NewDefaultClientMeterProvider = func(client *defaultClient) ClientMeterProvider + var NewDefaultMessageMeterInterceptor = func(clientMeterProvider ClientMeterProvider) *defaultMessageMeterInterceptor + var NewFilterExpression = func(expression string) *FilterExpression + var NewFilterExpressionWithType = func(expression string, expressionType FilterExpressionType) *FilterExpression + var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error) + var NewPublishingLoadBalancer = func(messageQueues []*v2.MessageQueue) (PublishingLoadBalancer, error) + var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) + var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, error) + var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (SimpleConsumer, error) + var NewSubscriptionLoadBalancer = func(messageQueues []*v2.MessageQueue) (SubscriptionLoadBalancer, error) + var NewTransactionImpl = func(producerImpl Producer) *transactionImpl + var PublishLatencyView = view.View + var SUB_ALL = NewFilterExpression("*") + func InitLogger() + func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error) + func ResetLogger() + type Client interface + GetClientID func() string + GracefulStop func() error + Sign func(ctx context.Context) context.Context + type ClientConn interface + Close func() error + Conn func() *grpc.ClientConn + type ClientConnFunc func(string, ...ConnOption) (ClientConn, error) + type ClientManager interface + AckMessage func(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, ...) (*v2.AckMessageResponse, error) + ChangeInvisibleDuration func(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.ChangeInvisibleDurationResponse, error) + EndTransaction func(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.EndTransactionResponse, error) + HeartBeat func(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, ...) (*v2.HeartbeatResponse, error) + NotifyClientTermination func(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.NotifyClientTerminationResponse, error) + QueryRoute func(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, ...) (*v2.QueryRouteResponse, error) + ReceiveMessage func(ctx context.Context, endpoints *v2.Endpoints, ...) (v2.MessagingService_ReceiveMessageClient, error) + RegisterClient func(client Client) + SendMessage func(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, ...) (*v2.SendMessageResponse, error) + Telemetry func(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error) + UnRegisterClient func(client Client) + type ClientMeterProvider interface + Reset func(metric *v2.Metric) + type ClientOption interface + func WithClientConnFunc(f ClientConnFunc) ClientOption + func WithConnOptions(opts ...ConnOption) ClientOption + func WithQueryRouteTimeout(d time.Duration) ClientOption + func WithRpcClientOptions(opts ...RpcClientOption) ClientOption + type ClientSettings interface + GetAccessPoint func() *v2.Endpoints + GetClientID func() string + GetClientType func() v2.ClientType + GetRequestTimeout func() time.Duration + GetRetryPolicy func() *v2.RetryPolicy + type Config struct + ConsumerGroup string + Credentials *credentials.SessionCredentials + Endpoint string + NameSpace string + type ConnOption interface + func WithContext(ctx context.Context) ConnOption + func WithDialKeepAliveTime(d time.Duration) ConnOption + func WithDialKeepAliveTimeout(d time.Duration) ConnOption + func WithDialOptions(dialOptions ...grpc.DialOption) ConnOption + func WithDialTimeout(dur time.Duration) ConnOption + func WithMaxCallRecvMsgSize(size int) ConnOption + func WithMaxCallSendMsgSize(size int) ConnOption + func WithPermitWithoutStream(permit bool) ConnOption + func WithTLSConfig(tc *tls.Config) ConnOption + func WithZapLogger(logger *zap.Logger) ConnOption + type Consumer interface + GetGroupName func() string + type ErrRpcStatus struct + Code int32 + Message string + func AsErrRpcStatus(err error) (*ErrRpcStatus, bool) + func (err *ErrRpcStatus) Error() string + func (err *ErrRpcStatus) GetCode() int32 + func (err *ErrRpcStatus) GetMessage() string + type FilterExpression struct + type FilterExpressionType int32 + const SQL92 + const TAG + const UNSPECIFIED + type InvocationStatus string + const InvocationStatus_FAILURE + const InvocationStatus_SUCCESS + type Message struct + Body []byte + Tag *string + Topic string + func (msg *Message) GetDeliveryTimestamp() *time.Time + func (msg *Message) GetKeys() []string + func (msg *Message) GetMessageCommon() *MessageCommon + func (msg *Message) GetMessageGroup() *string + func (msg *Message) GetProperties() map[string]string + func (msg *Message) GetTag() *string + func (msg *Message) SetDelayTimestamp(deliveryTimestamp time.Time) + func (msg *Message) SetKeys(keys ...string) + func (msg *Message) SetMessageGroup(messageGroup string) + func (msg *Message) SetTag(tag string) + type MessageCommon struct + type MessageHookPoints int32 + const MessageHookPoints_ACK + const MessageHookPoints_CHANGE_INVISIBLE_DURATION + const MessageHookPoints_COMMIT_TRANSACTION + const MessageHookPoints_CONSUME + const MessageHookPoints_FORWARD_TO_DLQ + const MessageHookPoints_RECEIVE + const MessageHookPoints_ROLLBACK_TRANSACTION + const MessageHookPoints_SEND + type MessageHookPointsStatus int32 + const MessageHookPointsStatus_ERROR + const MessageHookPointsStatus_OK + const MessageHookPointsStatus_UNSET + type MessageId interface + GetVersion func() string + String func() string + func NewMessageId(version, suffix string) MessageId + type MessageIdCodec interface + Decode func(messageId string) MessageId + NextMessageId func() MessageId + func GetMessageIdCodecInstance() MessageIdCodec + type MessageInterceptor interface + type MessageMeterInterceptor interface + type MessageView struct + ReceiptHandle string + func (msg *MessageView) GetBody() []byte + func (msg *MessageView) GetBornHost() *string + func (msg *MessageView) GetBornTimestamp() *time.Time + func (msg *MessageView) GetDeliveryAttempt() int32 + func (msg *MessageView) GetDeliveryTimestamp() *time.Time + func (msg *MessageView) GetKeys() []string + func (msg *MessageView) GetMessageCommon() *MessageCommon + func (msg *MessageView) GetMessageGroup() *string + func (msg *MessageView) GetMessageId() string + func (msg *MessageView) GetOffset() int64 + func (msg *MessageView) GetProperties() map[string]string + func (msg *MessageView) GetReceiptHandle() string + func (msg *MessageView) GetTag() *string + func (msg *MessageView) GetTopic() string + func (msg *MessageView) GetTraceContext() *string + func (msg *MessageView) SetDelayTimeLevel(deliveryTimestamp time.Time) + func (msg *MessageView) SetKeys(keys ...string) + func (msg *MessageView) SetMessageGroup(messageGroup string) + func (msg *MessageView) SetTag(tag string) + type MockClient struct + func NewMockClient(ctrl *gomock.Controller) *MockClient + func (m *MockClient) EXPECT() *MockClientMockRecorder + func (m *MockClient) GetClientID() string + func (m *MockClient) GracefulStop() error + func (m *MockClient) Sign(ctx context.Context) context.Context + type MockClientManager struct + func NewMockClientManager(ctrl *gomock.Controller) *MockClientManager + func (m *MockClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, ...) (*v2.AckMessageResponse, error) + func (m *MockClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.ChangeInvisibleDurationResponse, error) + func (m *MockClientManager) EXPECT() *MockClientManagerMockRecorder + func (m *MockClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.EndTransactionResponse, error) + func (m *MockClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, ...) (*v2.HeartbeatResponse, error) + func (m *MockClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.NotifyClientTerminationResponse, error) + func (m *MockClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, ...) (*v2.QueryRouteResponse, error) + func (m *MockClientManager) ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, ...) (v2.MessagingService_ReceiveMessageClient, error) + func (m *MockClientManager) RegisterClient(client Client) + func (m *MockClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, ...) (*v2.SendMessageResponse, error) + func (m *MockClientManager) Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error) + func (m *MockClientManager) UnRegisterClient(client Client) + type MockClientManagerMockRecorder struct + func (mr *MockClientManagerMockRecorder) AckMessage(ctx, endpoints, request, duration interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) ChangeInvisibleDuration(ctx, endpoints, request, duration interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) EndTransaction(ctx, endpoints, request, duration interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) HeartBeat(ctx, endpoints, request, duration interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) NotifyClientTermination(ctx, endpoints, request, duration interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) QueryRoute(ctx, endpoints, request, duration interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) ReceiveMessage(ctx, endpoints, request interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) RegisterClient(client interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) SendMessage(ctx, endpoints, request, duration interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) Telemetry(ctx, endpoints, duration interface{}) *gomock.Call + func (mr *MockClientManagerMockRecorder) UnRegisterClient(client interface{}) *gomock.Call + type MockClientMockRecorder struct + func (mr *MockClientMockRecorder) GetClientID() *gomock.Call + func (mr *MockClientMockRecorder) GracefulStop() *gomock.Call + func (mr *MockClientMockRecorder) Sign(ctx interface{}) *gomock.Call + type MockRpcClient struct + func NewMockRpcClient(ctrl *gomock.Controller) *MockRpcClient + func (m *MockRpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error) + func (m *MockRpcClient) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error) + func (m *MockRpcClient) EXPECT() *MockRpcClientMockRecorder + func (m *MockRpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error) + func (m *MockRpcClient) GetTarget() string + func (m *MockRpcClient) GracefulStop() error + func (m *MockRpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error) + func (m *MockRpcClient) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error) + func (m *MockRpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error) + func (m *MockRpcClient) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) + func (m *MockRpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error) + func (m *MockRpcClient) Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error) + type MockRpcClientMockRecorder struct + func (mr *MockRpcClientMockRecorder) AckMessage(ctx, request interface{}) *gomock.Call + func (mr *MockRpcClientMockRecorder) ChangeInvisibleDuration(ctx, request interface{}) *gomock.Call + func (mr *MockRpcClientMockRecorder) EndTransaction(ctx, request interface{}) *gomock.Call + func (mr *MockRpcClientMockRecorder) GetTarget() *gomock.Call + func (mr *MockRpcClientMockRecorder) GracefulStop() *gomock.Call + func (mr *MockRpcClientMockRecorder) HeartBeat(ctx, request interface{}) *gomock.Call + func (mr *MockRpcClientMockRecorder) NotifyClientTermination(ctx, request interface{}) *gomock.Call + func (mr *MockRpcClientMockRecorder) QueryRoute(ctx, request interface{}) *gomock.Call + func (mr *MockRpcClientMockRecorder) ReceiveMessage(ctx, request interface{}) *gomock.Call + func (mr *MockRpcClientMockRecorder) SendMessage(ctx, request interface{}) *gomock.Call + func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call + type MockisClient struct + func NewMockisClient(ctrl *gomock.Controller) *MockisClient + func (m *MockisClient) EXPECT() *MockisClientMockRecorder + type MockisClientMockRecorder struct + type NewClientFunc func(*Config, ...ClientOption) (Client, error) + type Producer interface + BeginTransaction func() Transaction + GracefulStop func() error + Send func(context.Context, *Message) ([]*SendReceipt, error) + SendAsync func(context.Context, *Message, func(context.Context, []*SendReceipt, error)) + SendWithTransaction func(context.Context, *Message, Transaction) ([]*SendReceipt, error) + Start func() error + type ProducerOption interface + func WithClientFunc(f NewClientFunc) ProducerOption + func WithMaxAttempts(m int32) ProducerOption + func WithTopics(t ...string) ProducerOption + func WithTransactionChecker(checker *TransactionChecker) ProducerOption + type PublishingLoadBalancer interface + TakeMessageQueueByMessageGroup func(messageGroup *string) ([]*v2.MessageQueue, error) + TakeMessageQueues func(excluded sync.Map, count int) ([]*v2.MessageQueue, error) + type PublishingMessage struct + type RpcClient interface + AckMessage func(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error) + ChangeInvisibleDuration func(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error) + EndTransaction func(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error) + GetTarget func() string + GracefulStop func() error + HeartBeat func(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error) + NotifyClientTermination func(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error) + QueryRoute func(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error) + ReceiveMessage func(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) + SendMessage func(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error) + Telemetry func(ctx context.Context) (v2.MessagingService_TelemetryClient, error) + type RpcClientOption interface + func WithHealthCheckDuration(d time.Duration) RpcClientOption + func WithHeartbeatDuration(d time.Duration) RpcClientOption + func WithRpcClientClientConnFunc(f ClientConnFunc) RpcClientOption + func WithRpcClientConnOption(opts ...ConnOption) RpcClientOption + func WithRpcClientTimeout(d time.Duration) RpcClientOption + type SendReceipt struct + Endpoints *v2.Endpoints + MessageID string + Offset int64 + TransactionId string + type SimpleConsumer interface + Ack func(ctx context.Context, messageView *MessageView) error + ChangeInvisibleDuration func(messageView *MessageView, invisibleDuration time.Duration) error + ChangeInvisibleDurationAsync func(messageView *MessageView, invisibleDuration time.Duration) + GracefulStop func() error + Receive func(ctx context.Context, maxMessageNum int32, invisibleDuration time.Duration) ([]*MessageView, error) + Start func() error + Subscribe func(topic string, filterExpression *FilterExpression) error + Unsubscribe func(topic string) error + type SimpleConsumerOption interface + func WithAwaitDuration(awaitDuration time.Duration) SimpleConsumerOption + func WithSubscriptionExpressions(subscriptionExpressions map[string]*FilterExpression) SimpleConsumerOption + type SubscriptionLoadBalancer interface + TakeMessageQueue func() (*v2.MessageQueue, error) + type Transaction interface + Commit func() error + RollBack func() error + type TransactionChecker struct + Check func(msg *MessageView) TransactionResolution + type TransactionResolution int32 + const COMMIT + const ROLLBACK + const UNKNOW + type UnifiedMessage struct + func (uMsg *UnifiedMessage) GetMessage() *Message