pipe

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2021 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	//DefaultSlotSize slot size
	//素数
	DefaultSlotSize = 509
	//DefaultQInSlotSize default pipe size, total current request can be pipe is 509*1024*8 = 4169728
	DefaultQInSlotSize = 1024 * 8
)

Variables

View Source
var (
	//ErrClosed close
	ErrClosed = errors.New("pipe.q.closed")

	//ErrCtrlQFull ctrl q full
	ErrCtrlQFull = errors.New("pipe.q.ctrl.full")
	//ErrReqQFull req q full
	ErrReqQFull = errors.New("pipe.q.req.full")

	//ErrSync never gonna happen
	ErrSync = errors.New("never.gonna.happen.crazy")
)
View Source
var (
	//ErrNoMsgHandler -- no message handler
	ErrNoMsgHandler = status.Error(codes.Unimplemented, "no.msg.handler")
	//ErrMsgQueueFull -- msg queue is full
	ErrMsgQueueFull = status.Error(codes.ResourceExhausted, "msg.queue.full")
	//ErrMsgQueueClosed -- msg queue closed
	ErrMsgQueueClosed = status.Error(codes.Unavailable, "msg.queue.closed")
	//ErrInvalidParam -- invalid msg param
	ErrInvalidParam = status.Error(codes.InvalidArgument, "invalid.input.msg")
	//ErrInvalidResult -- invalid result
	ErrInvalidResult = status.Error(codes.Internal, "invalid.output.msg")
)

Functions

This section is empty.

Types

type MsgHandler

type MsgHandler struct {
	// contains filtered or unexported fields
}

MsgHandler : msg handler

func NewMsgHandler

func NewMsgHandler() *MsgHandler

func (*MsgHandler) GetFn

func (h *MsgHandler) GetFn(fnIndex int) ProcMsgFn

GetFn -- get function by fn index

func (*MsgHandler) GetFunctionName

func (h *MsgHandler) GetFunctionName(fnIndex int) string

GetFunctionName -- get function name by fn index

func (*MsgHandler) RegisterFn

func (h *MsgHandler) RegisterFn(name string, fnIndex int, fn ProcMsgFn)

RegisterFn -- register function

type MsgProc

type MsgProc struct {
	// contains filtered or unexported fields
}

func NewMsgProc

func NewMsgProc(ctx context.Context, slotIndex int, fnIndex int, inputMsg proto.Message) *MsgProc

func (*MsgProc) GetOutput

func (m *MsgProc) GetOutput() (proto.Message, error)

func (*MsgProc) SetOutput

func (m *MsgProc) SetOutput(outputMsg proto.Message, err error)

type OutPutMsg

type OutPutMsg struct {
	// contains filtered or unexported fields
}

type ProcMsgFn

type ProcMsgFn func(ctx context.Context, index int, inputMsg proto.Message) (outputMsg proto.Message, err error)

ProcMsgFn function

type ProcMsgInfo

type ProcMsgInfo struct {
	// contains filtered or unexported fields
}

ProcMsgInfo -- name and function

type Q

type Q struct {
	// contains filtered or unexported fields
}

Q actor queue structure define

func NewQ

func NewQ(options ...QOption) *Q

NewQ new queue

func (*Q) AddCtrl

func (a *Q) AddCtrl(cmd interface{}) error

AddCtrl add control request to the control queue end place.

func (*Q) AddCtrlAnyway

func (a *Q) AddCtrlAnyway(cmd interface{}, ts time.Duration) error

AddCtrlAnyway dd control request to the control queue end place anyway if queue full, sleep then try

func (*Q) AddPriorCtrl

func (a *Q) AddPriorCtrl(cmd interface{}) error

AddPriorCtrl add control request to the control queue first place.

func (*Q) AddPriorReq

func (a *Q) AddPriorReq(req interface{}) error

AddPriorReq add normal request to the normal queue first place.

func (*Q) AddReq

func (a *Q) AddReq(req interface{}) error

AddReq add normal request to the normal queue end place.

func (*Q) AddReqAnyway

func (a *Q) AddReqAnyway(req interface{}, ts time.Duration) error

AddReqAnyway add normal request to the normal queue end place anyway if queue full, sleep then try

func (*Q) Close

func (a *Q) Close()

Close : close the queue

func (*Q) IsCleared

func (a *Q) IsCleared() bool

IsCleared is cleared or not

func (*Q) IsClosed

func (a *Q) IsClosed() bool

IsClosed is closed or not

func (*Q) Pop

func (a *Q) Pop() (interface{}, error)

Pop consume an item, if list is empty, it's been blocked

func (*Q) PopAnyway

func (a *Q) PopAnyway() (interface{}, error)

PopAnyway consume an item like Pop, but it can consume even the queue is closed.

func (*Q) TryClear

func (a *Q) TryClear() bool

TryClear try to clear a queue. the function should be called by consumer. if the queue be set can pop even after closed. the consumer handle the last pop item, it can call the function. after the function be called, which means the queue is totally clear. no producer/no consumer anymore

func (*Q) TryClose

func (a *Q) TryClose() bool

TryClose try to close a queue in case it's empty. otherwise, the queue can not be closed.

func (*Q) WaitClear

func (a *Q) WaitClear(ctx context.Context) error

WaitClear wait clear, must call in another go routine clear must be called after queue closed. be caution: if there is no one to call TryClear to clear the queue the clear would never happen

func (*Q) WaitClose

func (a *Q) WaitClose(ctx context.Context) error

WaitClose wait close, must call in another go routine

type QOption

type QOption func(o *_QOption)

QOption option function

func WithQCtrlSize

func WithQCtrlSize(num int) QOption

WithQCtrlSize setup max queue number of control queue if max is 0, which means no limit

func WithQReqSize

func WithQReqSize(num int) QOption

WithQReqSize setup max queue number of request queue if max is 0, which means no limit

type ShOption

type ShOption func(o *_ShOption)

ShOption shunt option function

func WithQSizeInSlot

func WithQSizeInSlot(qSizeInSlot int) ShOption

WithQSizeInSlot setup queue size in each slot

func WithSlotSize

func WithSlotSize(slotSize int) ShOption

WithSlotSize setup slot size

type Shunt

type Shunt struct {
	// contains filtered or unexported fields
}

Shunt multi-queue: like shunt

func NewShunt

func NewShunt(opts ...ShOption) *Shunt

NewShunt : new shunt

func NewShuntWithSize

func NewShuntWithSize(slotSize int, qSizeInSlot int) *Shunt

NewShuntWithSize : new shunt with size

func (*Shunt) AddMsg

func (s *Shunt) AddMsg(ctx context.Context, slotIndex int, fnIndex int, inputMsg proto.Message) (*MsgProc, error)

AddMsg : add msg

func (*Shunt) AddPriorMsg

func (s *Shunt) AddPriorMsg(ctx context.Context, slotIndex int, fnIndex int, inputMsg proto.Message) (*MsgProc, error)

AddPriorMsg : add prior msg

func (*Shunt) RegisterMsgHandler

func (s *Shunt) RegisterMsgHandler(name string, fnIndex int, fn ProcMsgFn)

RegisterMsgHandler : register msg handler

func (*Shunt) Run

func (s *Shunt) Run()

Run : run all queue msg handler

func (*Shunt) SizeOfQInSlot

func (s *Shunt) SizeOfQInSlot() int

SizeOfQInSlot : get queue size in each slot

func (*Shunt) SizeOfSlot

func (s *Shunt) SizeOfSlot() int

SizeOfSlot : get slot size

func (*Shunt) Stop

func (s *Shunt) Stop()

Stop : stop

func (*Shunt) WaitStop

func (s *Shunt) WaitStop(ctx context.Context) error

WaitStop : wait stop

Directories

Path Synopsis
grpcexample
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL