primitive

package
v1.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2020 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

* Define the ctx key and value type.

Index

Constants

View Source
const (

	// method name in  producer
	SendSync   CommunicationMode = "SendSync"
	SendOneway CommunicationMode = "SendOneway"
	SendAsync  CommunicationMode = "SendAsync"
	// method name in consumer
	ConsumerPush = "ConsumerPush"
	ConsumerPull = "ConsumerPull"

	PropCtxType                       = "ConsumeContextType"
	SuccessReturn   ConsumeReturnType = "SUCCESS"
	TimeoutReturn   ConsumeReturnType = "TIMEOUT"
	ExceptionReturn ConsumeReturnType = "EXCEPTION"
	NullReturn      ConsumeReturnType = "RETURNNULL"
	FailedReturn    ConsumeReturnType = "FAILED"
)
View Source
const (
	PropertyKeySeparator                   = " "
	PropertyKeys                           = "KEYS"
	PropertyTags                           = "TAGS"
	PropertyWaitStoreMsgOk                 = "WAIT"
	PropertyDelayTimeLevel                 = "DELAY"
	PropertyRetryTopic                     = "RETRY_TOPIC"
	PropertyRealTopic                      = "REAL_TOPIC"
	PropertyRealQueueId                    = "REAL_QID"
	PropertyTransactionPrepared            = "TRAN_MSG"
	PropertyProducerGroup                  = "PGROUP"
	PropertyMinOffset                      = "MIN_OFFSET"
	PropertyMaxOffset                      = "MAX_OFFSET"
	PropertyBuyerId                        = "BUYER_ID"
	PropertyOriginMessageId                = "ORIGIN_MESSAGE_ID"
	PropertyTransferFlag                   = "TRANSFER_FLAG"
	PropertyCorrectionFlag                 = "CORRECTION_FLAG"
	PropertyMQ2Flag                        = "MQ2_FLAG"
	PropertyReconsumeTime                  = "RECONSUME_TIME"
	PropertyMsgRegion                      = "MSG_REGION"
	PropertyTraceSwitch                    = "TRACE_ON"
	PropertyUniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
	PropertyMaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
	PropertyConsumeStartTime               = "CONSUME_START_TIME"
	PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
	PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
	PropertyCheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
	PropertyShardingKey                    = "SHARDING_KEY"
)

Variables

View Source
var (
	ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
	ErrMultiIP      = errors.New("multiple IP addr does not support")
	ErrIllegalIP    = errors.New("IP addr error")
)
View Source
var (
	CompressedFlag = 0x1

	MultiTagsFlag = 0x1 << 1

	TransactionNotType = 0

	TransactionPreparedType = 0x1 << 2

	TransactionCommitType = 0x2 << 2

	TransactionRollbackType = 0x3 << 2
)
View Source
var PanicHandler func(interface{})

Functions

func BackHeader

func BackHeader(d []byte)

func ClearCompressedFlag

func ClearCompressedFlag(flag int) int

func CreateUniqID

func CreateUniqID() string

func GetHeader

func GetHeader() []byte

func GetTransactionValue

func GetTransactionValue(flag int) int

func Pid

func Pid() int16

func ResetTransactionValue

func ResetTransactionValue(flag int, typeFlag int) int

func WithConsumerCtx

func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context

WithConsumerCtx set ConsumeMessageContext in PushConsumer

func WithMethod

func WithMethod(ctx context.Context, m CommunicationMode) context.Context

WithMethod set call method name

func WithProducerCtx

func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context

func WithRecover

func WithRecover(fn func())

Types

type AccessChannel

type AccessChannel int
const (
	// connect to private IDC cluster.
	Local AccessChannel = iota
	// connect to Cloud service.
	Cloud
)

type CommunicationMode

type CommunicationMode string

func GetMethod

func GetMethod(ctx context.Context) CommunicationMode

GetMethod get call method name

type ConsumeConcurrentlyContext

type ConsumeConcurrentlyContext struct {
	MQ                        MessageQueue
	DelayLevelWhenNextConsume int
	AckIndex                  int32
}

func GetConcurrentlyCtx

func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext, bool)

func NewConsumeConcurrentlyContext

func NewConsumeConcurrentlyContext() *ConsumeConcurrentlyContext

type ConsumeMessageContext

type ConsumeMessageContext struct {
	ConsumerGroup string
	Msgs          []*MessageExt
	MQ            *MessageQueue
	Success       bool
	Status        string
	// mqTractContext
	Properties map[string]string
}

func GetConsumerCtx

func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool)

GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so should add bool return param indicate whether exist.

type ConsumeOrderlyContext

type ConsumeOrderlyContext struct {
	MQ                            MessageQueue
	AutoCommit                    bool
	SuspendCurrentQueueTimeMillis int
}

func GetOrderlyCtx

func GetOrderlyCtx(ctx context.Context) (*ConsumeOrderlyContext, bool)

func NewConsumeOrderlyContext

func NewConsumeOrderlyContext() *ConsumeOrderlyContext

type ConsumeReturnType

type ConsumeReturnType string

func (ConsumeReturnType) Ordinal

func (c ConsumeReturnType) Ordinal() int

type Credentials

type Credentials struct {
	AccessKey     string
	SecretKey     string
	SecurityToken string
}

func (Credentials) IsEmpty

func (c Credentials) IsEmpty() bool

type CtxKey

type CtxKey int

type Interceptor

type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error

Interceptor intercepts the invoke of a producer/consumer on messages. In PushConsumer call, the req is []*MessageExt type and the reply is ConsumeResultHolder, use type assert to get real type.

func ChainInterceptors

func ChainInterceptors(interceptors ...Interceptor) Interceptor

type Invoker

type Invoker func(ctx context.Context, req, reply interface{}) error

Invoker finish a message invoke on producer/consumer.

type LocalTransactionState

type LocalTransactionState int
const (
	CommitMessageState LocalTransactionState = iota + 1
	RollbackMessageState
	UnknowState
)

type Message

type Message struct {
	Topic         string
	Body          []byte
	Flag          int32
	TransactionId string
	Batch         bool
	// Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message,
	// just ignore if not.
	Queue *MessageQueue
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(topic string, body []byte) *Message

func (*Message) GetKeys

func (m *Message) GetKeys() string

func (*Message) GetProperties

func (m *Message) GetProperties() map[string]string

func (*Message) GetProperty

func (m *Message) GetProperty(key string) string

func (*Message) GetShardingKey

func (m *Message) GetShardingKey() string

func (*Message) GetTags

func (m *Message) GetTags() string

func (*Message) Marshal

func (m *Message) Marshal() []byte

func (*Message) MarshallProperties

func (m *Message) MarshallProperties() string

func (*Message) RemoveProperty

func (m *Message) RemoveProperty(key string) string

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalProperties

func (m *Message) UnmarshalProperties(data []byte)

unmarshalProperties parse data into property kv pairs.

func (*Message) WithDelayTimeLevel

func (m *Message) WithDelayTimeLevel(level int) *Message

WithDelayTimeLevel set message delay time to consume. reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.

func (*Message) WithKeys

func (m *Message) WithKeys(keys []string) *Message

func (*Message) WithProperties

func (m *Message) WithProperties(p map[string]string)

func (*Message) WithProperty

func (m *Message) WithProperty(key, value string)

func (*Message) WithShardingKey

func (m *Message) WithShardingKey(key string) *Message

func (*Message) WithTag

func (m *Message) WithTag(tags string) *Message

type MessageExt

type MessageExt struct {
	Message
	MsgId                     string
	StoreSize                 int32
	QueueOffset               int64
	SysFlag                   int32
	BornTimestamp             int64
	BornHost                  string
	StoreTimestamp            int64
	StoreHost                 string
	CommitLogOffset           int64
	BodyCRC                   int32
	ReconsumeTimes            int32
	PreparedTransactionOffset int64
}

func DecodeMessage

func DecodeMessage(data []byte) []*MessageExt

func (*MessageExt) GetRegionID

func (msgExt *MessageExt) GetRegionID() string

func (*MessageExt) GetTags

func (msgExt *MessageExt) GetTags() string

func (*MessageExt) IsTraceOn

func (msgExt *MessageExt) IsTraceOn() string

func (*MessageExt) String

func (msgExt *MessageExt) String() string

type MessageID

type MessageID struct {
	Addr   string
	Port   int
	Offset int64
}

func UnmarshalMsgID

func UnmarshalMsgID(id []byte) (*MessageID, error)

type MessageQueue

type MessageQueue struct {
	Topic      string `json:"topic"`
	BrokerName string `json:"brokerName"`
	QueueId    int    `json:"queueId"`
}

MessageQueue message queue

func (*MessageQueue) HashCode

func (mq *MessageQueue) HashCode() int

func (*MessageQueue) String

func (mq *MessageQueue) String() string

type MessageType

type MessageType int
const (
	NormalMsg MessageType = iota
	TransMsgHalf
	TransMsgCommit
	DelayMsg
)

type NamesrvAddr

type NamesrvAddr []string

func NewNamesrvAddr

func NewNamesrvAddr(s ...string) (NamesrvAddr, error)

func (NamesrvAddr) Check

func (addr NamesrvAddr) Check() error

type ProducerCtx

type ProducerCtx struct {
	ProducerGroup     string
	Message           Message
	MQ                MessageQueue
	BrokerAddr        string
	BornHost          string
	CommunicationMode CommunicationMode
	SendResult        *SendResult
	Props             map[string]string
	MsgType           MessageType
	Namespace         string
}

func GetProducerCtx

func GetProducerCtx(ctx context.Context) *ProducerCtx

type PullResult

type PullResult struct {
	NextBeginOffset      int64
	MinOffset            int64
	MaxOffset            int64
	Status               PullStatus
	SuggestWhichBrokerId int64
	// contains filtered or unexported fields
}

PullResult the pull result

func (*PullResult) GetBody

func (result *PullResult) GetBody() []byte

func (*PullResult) GetMessageExts

func (result *PullResult) GetMessageExts() []*MessageExt

func (*PullResult) GetMessages

func (result *PullResult) GetMessages() []*Message

func (*PullResult) SetBody

func (result *PullResult) SetBody(data []byte)

func (*PullResult) SetMessageExts

func (result *PullResult) SetMessageExts(msgExts []*MessageExt)

func (*PullResult) String

func (result *PullResult) String() string

type PullStatus

type PullStatus int

PullStatus pull Status

const (
	PullFound PullStatus = iota
	PullNoNewMsg
	PullNoMsgMatched
	PullOffsetIllegal
	PullBrokerTimeout
)

predefined pull Status

type SendResult

type SendResult struct {
	Status        SendStatus
	MsgID         string
	MessageQueue  *MessageQueue
	QueueOffset   int64
	TransactionID string
	OffsetMsgID   string
	RegionID      string
	TraceOn       bool
}

SendResult RocketMQ send result

func (*SendResult) String

func (result *SendResult) String() string

SendResult send message result to string(detail result)

type SendStatus

type SendStatus int

SendStatus of message

const (
	SendOK SendStatus = iota
	SendFlushDiskTimeout
	SendFlushSlaveTimeout
	SendSlaveNotAvailable
	SendUnknownError

	FlagCompressed = 0x1
	MsgIdLength    = 8 + 8
)

type TraceConfig

type TraceConfig struct {
	TraceTopic   string
	Access       AccessChannel
	NamesrvAddrs []string
	Credentials  // acl config for trace. omit if acl is closed on broker.
}

config for message trace.

type TransactionListener

type TransactionListener interface {
	//  When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
	ExecuteLocalTransaction(*Message) LocalTransactionState

	// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
	// method will be invoked to get local transaction status.
	CheckLocalTransaction(*MessageExt) LocalTransactionState
}

type TransactionSendResult

type TransactionSendResult struct {
	*SendResult
	State LocalTransactionState
}

SendResult RocketMQ send result

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL