Documentation ¶
Overview ¶
Package pqueue is a generated protocol buffer package.
It is generated from these files:
pqueue/pqmsg.proto
It has these top-level messages:
PQueueMsgData
Index ¶
- Constants
- Variables
- func DefaultPQConfig() *conf.PQConfig
- func LoadPQueue(svcs apis.IServices, desc *queue_info.ServiceDescription) (apis.ISvc, error)
- func ParsePQConfig(params []string) (*conf.PQConfig, apis.IResponse)
- type MsgHeap
- func (s *MsgHeap) ContainsSn(sn uint64) bool
- func (s *MsgHeap) Empty() bool
- func (s *MsgHeap) GetMsg(sn uint64) *PQMsgMetaData
- func (s *MsgHeap) Init()
- func (s *MsgHeap) Len() int
- func (s *MsgHeap) MinMsg() *PQMsgMetaData
- func (s *MsgHeap) NotEmpty() bool
- func (s *MsgHeap) Pop() *PQMsgMetaData
- func (s *MsgHeap) Push(msg *PQMsgMetaData)
- func (s *MsgHeap) Remove(sn uint64) *PQMsgMetaData
- type MsgResponseItem
- type PQContext
- func (ctx *PQContext) Call(cmd string, params []string) apis.IResponse
- func (ctx *PQContext) CheckTimeouts(params []string) apis.IResponse
- func (ctx *PQContext) DeleteById(params []string) apis.IResponse
- func (ctx *PQContext) DeleteByReceipt(params []string) apis.IResponse
- func (ctx *PQContext) DeleteLockedById(params []string) apis.IResponse
- func (ctx *PQContext) Finish()
- func (ctx *PQContext) GetCurrentStatus(params []string) apis.IResponse
- func (ctx *PQContext) GetMessageInfo(params []string) apis.IResponse
- func (ctx *PQContext) Pop(params []string) apis.IResponse
- func (ctx *PQContext) PopLock(params []string) apis.IResponse
- func (ctx *PQContext) Push(params []string) apis.IResponse
- func (ctx *PQContext) SetParamValue(params []string) apis.IResponse
- func (ctx *PQContext) UnlockByReceipt(params []string) apis.IResponse
- func (ctx *PQContext) UnlockMessageById(params []string) apis.IResponse
- func (ctx *PQContext) UpdateLockById(params []string) apis.IResponse
- func (ctx *PQContext) UpdateLockByRcpt(params []string) apis.IResponse
- type PQMsgMetaData
- type PQueue
- func (pq *PQueue) AvailableMessages() int64
- func (pq *PQueue) CheckTimeouts(ts int64) apis.IResponse
- func (pq *PQueue) Clear()
- func (pq *PQueue) Close()
- func (pq *PQueue) Config() *conf.PQConfig
- func (pq *PQueue) DelayedCount() int64
- func (pq *PQueue) DeleteById(msgId string) apis.IResponse
- func (pq *PQueue) DeleteByReceipt(rcpt string) apis.IResponse
- func (pq *PQueue) DeleteLockedById(msgId string) apis.IResponse
- func (pq *PQueue) Description() *queue_info.ServiceDescription
- func (pq *PQueue) GetCurrentStatus() apis.IResponse
- func (pq *PQueue) GetMessageInfo(msgId string) apis.IResponse
- func (pq *PQueue) GetStatus() map[string]interface{}
- func (pq *PQueue) Info() apis.ServiceInfo
- func (pq *PQueue) IsClosed() bool
- func (pq *PQueue) LockedCount() int64
- func (pq *PQueue) NewContext(rw apis.ResponseWriter) apis.ServiceContext
- func (pq *PQueue) Pop(lockTimeout, popWaitTimeout, limit int64, lock bool) apis.IResponse
- func (pq *PQueue) Push(msgId string, payload string, msgTtl, delay, priority int64) apis.IResponse
- func (pq *PQueue) ReleaseInFlight(cutOffTs int64) apis.IResponse
- func (pq *PQueue) SetParams(params *PQueueParams) apis.IResponse
- func (pq *PQueue) StartUpdate()
- func (pq *PQueue) TimeoutItems(cutOffTs int64) apis.IResponse
- func (pq *PQueue) TotalMessages() int64
- func (pq *PQueue) UnlockByReceipt(rcpt string) apis.IResponse
- func (pq *PQueue) UnlockMessageById(msgId string) apis.IResponse
- func (pq *PQueue) UpdateLockById(msgId string, lockTimeout int64) apis.IResponse
- func (pq *PQueue) UpdateLockByRcpt(rcpt string, lockTimeout int64) apis.IResponse
- type PQueueMsgData
- func (*PQueueMsgData) Descriptor() ([]byte, []int)
- func (this *PQueueMsgData) Equal(that interface{}) bool
- func (this *PQueueMsgData) GoString() string
- func (m *PQueueMsgData) Marshal() (data []byte, err error)
- func (m *PQueueMsgData) MarshalTo(data []byte) (int, error)
- func (*PQueueMsgData) ProtoMessage()
- func (m *PQueueMsgData) Reset()
- func (m *PQueueMsgData) Size() (n int)
- func (this *PQueueMsgData) String() string
- func (m *PQueueMsgData) Unmarshal(data []byte) error
- type PQueueParams
Constants ¶
const ( PQ_CMD_DELETE_LOCKED_BY_ID = "DELLCK" PQ_CMD_DELETE_BY_ID = "DEL" PQ_CMD_DELETE_BY_RCPT = "RDEL" PQ_CMD_UNLOCK_BY_RCPT = "RUNLCK" PQ_CMD_UNLOCK_BY_ID = "UNLCK" PQ_CMD_UPD_LOCK_BY_ID = "UPDLCK" PQ_CMD_UPD_LOCK_BY_RCPT = "RUPDLCK" PQ_CMD_PUSH = "PUSH" PQ_CMD_POP = "POP" PQ_CMD_POPLOCK = "POPLCK" PQ_CMD_MSG_INFO = "MSGINFO" PQ_CMD_STATUS = "STATUS" PQ_CMD_CHECK_TIMEOUTS = "CHKTS" PQ_CMD_SET_CFG = "SETCFG" PQ_CMD_PURGE = "PURGE" )
const ( PRM_ID = "ID" PRM_RECEIPT = "RCPT" PRM_POP_WAIT = "WAIT" PRM_LOCK_TIMEOUT = "TIMEOUT" PRM_PRIORITY = "PRIORITY" PRM_LIMIT = "LIMIT" PRM_PAYLOAD = "PL" PRM_DELAY = "DELAY" PRM_TIMESTAMP = "TS" PRM_ASYNC = "ASYNC" PRM_SYNC_WAIT = "SYNCWAIT" PRM_MSG_TTL = "TTL" )
const ( CPRM_MSG_TTL = "MSGTTL" CPRM_MAX_MSG_SIZE = "MSGSIZE" CPRM_MAX_MSGS_IN_QUEUE = "MAXMSGS" CPRM_DELIVERY_DELAY = "DELAY" CPRM_POP_LIMIT = "POPLIMIT" CPRM_LOCK_TIMEOUT = "TIMEOUT" CPRM_FAIL_QUEUE = "FAILQ" CPRM_POP_WAIT = "WAIT" )
const ( PQ_STATUS_MAX_QUEUE_SIZE = "MaxMsgsInQueue" PQ_STATUS_POP_WAIT_TIMEOUT = "PopWaitTimeout" PQ_STATUS_MSG_TTL = "MsgTtl" PQ_STATUS_DELIVERY_DELAY = "DeliveryDelay" PQ_STATUS_POP_LOCK_TIMEOUT = "PopLockTimeout" PQ_STATUS_POP_COUNT_LIMIT = "PopCountLimit" PQ_STATUS_CREATE_TS = "CreateTs" PQ_STATUS_LAST_PUSH_TS = "LastPushTs" PQ_STATUS_LAST_POP_TS = "LastPopTs" PQ_STATUS_TOTAL_MSGS = "TotalMessages" PQ_STATUS_IN_FLIGHT_MSG = "InFlightMessages" PQ_STATUS_AVAILABLE_MSGS = "AvailableMessages" PQ_STATUS_DELAYED = "DelayedMessages" PQ_STATUS_FAIL_QUEUE = "FailQueue" PQ_STATUS_MAX_MSG_SIZE = "MaxMsgSize" )
const ( MSG_INFO_ID = "Id" MSG_INFO_LOCKED = "Locked" MSG_INFO_UNLOCK_TS = "UnlockTs" MSG_INFO_POP_COUNT = "PopCount" MSG_INFO_PRIORITY = "Priority" MSG_INFO_EXPIRE_TS = "ExpireTs" )
const PAYLOAD_LIMIT = 512 * 1024
Variables ¶
var ( ErrInvalidLengthPqmsg = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowPqmsg = fmt.Errorf("proto: integer overflow") )
Functions ¶
func DefaultPQConfig ¶
func LoadPQueue ¶
func LoadPQueue(svcs apis.IServices, desc *queue_info.ServiceDescription) (apis.ISvc, error)
Types ¶
type MsgHeap ¶
type MsgHeap struct {
// contains filtered or unexported fields
}
func NewMsgHeap ¶
func NewMsgHeap() *MsgHeap
func (*MsgHeap) ContainsSn ¶
func (*MsgHeap) GetMsg ¶
func (s *MsgHeap) GetMsg(sn uint64) *PQMsgMetaData
func (*MsgHeap) MinMsg ¶
func (s *MsgHeap) MinMsg() *PQMsgMetaData
func (*MsgHeap) Pop ¶
func (s *MsgHeap) Pop() *PQMsgMetaData
func (*MsgHeap) Push ¶
func (s *MsgHeap) Push(msg *PQMsgMetaData)
func (*MsgHeap) Remove ¶
func (s *MsgHeap) Remove(sn uint64) *PQMsgMetaData
type MsgResponseItem ¶
type MsgResponseItem struct {
// contains filtered or unexported fields
}
func NewMsgResponseItem ¶
func NewMsgResponseItem(msg *PQMsgMetaData, payload []byte) *MsgResponseItem
func (*MsgResponseItem) GetMeta ¶
func (p *MsgResponseItem) GetMeta() *PQMsgMetaData
func (*MsgResponseItem) ID ¶
func (p *MsgResponseItem) ID() string
func (*MsgResponseItem) Payload ¶
func (p *MsgResponseItem) Payload() []byte
func (*MsgResponseItem) Receipt ¶
func (p *MsgResponseItem) Receipt() string
func (*MsgResponseItem) WriteResponse ¶
func (p *MsgResponseItem) WriteResponse(buf *bufio.Writer) error
type PQContext ¶
type PQContext struct {
// contains filtered or unexported fields
}
func NewPQContext ¶
func NewPQContext(pq *PQueue, r apis.ResponseWriter) *PQContext
func (*PQContext) CheckTimeouts ¶
func (*PQContext) DeleteByReceipt ¶
DeleteByReceipt deletes locked message using provided receipt. This is a preferable method to unlock messages. It helps to avoid race condition in case if when message lock has timed out and was picked up by other consumer.
func (*PQContext) DeleteLockedById ¶
func (*PQContext) GetCurrentStatus ¶
func (*PQContext) GetMessageInfo ¶
func (*PQContext) Push ¶
Push message to the queue. Pushing message automatically enables auto expiration.
func (*PQContext) SetParamValue ¶
func (*PQContext) UnlockByReceipt ¶
UnlockByReceipt unlocks locked message using provided receipt. Unlocking by receipt is making sure message was not relocked by something else.
func (*PQContext) UnlockMessageById ¶
func (*PQContext) UpdateLockById ¶
UpdateLockById sets a user defined message lock timeout. It works only for locked messages.
type PQMsgMetaData ¶
type PQMsgMetaData struct { SerialNumber uint64 PQueueMsgData }
func NewPQMsgMetaData ¶
func NewPQMsgMetaData(id string, priority int64, expireTs int64, sn uint64) *PQMsgMetaData
func UnmarshalPQMsgMetaData ¶
func UnmarshalPQMsgMetaData(sn uint64, buf []byte) *PQMsgMetaData
func (*PQMsgMetaData) ByteMarshal ¶
func (self *PQMsgMetaData) ByteMarshal() []byte
func (*PQMsgMetaData) Sn2Bin ¶
func (pqm *PQMsgMetaData) Sn2Bin() string
type PQueue ¶
type PQueue struct { // A wrapper on top of common database operations. db.DBService // contains filtered or unexported fields }
func InitPQueue ¶
func InitPQueue(svcs apis.IServices, desc *queue_info.ServiceDescription, config *conf.PQConfig) *PQueue
func (*PQueue) AvailableMessages ¶
AvailableMessages returns number of available messages.
func (*PQueue) Clear ¶
func (pq *PQueue) Clear()
Clear drops all locked and unlocked messages in the queue.
func (*PQueue) Config ¶
ServiceConfig returns service config as an empty interface type. User service type getter to find out the expected config type.
func (*PQueue) DelayedCount ¶
DelayedCount is the number of messages which are delayed for delivery.
func (*PQueue) Description ¶
func (pq *PQueue) Description() *queue_info.ServiceDescription
Description is queue description.
func (*PQueue) GetCurrentStatus ¶
func (*PQueue) Info ¶
func (pq *PQueue) Info() apis.ServiceInfo
func (*PQueue) LockedCount ¶
LockedCount is the number of messages which are locked at the moment.
func (*PQueue) NewContext ¶
func (pq *PQueue) NewContext(rw apis.ResponseWriter) apis.ServiceContext
func (*PQueue) Pop ¶
PopWaitItems pops 'limit' messages within 'timeout'(milliseconds) time interval.
func (*PQueue) StartUpdate ¶
func (pq *PQueue) StartUpdate()
StartUpdate runs a loop of periodic data updates.
func (*PQueue) TotalMessages ¶
TotalMessages returns a number of all messages currently in the queue.
func (*PQueue) UpdateLockById ¶
UpdateLockById sets a user defined message lock timeout. It works only for locked messages.
type PQueueMsgData ¶
type PQueueMsgData struct { Priority int64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"` ExpireTs int64 `protobuf:"varint,2,opt,name=expire_ts,json=expireTs,proto3" json:"expire_ts,omitempty"` PopCount int64 `protobuf:"varint,3,opt,name=pop_count,json=popCount,proto3" json:"pop_count,omitempty"` UnlockTs int64 `protobuf:"varint,4,opt,name=unlock_ts,json=unlockTs,proto3" json:"unlock_ts,omitempty"` StrId string `protobuf:"bytes,5,opt,name=str_id,json=strId,proto3" json:"str_id,omitempty"` }
func (*PQueueMsgData) Descriptor ¶
func (*PQueueMsgData) Descriptor() ([]byte, []int)
func (*PQueueMsgData) Equal ¶
func (this *PQueueMsgData) Equal(that interface{}) bool
func (*PQueueMsgData) GoString ¶
func (this *PQueueMsgData) GoString() string
func (*PQueueMsgData) Marshal ¶
func (m *PQueueMsgData) Marshal() (data []byte, err error)
func (*PQueueMsgData) ProtoMessage ¶
func (*PQueueMsgData) ProtoMessage()
func (*PQueueMsgData) Reset ¶
func (m *PQueueMsgData) Reset()
func (*PQueueMsgData) Size ¶
func (m *PQueueMsgData) Size() (n int)
func (*PQueueMsgData) String ¶
func (this *PQueueMsgData) String() string
func (*PQueueMsgData) Unmarshal ¶
func (m *PQueueMsgData) Unmarshal(data []byte) error