Documentation ¶
Overview ¶
- Define the ctx key and value type.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func BackBuffer(b *bytes.Buffer)
- func BackHeader(d *[]byte)
- func ClearCompressedFlag(flag int) int
- func CreateMessageId(addr []byte, port int32, offset int64) string
- func CreateUniqID() string
- func Diff(origin, latest []string) bool
- func GetBuffer() *bytes.Buffer
- func GetHeader() *[]byte
- func GetTransactionValue(flag int) int
- func IsRemotingErr(err error) bool
- func NewMQClientErr(code int16, msg string) error
- func NewPassthroughResolver(addr []string) *passthroughResolver
- func NewRemotingErr(s string) error
- func Pid() int16
- func ResetTransactionValue(flag int, typeFlag int) int
- func SetCompressedFlag(flag int) int
- func WithConcurrentlyCtx(ctx context.Context, c *ConsumeConcurrentlyContext) context.Context
- func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context
- func WithMethod(ctx context.Context, m CommunicationMode) context.Context
- func WithOrderlyCtx(ctx context.Context, c *ConsumeOrderlyContext) context.Context
- func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context
- func WithRecover(fn func())
- type AccessChannel
- type CommunicationMode
- type ConsumeConcurrentlyContext
- type ConsumeMessageContext
- type ConsumeOrderlyContext
- type ConsumeReturnType
- type Credentials
- type CtxKey
- type EnvResolver
- type HttpResolver
- type Interceptor
- type Invoker
- type LocalTransactionState
- type MQBrokerErr
- type MQClientErr
- type Message
- func (m *Message) GetKeys() string
- func (m *Message) GetProperties() map[string]string
- func (m *Message) GetProperty(key string) string
- func (m *Message) GetShardingKey() string
- func (m *Message) GetTags() string
- func (m *Message) Marshal() []byte
- func (m *Message) MarshallProperties() string
- func (m *Message) RemoveProperty(key string) string
- func (m *Message) String() string
- func (m *Message) UnmarshalProperties(data []byte)
- func (m *Message) WithDelayTimeLevel(level int) *Message
- func (m *Message) WithKeys(keys []string) *Message
- func (m *Message) WithProperties(p map[string]string)
- func (m *Message) WithProperty(key, value string)
- func (m *Message) WithShardingKey(key string) *Message
- func (m *Message) WithTag(tags string) *Message
- type MessageExt
- type MessageID
- type MessageQueue
- type MessageType
- type NamesrvAddr
- type NsResolver
- type ProducerCtx
- type PullResult
- func (result *PullResult) GetBody() []byte
- func (result *PullResult) GetMessageExts() []*MessageExt
- func (result *PullResult) GetMessages() []*Message
- func (result *PullResult) SetBody(data []byte)
- func (result *PullResult) SetMessageExts(msgExts []*MessageExt)
- func (result *PullResult) String() string
- type PullStatus
- type RemotingErr
- type SendResult
- type SendStatus
- type StaticResolver
- type TraceConfig
- type TransactionListener
- type TransactionSendResult
Constants ¶
const ( // method name in producer SendSync CommunicationMode = "SendSync" SendOneway CommunicationMode = "SendOneway" SendAsync CommunicationMode = "SendAsync" // method name in consumer ConsumerPush = "ConsumerPush" ConsumerPull = "ConsumerPull" PropCtxType = "ConsumeContextType" SuccessReturn ConsumeReturnType = "SUCCESS" TimeoutReturn ConsumeReturnType = "TIMEOUT" ExceptionReturn ConsumeReturnType = "EXCEPTION" NullReturn ConsumeReturnType = "RETURNNULL" FailedReturn ConsumeReturnType = "FAILED" )
const ( PropertyKeySeparator = " " PropertyKeys = "KEYS" PropertyTags = "TAGS" PropertyWaitStoreMsgOk = "WAIT" PropertyDelayTimeLevel = "DELAY" PropertyRetryTopic = "RETRY_TOPIC" PropertyRealTopic = "REAL_TOPIC" PropertyRealQueueId = "REAL_QID" PropertyTransactionPrepared = "TRAN_MSG" PropertyProducerGroup = "PGROUP" PropertyMinOffset = "MIN_OFFSET" PropertyMaxOffset = "MAX_OFFSET" PropertyBuyerId = "BUYER_ID" PropertyOriginMessageId = "ORIGIN_MESSAGE_ID" PropertyTransferFlag = "TRANSFER_FLAG" PropertyCorrectionFlag = "CORRECTION_FLAG" PropertyMQ2Flag = "MQ2_FLAG" PropertyReconsumeTime = "RECONSUME_TIME" PropertyMsgRegion = "MSG_REGION" PropertyTraceSwitch = "TRACE_ON" PropertyUniqueClientMessageIdKeyIndex = "UNIQ_KEY" PropertyMaxReconsumeTimes = "MAX_RECONSUME_TIMES" PropertyConsumeStartTime = "CONSUME_START_TIME" PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET" PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES" PropertyCheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS" PropertyShardingKey = "SHARDING_KEY" PropertyTransactionID = "__transactionId__" PropertyCorrelationID = "CORRELATION_ID" PropertyMessageReplyToClient = "REPLY_TO_CLIENT" PropertyMessageTTL = "TTL" PropertyReplyMessageArriveTime = "ARRIVE_TIME" PropertyMsgType = "MSG_TYPE" PropertyCluster = "CLUSTER" )
const ( SendOK SendStatus = iota SendFlushDiskTimeout SendFlushSlaveTimeout SendSlaveNotAvailable SendUnknownError FlagCompressed = 0x1 FlagBornHostV6 = 0x1 << 4 FlagStoreHostV6 = 0x1 << 5 MsgIdLength = 8 + 8 )
const (
DEFAULT_NAMESRV_ADDR = "http://jmenv.tbsite.net:8080/rocketmq/nsaddr"
)
Variables ¶
var ( CompressedFlag = 0x1 MultiTagsFlag = 0x1 << 1 TransactionNotType = 0 TransactionPreparedType = 0x1 << 2 TransactionCommitType = 0x2 << 2 TransactionRollbackType = 0x3 << 2 )
var PanicHandler func(interface{})
Functions ¶
func BackBuffer ¶
func BackHeader ¶
func BackHeader(d *[]byte)
func ClearCompressedFlag ¶
func CreateUniqID ¶
func CreateUniqID() string
func GetTransactionValue ¶
func IsRemotingErr ¶
func NewMQClientErr ¶
func NewPassthroughResolver ¶
func NewPassthroughResolver(addr []string) *passthroughResolver
func NewRemotingErr ¶
func ResetTransactionValue ¶
func SetCompressedFlag ¶
func WithConcurrentlyCtx ¶
func WithConcurrentlyCtx(ctx context.Context, c *ConsumeConcurrentlyContext) context.Context
func WithConsumerCtx ¶
func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context
WithConsumerCtx set ConsumeMessageContext in PushConsumer
func WithMethod ¶
func WithMethod(ctx context.Context, m CommunicationMode) context.Context
WithMethod set call method name
func WithOrderlyCtx ¶
func WithOrderlyCtx(ctx context.Context, c *ConsumeOrderlyContext) context.Context
func WithProducerCtx ¶
func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context
func WithRecover ¶
func WithRecover(fn func())
Types ¶
type AccessChannel ¶
type AccessChannel int
const ( // connect to private IDC cluster. Local AccessChannel = iota // connect to Cloud service. Cloud )
type CommunicationMode ¶
type CommunicationMode string
func GetMethod ¶
func GetMethod(ctx context.Context) CommunicationMode
GetMethod get call method name
type ConsumeConcurrentlyContext ¶
type ConsumeConcurrentlyContext struct { MQ MessageQueue DelayLevelWhenNextConsume int AckIndex int32 }
func GetConcurrentlyCtx ¶
func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext, bool)
func NewConsumeConcurrentlyContext ¶
func NewConsumeConcurrentlyContext() *ConsumeConcurrentlyContext
type ConsumeMessageContext ¶
type ConsumeMessageContext struct { ConsumerGroup string Msgs []*MessageExt MQ *MessageQueue Success bool Status string // mqTractContext Properties map[string]string }
func GetConsumerCtx ¶
func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool)
GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so should add bool return param indicate whether exist.
type ConsumeOrderlyContext ¶
type ConsumeOrderlyContext struct { MQ MessageQueue AutoCommit bool SuspendCurrentQueueTimeMillis int }
func GetOrderlyCtx ¶
func GetOrderlyCtx(ctx context.Context) (*ConsumeOrderlyContext, bool)
func NewConsumeOrderlyContext ¶
func NewConsumeOrderlyContext() *ConsumeOrderlyContext
type ConsumeReturnType ¶
type ConsumeReturnType string
func (ConsumeReturnType) Ordinal ¶
func (c ConsumeReturnType) Ordinal() int
type Credentials ¶
func (Credentials) IsEmpty ¶
func (c Credentials) IsEmpty() bool
type EnvResolver ¶
type EnvResolver struct { }
func NewEnvResolver ¶
func NewEnvResolver() *EnvResolver
func (*EnvResolver) Description ¶
func (e *EnvResolver) Description() string
func (*EnvResolver) Resolve ¶
func (e *EnvResolver) Resolve() []string
type HttpResolver ¶
type HttpResolver struct {
// contains filtered or unexported fields
}
func NewHttpResolver ¶
func NewHttpResolver(instance string, domain ...string) *HttpResolver
func (*HttpResolver) Description ¶
func (h *HttpResolver) Description() string
func (*HttpResolver) Resolve ¶
func (h *HttpResolver) Resolve() []string
type Interceptor ¶
Interceptor intercepts the invoke of a producer/consumer on messages. In PushConsumer call, the req is []*MessageExt type and the reply is ConsumeResultHolder, use type assert to get real type.
func ChainInterceptors ¶
func ChainInterceptors(interceptors ...Interceptor) Interceptor
type LocalTransactionState ¶
type LocalTransactionState int
const ( CommitMessageState LocalTransactionState = iota + 1 RollbackMessageState UnknowState )
type MQBrokerErr ¶
func (MQBrokerErr) Error ¶
func (e MQBrokerErr) Error() string
type MQClientErr ¶
type MQClientErr struct {
// contains filtered or unexported fields
}
func (MQClientErr) Error ¶
func (e MQClientErr) Error() string
type Message ¶
type Message struct { Topic string Body []byte CompressedBody []byte Flag int32 TransactionId string Batch bool Compress bool // Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message, // just ignore if not. Queue *MessageQueue // contains filtered or unexported fields }
func NewMessage ¶
func (*Message) GetProperties ¶
func (*Message) GetProperty ¶
func (*Message) GetShardingKey ¶
func (*Message) MarshallProperties ¶
func (*Message) RemoveProperty ¶
func (*Message) UnmarshalProperties ¶
unmarshalProperties parse data into property kv pairs.
func (*Message) WithDelayTimeLevel ¶
WithDelayTimeLevel set message delay time to consume. reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.
func (*Message) WithProperties ¶
func (*Message) WithProperty ¶
func (*Message) WithShardingKey ¶
type MessageExt ¶
type MessageExt struct { Message MsgId string OffsetMsgId string StoreSize int32 QueueOffset int64 SysFlag int32 BornTimestamp int64 BornHost string StoreTimestamp int64 StoreHost string CommitLogOffset int64 BodyCRC int32 ReconsumeTimes int32 PreparedTransactionOffset int64 }
func DecodeMessage ¶
func DecodeMessage(data []byte) []*MessageExt
func (*MessageExt) GetRegionID ¶
func (msgExt *MessageExt) GetRegionID() string
func (*MessageExt) GetTags ¶
func (msgExt *MessageExt) GetTags() string
func (*MessageExt) IsTraceOn ¶
func (msgExt *MessageExt) IsTraceOn() string
func (*MessageExt) String ¶
func (msgExt *MessageExt) String() string
type MessageID ¶
func UnmarshalMsgID ¶
type MessageQueue ¶
type MessageQueue struct { Topic string `json:"topic"` BrokerName string `json:"brokerName"` QueueId int `json:"queueId"` }
MessageQueue message queue
func (*MessageQueue) HashCode ¶
func (mq *MessageQueue) HashCode() int
func (*MessageQueue) String ¶
func (mq *MessageQueue) String() string
type MessageType ¶
type MessageType int
const ( NormalMsg MessageType = iota TransMsgHalf TransMsgCommit DelayMsg )
type NamesrvAddr ¶
type NamesrvAddr []string
func NewNamesrvAddr ¶
func NewNamesrvAddr(s ...string) (NamesrvAddr, error)
func (NamesrvAddr) Check ¶
func (addr NamesrvAddr) Check() error
type NsResolver ¶
resolver for nameserver, monitor change of nameserver and notify client consul or domain is common
type ProducerCtx ¶
type ProducerCtx struct { ProducerGroup string Message Message MQ MessageQueue BrokerAddr string BornHost string CommunicationMode CommunicationMode SendResult *SendResult Props map[string]string MsgType MessageType Namespace string }
func GetProducerCtx ¶
func GetProducerCtx(ctx context.Context) (*ProducerCtx, bool)
type PullResult ¶
type PullResult struct { NextBeginOffset int64 MinOffset int64 MaxOffset int64 Status PullStatus SuggestWhichBrokerId int64 // contains filtered or unexported fields }
PullResult the pull result
func (*PullResult) GetBody ¶
func (result *PullResult) GetBody() []byte
func (*PullResult) GetMessageExts ¶
func (result *PullResult) GetMessageExts() []*MessageExt
func (*PullResult) GetMessages ¶
func (result *PullResult) GetMessages() []*Message
func (*PullResult) SetBody ¶
func (result *PullResult) SetBody(data []byte)
func (*PullResult) SetMessageExts ¶
func (result *PullResult) SetMessageExts(msgExts []*MessageExt)
func (*PullResult) String ¶
func (result *PullResult) String() string
type PullStatus ¶
type PullStatus int
PullStatus pull Status
const ( PullFound PullStatus = iota PullNoNewMsg PullNoMsgMatched PullOffsetIllegal PullBrokerTimeout )
predefined pull Status
type RemotingErr ¶
type RemotingErr struct {
// contains filtered or unexported fields
}
func (*RemotingErr) Error ¶
func (e *RemotingErr) Error() string
type SendResult ¶
type SendResult struct { Status SendStatus MsgID string MessageQueue *MessageQueue QueueOffset int64 TransactionID string OffsetMsgID string RegionID string TraceOn bool }
SendResult RocketMQ send result
func NewSendResult ¶
func NewSendResult() *SendResult
func (*SendResult) String ¶
func (result *SendResult) String() string
SendResult send message result to string(detail result)
type StaticResolver ¶
type StaticResolver struct { }
type TraceConfig ¶
type TraceConfig struct { TraceTopic string GroupName string Access AccessChannel NamesrvAddrs []string Resolver NsResolver Credentials // acl config for trace. omit if acl is closed on broker. }
config for message trace.
type TransactionListener ¶
type TransactionListener interface { // When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. ExecuteLocalTransaction(*Message) LocalTransactionState // When no response to prepare(half) message. broker will send check message to check the transaction status, and this // method will be invoked to get local transaction status. CheckLocalTransaction(*MessageExt) LocalTransactionState }
type TransactionSendResult ¶
type TransactionSendResult struct { *SendResult State LocalTransactionState }
SendResult RocketMQ send result