Documentation ¶
Index ¶
- Constants
- func BrokerRuntimeInfo(client remote.Client, addr string, timeout time.Duration) (map[string]string, error)
- func CreateOrUpdateTopic(client remote.Client, addr string, header *CreateOrUpdateTopicHeader, ...) (err error)
- func DeleteTopicInBroker(client remote.Client, addr, topic string, to time.Duration) (err error)
- func DeleteTopicInNamesrv(client remote.Client, addr, topic string, to time.Duration) (err error)
- func GetBrokerClusterInfo(client remote.Client, addr string, to time.Duration) (*route.ClusterInfo, error)
- func GetConsumerIDs(client remote.Client, addr, group string, to time.Duration) (ids []string, err error)
- func LockMessageQueues(client remote.Client, addr string, group, clientID string, ...) ([]message.Queue, error)
- func PullMessageAsync(client remote.Client, addr string, header *PullHeader, to time.Duration, ...) error
- func RegisterFilter(client remote.Client, addr, group, clientID string, subData *SubscribeData, ...) error
- func ResetConsumeOffset(client remote.Client, addr, topic, group string, timestamp time.Time, ...) ([]byte, error)
- func SendBack(client remote.Client, addr string, h *SendBackHeader, to time.Duration) (err error)
- func SendHeartbeat(client remote.Client, addr string, heartbeat *HeartbeatRequest, ...) (version int16, err error)
- func UnlockMessageQueuesOneway(client remote.Client, addr, group, clientID string, queues []message.Queue) error
- func UnregisterClient(client remote.Client, addr, clientID, pGroup, cGroup string, to time.Duration) (err error)
- func UpdateConsumerOffset(client remote.Client, addr, topic, group string, queueID int, offset int64, ...) error
- func UpdateConsumerOffsetOneway(client remote.Client, addr, topic, group string, queueID int, offset int64) error
- func ViewMessageByOffset(client remote.Client, addr string, offset int64, to time.Duration) (*message.Ext, error)
- type ConsumeDirectlyResult
- type ConsumeMessageDirectlyResult
- type Consumer
- type CreateOrUpdateTopicHeader
- type Error
- func GetTopicRouteInfo(client remote.Client, addr string, topic string, to time.Duration) (router *route.TopicRouter, err *Error)
- func MaxOffset(client remote.Client, addr, topic string, queueID uint8, to time.Duration) (int64, *Error)
- func QueryConsumerOffset(client remote.Client, addr, topic, group string, queueID int, to time.Duration) (int64, *Error)
- func SearchOffsetByTimestamp(client remote.Client, addr, topic string, queueID uint8, timestamp time.Time, ...) (int64, *Error)
- type HeartbeatRequest
- type Producer
- type PullHeader
- type PullResponse
- type SendBackHeader
- type SendHeader
- type SendResponse
- type SubscribeData
Constants ¶
const ( CheckTransactionState = 39 NotifyConsumerIdsChanged = 40 ResetConsumerClientOffset = 220 GetConsumerStatusFromClient = 221 GetConsumerRunningInfo = 307 ConsumeMessageDirectlyCode = 309 )
request code
const ( UnknowError remote.Code = -1 Success = 0 SystemError = 1 SystemBusy = 2 RequestCodeNotSupported = 3 TransactionFailed = 4 FlushDiskTimeout = 10 SlaveNotAvailable = 11 FlushSlaveTimeout = 12 MessageIllegal = 13 ServiceNotAvailable = 14 VersionNotSupported = 15 NoPermission = 16 TopicNotExist = 17 TopicExistAlready = 18 PullNotFound = 19 PullRetryImmediately = 20 PullOffsetMoved = 21 QueryNotFound = 22 SubscriptionParseFailed = 23 SubscriptionNotExist = 24 SubscriptionNotLatest = 25 SubscriptionGroupNotExist = 26 FilterDataNotExist = 27 FilterDataNotLatest = 28 TransactionShouldCommit = 200 TransactionShouldRollback = 201 TransactionStateUnknow = 202 TransactionStateGroupWrong = 203 NoBuyerID = 204 NotInCurrentUnit = 205 ConsumerNotOnline = 206 ConsumeMsgTimeout = 207 NoMessage = 208 ConnectBrokerException = 10001 AccessBrokerException = 10002 BrokerNotExistException = 10003 NoNameServerException = 10004 NotFoundTopicException = 10005 )
response code
Variables ¶
This section is empty.
Functions ¶
func BrokerRuntimeInfo ¶
func BrokerRuntimeInfo(client remote.Client, addr string, timeout time.Duration) ( map[string]string, error, )
BrokerRuntimeInfo returns the broker runtime information.
func CreateOrUpdateTopic ¶
func CreateOrUpdateTopic(client remote.Client, addr string, header *CreateOrUpdateTopicHeader, to time.Duration) ( err error, )
CreateOrUpdateTopic create topic from broker
func DeleteTopicInBroker ¶
DeleteTopicInBroker delete topic in the broker
func DeleteTopicInNamesrv ¶
DeleteTopicInNamesrv delete topic in the broker
func GetBrokerClusterInfo ¶
func GetBrokerClusterInfo(client remote.Client, addr string, to time.Duration) (*route.ClusterInfo, error)
GetBrokerClusterInfo get the cluster info from the namesrv
func GetConsumerIDs ¶
func GetConsumerIDs(client remote.Client, addr, group string, to time.Duration) ( ids []string, err error, )
GetConsumerIDs get the client id from the broker
func LockMessageQueues ¶
func LockMessageQueues( client remote.Client, addr string, group, clientID string, queues []message.Queue, to time.Duration, ) ([]message.Queue, error)
LockMessageQueues send lock message queue request to the broker
func PullMessageAsync ¶
func PullMessageAsync( client remote.Client, addr string, header *PullHeader, to time.Duration, callback func(*PullResponse, error), ) error
PullMessageAsync pull the message async
func RegisterFilter ¶
func RegisterFilter( client remote.Client, addr, group, clientID string, subData *SubscribeData, to time.Duration, ) error
RegisterFilter register the filter to the broker
func ResetConsumeOffset ¶
func ResetConsumeOffset( client remote.Client, addr, topic, group string, timestamp time.Time, isForce bool, timeout time.Duration, ) ( []byte, error, )
ResetConsumeOffset requests broker to reset the offsets of the specified topic, the offsets' owner is specified by the group
func SendHeartbeat ¶
func SendHeartbeat(client remote.Client, addr string, heartbeat *HeartbeatRequest, to time.Duration) ( version int16, err error, )
SendHeartbeat send the heartbeat to the broker
func UnlockMessageQueuesOneway ¶
func UnlockMessageQueuesOneway( client remote.Client, addr, group, clientID string, queues []message.Queue, ) error
UnlockMessageQueuesOneway send unlock message queue request to the broker
func UnregisterClient ¶
func UnregisterClient( client remote.Client, addr, clientID, pGroup, cGroup string, to time.Duration, ) ( err error, )
UnregisterClient unregister the producer/consumer group from broker
func UpdateConsumerOffset ¶
func UpdateConsumerOffset( client remote.Client, addr, topic, group string, queueID int, offset int64, to time.Duration, ) error
UpdateConsumerOffset updates the offset of the consumer queue
Types ¶
type ConsumeDirectlyResult ¶
type ConsumeDirectlyResult int
ConsumeDirectlyResult the flag of the consuming directly
type ConsumeMessageDirectlyResult ¶
type ConsumeMessageDirectlyResult struct { Order bool AutoCommit bool Remark string TimeCost time.Duration Result ConsumeDirectlyResult }
ConsumeMessageDirectlyResult consume the message directly, sending by the broker
func ConsumeMessageDirectly ¶
func ConsumeMessageDirectly( client remote.Client, addr, group, clientID, offsetID string, timeout time.Duration, ) ( ret ConsumeMessageDirectlyResult, err error, )
ConsumeMessageDirectly send unlock message queue request to the broker
type Consumer ¶
type Consumer struct { Group string `json:"groupName"` Type string `json:"consumeType"` Model string `json:"messageModel"` FromWhere string `json:"consumeFromWhere"` Subscription []*SubscribeData `json:"subscriptionDataSet"` UnitMode bool `json:"unitMode"` }
Consumer consumer's data in the heartbeat
type CreateOrUpdateTopicHeader ¶
type CreateOrUpdateTopicHeader struct { Topic string DefaultTopic string ReadQueueNums int32 WriteQueueNums int32 Perm int32 TopicFilterType string TopicSysFlag int32 Order bool }
CreateOrUpdateTopicHeader create topic params
func (*CreateOrUpdateTopicHeader) ToMap ¶
func (h *CreateOrUpdateTopicHeader) ToMap() map[string]string
ToMap serializes to the map
type Error ¶
Error rpc error wraper
func GetTopicRouteInfo ¶
func GetTopicRouteInfo(client remote.Client, addr string, topic string, to time.Duration) ( router *route.TopicRouter, err *Error, )
GetTopicRouteInfo returns the topic information.
func MaxOffset ¶
func MaxOffset(client remote.Client, addr, topic string, queueID uint8, to time.Duration) ( int64, *Error, )
MaxOffset returns the max offset in the consume queue
func QueryConsumerOffset ¶
func QueryConsumerOffset( client remote.Client, addr, topic, group string, queueID int, to time.Duration, ) ( int64, *Error, )
QueryConsumerOffset returns the offset of the specified topic and group
type HeartbeatRequest ¶
type HeartbeatRequest struct { ClientID string `json:"clientID"` Producers []Producer `json:"producerDataSet"` Consumers []*Consumer `json:"consumerDataSet"` }
HeartbeatRequest heartbeat request data
func (*HeartbeatRequest) String ¶
func (h *HeartbeatRequest) String() string
type Producer ¶
type Producer struct {
Group string `json:"groupName"`
}
Producer producer's data in the heartbeat
type PullHeader ¶
type PullHeader struct { ConsumerGroup string Topic string QueueOffset int64 MaxCount int32 SysFlag int32 CommitOffset int64 SuspendTimeoutMillis int64 Subscription string SubVersion int64 ExpressionType string QueueID uint8 }
PullHeader pull message header
func (*PullHeader) ToMap ¶
func (p *PullHeader) ToMap() map[string]string
ToMap converts pull header to map
type PullResponse ¶
type PullResponse struct { Code remote.Code Message string Version int16 NextBeginOffset int64 MinOffset int64 MaxOffset int64 Messages []*message.Ext SuggestBrokerID int64 }
PullResponse pull message response
func PullMessageSync ¶
func PullMessageSync( client remote.Client, addr string, header *PullHeader, to time.Duration, ) ( *PullResponse, error, )
PullMessageSync pull message sync
type SendBackHeader ¶
type SendBackHeader struct { CommitOffset int64 Group string DelayLevel int32 MessageID string Topic string IsUnitMode bool MaxReconsumeTimes int }
SendBackHeader send message back params
func (*SendBackHeader) ToMap ¶
func (h *SendBackHeader) ToMap() map[string]string
ToMap converts send back header to map
type SendHeader ¶
type SendHeader struct { Group string Topic string DefaultTopic string DefaultTopicQueueNums int32 QueueID uint8 SysFlag int32 BornTimestamp int64 Flag int32 Properties string ReconsumeTimes int32 UnitMode bool Batch bool MaxReconsumeTimes int32 }
SendHeader send request header
func (*SendHeader) ToMap ¶
func (h *SendHeader) ToMap() map[string]string
ToMap serialzes to the map
type SendResponse ¶
type SendResponse struct { Code remote.Code Message string Version int16 MsgID string QueueOffset int64 QueueID int32 RegionID string TraceOn bool TransactionID string }
SendResponse send response
func SendMessageSync ¶
func SendMessageSync( client remote.Client, addr string, body []byte, h *SendHeader, to time.Duration, ) ( *SendResponse, error, )
SendMessageSync send message to broker sync
type SubscribeData ¶
type SubscribeData struct { Topic string `json:"topic"` Expr string `json:"subString"` Type string `json:"expressionType"` Tags []string `json:"tagsSet"` Codes []uint32 `json:"codeSet"` Version int64 `json:"subVersion"` IsClassFilterMode bool `json:"classFilterMode"` FilterClassSource string `json:"-"` }
SubscribeData subscription information
func (*SubscribeData) String ¶
func (sd *SubscribeData) String() string