Documentation ¶
Index ¶
- Constants
- func GetVersion() (version string)
- func Version() (version string)
- type ClientConfig
- type ConsumeStatus
- type ConsumerModel
- type LogConfig
- type LogLevel
- type Message
- type MessageExt
- type MessageModel
- type MessageQueue
- type MessageQueueSelector
- type Producer
- type ProducerConfig
- type ProducerModel
- type PullConsumer
- type PullConsumerConfig
- type PullResult
- type PullStatus
- type PushConsumer
- type PushConsumerConfig
- type SendResult
- type SendStatus
- type SessionCredentials
- type TransactionLocalListener
- type TransactionProducer
- type TransactionStatus
Constants ¶
const ( CommonProducer = ProducerModel(1) OrderlyProducer = ProducerModel(2) TransProducer = ProducerModel(3) )
Different models
const ( BroadCasting = MessageModel(1) Clustering = MessageModel(2) )
MessageModel
const ( CoCurrently = ConsumerModel(1) Orderly = ConsumerModel(2) )
ConsumerModel
const ( NIL = rmqError(C.OK) ErrNullPoint = rmqError(C.NULL_POINTER) ErrMallocFailed = rmqError(C.MALLOC_FAILED) ErrProducerStartFailed = rmqError(C.PRODUCER_START_FAILED) ErrSendSyncFailed = rmqError(C.PRODUCER_SEND_SYNC_FAILED) ErrSendOnewayFailed = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED) ErrSendOrderlyFailed = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED) ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START) ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED) ErrFetchMQFailed = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED) ErrFetchMessageFailed = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED) )
This is error messages
const ( LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL) LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR) LogLevelWarn = LogLevel(C.E_LOG_LEVEL_WARN) LogLevelInfo = LogLevel(C.E_LOG_LEVEL_INFO) LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG) LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE) LogLevelNum = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM) )
predefined log level
const ( //SendOK OK SendOK = SendStatus(C.E_SEND_OK) //SendFlushDiskTimeout Failed because broker flush error SendFlushDiskTimeout = SendStatus(C.E_SEND_FLUSH_DISK_TIMEOUT) //SendFlushSlaveTimeout Failed because slave broker timeout SendFlushSlaveTimeout = SendStatus(C.E_SEND_FLUSH_SLAVE_TIMEOUT) //SendSlaveNotAvailable Failed because slave broker error SendSlaveNotAvailable = SendStatus(C.E_SEND_SLAVE_NOT_AVAILABLE) )
const ( PullFound = PullStatus(C.E_FOUND) PullNoNewMsg = PullStatus(C.E_NO_NEW_MSG) PullNoMatchedMsg = PullStatus(C.E_NO_MATCHED_MSG) PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL) PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT) )
predefined pull status
const ( //ConsumeSuccess commit offset to broker ConsumeSuccess = ConsumeStatus(C.E_CONSUME_SUCCESS) //ReConsumeLater it will be send back to broker ReConsumeLater = ConsumeStatus(C.E_RECONSUME_LATER) )
const ( CommitTransaction = TransactionStatus(C.E_COMMIT_TRANSACTION) RollbackTransaction = TransactionStatus(C.E_ROLLBACK_TRANSACTION) UnknownTransaction = TransactionStatus(C.E_UNKNOWN_TRANSACTION) )
const GoClientVersion = "Go Client V1.2.4, Support CPP Core:V1.2.X"
GoClientVersion const strings for version
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ClientConfig ¶
type ClientConfig struct { GroupID string NameServer string NameServerDomain string InstanceName string Credentials *SessionCredentials LogC *LogConfig }
ClientConfig save client config
func (*ClientConfig) String ¶
func (config *ClientConfig) String() string
type ConsumeStatus ¶
type ConsumeStatus int
ConsumeStatus the retern value for consumer
func (ConsumeStatus) String ¶
func (status ConsumeStatus) String() string
type ConsumerModel ¶
type ConsumerModel int
ConsumerModel CoCurrently or Orderly
func (ConsumerModel) String ¶
func (mode ConsumerModel) String() string
type Message ¶
type Message struct { Topic string Tags string Keys string Body string DelayTimeLevel int Property map[string]string // contains filtered or unexported fields }
Message used for send
func (*Message) GetProperty ¶ added in v1.2.4
type MessageExt ¶
type MessageExt struct { Message MessageID string QueueId int ReconsumeTimes int StoreSize int BornTimestamp int64 StoreTimestamp int64 QueueOffset int64 CommitLogOffset int64 PreparedTransactionOffset int64 // contains filtered or unexported fields }
MessageExt used for consume
func (*MessageExt) GetProperty ¶
func (msgExt *MessageExt) GetProperty(key string) string
GetProperty get the message property by key from message ext
func (*MessageExt) String ¶
func (msgExt *MessageExt) String() string
type MessageModel ¶
type MessageModel int
MessageModel Clustering or BroadCasting
func (MessageModel) String ¶
func (mode MessageModel) String() string
type MessageQueue ¶
MessageQueue the queue of the message
func (*MessageQueue) String ¶
func (q *MessageQueue) String() string
type MessageQueueSelector ¶
MessageQueueSelector select one message queue
type Producer ¶
type Producer interface { // SendMessageSync send a message with sync SendMessageSync(msg *Message) (*SendResult, error) // SendMessageOrderly send the message orderly SendMessageOrderly( msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) (*SendResult, error) // SendMessageOneway send a message with oneway SendMessageOneway(msg *Message) error SendMessageOrderlyByShardingKey(msg *Message, shardingkey string) (*SendResult, error) // contains filtered or unexported methods }
Producer define interface
func NewProducer ¶
func NewProducer(config *ProducerConfig) (Producer, error)
NewProducer create a new producer with config
type ProducerConfig ¶
type ProducerConfig struct { ClientConfig SendMsgTimeout int CompressLevel int MaxMessageSize int ProducerModel ProducerModel }
ProducerConfig define a producer
func (*ProducerConfig) String ¶
func (config *ProducerConfig) String() string
type ProducerModel ¶
type ProducerModel int
ProducerModel Common or orderly
func (ProducerModel) String ¶
func (mode ProducerModel) String() string
type PullConsumer ¶
type PullConsumer interface { // Pull returns the messages from the consume queue by specify the offset and the max number Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult // FetchSubscriptionMessageQueues returns the consume queue of the topic FetchSubscriptionMessageQueues(topic string) []MessageQueue // contains filtered or unexported methods }
PullConsumer consumer pulling the message
func NewPullConsumer ¶
func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error)
NewPullConsumer creates a pull consumer
type PullConsumerConfig ¶
type PullConsumerConfig struct {
ClientConfig
}
PullConsumerConfig the configuration for the pull consumer
func (*PullConsumerConfig) String ¶
func (config *PullConsumerConfig) String() string
type PullResult ¶
type PullResult struct { NextBeginOffset int64 MinOffset int64 MaxOffset int64 Status PullStatus Messages []*MessageExt }
PullResult the pull result
func (*PullResult) String ¶
func (pr *PullResult) String() string
type PullStatus ¶
type PullStatus int
PullStatus pull status
func (PullStatus) String ¶
func (ps PullStatus) String() string
type PushConsumer ¶
type PushConsumer interface { // Subscribe a new topic with specify filter expression and consume function. Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error // contains filtered or unexported methods }
PushConsumer apis for PushConsumer
func NewPushConsumer ¶
func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error)
NewPushConsumer create a new consumer with config.
type PushConsumerConfig ¶
type PushConsumerConfig struct { ClientConfig ThreadCount int MessageBatchMaxSize int Model MessageModel ConsumerModel ConsumerModel MaxCacheMessageSize int MaxCacheMessageSizeInMB int }
PushConsumerConfig define a new consumer.
func (*PushConsumerConfig) String ¶
func (config *PushConsumerConfig) String() string
type SendResult ¶
type SendResult struct { Status SendStatus MsgId string Offset int64 }
SendResult status for send
func (*SendResult) String ¶
func (result *SendResult) String() string
type SendStatus ¶
type SendStatus int
SendStatus The Status for send result from C apis.
func (SendStatus) String ¶
func (status SendStatus) String() string
type SessionCredentials ¶
SessionCredentials access config for client
func (*SessionCredentials) String ¶
func (session *SessionCredentials) String() string
type TransactionLocalListener ¶ added in v1.2.4
type TransactionLocalListener interface { Execute(m *Message, arg interface{}) TransactionStatus Check(m *MessageExt, arg interface{}) TransactionStatus }
TransactionExecutor local executor for transaction message
type TransactionProducer ¶ added in v1.2.4
type TransactionProducer interface { // send a transaction message with sync SendMessageTransaction(msg *Message, arg interface{}) (*SendResult, error) // contains filtered or unexported methods }
func NewTransactionProducer ¶ added in v1.2.4
func NewTransactionProducer(config *ProducerConfig, listener TransactionLocalListener, arg interface{}) (TransactionProducer, error)
NewTransactionProducer create a new trasaction producer with config
type TransactionStatus ¶ added in v1.2.4
type TransactionStatus int
func (TransactionStatus) String ¶ added in v1.2.4
func (status TransactionStatus) String() string