Documentation ¶
Overview ¶
* Define the ctx key and value type.
Index ¶
- Constants
- Variables
- func BackBuffer(b *bytes.Buffer)
- func BackHeader(d []byte)
- func ClearCompressedFlag(flag int) int
- func CreateMessageId(addr []byte, port int32, offset int64) string
- func CreateUniqID() string
- func GetBuffer() *bytes.Buffer
- func GetHeader() []byte
- func GetTransactionValue(flag int) int
- func IsRemotingErr(err error) bool
- func NewMQClientErr(code int16, msg string) error
- func NewRemotingErr(s string) error
- func Pid() int16
- func ResetTransactionValue(flag int, typeFlag int) int
- func WithConcurrentlyCtx(ctx context.Context, c *ConsumeConcurrentlyContext) context.Context
- func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context
- func WithMethod(ctx context.Context, m CommunicationMode) context.Context
- func WithOrderlyCtx(ctx context.Context, c *ConsumeOrderlyContext) context.Context
- func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context
- func WithRecover(fn func())
- type AccessChannel
- type CommunicationMode
- type ConsumeConcurrentlyContext
- type ConsumeMessageContext
- type ConsumeOrderlyContext
- type ConsumeReturnType
- type Credentials
- type CtxKey
- type Interceptor
- type Invoker
- type LocalTransactionState
- type MQBrokerErr
- type MQClientErr
- type Message
- func (m *Message) GetKeys() string
- func (m *Message) GetProperties() map[string]string
- func (m *Message) GetProperty(key string) string
- func (m *Message) GetShardingKey() string
- func (m *Message) GetTags() string
- func (m *Message) Marshal() []byte
- func (m *Message) MarshallProperties() string
- func (m *Message) RemoveProperty(key string) string
- func (m *Message) String() string
- func (m *Message) UnmarshalProperties(data []byte)
- func (m *Message) WithDelayTimeLevel(level int) *Message
- func (m *Message) WithKeys(keys []string) *Message
- func (m *Message) WithProperties(p map[string]string)
- func (m *Message) WithProperty(key, value string)
- func (m *Message) WithShardingKey(key string) *Message
- func (m *Message) WithTag(tags string) *Message
- type MessageExt
- type MessageID
- type MessageQueue
- type MessageType
- type NamesrvAddr
- type ProducerCtx
- type PullResult
- func (result *PullResult) GetBody() []byte
- func (result *PullResult) GetMessageExts() []*MessageExt
- func (result *PullResult) GetMessages() []*Message
- func (result *PullResult) SetBody(data []byte)
- func (result *PullResult) SetMessageExts(msgExts []*MessageExt)
- func (result *PullResult) String() string
- type PullStatus
- type RemotingErr
- type SendResult
- type SendStatus
- type TraceConfig
- type TransactionListener
- type TransactionSendResult
Constants ¶
const ( // method name in producer SendSync CommunicationMode = "SendSync" SendOneway CommunicationMode = "SendOneway" SendAsync CommunicationMode = "SendAsync" // method name in consumer ConsumerPush = "ConsumerPush" ConsumerPull = "ConsumerPull" PropCtxType = "ConsumeContextType" SuccessReturn ConsumeReturnType = "SUCCESS" TimeoutReturn ConsumeReturnType = "TIMEOUT" ExceptionReturn ConsumeReturnType = "EXCEPTION" NullReturn ConsumeReturnType = "RETURNNULL" FailedReturn ConsumeReturnType = "FAILED" )
const ( PropertyKeySeparator = " " PropertyKeys = "KEYS" PropertyTags = "TAGS" PropertyWaitStoreMsgOk = "WAIT" PropertyDelayTimeLevel = "DELAY" PropertyRetryTopic = "RETRY_TOPIC" PropertyRealTopic = "REAL_TOPIC" PropertyRealQueueId = "REAL_QID" PropertyTransactionPrepared = "TRAN_MSG" PropertyProducerGroup = "PGROUP" PropertyMinOffset = "MIN_OFFSET" PropertyMaxOffset = "MAX_OFFSET" PropertyBuyerId = "BUYER_ID" PropertyOriginMessageId = "ORIGIN_MESSAGE_ID" PropertyTransferFlag = "TRANSFER_FLAG" PropertyCorrectionFlag = "CORRECTION_FLAG" PropertyMQ2Flag = "MQ2_FLAG" PropertyReconsumeTime = "RECONSUME_TIME" PropertyMsgRegion = "MSG_REGION" PropertyTraceSwitch = "TRACE_ON" PropertyUniqueClientMessageIdKeyIndex = "UNIQ_KEY" PropertyMaxReconsumeTimes = "MAX_RECONSUME_TIMES" PropertyConsumeStartTime = "CONSUME_START_TIME" PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET" PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES" PropertyCheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS" PropertyShardingKey = "SHARDING_KEY" )
Variables ¶
var ( ErrNoNameserver = errors.New("nameServerAddrs can't be empty.") ErrMultiIP = errors.New("multiple IP addr does not support") ErrIllegalIP = errors.New("IP addr error") )
var ( CompressedFlag = 0x1 MultiTagsFlag = 0x1 << 1 TransactionNotType = 0 TransactionPreparedType = 0x1 << 2 TransactionCommitType = 0x2 << 2 TransactionRollbackType = 0x3 << 2 )
var PanicHandler func(interface{})
Functions ¶
func BackBuffer ¶
func BackHeader ¶
func BackHeader(d []byte)
func ClearCompressedFlag ¶
func CreateUniqID ¶
func CreateUniqID() string
func GetTransactionValue ¶
func IsRemotingErr ¶
func NewMQClientErr ¶
func NewRemotingErr ¶
func ResetTransactionValue ¶
func WithConcurrentlyCtx ¶
func WithConcurrentlyCtx(ctx context.Context, c *ConsumeConcurrentlyContext) context.Context
func WithConsumerCtx ¶
func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context
WithConsumerCtx set ConsumeMessageContext in PushConsumer
func WithMethod ¶
func WithMethod(ctx context.Context, m CommunicationMode) context.Context
WithMethod set call method name
func WithOrderlyCtx ¶
func WithOrderlyCtx(ctx context.Context, c *ConsumeOrderlyContext) context.Context
func WithProducerCtx ¶
func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context
func WithRecover ¶
func WithRecover(fn func())
Types ¶
type AccessChannel ¶
type AccessChannel int
const ( // connect to private IDC cluster. Local AccessChannel = iota // connect to Cloud service. Cloud )
type CommunicationMode ¶
type CommunicationMode string
func GetMethod ¶
func GetMethod(ctx context.Context) CommunicationMode
GetMethod get call method name
type ConsumeConcurrentlyContext ¶
type ConsumeConcurrentlyContext struct { MQ MessageQueue DelayLevelWhenNextConsume int AckIndex int32 }
func GetConcurrentlyCtx ¶
func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext, bool)
func NewConsumeConcurrentlyContext ¶
func NewConsumeConcurrentlyContext() *ConsumeConcurrentlyContext
type ConsumeMessageContext ¶
type ConsumeMessageContext struct { ConsumerGroup string Msgs []*MessageExt MQ *MessageQueue Success bool Status string // mqTractContext Properties map[string]string }
func GetConsumerCtx ¶
func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool)
GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so should add bool return param indicate whether exist.
type ConsumeOrderlyContext ¶
type ConsumeOrderlyContext struct { MQ MessageQueue AutoCommit bool SuspendCurrentQueueTimeMillis int }
func GetOrderlyCtx ¶
func GetOrderlyCtx(ctx context.Context) (*ConsumeOrderlyContext, bool)
func NewConsumeOrderlyContext ¶
func NewConsumeOrderlyContext() *ConsumeOrderlyContext
type ConsumeReturnType ¶
type ConsumeReturnType string
func (ConsumeReturnType) Ordinal ¶
func (c ConsumeReturnType) Ordinal() int
type Credentials ¶
func (Credentials) IsEmpty ¶
func (c Credentials) IsEmpty() bool
type Interceptor ¶
Interceptor intercepts the invoke of a producer/consumer on messages. In PushConsumer call, the req is []*MessageExt type and the reply is ConsumeResultHolder, use type assert to get real type.
func ChainInterceptors ¶
func ChainInterceptors(interceptors ...Interceptor) Interceptor
type LocalTransactionState ¶
type LocalTransactionState int
const ( CommitMessageState LocalTransactionState = iota + 1 RollbackMessageState UnknowState )
type MQBrokerErr ¶
func (MQBrokerErr) Error ¶
func (e MQBrokerErr) Error() string
type MQClientErr ¶
type MQClientErr struct {
// contains filtered or unexported fields
}
func (MQClientErr) Error ¶
func (e MQClientErr) Error() string
type Message ¶
type Message struct { Topic string Body []byte Flag int32 TransactionId string Batch bool // Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message, // just ignore if not. Queue *MessageQueue // contains filtered or unexported fields }
func NewMessage ¶
func (*Message) GetProperties ¶
func (*Message) GetProperty ¶
func (*Message) GetShardingKey ¶
func (*Message) MarshallProperties ¶
func (*Message) RemoveProperty ¶
func (*Message) UnmarshalProperties ¶
unmarshalProperties parse data into property kv pairs.
func (*Message) WithDelayTimeLevel ¶
WithDelayTimeLevel set message delay time to consume. reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.
func (*Message) WithProperties ¶
func (*Message) WithProperty ¶
func (*Message) WithShardingKey ¶
type MessageExt ¶
type MessageExt struct { Message MsgId string StoreSize int32 QueueOffset int64 SysFlag int32 BornTimestamp int64 BornHost string StoreTimestamp int64 StoreHost string CommitLogOffset int64 BodyCRC int32 ReconsumeTimes int32 PreparedTransactionOffset int64 }
func DecodeMessage ¶
func DecodeMessage(data []byte) []*MessageExt
func (*MessageExt) GetRegionID ¶
func (msgExt *MessageExt) GetRegionID() string
func (*MessageExt) GetTags ¶
func (msgExt *MessageExt) GetTags() string
func (*MessageExt) IsTraceOn ¶
func (msgExt *MessageExt) IsTraceOn() string
func (*MessageExt) String ¶
func (msgExt *MessageExt) String() string
type MessageID ¶
func UnmarshalMsgID ¶
type MessageQueue ¶
type MessageQueue struct { Topic string `json:"topic"` BrokerName string `json:"brokerName"` QueueId int `json:"queueId"` }
MessageQueue message queue
func (*MessageQueue) HashCode ¶
func (mq *MessageQueue) HashCode() int
func (*MessageQueue) String ¶
func (mq *MessageQueue) String() string
type MessageType ¶
type MessageType int
const ( NormalMsg MessageType = iota TransMsgHalf TransMsgCommit DelayMsg )
type NamesrvAddr ¶
type NamesrvAddr []string
func NewNamesrvAddr ¶
func NewNamesrvAddr(s ...string) (NamesrvAddr, error)
func (NamesrvAddr) Check ¶
func (addr NamesrvAddr) Check() error
type ProducerCtx ¶
type ProducerCtx struct { ProducerGroup string Message Message MQ MessageQueue BrokerAddr string BornHost string CommunicationMode CommunicationMode SendResult *SendResult Props map[string]string MsgType MessageType Namespace string }
func GetProducerCtx ¶
func GetProducerCtx(ctx context.Context) *ProducerCtx
type PullResult ¶
type PullResult struct { NextBeginOffset int64 MinOffset int64 MaxOffset int64 Status PullStatus SuggestWhichBrokerId int64 // contains filtered or unexported fields }
PullResult the pull result
func (*PullResult) GetBody ¶
func (result *PullResult) GetBody() []byte
func (*PullResult) GetMessageExts ¶
func (result *PullResult) GetMessageExts() []*MessageExt
func (*PullResult) GetMessages ¶
func (result *PullResult) GetMessages() []*Message
func (*PullResult) SetBody ¶
func (result *PullResult) SetBody(data []byte)
func (*PullResult) SetMessageExts ¶
func (result *PullResult) SetMessageExts(msgExts []*MessageExt)
func (*PullResult) String ¶
func (result *PullResult) String() string
type PullStatus ¶
type PullStatus int
PullStatus pull Status
const ( PullFound PullStatus = iota PullNoNewMsg PullNoMsgMatched PullOffsetIllegal PullBrokerTimeout )
predefined pull Status
type RemotingErr ¶
type RemotingErr struct {
// contains filtered or unexported fields
}
func (*RemotingErr) Error ¶
func (e *RemotingErr) Error() string
type SendResult ¶
type SendResult struct { Status SendStatus MsgID string MessageQueue *MessageQueue QueueOffset int64 TransactionID string OffsetMsgID string RegionID string TraceOn bool }
SendResult RocketMQ send result
func (*SendResult) String ¶
func (result *SendResult) String() string
SendResult send message result to string(detail result)
type SendStatus ¶
type SendStatus int
SendStatus of message
const ( SendOK SendStatus = iota SendFlushDiskTimeout SendFlushSlaveTimeout SendSlaveNotAvailable SendUnknownError FlagCompressed = 0x1 MsgIdLength = 8 + 8 )
type TraceConfig ¶
type TraceConfig struct { TraceTopic string Access AccessChannel NamesrvAddrs []string Credentials // acl config for trace. omit if acl is closed on broker. }
config for message trace.
type TransactionListener ¶
type TransactionListener interface { // When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. ExecuteLocalTransaction(*Message) LocalTransactionState // When no response to prepare(half) message. broker will send check message to check the transaction status, and this // method will be invoked to get local transaction status. CheckLocalTransaction(*MessageExt) LocalTransactionState }
type TransactionSendResult ¶
type TransactionSendResult struct { *SendResult State LocalTransactionState }
SendResult RocketMQ send result