Documentation ¶
Index ¶
- Constants
- Variables
- type MsgHandler
- type MsgProc
- type OutPutMsg
- type ProcMsgFn
- type ProcMsgInfo
- type Q
- func (a *Q) AddCtrl(cmd interface{}) error
- func (a *Q) AddCtrlAnyway(cmd interface{}, ts time.Duration) error
- func (a *Q) AddPriorCtrl(cmd interface{}) error
- func (a *Q) AddPriorReq(req interface{}) error
- func (a *Q) AddReq(req interface{}) error
- func (a *Q) AddReqAnyway(req interface{}, ts time.Duration) error
- func (a *Q) Close()
- func (a *Q) IsCleared() bool
- func (a *Q) IsClosed() bool
- func (a *Q) Pop() (interface{}, error)
- func (a *Q) PopAnyway() (interface{}, error)
- func (a *Q) TryClear() bool
- func (a *Q) TryClose() bool
- func (a *Q) WaitClear(ctx context.Context) error
- func (a *Q) WaitClose(ctx context.Context) error
- type QOption
- type ShOption
- type Shunt
- func (s *Shunt) AddMsg(ctx context.Context, slotIndex int, fnIndex int, inputMsg proto.Message) (*MsgProc, error)
- func (s *Shunt) AddPriorMsg(ctx context.Context, slotIndex int, fnIndex int, inputMsg proto.Message) (*MsgProc, error)
- func (s *Shunt) RegisterMsgHandler(name string, fnIndex int, fn ProcMsgFn)
- func (s *Shunt) Run()
- func (s *Shunt) SizeOfQInSlot() int
- func (s *Shunt) SizeOfSlot() int
- func (s *Shunt) Stop()
- func (s *Shunt) WaitStop(ctx context.Context) error
Constants ¶
const ( //DefaultSlotSize slot size //素数 DefaultSlotSize = 509 //DefaultQInSlotSize default pipe size, total current request can be pipe is 509*1024*8 = 4169728 DefaultQInSlotSize = 1024 * 8 )
Variables ¶
var ( //ErrClosed close ErrClosed = errors.New("pipe.q.closed") //ErrCtrlQFull ctrl q full ErrCtrlQFull = errors.New("pipe.q.ctrl.full") //ErrReqQFull req q full ErrReqQFull = errors.New("pipe.q.req.full") //ErrSync never gonna happen ErrSync = errors.New("never.gonna.happen.crazy") )
var ( //ErrNoMsgHandler -- no message handler ErrNoMsgHandler = status.Error(codes.Unimplemented, "no.msg.handler") //ErrMsgQueueFull -- msg queue is full ErrMsgQueueFull = status.Error(codes.ResourceExhausted, "msg.queue.full") //ErrMsgQueueClosed -- msg queue closed ErrMsgQueueClosed = status.Error(codes.Unavailable, "msg.queue.closed") //ErrInvalidParam -- invalid msg param ErrInvalidParam = status.Error(codes.InvalidArgument, "invalid.input.msg") //ErrInvalidResult -- invalid result ErrInvalidResult = status.Error(codes.Internal, "invalid.output.msg") )
Functions ¶
This section is empty.
Types ¶
type MsgHandler ¶
type MsgHandler struct {
// contains filtered or unexported fields
}
MsgHandler : msg handler
func NewMsgHandler ¶
func NewMsgHandler() *MsgHandler
func (*MsgHandler) GetFn ¶
func (h *MsgHandler) GetFn(fnIndex int) ProcMsgFn
GetFn -- get function by fn index
func (*MsgHandler) GetFunctionName ¶
func (h *MsgHandler) GetFunctionName(fnIndex int) string
GetFunctionName -- get function name by fn index
func (*MsgHandler) RegisterFn ¶
func (h *MsgHandler) RegisterFn(name string, fnIndex int, fn ProcMsgFn)
RegisterFn -- register function
type ProcMsgFn ¶
type ProcMsgFn func(ctx context.Context, index int, inputMsg proto.Message) (outputMsg proto.Message, err error)
ProcMsgFn function
type ProcMsgInfo ¶
type ProcMsgInfo struct {
// contains filtered or unexported fields
}
ProcMsgInfo -- name and function
type Q ¶
type Q struct {
// contains filtered or unexported fields
}
Q actor queue structure define
func (*Q) AddCtrlAnyway ¶
AddCtrlAnyway dd control request to the control queue end place anyway if queue full, sleep then try
func (*Q) AddPriorCtrl ¶
AddPriorCtrl add control request to the control queue first place.
func (*Q) AddPriorReq ¶
AddPriorReq add normal request to the normal queue first place.
func (*Q) AddReqAnyway ¶
AddReqAnyway add normal request to the normal queue end place anyway if queue full, sleep then try
func (*Q) PopAnyway ¶
PopAnyway consume an item like Pop, but it can consume even the queue is closed.
func (*Q) TryClear ¶
TryClear try to clear a queue. the function should be called by consumer. if the queue be set can pop even after closed. the consumer handle the last pop item, it can call the function. after the function be called, which means the queue is totally clear. no producer/no consumer anymore
func (*Q) TryClose ¶
TryClose try to close a queue in case it's empty. otherwise, the queue can not be closed.
type QOption ¶
type QOption func(o *_QOption)
QOption option function
func WithQCtrlSize ¶
WithQCtrlSize setup max queue number of control queue if max is 0, which means no limit
func WithQReqSize ¶
WithQReqSize setup max queue number of request queue if max is 0, which means no limit
type ShOption ¶
type ShOption func(o *_ShOption)
ShOption shunt option function
func WithQSizeInSlot ¶
WithQSizeInSlot setup queue size in each slot
type Shunt ¶
type Shunt struct {
// contains filtered or unexported fields
}
Shunt multi-queue: like shunt
func NewShuntWithSize ¶
NewShuntWithSize : new shunt with size
func (*Shunt) AddMsg ¶
func (s *Shunt) AddMsg(ctx context.Context, slotIndex int, fnIndex int, inputMsg proto.Message) (*MsgProc, error)
AddMsg : add msg
func (*Shunt) AddPriorMsg ¶
func (s *Shunt) AddPriorMsg(ctx context.Context, slotIndex int, fnIndex int, inputMsg proto.Message) (*MsgProc, error)
AddPriorMsg : add prior msg
func (*Shunt) RegisterMsgHandler ¶
RegisterMsgHandler : register msg handler
func (*Shunt) SizeOfQInSlot ¶
SizeOfQInSlot : get queue size in each slot