Documentation ¶
Index ¶
- Constants
- Variables
- func InitLogger()
- func InitSelfLogger(w io.Writer)
- func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error)
- func ResetLogger()
- type Client
- type ClientConn
- type ClientConnFunc
- type ClientManager
- type ClientMeterProvider
- type ClientOption
- type ClientSettings
- type Config
- type ConnOption
- func WithContext(ctx context.Context) ConnOption
- func WithDialOptions(dialOptions ...grpc.DialOption) ConnOption
- func WithDialTimeout(dur time.Duration) ConnOption
- func WithMaxCallRecvMsgSize(size int) ConnOption
- func WithMaxCallSendMsgSize(size int) ConnOption
- func WithTLSConfig(tc *tls.Config) ConnOption
- func WithZapLogger(logger *zap.Logger) ConnOption
- type Consumer
- type ErrRpcStatus
- type FilterExpression
- type FilterExpressionType
- type InvocationStatus
- type Message
- func (msg *Message) AddProperty(key, value string)
- func (msg *Message) GetDeliveryTimestamp() *time.Time
- func (msg *Message) GetKeys() []string
- func (msg *Message) GetMessageCommon() *MessageCommon
- func (msg *Message) GetMessageGroup() *string
- func (msg *Message) GetProperties() map[string]string
- func (msg *Message) GetTag() *string
- func (msg *Message) SetDelayTimestamp(deliveryTimestamp time.Time)
- func (msg *Message) SetKeys(keys ...string)
- func (msg *Message) SetMessageGroup(messageGroup string)
- func (msg *Message) SetTag(tag string)
- type MessageCommon
- type MessageHookPoints
- type MessageHookPointsStatus
- type MessageId
- type MessageIdCodec
- type MessageInterceptor
- type MessageMeterInterceptor
- type MessageView
- func (msg *MessageView) GetBody() []byte
- func (msg *MessageView) GetBornHost() *string
- func (msg *MessageView) GetBornTimestamp() *time.Time
- func (msg *MessageView) GetDeliveryAttempt() int32
- func (msg *MessageView) GetDeliveryTimestamp() *time.Time
- func (msg *MessageView) GetKeys() []string
- func (msg *MessageView) GetMessageCommon() *MessageCommon
- func (msg *MessageView) GetMessageGroup() *string
- func (msg *MessageView) GetMessageId() string
- func (msg *MessageView) GetOffset() int64
- func (msg *MessageView) GetProperties() map[string]string
- func (msg *MessageView) GetReceiptHandle() string
- func (msg *MessageView) GetTag() *string
- func (msg *MessageView) GetTopic() string
- func (msg *MessageView) GetTraceContext() *string
- func (msg *MessageView) SetDelayTimeLevel(deliveryTimestamp time.Time)
- func (msg *MessageView) SetKeys(keys ...string)
- func (msg *MessageView) SetMessageGroup(messageGroup string)
- func (msg *MessageView) SetTag(tag string)
- type MockClient
- type MockClientManager
- func (m *MockClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, ...) (*v2.AckMessageResponse, error)
- func (m *MockClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.ChangeInvisibleDurationResponse, error)
- func (m *MockClientManager) EXPECT() *MockClientManagerMockRecorder
- func (m *MockClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.EndTransactionResponse, error)
- func (m *MockClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, ...) (*v2.HeartbeatResponse, error)
- func (m *MockClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.NotifyClientTerminationResponse, error)
- func (m *MockClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, ...) (*v2.QueryRouteResponse, error)
- func (m *MockClientManager) ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, ...) (v2.MessagingService_ReceiveMessageClient, error)
- func (m *MockClientManager) RegisterClient(client Client)
- func (m *MockClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, ...) (*v2.SendMessageResponse, error)
- func (m *MockClientManager) Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
- func (m *MockClientManager) UnRegisterClient(client Client)
- type MockClientManagerMockRecorder
- func (mr *MockClientManagerMockRecorder) AckMessage(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) ChangeInvisibleDuration(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) EndTransaction(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) HeartBeat(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) NotifyClientTermination(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) QueryRoute(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) ReceiveMessage(ctx, endpoints, request interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) RegisterClient(client interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) SendMessage(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) Telemetry(ctx, endpoints, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) UnRegisterClient(client interface{}) *gomock.Call
- type MockClientMockRecorder
- type MockRpcClient
- func (m *MockRpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)
- func (m *MockRpcClient) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error)
- func (m *MockRpcClient) EXPECT() *MockRpcClientMockRecorder
- func (m *MockRpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)
- func (m *MockRpcClient) GetTarget() string
- func (m *MockRpcClient) GracefulStop() error
- func (m *MockRpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)
- func (m *MockRpcClient) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error)
- func (m *MockRpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)
- func (m *MockRpcClient) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
- func (m *MockRpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)
- func (m *MockRpcClient) Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error)
- type MockRpcClientMockRecorder
- func (mr *MockRpcClientMockRecorder) AckMessage(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) ChangeInvisibleDuration(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) EndTransaction(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) GetTarget() *gomock.Call
- func (mr *MockRpcClientMockRecorder) GracefulStop() *gomock.Call
- func (mr *MockRpcClientMockRecorder) HeartBeat(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) NotifyClientTermination(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) QueryRoute(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) ReceiveMessage(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) SendMessage(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call
- type MockisClient
- type MockisClientMockRecorder
- type NewClientFunc
- type Producer
- type ProducerOption
- type PublishingLoadBalancer
- type PublishingMessage
- type RpcClient
- type RpcClientOption
- func WithHealthCheckDuration(d time.Duration) RpcClientOption
- func WithHeartbeatDuration(d time.Duration) RpcClientOption
- func WithRpcClientClientConnFunc(f ClientConnFunc) RpcClientOption
- func WithRpcClientConnOption(opts ...ConnOption) RpcClientOption
- func WithRpcClientTimeout(d time.Duration) RpcClientOption
- type SendReceipt
- type SimpleConsumer
- type SimpleConsumerOption
- type SubscriptionLoadBalancer
- type Transaction
- type TransactionChecker
- type TransactionResolution
- type UnifiedMessage
Constants ¶
const ( CLIENT_LOG_ROOT = "rocketmq.client.logRoot" CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex" CLIENT_LOG_FILESIZE = "rocketmq.client.logFileMaxSize" CLIENT_LOG_LEVEL = "rocketmq.client.logLevel" // CLIENT_LOG_ADDITIVE = "rocketmq.client.log.additive" CLIENT_LOG_FILENAME = "rocketmq.client.logFileName" // CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize" ENABLE_CONSOLE_APPENDER = "mq.consoleAppender.enabled" )
const ( MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34 MESSAGE_ID_VERSION_V0 string = "00" MESSAGE_ID_VERSION_V1 string = "01" )
const ( SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION = "messaging.rocketmq.operation" SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE = "messaging.rocketmq.namespace" SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG = "messaging.rocketmq.message_tag" SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS = "messaging.rocketmq.message_keys" SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID = "messaging.rocketmq.client_id" SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE = "messaging.rocketmq.message_type" SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP = "messaging.rocketmq.client_group" SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT = "messaging.rocketmq.attempt" SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE = "messaging.rocketmq.batch_size" SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP = "messaging.rocketmq.delivery_timestamp" SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP = "messaging.rocketmq.available_timestamp" SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY = "messaging.rocketmq.access_key" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM = "rocketmq" SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND = "topic" SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL = "RMQ-gRPC" SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION = "v1" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE = "normal" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE = "fifo" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE = "delay" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE = "transaction" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION = "send" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_RECEIVE_OPERATION = "receive" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PULL_OPERATION = "pull" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION = "await" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION = "process" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ACK_OPERATION = "ack" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NACK_OPERATION = "nack" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION = "commit" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION = "rollback" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DLQ_OPERATION = "dlq" // Messaging span attribute name list SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM = "messaging.system" SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION = "messaging.destination" SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND = "messaging.destination_kind" SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL = "messaging.protocol" SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION = "messaging.protocol_version" SPAN_ATTRIBUTE_KEY_MESSAGING_URL = "messaging.url" SPAN_ATTRIBUTE_KEY_MESSAGING_ID = "messaging.message_id" SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes" SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION = "messaging.operation" SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION = "send" SPAN_ATTRIBUTE_VALUE_MESSAGING_RECEIVE_OPERATION = "receive" SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION = "process" SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction" // Span annotation SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption" SPAN_ANNOTATION_MESSAGE_KEYS = "__message_keys" SPAN_ANNOTATION_ATTR_START_TIME = "__start_time" )
RocketMQ span attribute name list
const (
MAX_MESSAGE_NUM = 1
)
Variables ¶
var ( MLatencyMs = stats.Int64("publish_latency", "Publish latency in milliseconds", "ms") PublishLatencyView = view.View{ Name: "rocketmq_send_cost_time", Description: "Publish latency", Measure: MLatencyMs, Aggregation: view.Distribution(1, 5, 10, 20, 50, 200, 500), TagKeys: []tag.Key{topicTag, clientIdTag, invocationStatusTag}, } )
var (
ErrNoAvailableBrokers = errors.New("rocketmq: no available brokers")
)
var (
ErrNoAvailableEndpoints = errors.New("rocketmq: no available endpoints")
)
var NewClient = func(config *Config, opts ...ClientOption) (Client, error) { endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } cli := &defaultClient{ config: config, opts: defaultNSOptions, clientID: utils.GenClientID(), accessPoint: endpoints, messageInterceptors: make([]MessageInterceptor, 0), endpointsTelemetryClientTable: make(map[string]*defaultClientSession), on: *atomic.NewBool(true), } cli.log = sugarBaseLogger.With("client_id", cli.clientID) for _, opt := range opts { opt.apply(&cli.opts) } cli.done = make(chan struct{}, 1) cli.clientMeterProvider = NewDefaultClientMeterProvider(cli) return cli, nil }
var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) { endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } cli := &defaultClient{ config: config, opts: defaultNSOptions, clientID: utils.GenClientID(), accessPoint: endpoints, messageInterceptors: make([]MessageInterceptor, 0), endpointsTelemetryClientTable: make(map[string]*defaultClientSession), on: *atomic.NewBool(true), clientManager: &MockClientManager{}, } cli.log = sugarBaseLogger.With("client_id", cli.clientID) for _, opt := range opts { opt.apply(&cli.opts) } cli.done = make(chan struct{}, 1) cli.clientMeterProvider = NewDefaultClientMeterProvider(cli) return cli, nil }
var NewClientConn = func(endpoint string, opts ...ConnOption) (ClientConn, error) { client := &clientConn{ opts: defaultConnOptions, validate: validator.New(), } if len(endpoint) == 0 { return nil, ErrNoAvailableEndpoints } for _, opt := range opts { opt.apply(&client.opts) } baseCtx := context.TODO() if client.opts.Context != nil { baseCtx = client.opts.Context } ctx, cancel := context.WithCancel(baseCtx) client.ctx = ctx client.cancel = cancel client.creds = credentials.NewTLS(client.opts.TLS) if client.opts.MaxCallSendMsgSize > 0 || client.opts.MaxCallRecvMsgSize > 0 { if client.opts.MaxCallRecvMsgSize > 0 && client.opts.MaxCallSendMsgSize > client.opts.MaxCallRecvMsgSize { return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", client.opts.MaxCallRecvMsgSize, client.opts.MaxCallSendMsgSize) } if client.opts.MaxCallSendMsgSize > 0 { client.callOpts = append(client.callOpts, grpc.MaxCallSendMsgSize(client.opts.MaxCallSendMsgSize)) } if client.opts.MaxCallRecvMsgSize > 0 { client.callOpts = append(client.callOpts, grpc.MaxCallRecvMsgSize(client.opts.MaxCallRecvMsgSize)) } } conn, err := client.dial(endpoint) if err != nil { client.cancel() return nil, err } client.conn = conn return client, nil }
var NewDefaultClientManager = func() *defaultClientManager { return &defaultClientManager{ rpcClientTable: make(map[string]RpcClient), done: make(chan struct{}), opts: defaultClientManagerOptions, } }
var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter { return &defaultClientMeter{ enabled: *atomic.NewBool(on), endpoints: endpoints, ocaExporter: exporter, } }
var NewDefaultClientMeterProvider = func(client *defaultClient) ClientMeterProvider { cmp := &defaultClientMeterProvider{ client: client, clientMeter: NewDefaultClientMeter(nil, false, nil, "nil"), } client.registerMessageInterceptor(NewDefaultMessageMeterInterceptor(cmp)) return cmp }
var NewDefaultMessageMeterInterceptor = func(clientMeterProvider ClientMeterProvider) *defaultMessageMeterInterceptor {
return &defaultMessageMeterInterceptor{
clientMeterProvider: clientMeterProvider,
}
}
var NewFilterExpression = func(expression string) *FilterExpression { return &FilterExpression{ expression: expression, expressionType: TAG, } }
var NewFilterExpressionWithType = func(expression string, expressionType FilterExpressionType) *FilterExpression { return &FilterExpression{ expression: expression, expressionType: expressionType, } }
var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error) { copyOpt := defaultProducerOptions po := ©Opt for _, opt := range opts { opt.apply(po) } cli, err := po.clientFunc(config) if err != nil { return nil, err } p := &defaultProducer{ po: *po, cli: cli.(*defaultClient), checker: po.checker, } p.cli.initTopics = po.topics endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } p.pSetting = &producerSettings{ clientId: p.cli.GetClientID(), endpoints: endpoints, clientType: v2.ClientType_PRODUCER, retryPolicy: &v2.RetryPolicy{ MaxAttempts: po.maxAttempts, Strategy: &v2.RetryPolicy_ExponentialBackoff{ ExponentialBackoff: &v2.ExponentialBackoff{ Max: durationpb.New(time.Duration(0)), Initial: durationpb.New(time.Duration(0)), Multiplier: 1, }, }, }, requestTimeout: p.cli.opts.timeout, validateMessageType: *atomic.NewBool(true), maxBodySizeBytes: *atomic.NewInt32(4 * 1024 * 1024), } for _, topic := range po.topics { topicResource := &v2.Resource{ Name: topic, } p.pSetting.topics.Store(topic, topicResource) } p.cli.settings = p.pSetting p.cli.clientImpl = p return p, nil }
var NewPublishingLoadBalancer = func(messageQueues []*v2.MessageQueue) (PublishingLoadBalancer, error) { plb := &publishingLoadBalancer{ messageQueues: messageQueues, } return plb, nil }
var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) { if msg == nil { return nil, fmt.Errorf("message is nil") } pMsg := &PublishingMessage{ msg: msg, } maxBodySizeBytes := int(settings.maxBodySizeBytes.Load()) length := len(msg.Body) if length > maxBodySizeBytes { return nil, fmt.Errorf("message body size exceeds the threshold, max size=%d bytes", maxBodySizeBytes) } pMsg.encoding = v2.Encoding_IDENTITY pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String() if msg.GetMessageGroup() == nil && msg.GetDeliveryTimestamp() == nil && !txEnabled { pMsg.messageType = v2.MessageType_NORMAL return pMsg, nil } if msg.GetMessageGroup() != nil && !txEnabled { pMsg.messageType = v2.MessageType_FIFO return pMsg, nil } if msg.GetDeliveryTimestamp() != nil && !txEnabled { pMsg.messageType = v2.MessageType_DELAY return pMsg, nil } if msg.GetMessageGroup() == nil && msg.GetDeliveryTimestamp() == nil && txEnabled { pMsg.messageType = v2.MessageType_TRANSACTION return pMsg, nil } return nil, fmt.Errorf("transactional message should not set messageGroup or deliveryTimestamp") }
var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, error) { rc := &rpcClient{ target: target, opts: defaultRpcClientOptions, } for _, opt := range opts { opt.apply(&rc.opts) } conn, err := rc.opts.clientConnFunc(target, rc.opts.connOptions...) if err != nil { return nil, fmt.Errorf("create grpc conn failed, err=%w", err) } rc.conn = conn rc.msc = v2.NewMessagingServiceClient(conn.Conn()) rc.activityNanoTime = time.Now() sugarBaseLogger.Infof("create rpc client success, target=%v", target) return rc, nil }
var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (SimpleConsumer, error) { copyOpt := defaultSimpleConsumerOptions scOpts := ©Opt for _, opt := range opts { opt.apply(scOpts) } if len(config.ConsumerGroup) == 0 { return nil, fmt.Errorf("consumerGroup could not be nil") } cli, err := scOpts.clientFunc(config) if err != nil { return nil, err } sc := &defaultSimpleConsumer{ scOpts: *scOpts, cli: cli.(*defaultClient), groupName: config.ConsumerGroup, awaitDuration: scOpts.awaitDuration, subscriptionExpressions: scOpts.subscriptionExpressions, } if sc.subscriptionExpressions == nil { sc.subscriptionExpressions = make(map[string]*FilterExpression) } sc.cli.initTopics = make([]string, 0) for topic := range scOpts.subscriptionExpressions { sc.cli.initTopics = append(sc.cli.initTopics, topic) } endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } sc.scSettings = &simpleConsumerSettings{ clientId: sc.cli.GetClientID(), endpoints: endpoints, clientType: v2.ClientType_SIMPLE_CONSUMER, requestTimeout: sc.cli.opts.timeout, groupName: &v2.Resource{ Name: sc.groupName, }, longPollingTimeout: scOpts.awaitDuration, subscriptionExpressions: scOpts.subscriptionExpressions, } sc.cli.settings = sc.scSettings sc.cli.clientImpl = sc return sc, nil }
var NewSubscriptionLoadBalancer = func(messageQueues []*v2.MessageQueue) (SubscriptionLoadBalancer, error) { slb := &subscriptionLoadBalancer{ messageQueues: messageQueues, } return slb, nil }
var NewTransactionImpl = func(producerImpl Producer) *transactionImpl { return &transactionImpl{ producerImpl: producerImpl, messages: make(map[string]*PublishingMessage), } }
var SUB_ALL = NewFilterExpression("*")
Functions ¶
func InitLogger ¶
func InitLogger()
func InitSelfLogger ¶
func NewDefaultClientSession ¶
func ResetLogger ¶
func ResetLogger()
Types ¶
type ClientConn ¶
type ClientConn interface { Conn() *grpc.ClientConn Close() error }
type ClientConnFunc ¶
type ClientConnFunc func(string, ...ConnOption) (ClientConn, error)
type ClientManager ¶
type ClientManager interface { RegisterClient(client Client) UnRegisterClient(client Client) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error) Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error) ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error) }
type ClientMeterProvider ¶
type ClientOption ¶
type ClientOption interface {
// contains filtered or unexported methods
}
A ClientOption sets options such as timeout, etc.
func WithClientConnFunc ¶
func WithClientConnFunc(f ClientConnFunc) ClientOption
WithClientConnFunc returns a Option that sets ClientConnFunc for nameserver. Default is NewClientConn.
func WithConnOptions ¶
func WithConnOptions(opts ...ConnOption) ClientOption
WithConnOptions returns a Option that sets ConnOption for grpc ClientConn.
func WithQueryRouteTimeout ¶
func WithQueryRouteTimeout(d time.Duration) ClientOption
WithQueryRouteTimeout returns a Option that sets timeout duration for nameserver. Default is 3s.
func WithRpcClientOptions ¶
func WithRpcClientOptions(opts ...RpcClientOption) ClientOption
WithRpcClientOptions returns a Option that sets RpcClientOption for grpc ClientConn.
type ClientSettings ¶
type ClientSettings interface { GetClientID() string GetClientType() v2.ClientType GetAccessPoint() *v2.Endpoints GetRetryPolicy() *v2.RetryPolicy GetRequestTimeout() time.Duration // contains filtered or unexported methods }
type Config ¶
type Config struct { Endpoint string `validate:"required"` NameSpace string ConsumerGroup string Credentials *credentials.SessionCredentials `validate:"required"` }
type ConnOption ¶
type ConnOption interface {
// contains filtered or unexported methods
}
A ConnOption sets options such as tls.Config, etc.
func WithContext ¶
func WithContext(ctx context.Context) ConnOption
WithContext is the default client context; it can be used to cancel grpc dial out and other operations that do not have an explicit context.
func WithDialOptions ¶
func WithDialOptions(dialOptions ...grpc.DialOption) ConnOption
WithDialOptions returns a ConnOption that sets grpc.DialOption for grpc.DialContext.
func WithDialTimeout ¶
func WithDialTimeout(dur time.Duration) ConnOption
WithDialTimeout returns a ConnOption that sets DialTimeout for grpc.DialContext. Default it is 5 second.
func WithMaxCallRecvMsgSize ¶
func WithMaxCallRecvMsgSize(size int) ConnOption
WithMaxCallRecvMsgSize returns a ConnOption that sets client-side request send limit in bytes for grpc.DialContext.
func WithMaxCallSendMsgSize ¶
func WithMaxCallSendMsgSize(size int) ConnOption
WithMaxCallSendMsgSize returns a ConnOption that sets the client-side response receive limit. If 0, it defaults to "math.MaxInt32", because range response can easily exceed request send limits. Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
func WithTLSConfig ¶
func WithTLSConfig(tc *tls.Config) ConnOption
WithTLSConfig returns a ConnOption that sets tls.Config for grpc.DialContext. Default it is x509 insecure tls.Config.
func WithZapLogger ¶
func WithZapLogger(logger *zap.Logger) ConnOption
type Consumer ¶
type Consumer interface { GetGroupName() string // contains filtered or unexported methods }
type ErrRpcStatus ¶
func AsErrRpcStatus ¶
func AsErrRpcStatus(err error) (*ErrRpcStatus, bool)
func (*ErrRpcStatus) Error ¶
func (err *ErrRpcStatus) Error() string
func (*ErrRpcStatus) GetCode ¶
func (err *ErrRpcStatus) GetCode() int32
func (*ErrRpcStatus) GetMessage ¶
func (err *ErrRpcStatus) GetMessage() string
type FilterExpression ¶
type FilterExpression struct {
// contains filtered or unexported fields
}
type FilterExpressionType ¶
type FilterExpressionType int32
const ( SQL92 FilterExpressionType = iota TAG UNSPECIFIED )
type InvocationStatus ¶
type InvocationStatus string
const ( InvocationStatus_SUCCESS InvocationStatus = "success" InvocationStatus_FAILURE InvocationStatus = "failure" )
type Message ¶
type Message struct { Topic string Body []byte Tag *string // contains filtered or unexported fields }
func (*Message) AddProperty ¶
func (*Message) GetDeliveryTimestamp ¶
func (*Message) GetMessageCommon ¶
func (msg *Message) GetMessageCommon() *MessageCommon
func (*Message) GetMessageGroup ¶
func (*Message) GetProperties ¶
func (*Message) SetDelayTimestamp ¶
func (*Message) SetMessageGroup ¶
type MessageCommon ¶
type MessageCommon struct {
// contains filtered or unexported fields
}
type MessageHookPoints ¶
type MessageHookPoints int32
const ( MessageHookPoints_SEND MessageHookPoints = iota MessageHookPoints_RECEIVE MessageHookPoints_CONSUME MessageHookPoints_ACK MessageHookPoints_CHANGE_INVISIBLE_DURATION MessageHookPoints_COMMIT_TRANSACTION MessageHookPoints_ROLLBACK_TRANSACTION MessageHookPoints_FORWARD_TO_DLQ )
type MessageHookPointsStatus ¶
type MessageHookPointsStatus int32
const ( MessageHookPointsStatus_UNSET MessageHookPointsStatus = iota MessageHookPointsStatus_OK MessageHookPointsStatus_ERROR )
type MessageId ¶
type MessageId interface { // GetVersion Get the version of the messageId GetVersion() string // String string-formed string id String() string }
MessageId Abstract message id
func NewMessageId ¶
type MessageIdCodec ¶
* The codec for the message-id.
Codec here provides the following two functions:
1. Provide decoding function of message-id of all versions above v0.
2. Provide a generator of message-id of v1 version.
The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version number. For V1, these two bytes are 0x0001.
V1 message id example
┌──┬────────────┬────┬────────┬────────┐ │01│56F7E71C361B│21BC│024CCDBE│00000000│ └──┴────────────┴────┴────────┴────────┘
V1 version message id generation rules
process id(lower 2bytes) ▲
mac address(lower 6bytes) │ sequence number(big endian)
▲ │ ▲ (4bytes) │ │ │ ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐ 0x01+ │ 6 │ │2│ │ 4 │ │ 4 │ └───────────┘ └─┘ └─┬─┘ └───┘ │ ▼ seconds since 2021-01-01 00:00:00(UTC+0) (lower 4bytes)
func GetMessageIdCodecInstance ¶
func GetMessageIdCodecInstance() MessageIdCodec
type MessageInterceptor ¶
type MessageInterceptor interface {
// contains filtered or unexported methods
}
type MessageMeterInterceptor ¶
type MessageMeterInterceptor interface { MessageInterceptor }
type MessageView ¶
type MessageView struct { ReceiptHandle string // contains filtered or unexported fields }
func (*MessageView) GetBody ¶
func (msg *MessageView) GetBody() []byte
func (*MessageView) GetBornHost ¶
func (msg *MessageView) GetBornHost() *string
func (*MessageView) GetBornTimestamp ¶
func (msg *MessageView) GetBornTimestamp() *time.Time
func (*MessageView) GetDeliveryAttempt ¶
func (msg *MessageView) GetDeliveryAttempt() int32
func (*MessageView) GetDeliveryTimestamp ¶
func (msg *MessageView) GetDeliveryTimestamp() *time.Time
func (*MessageView) GetKeys ¶
func (msg *MessageView) GetKeys() []string
func (*MessageView) GetMessageCommon ¶
func (msg *MessageView) GetMessageCommon() *MessageCommon
func (*MessageView) GetMessageGroup ¶
func (msg *MessageView) GetMessageGroup() *string
func (*MessageView) GetMessageId ¶
func (msg *MessageView) GetMessageId() string
func (*MessageView) GetOffset ¶
func (msg *MessageView) GetOffset() int64
func (*MessageView) GetProperties ¶
func (msg *MessageView) GetProperties() map[string]string
func (*MessageView) GetReceiptHandle ¶
func (msg *MessageView) GetReceiptHandle() string
func (*MessageView) GetTag ¶
func (msg *MessageView) GetTag() *string
func (*MessageView) GetTopic ¶
func (msg *MessageView) GetTopic() string
func (*MessageView) GetTraceContext ¶
func (msg *MessageView) GetTraceContext() *string
func (*MessageView) SetDelayTimeLevel ¶
func (msg *MessageView) SetDelayTimeLevel(deliveryTimestamp time.Time)
func (*MessageView) SetKeys ¶
func (msg *MessageView) SetKeys(keys ...string)
func (*MessageView) SetMessageGroup ¶
func (msg *MessageView) SetMessageGroup(messageGroup string)
func (*MessageView) SetTag ¶
func (msg *MessageView) SetTag(tag string)
type MockClient ¶
type MockClient struct {
// contains filtered or unexported fields
}
MockClient is a mock of Client interface.
func NewMockClient ¶
func NewMockClient(ctrl *gomock.Controller) *MockClient
NewMockClient creates a new mock instance.
func (*MockClient) EXPECT ¶
func (m *MockClient) EXPECT() *MockClientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockClient) GetClientID ¶
func (m *MockClient) GetClientID() string
GetClientID mocks base method.
func (*MockClient) GracefulStop ¶
func (m *MockClient) GracefulStop() error
GracefulStop mocks base method.
type MockClientManager ¶
type MockClientManager struct {
// contains filtered or unexported fields
}
MockClientManager is a mock of ClientManager interface.
func NewMockClientManager ¶
func NewMockClientManager(ctrl *gomock.Controller) *MockClientManager
NewMockClientManager creates a new mock instance.
func (*MockClientManager) AckMessage ¶
func (m *MockClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)
AckMessage mocks base method.
func (*MockClientManager) ChangeInvisibleDuration ¶
func (m *MockClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)
ChangeInvisibleDuration mocks base method.
func (*MockClientManager) EXPECT ¶
func (m *MockClientManager) EXPECT() *MockClientManagerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockClientManager) EndTransaction ¶
func (m *MockClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error)
EndTransaction mocks base method.
func (*MockClientManager) HeartBeat ¶
func (m *MockClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)
HeartBeat mocks base method.
func (*MockClientManager) NotifyClientTermination ¶
func (m *MockClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error)
NotifyClientTermination mocks base method.
func (*MockClientManager) QueryRoute ¶
func (m *MockClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)
QueryRoute mocks base method.
func (*MockClientManager) ReceiveMessage ¶
func (m *MockClientManager) ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
ReceiveMessage mocks base method.
func (*MockClientManager) RegisterClient ¶
func (m *MockClientManager) RegisterClient(client Client)
RegisterClient mocks base method.
func (*MockClientManager) SendMessage ¶
func (m *MockClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)
SendMessage mocks base method.
func (*MockClientManager) Telemetry ¶
func (m *MockClientManager) Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
Telemetry mocks base method.
func (*MockClientManager) UnRegisterClient ¶
func (m *MockClientManager) UnRegisterClient(client Client)
UnRegisterClient mocks base method.
type MockClientManagerMockRecorder ¶
type MockClientManagerMockRecorder struct {
// contains filtered or unexported fields
}
MockClientManagerMockRecorder is the mock recorder for MockClientManager.
func (*MockClientManagerMockRecorder) AckMessage ¶
func (mr *MockClientManagerMockRecorder) AckMessage(ctx, endpoints, request, duration interface{}) *gomock.Call
AckMessage indicates an expected call of AckMessage.
func (*MockClientManagerMockRecorder) ChangeInvisibleDuration ¶
func (mr *MockClientManagerMockRecorder) ChangeInvisibleDuration(ctx, endpoints, request, duration interface{}) *gomock.Call
ChangeInvisibleDuration indicates an expected call of ChangeInvisibleDuration.
func (*MockClientManagerMockRecorder) EndTransaction ¶
func (mr *MockClientManagerMockRecorder) EndTransaction(ctx, endpoints, request, duration interface{}) *gomock.Call
EndTransaction indicates an expected call of EndTransaction.
func (*MockClientManagerMockRecorder) HeartBeat ¶
func (mr *MockClientManagerMockRecorder) HeartBeat(ctx, endpoints, request, duration interface{}) *gomock.Call
HeartBeat indicates an expected call of HeartBeat.
func (*MockClientManagerMockRecorder) NotifyClientTermination ¶
func (mr *MockClientManagerMockRecorder) NotifyClientTermination(ctx, endpoints, request, duration interface{}) *gomock.Call
NotifyClientTermination indicates an expected call of NotifyClientTermination.
func (*MockClientManagerMockRecorder) QueryRoute ¶
func (mr *MockClientManagerMockRecorder) QueryRoute(ctx, endpoints, request, duration interface{}) *gomock.Call
QueryRoute indicates an expected call of QueryRoute.
func (*MockClientManagerMockRecorder) ReceiveMessage ¶
func (mr *MockClientManagerMockRecorder) ReceiveMessage(ctx, endpoints, request interface{}) *gomock.Call
ReceiveMessage indicates an expected call of ReceiveMessage.
func (*MockClientManagerMockRecorder) RegisterClient ¶
func (mr *MockClientManagerMockRecorder) RegisterClient(client interface{}) *gomock.Call
RegisterClient indicates an expected call of RegisterClient.
func (*MockClientManagerMockRecorder) SendMessage ¶
func (mr *MockClientManagerMockRecorder) SendMessage(ctx, endpoints, request, duration interface{}) *gomock.Call
SendMessage indicates an expected call of SendMessage.
func (*MockClientManagerMockRecorder) Telemetry ¶
func (mr *MockClientManagerMockRecorder) Telemetry(ctx, endpoints, duration interface{}) *gomock.Call
Telemetry indicates an expected call of Telemetry.
func (*MockClientManagerMockRecorder) UnRegisterClient ¶
func (mr *MockClientManagerMockRecorder) UnRegisterClient(client interface{}) *gomock.Call
UnRegisterClient indicates an expected call of UnRegisterClient.
type MockClientMockRecorder ¶
type MockClientMockRecorder struct {
// contains filtered or unexported fields
}
MockClientMockRecorder is the mock recorder for MockClient.
func (*MockClientMockRecorder) GetClientID ¶
func (mr *MockClientMockRecorder) GetClientID() *gomock.Call
GetClientID indicates an expected call of GetClientID.
func (*MockClientMockRecorder) GracefulStop ¶
func (mr *MockClientMockRecorder) GracefulStop() *gomock.Call
GracefulStop indicates an expected call of GracefulStop.
func (*MockClientMockRecorder) Sign ¶
func (mr *MockClientMockRecorder) Sign(ctx interface{}) *gomock.Call
Sign indicates an expected call of Sign.
type MockRpcClient ¶
type MockRpcClient struct {
// contains filtered or unexported fields
}
MockRpcClient is a mock of RpcClient interface.
func NewMockRpcClient ¶
func NewMockRpcClient(ctrl *gomock.Controller) *MockRpcClient
NewMockRpcClient creates a new mock instance.
func (*MockRpcClient) AckMessage ¶
func (m *MockRpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)
AckMessage mocks base method.
func (*MockRpcClient) ChangeInvisibleDuration ¶
func (m *MockRpcClient) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error)
ChangeInvisibleDuration mocks base method.
func (*MockRpcClient) EXPECT ¶
func (m *MockRpcClient) EXPECT() *MockRpcClientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockRpcClient) EndTransaction ¶
func (m *MockRpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)
EndTransaction mocks base method.
func (*MockRpcClient) GetTarget ¶
func (m *MockRpcClient) GetTarget() string
GetTarget mocks base method.
func (*MockRpcClient) GracefulStop ¶
func (m *MockRpcClient) GracefulStop() error
GracefulStop mocks base method.
func (*MockRpcClient) HeartBeat ¶
func (m *MockRpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)
HeartBeat mocks base method.
func (*MockRpcClient) NotifyClientTermination ¶
func (m *MockRpcClient) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error)
NotifyClientTermination mocks base method.
func (*MockRpcClient) QueryRoute ¶
func (m *MockRpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)
QueryRoute mocks base method.
func (*MockRpcClient) ReceiveMessage ¶
func (m *MockRpcClient) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
ReceiveMessage mocks base method.
func (*MockRpcClient) SendMessage ¶
func (m *MockRpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)
SendMessage mocks base method.
func (*MockRpcClient) Telemetry ¶
func (m *MockRpcClient) Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error)
Telemetry mocks base method.
type MockRpcClientMockRecorder ¶
type MockRpcClientMockRecorder struct {
// contains filtered or unexported fields
}
MockRpcClientMockRecorder is the mock recorder for MockRpcClient.
func (*MockRpcClientMockRecorder) AckMessage ¶
func (mr *MockRpcClientMockRecorder) AckMessage(ctx, request interface{}) *gomock.Call
AckMessage indicates an expected call of AckMessage.
func (*MockRpcClientMockRecorder) ChangeInvisibleDuration ¶
func (mr *MockRpcClientMockRecorder) ChangeInvisibleDuration(ctx, request interface{}) *gomock.Call
ChangeInvisibleDuration indicates an expected call of ChangeInvisibleDuration.
func (*MockRpcClientMockRecorder) EndTransaction ¶
func (mr *MockRpcClientMockRecorder) EndTransaction(ctx, request interface{}) *gomock.Call
EndTransaction indicates an expected call of EndTransaction.
func (*MockRpcClientMockRecorder) GetTarget ¶
func (mr *MockRpcClientMockRecorder) GetTarget() *gomock.Call
GetTarget indicates an expected call of GetTarget.
func (*MockRpcClientMockRecorder) GracefulStop ¶
func (mr *MockRpcClientMockRecorder) GracefulStop() *gomock.Call
GracefulStop indicates an expected call of GracefulStop.
func (*MockRpcClientMockRecorder) HeartBeat ¶
func (mr *MockRpcClientMockRecorder) HeartBeat(ctx, request interface{}) *gomock.Call
HeartBeat indicates an expected call of HeartBeat.
func (*MockRpcClientMockRecorder) NotifyClientTermination ¶
func (mr *MockRpcClientMockRecorder) NotifyClientTermination(ctx, request interface{}) *gomock.Call
NotifyClientTermination indicates an expected call of NotifyClientTermination.
func (*MockRpcClientMockRecorder) QueryRoute ¶
func (mr *MockRpcClientMockRecorder) QueryRoute(ctx, request interface{}) *gomock.Call
QueryRoute indicates an expected call of QueryRoute.
func (*MockRpcClientMockRecorder) ReceiveMessage ¶
func (mr *MockRpcClientMockRecorder) ReceiveMessage(ctx, request interface{}) *gomock.Call
ReceiveMessage indicates an expected call of ReceiveMessage.
func (*MockRpcClientMockRecorder) SendMessage ¶
func (mr *MockRpcClientMockRecorder) SendMessage(ctx, request interface{}) *gomock.Call
SendMessage indicates an expected call of SendMessage.
func (*MockRpcClientMockRecorder) Telemetry ¶
func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call
Telemetry indicates an expected call of Telemetry.
type MockisClient ¶
type MockisClient struct {
// contains filtered or unexported fields
}
MockisClient is a mock of isClient interface.
func NewMockisClient ¶
func NewMockisClient(ctrl *gomock.Controller) *MockisClient
NewMockisClient creates a new mock instance.
func (*MockisClient) EXPECT ¶
func (m *MockisClient) EXPECT() *MockisClientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockisClientMockRecorder ¶
type MockisClientMockRecorder struct {
// contains filtered or unexported fields
}
MockisClientMockRecorder is the mock recorder for MockisClient.
type NewClientFunc ¶
type NewClientFunc func(*Config, ...ClientOption) (Client, error)
type Producer ¶
type Producer interface { Send(context.Context, *Message) ([]*SendReceipt, error) SendWithTransaction(context.Context, *Message, Transaction) ([]*SendReceipt, error) SendAsync(context.Context, *Message, func(context.Context, []*SendReceipt, error)) BeginTransaction() Transaction Start() error GracefulStop() error // contains filtered or unexported methods }
type ProducerOption ¶
type ProducerOption interface {
// contains filtered or unexported methods
}
A ProducerOption sets options such as tls.Config, etc.
func WithClientFunc ¶
func WithClientFunc(f NewClientFunc) ProducerOption
WithClientFunc returns a ProducerOption that sets ClientFunc for producer. Default is nameserver.New.
func WithMaxAttempts ¶
func WithMaxAttempts(m int32) ProducerOption
func WithTopics ¶
func WithTopics(t ...string) ProducerOption
func WithTransactionChecker ¶
func WithTransactionChecker(checker *TransactionChecker) ProducerOption
type PublishingLoadBalancer ¶
type PublishingLoadBalancer interface { TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error) TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error) }
type PublishingMessage ¶
type PublishingMessage struct {
// contains filtered or unexported fields
}
type RpcClient ¶
type RpcClient interface { GracefulStop() error HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error) Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error) GetTarget() string // contains filtered or unexported methods }
type RpcClientOption ¶
type RpcClientOption interface {
// contains filtered or unexported methods
}
A RpcClientOption sets options such as tls.Config, etc.
func WithHealthCheckDuration ¶
func WithHealthCheckDuration(d time.Duration) RpcClientOption
WithHealthCheckDuration returns a RpcClientOption that sets healthCheckDuration for RpcClient. Default is 15s.
func WithHeartbeatDuration ¶
func WithHeartbeatDuration(d time.Duration) RpcClientOption
WithHeartbeatDuration returns a RpcClientOption that sets heartbeatDuration for RpcClient. Default is 10s.
func WithRpcClientClientConnFunc ¶
func WithRpcClientClientConnFunc(f ClientConnFunc) RpcClientOption
WithRpcClientClientConnFunc returns a RpcClientOption that sets ClientConnFunc for RpcClient. Default is NewClientConn.
func WithRpcClientConnOption ¶
func WithRpcClientConnOption(opts ...ConnOption) RpcClientOption
WithRpcClientConnOption returns a RpcClientOption that sets ConnOption for RpcClient.
func WithRpcClientTimeout ¶
func WithRpcClientTimeout(d time.Duration) RpcClientOption
WithRpcClientTimeout returns a RpcClientOption that sets time for RpcClient when heartbeat and health check. Default is 5s.
type SendReceipt ¶
type SimpleConsumer ¶
type SimpleConsumer interface { Consumer Start() error GracefulStop() error Subscribe(topic string, filterExpression *FilterExpression) error Unsubscribe(topic string) error Ack(ctx context.Context, messageView *MessageView) error Receive(ctx context.Context, maxMessageNum int32, invisibleDuration time.Duration) ([]*MessageView, error) ChangeInvisibleDuration(messageView *MessageView, invisibleDuration time.Duration) error ChangeInvisibleDurationAsync(messageView *MessageView, invisibleDuration time.Duration) }
type SimpleConsumerOption ¶
type SimpleConsumerOption interface {
// contains filtered or unexported methods
}
A ConsumerOption sets options such as tag, etc.
func WithAwaitDuration ¶
func WithAwaitDuration(awaitDuration time.Duration) SimpleConsumerOption
func WithSubscriptionExpressions ¶
func WithSubscriptionExpressions(subscriptionExpressions map[string]*FilterExpression) SimpleConsumerOption
WithTag returns a consumerOption that sets tag for consumer. Note: Default it uses *.
type SubscriptionLoadBalancer ¶
type SubscriptionLoadBalancer interface {
TakeMessageQueue() (*v2.MessageQueue, error)
}
type Transaction ¶
type TransactionChecker ¶
type TransactionChecker struct {
Check func(msg *MessageView) TransactionResolution
}
type TransactionResolution ¶
type TransactionResolution int32
const ( UNKNOWN TransactionResolution = iota // 开始生成枚举值, 默认为0 COMMIT ROLLBACK )
type UnifiedMessage ¶
type UnifiedMessage struct {
// contains filtered or unexported fields
}
func (*UnifiedMessage) GetMessage ¶
func (uMsg *UnifiedMessage) GetMessage() *Message
Source Files ¶
- client.go
- client_manager.go
- client_manager_mock.go
- client_mock.go
- client_options.go
- config.go
- conn.go
- conn_options.go
- consumer.go
- error.go
- loadBalancer.go
- log.go
- message.go
- message_id.go
- message_id_codec.go
- metric.go
- producer.go
- producer_options.go
- publishing_message.go
- rpc_client.go
- rpc_client_mock.go
- rpc_client_options.go
- simple_consumer.go
- simple_consumer_options.go
- trace.go
- transaction.go
- user_agent.go