primitive

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2023 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

  • Define the ctx key and value type.

Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (

	// method name in  producer
	SendSync   CommunicationMode = "SendSync"
	SendOneway CommunicationMode = "SendOneway"
	SendAsync  CommunicationMode = "SendAsync"
	// method name in consumer
	ConsumerPush = "ConsumerPush"
	ConsumerPull = "ConsumerPull"

	PropCtxType                       = "ConsumeContextType"
	SuccessReturn   ConsumeReturnType = "SUCCESS"
	TimeoutReturn   ConsumeReturnType = "TIMEOUT"
	ExceptionReturn ConsumeReturnType = "EXCEPTION"
	NullReturn      ConsumeReturnType = "RETURNNULL"
	FailedReturn    ConsumeReturnType = "FAILED"
)
View Source
const (
	PropertyKeySeparator                   = " "
	PropertyKeys                           = "KEYS"
	PropertyTags                           = "TAGS"
	PropertyWaitStoreMsgOk                 = "WAIT"
	PropertyDelayTimeLevel                 = "DELAY"
	PropertyRetryTopic                     = "RETRY_TOPIC"
	PropertyRealTopic                      = "REAL_TOPIC"
	PropertyRealQueueId                    = "REAL_QID"
	PropertyTransactionPrepared            = "TRAN_MSG"
	PropertyProducerGroup                  = "PGROUP"
	PropertyMinOffset                      = "MIN_OFFSET"
	PropertyMaxOffset                      = "MAX_OFFSET"
	PropertyBuyerId                        = "BUYER_ID"
	PropertyOriginMessageId                = "ORIGIN_MESSAGE_ID"
	PropertyTransferFlag                   = "TRANSFER_FLAG"
	PropertyCorrectionFlag                 = "CORRECTION_FLAG"
	PropertyMQ2Flag                        = "MQ2_FLAG"
	PropertyReconsumeTime                  = "RECONSUME_TIME"
	PropertyMsgRegion                      = "MSG_REGION"
	PropertyTraceSwitch                    = "TRACE_ON"
	PropertyUniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
	PropertyMaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
	PropertyConsumeStartTime               = "CONSUME_START_TIME"
	PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
	PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
	PropertyCheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
	PropertyShardingKey                    = "SHARDING_KEY"
	PropertyTransactionID                  = "__transactionId__"
	PropertyCorrelationID                  = "CORRELATION_ID"
	PropertyMessageReplyToClient           = "REPLY_TO_CLIENT"
	PropertyMessageTTL                     = "TTL"
	PropertyReplyMessageArriveTime         = "ARRIVE_TIME"
	PropertyMsgType                        = "MSG_TYPE"
	PropertyCluster                        = "CLUSTER"
)
View Source
const (
	SendOK SendStatus = iota
	SendFlushDiskTimeout
	SendFlushSlaveTimeout
	SendSlaveNotAvailable
	SendUnknownError

	FlagCompressed  = 0x1
	FlagBornHostV6  = 0x1 << 4
	FlagStoreHostV6 = 0x1 << 5
	MsgIdLength     = 8 + 8
)
View Source
const (
	DEFAULT_NAMESRV_ADDR = "http://jmenv.tbsite.net:8080/rocketmq/nsaddr"
)

Variables

View Source
var (
	CompressedFlag = 0x1

	MultiTagsFlag = 0x1 << 1

	TransactionNotType = 0

	TransactionPreparedType = 0x1 << 2

	TransactionCommitType = 0x2 << 2

	TransactionRollbackType = 0x3 << 2
)
View Source
var PanicHandler func(interface{})

Functions

func BackBuffer

func BackBuffer(b *bytes.Buffer)

func BackHeader

func BackHeader(d *[]byte)

func ClearCompressedFlag

func ClearCompressedFlag(flag int) int

func CreateMessageId

func CreateMessageId(addr []byte, port int32, offset int64) string

func CreateUniqID

func CreateUniqID() string

func Diff

func Diff(origin, latest []string) bool

func GetBuffer

func GetBuffer() *bytes.Buffer

func GetHeader

func GetHeader() *[]byte

func GetTransactionValue

func GetTransactionValue(flag int) int

func IsRemotingErr

func IsRemotingErr(err error) bool

func NewMQClientErr

func NewMQClientErr(code int16, msg string) error

func NewPassthroughResolver

func NewPassthroughResolver(addr []string) *passthroughResolver

func NewRemotingErr

func NewRemotingErr(s string) error

func Pid

func Pid() int16

func ResetTransactionValue

func ResetTransactionValue(flag int, typeFlag int) int

func SetCompressedFlag

func SetCompressedFlag(flag int) int

func WithConsumerCtx

func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context

WithConsumerCtx set ConsumeMessageContext in PushConsumer

func WithMethod

func WithMethod(ctx context.Context, m CommunicationMode) context.Context

WithMethod set call method name

func WithProducerCtx

func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context

func WithRecover

func WithRecover(fn func())

Types

type AccessChannel

type AccessChannel int
const (
	// connect to private IDC cluster.
	Local AccessChannel = iota
	// connect to Cloud service.
	Cloud
)

type CommunicationMode

type CommunicationMode string

func GetMethod

func GetMethod(ctx context.Context) CommunicationMode

GetMethod get call method name

type ConsumeConcurrentlyContext

type ConsumeConcurrentlyContext struct {
	MQ                        MessageQueue
	DelayLevelWhenNextConsume int
	AckIndex                  int32
}

func GetConcurrentlyCtx

func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext, bool)

func NewConsumeConcurrentlyContext

func NewConsumeConcurrentlyContext() *ConsumeConcurrentlyContext

type ConsumeMessageContext

type ConsumeMessageContext struct {
	ConsumerGroup string
	Msgs          []*MessageExt
	MQ            *MessageQueue
	Success       bool
	Status        string
	// mqTractContext
	Properties map[string]string
}

func GetConsumerCtx

func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool)

GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so should add bool return param indicate whether exist.

type ConsumeOrderlyContext

type ConsumeOrderlyContext struct {
	MQ                            MessageQueue
	AutoCommit                    bool
	SuspendCurrentQueueTimeMillis int
}

func GetOrderlyCtx

func GetOrderlyCtx(ctx context.Context) (*ConsumeOrderlyContext, bool)

func NewConsumeOrderlyContext

func NewConsumeOrderlyContext() *ConsumeOrderlyContext

type ConsumeReturnType

type ConsumeReturnType string

func (ConsumeReturnType) Ordinal

func (c ConsumeReturnType) Ordinal() int

type Credentials

type Credentials struct {
	AccessKey     string
	SecretKey     string
	SecurityToken string
}

func (Credentials) IsEmpty

func (c Credentials) IsEmpty() bool

type CtxKey

type CtxKey int

type EnvResolver

type EnvResolver struct {
}

func NewEnvResolver

func NewEnvResolver() *EnvResolver

func (*EnvResolver) Description

func (e *EnvResolver) Description() string

func (*EnvResolver) Resolve

func (e *EnvResolver) Resolve() []string

type HttpResolver

type HttpResolver struct {
	// contains filtered or unexported fields
}

func NewHttpResolver

func NewHttpResolver(instance string, domain ...string) *HttpResolver

func (*HttpResolver) Description

func (h *HttpResolver) Description() string

func (*HttpResolver) DomainWithUnit

func (h *HttpResolver) DomainWithUnit(unitName string)

func (*HttpResolver) Resolve

func (h *HttpResolver) Resolve() []string

type Interceptor

type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error

Interceptor intercepts the invoke of a producer/consumer on messages. In PushConsumer call, the req is []*MessageExt type and the reply is ConsumeResultHolder, use type assert to get real type.

func ChainInterceptors

func ChainInterceptors(interceptors ...Interceptor) Interceptor

type Invoker

type Invoker func(ctx context.Context, req, reply interface{}) error

Invoker finish a message invoke on producer/consumer.

type LocalTransactionState

type LocalTransactionState int
const (
	CommitMessageState LocalTransactionState = iota + 1
	RollbackMessageState
	UnknowState
)

type MQBrokerErr

type MQBrokerErr struct {
	ResponseCode int16
	ErrorMessage string
}

func (MQBrokerErr) Error

func (e MQBrokerErr) Error() string

type MQClientErr

type MQClientErr struct {
	// contains filtered or unexported fields
}

func (MQClientErr) Error

func (e MQClientErr) Error() string

type Message

type Message struct {
	Topic          string
	Body           []byte
	CompressedBody []byte
	Flag           int32
	TransactionId  string
	Batch          bool
	Compress       bool
	// Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message,
	// just ignore if not.
	Queue *MessageQueue
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(topic string, body []byte) *Message

func (*Message) GetKeys

func (m *Message) GetKeys() string

func (*Message) GetProperties

func (m *Message) GetProperties() map[string]string

func (*Message) GetProperty

func (m *Message) GetProperty(key string) string

func (*Message) GetShardingKey

func (m *Message) GetShardingKey() string

func (*Message) GetTags

func (m *Message) GetTags() string

func (*Message) Marshal

func (m *Message) Marshal() []byte

func (*Message) MarshallProperties

func (m *Message) MarshallProperties() string

func (*Message) RemoveProperty

func (m *Message) RemoveProperty(key string) string

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalProperties

func (m *Message) UnmarshalProperties(data []byte)

unmarshalProperties parse data into property kv pairs.

func (*Message) WithDelayTimeLevel

func (m *Message) WithDelayTimeLevel(level int) *Message

WithDelayTimeLevel set message delay time to consume. reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.

func (*Message) WithKeys

func (m *Message) WithKeys(keys []string) *Message

func (*Message) WithProperties

func (m *Message) WithProperties(p map[string]string)

func (*Message) WithProperty

func (m *Message) WithProperty(key, value string)

func (*Message) WithShardingKey

func (m *Message) WithShardingKey(key string) *Message

func (*Message) WithTag

func (m *Message) WithTag(tags string) *Message

type MessageExt

type MessageExt struct {
	Message
	MsgId                     string
	OffsetMsgId               string
	StoreSize                 int32
	QueueOffset               int64
	SysFlag                   int32
	BornTimestamp             int64
	BornHost                  string
	StoreTimestamp            int64
	StoreHost                 string
	CommitLogOffset           int64
	BodyCRC                   int32
	ReconsumeTimes            int32
	PreparedTransactionOffset int64
}

func DecodeMessage

func DecodeMessage(data []byte) []*MessageExt

func (*MessageExt) GetRegionID

func (msgExt *MessageExt) GetRegionID() string

func (*MessageExt) GetTags

func (msgExt *MessageExt) GetTags() string

func (*MessageExt) IsTraceOn

func (msgExt *MessageExt) IsTraceOn() string

func (*MessageExt) String

func (msgExt *MessageExt) String() string

type MessageID

type MessageID struct {
	Addr   string
	Port   int
	Offset int64
}

func UnmarshalMsgID

func UnmarshalMsgID(id []byte) (*MessageID, error)

type MessageQueue

type MessageQueue struct {
	Topic      string `json:"topic"`
	BrokerName string `json:"brokerName"`
	QueueId    int    `json:"queueId"`
}

MessageQueue message queue

func (*MessageQueue) HashCode

func (mq *MessageQueue) HashCode() int

func (*MessageQueue) String

func (mq *MessageQueue) String() string

type MessageType

type MessageType int
const (
	NormalMsg MessageType = iota
	TransMsgHalf
	TransMsgCommit
	DelayMsg
)

type NamesrvAddr

type NamesrvAddr []string

func NewNamesrvAddr

func NewNamesrvAddr(s ...string) (NamesrvAddr, error)

func (NamesrvAddr) Check

func (addr NamesrvAddr) Check() error

type NsResolver

type NsResolver interface {
	Resolve() []string
	Description() string
}

resolver for nameserver, monitor change of nameserver and notify client consul or domain is common

type ProducerCtx

type ProducerCtx struct {
	ProducerGroup     string
	Message           Message
	MQ                MessageQueue
	BrokerAddr        string
	BornHost          string
	CommunicationMode CommunicationMode
	SendResult        *SendResult
	Props             map[string]string
	MsgType           MessageType
	Namespace         string
}

func GetProducerCtx

func GetProducerCtx(ctx context.Context) (*ProducerCtx, bool)

type PullResult

type PullResult struct {
	NextBeginOffset      int64
	MinOffset            int64
	MaxOffset            int64
	Status               PullStatus
	SuggestWhichBrokerId int64
	// contains filtered or unexported fields
}

PullResult the pull result

func (*PullResult) GetBody

func (result *PullResult) GetBody() []byte

func (*PullResult) GetMessageExts

func (result *PullResult) GetMessageExts() []*MessageExt

func (*PullResult) GetMessages

func (result *PullResult) GetMessages() []*Message

func (*PullResult) SetBody

func (result *PullResult) SetBody(data []byte)

func (*PullResult) SetMessageExts

func (result *PullResult) SetMessageExts(msgExts []*MessageExt)

func (*PullResult) String

func (result *PullResult) String() string

type PullStatus

type PullStatus int

PullStatus pull Status

const (
	PullFound PullStatus = iota
	PullNoNewMsg
	PullNoMsgMatched
	PullOffsetIllegal
	PullBrokerTimeout
)

predefined pull Status

type RemotingErr

type RemotingErr struct {
	// contains filtered or unexported fields
}

func (*RemotingErr) Error

func (e *RemotingErr) Error() string

type SendResult

type SendResult struct {
	Status        SendStatus
	MsgID         string
	MessageQueue  *MessageQueue
	QueueOffset   int64
	TransactionID string
	OffsetMsgID   string
	RegionID      string
	TraceOn       bool
}

SendResult RocketMQ send result

func NewSendResult

func NewSendResult() *SendResult

func (*SendResult) String

func (result *SendResult) String() string

SendResult send message result to string(detail result)

type SendStatus

type SendStatus int

SendStatus of message

type StaticResolver

type StaticResolver struct {
}

type TraceConfig

type TraceConfig struct {
	TraceTopic   string
	GroupName    string
	Access       AccessChannel
	NamesrvAddrs []string
	Resolver     NsResolver
	Credentials  // acl config for trace. omit if acl is closed on broker.
}

config for message trace.

type TransactionListener

type TransactionListener interface {
	//  When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
	ExecuteLocalTransaction(*Message) LocalTransactionState

	// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
	// method will be invoked to get local transaction status.
	CheckLocalTransaction(*MessageExt) LocalTransactionState
}

type TransactionSendResult

type TransactionSendResult struct {
	*SendResult
	State LocalTransactionState
}

TransactionSendResult RocketMQ send result

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL