primitive

package
v2.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2020 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

* Define the ctx key and value type.

Index

Constants

View Source
const (

	// method name in  producer
	SendSync   CommunicationMode = "SendSync"
	SendOneway CommunicationMode = "SendOneway"
	SendAsync  CommunicationMode = "SendAsync"
	// method name in consumer
	ConsumerPush = "ConsumerPush"
	ConsumerPull = "ConsumerPull"

	PropCtxType                       = "ConsumeContextType"
	SuccessReturn   ConsumeReturnType = "SUCCESS"
	TimeoutReturn   ConsumeReturnType = "TIMEOUT"
	ExceptionReturn ConsumeReturnType = "EXCEPTION"
	NullReturn      ConsumeReturnType = "RETURNNULL"
	FailedReturn    ConsumeReturnType = "FAILED"
)
View Source
const (
	PropertyKeySeparator                   = " "
	PropertyKeys                           = "KEYS"
	PropertyTags                           = "TAGS"
	PropertyWaitStoreMsgOk                 = "WAIT"
	PropertyDelayTimeLevel                 = "DELAY"
	PropertyRetryTopic                     = "RETRY_TOPIC"
	PropertyRealTopic                      = "REAL_TOPIC"
	PropertyRealQueueId                    = "REAL_QID"
	PropertyTransactionPrepared            = "TRAN_MSG"
	PropertyProducerGroup                  = "PGROUP"
	PropertyMinOffset                      = "MIN_OFFSET"
	PropertyMaxOffset                      = "MAX_OFFSET"
	PropertyBuyerId                        = "BUYER_ID"
	PropertyOriginMessageId                = "ORIGIN_MESSAGE_ID"
	PropertyTransferFlag                   = "TRANSFER_FLAG"
	PropertyCorrectionFlag                 = "CORRECTION_FLAG"
	PropertyMQ2Flag                        = "MQ2_FLAG"
	PropertyReconsumeTime                  = "RECONSUME_TIME"
	PropertyMsgRegion                      = "MSG_REGION"
	PropertyTraceSwitch                    = "TRACE_ON"
	PropertyUniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
	PropertyMaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
	PropertyConsumeStartTime               = "CONSUME_START_TIME"
	PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
	PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
	PropertyCheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
	PropertyShardingKey                    = "SHARDING_KEY"
)

Variables

View Source
var (
	ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
	ErrMultiIP      = errors.New("multiple IP addr does not support")
	ErrIllegalIP    = errors.New("IP addr error")
)
View Source
var (
	CompressedFlag = 0x1

	MultiTagsFlag = 0x1 << 1

	TransactionNotType = 0

	TransactionPreparedType = 0x1 << 2

	TransactionCommitType = 0x2 << 2

	TransactionRollbackType = 0x3 << 2
)

Functions

func ClearCompressedFlag added in v1.0.1

func ClearCompressedFlag(flag int) int

func CreateUniqID added in v1.0.1

func CreateUniqID() string

func GetTransactionValue added in v1.0.1

func GetTransactionValue(flag int) int

func Pid added in v1.0.1

func Pid() int16

func ResetTransactionValue added in v1.0.1

func ResetTransactionValue(flag int, typeFlag int) int

func WithConsumerCtx

func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context

WithConsumerCtx set ConsumeMessageContext in PushConsumer

func WithMethod

func WithMethod(ctx context.Context, m CommunicationMode) context.Context

WithMethod set call method name

func WithProducerCtx added in v1.0.1

func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context

Types

type AccessChannel added in v1.0.1

type AccessChannel int
const (
	// connect to private IDC cluster.
	Local AccessChannel = iota
	// connect to Cloud service.
	Cloud
)

type CommunicationMode added in v1.0.1

type CommunicationMode string

func GetMethod

func GetMethod(ctx context.Context) CommunicationMode

GetMethod get call method name

type ConsumeConcurrentlyContext

type ConsumeConcurrentlyContext struct {
	MQ                        MessageQueue
	DelayLevelWhenNextConsume int
	AckIndex                  int32
}

func GetConcurrentlyCtx

func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext, bool)

func NewConsumeConcurrentlyContext

func NewConsumeConcurrentlyContext() *ConsumeConcurrentlyContext

type ConsumeMessageContext

type ConsumeMessageContext struct {
	ConsumerGroup string
	Msgs          []*MessageExt
	MQ            *MessageQueue
	Success       bool
	Status        string
	// mqTractContext
	Properties map[string]string
}

func GetConsumerCtx

func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool)

GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so should add bool return param indicate whether exist.

type ConsumeOrderlyContext

type ConsumeOrderlyContext struct {
	MQ                            MessageQueue
	AutoCommit                    bool
	SuspendCurrentQueueTimeMillis int
}

func GetOrderlyCtx

func GetOrderlyCtx(ctx context.Context) (*ConsumeOrderlyContext, bool)

func NewConsumeOrderlyContext

func NewConsumeOrderlyContext() *ConsumeOrderlyContext

type ConsumeReturnType added in v1.0.1

type ConsumeReturnType string

func (ConsumeReturnType) Ordinal added in v1.0.1

func (c ConsumeReturnType) Ordinal() int

type Credentials

type Credentials struct {
	AccessKey     string
	SecretKey     string
	SecurityToken string
}

func (Credentials) IsEmpty

func (c Credentials) IsEmpty() bool

type CtxKey

type CtxKey int

type Interceptor

type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error

Interceptor intercepts the invoke of a producer/consumer on messages. In PushConsumer call, the req is []*MessageExt type and the reply is ConsumeResultHolder, use type assert to get real type.

func ChainInterceptors added in v1.0.1

func ChainInterceptors(interceptors ...Interceptor) Interceptor

type Invoker

type Invoker func(ctx context.Context, req, reply interface{}) error

Invoker finish a message invoke on producer/consumer.

type LocalTransactionState added in v1.0.1

type LocalTransactionState int
const (
	CommitMessageState LocalTransactionState = iota + 1
	RollbackMessageState
	UnknowState
)

type Message

type Message struct {
	Topic         string
	Body          []byte
	Flag          int32
	TransactionId string
	Batch         bool
	// QueueID is the queue that messages will be sent to. the value must be set if want to custom the queue of message,
	// just ignore if not.
	Queue *MessageQueue
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(topic string, body []byte) *Message

func (*Message) GetKeys added in v1.0.1

func (m *Message) GetKeys() string

func (*Message) GetProperty added in v1.0.1

func (m *Message) GetProperty(key string) string

func (*Message) GetShardingKey added in v1.0.1

func (m *Message) GetShardingKey() string

func (*Message) GetTags added in v1.0.1

func (m *Message) GetTags() string

func (*Message) MarshallProperties added in v1.0.1

func (m *Message) MarshallProperties() string

func (*Message) RemoveProperty

func (m *Message) RemoveProperty(key string) string

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalProperties added in v1.0.1

func (m *Message) UnmarshalProperties(data []byte)

unmarshalProperties parse data into property kv pairs.

func (*Message) WithDelayTimeLevel added in v1.0.1

func (m *Message) WithDelayTimeLevel(level int) *Message

WithDelayTimeLevel set message delay time to consume. reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.

func (*Message) WithKeys added in v1.0.1

func (m *Message) WithKeys(keys []string) *Message

func (*Message) WithProperty added in v1.0.1

func (m *Message) WithProperty(key, value string)

func (*Message) WithShardingKey added in v1.0.1

func (m *Message) WithShardingKey(key string) *Message

func (*Message) WithTag added in v1.0.1

func (m *Message) WithTag(tags string) *Message

type MessageExt

type MessageExt struct {
	Message
	MsgId                     string
	QueueId                   int32
	StoreSize                 int32
	QueueOffset               int64
	SysFlag                   int32
	BornTimestamp             int64
	BornHost                  string
	StoreTimestamp            int64
	StoreHost                 string
	CommitLogOffset           int64
	BodyCRC                   int32
	ReconsumeTimes            int32
	PreparedTransactionOffset int64
}

func DecodeMessage

func DecodeMessage(data []byte) []*MessageExt

func (*MessageExt) GetRegionID added in v1.0.1

func (msgExt *MessageExt) GetRegionID() string

func (*MessageExt) GetTags

func (msgExt *MessageExt) GetTags() string

func (*MessageExt) IsTraceOn added in v1.0.1

func (msgExt *MessageExt) IsTraceOn() string

func (*MessageExt) String

func (msgExt *MessageExt) String() string

type MessageID added in v1.0.1

type MessageID struct {
	Addr   string
	Port   int
	Offset int64
}

func UnmarshalMsgID added in v1.0.1

func UnmarshalMsgID(id []byte) (*MessageID, error)

type MessageQueue

type MessageQueue struct {
	Topic      string `json:"topic"`
	BrokerName string `json:"brokerName"`
	QueueId    int    `json:"queueId"`
}

MessageQueue message queue

func (MessageQueue) Equals

func (mq MessageQueue) Equals(queue *MessageQueue) bool

func (*MessageQueue) HashCode

func (mq *MessageQueue) HashCode() int

func (*MessageQueue) String

func (mq *MessageQueue) String() string

type MessageType added in v1.0.1

type MessageType int
const (
	NormalMsg MessageType = iota
	TransMsgHalf
	TransMsgCommit
	DelayMsg
)

type NamesrvAddr

type NamesrvAddr []string

func NewNamesrvAddr

func NewNamesrvAddr(s ...string) (NamesrvAddr, error)

func (NamesrvAddr) Check

func (addr NamesrvAddr) Check() error

type ProducerCtx added in v1.0.1

type ProducerCtx struct {
	ProducerGroup     string
	Message           Message
	MQ                MessageQueue
	BrokerAddr        string
	BornHost          string
	CommunicationMode CommunicationMode
	SendResult        *SendResult
	Props             map[string]string
	MsgType           MessageType
	Namespace         string
}

func GetProducerCtx added in v1.0.1

func GetProducerCtx(ctx context.Context) *ProducerCtx

type PullResult

type PullResult struct {
	NextBeginOffset      int64
	MinOffset            int64
	MaxOffset            int64
	Status               PullStatus
	SuggestWhichBrokerId int64
	// contains filtered or unexported fields
}

PullResult the pull result

func (*PullResult) GetBody

func (result *PullResult) GetBody() []byte

func (*PullResult) GetMessageExts

func (result *PullResult) GetMessageExts() []*MessageExt

func (*PullResult) GetMessages

func (result *PullResult) GetMessages() []*Message

func (*PullResult) SetBody

func (result *PullResult) SetBody(data []byte)

func (*PullResult) SetMessageExts

func (result *PullResult) SetMessageExts(msgExts []*MessageExt)

func (*PullResult) String

func (result *PullResult) String() string

type PullStatus

type PullStatus int

PullStatus pull Status

const (
	PullFound PullStatus = iota
	PullNoNewMsg
	PullNoMsgMatched
	PullOffsetIllegal
	PullBrokerTimeout
)

predefined pull Status

type SendResult

type SendResult struct {
	Status        SendStatus
	MsgID         string
	MessageQueue  *MessageQueue
	QueueOffset   int64
	TransactionID string
	OffsetMsgID   string
	RegionID      string
	TraceOn       bool
}

SendResult RocketMQ send result

func (*SendResult) String

func (result *SendResult) String() string

SendResult send message result to string(detail result)

type SendStatus

type SendStatus int

SendStatus of message

const (
	SendOK SendStatus = iota
	SendFlushDiskTimeout
	SendFlushSlaveTimeout
	SendSlaveNotAvailable
	SendUnknownError

	FlagCompressed = 0x1
	MsgIdLength    = 8 + 8
)

type TraceConfig

type TraceConfig struct {
	TraceTopic   string
	Access       AccessChannel
	NamesrvAddrs []string
	Credentials  // acl config for trace. omit if acl is closed on broker.
}

config for message trace.

type TransactionListener added in v1.0.1

type TransactionListener interface {
	//  When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
	ExecuteLocalTransaction(Message) LocalTransactionState

	// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
	// method will be invoked to get local transaction status.
	CheckLocalTransaction(MessageExt) LocalTransactionState
}

type TransactionSendResult added in v1.0.1

type TransactionSendResult struct {
	*SendResult
	State LocalTransactionState
}

SendResult RocketMQ send result

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL