Documentation ¶
Index ¶
- Constants
- Variables
- func AddIDType(c *container.Container, id uint32, msgType MsgType)
- func MakeMsg(c *container.Container, id uint32, msgType MsgType)
- func NewCounterOp(t OpTerminal, opts CounterOpts) (*CounterOp, *Error)
- func NewLocalBaseTerminal(ctx context.Context, id uint32, parentID string, remoteHub *hub.Hub, ...) (t *TerminalBase, initData *container.Container, err *Error)
- func NewLocalTestTerminal(ctx context.Context, id uint32, parentID string, remoteHub *hub.Hub, ...) (*TestTerminal, *container.Container, *Error)
- func NewRemoteBaseTerminal(ctx context.Context, id uint32, parentID string, identity *cabin.Identity, ...) (t *TerminalBase, initMsg *TerminalOpts, err *Error)
- func NewRemoteTestTerminal(ctx context.Context, id uint32, parentID string, identity *cabin.Identity, ...) (*TestTerminal, *TerminalOpts, *Error)
- func ParseTerminalOpts(c *container.Container) (*TerminalOpts, *Error)
- func RegisterOpType(params OpParams)
- type AuthorizingTerminal
- type CounterOp
- func (op *CounterOp) CounterWorker(ctx context.Context) error
- func (op *CounterOp) Deliver(data *container.Container) *Error
- func (op *CounterOp) End(err *Error)
- func (op *CounterOp) HasEnded(end bool) bool
- func (op *CounterOp) ID() uint32
- func (op *CounterOp) SendCounter() *Error
- func (op *CounterOp) SetID(id uint32)
- func (op *CounterOp) Type() string
- func (op *CounterOp) Wait()
- type CounterOpts
- type DuplexFlowQueue
- func (dfq *DuplexFlowQueue) Deliver(c *container.Container) *Error
- func (dfq *DuplexFlowQueue) FlowHandler(_ context.Context) error
- func (dfq *DuplexFlowQueue) FlowStats() string
- func (dfq *DuplexFlowQueue) Flush()
- func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{}
- func (dfq *DuplexFlowQueue) Receive() <-chan *container.Container
- func (dfq *DuplexFlowQueue) Send(c *container.Container) *Error
- func (dfq *DuplexFlowQueue) SendRaw(c *container.Container) *Error
- type Error
- func (e *Error) AsExternal() *Error
- func (e *Error) Error() string
- func (e *Error) ID() uint8
- func (e *Error) Is(target error) bool
- func (e *Error) IsError() bool
- func (e *Error) IsExternal() bool
- func (e *Error) IsOK() bool
- func (e *Error) Pack() []byte
- func (e *Error) Unwrap() error
- func (e *Error) With(format string, a ...interface{}) *Error
- func (e *Error) Wrap(format string, a ...interface{}) *Error
- type MsgType
- type OpBase
- type OpBaseRequest
- type OpParams
- type OpRunner
- type OpTerminal
- type Operation
- type Permission
- type TerminalBase
- func (t *TerminalBase) Abandon(err *Error)
- func (t *TerminalBase) Ctx() context.Context
- func (t *TerminalBase) DeleteActiveOp(opID uint32)
- func (t *TerminalBase) Deliver(c *container.Container) *Error
- func (t *TerminalBase) Flush()
- func (t *TerminalBase) FmtID() string
- func (t *TerminalBase) GetActiveOp(opID uint32) (op Operation, ok bool)
- func (t *TerminalBase) GetActiveOpCount() int
- func (t *TerminalBase) GrantPermission(grant Permission)
- func (t *TerminalBase) Handler(_ context.Context) error
- func (t *TerminalBase) HasPermission(required Permission) bool
- func (t *TerminalBase) ID() uint32
- func (t *TerminalBase) OpEnd(op Operation, err *Error)
- func (t *TerminalBase) OpInit(op Operation, data *container.Container) *Error
- func (t *TerminalBase) OpSend(op Operation, data *container.Container) *Error
- func (t *TerminalBase) OpSendWithTimeout(op Operation, data *container.Container, timeout time.Duration) *Error
- func (t *TerminalBase) Sender(_ context.Context) error
- func (t *TerminalBase) SetActiveOp(opID uint32, op Operation)
- func (t *TerminalBase) SetTerminalExtension(ext TerminalExtension)
- func (t *TerminalBase) SetTimeout(d time.Duration)
- func (t *TerminalBase) Shutdown(err *Error, sendError bool)
- func (t *TerminalBase) SubmitAsDataMsg(submitFunc func(*container.Container)) func(*container.Container)
- func (t *TerminalBase) WaitForFlush()
- type TerminalExtension
- type TerminalInterface
- type TerminalOpts
- type TestTerminal
Constants ¶
const ( DefaultQueueSize = 50000 MaxQueueSize = 1000000 )
const CounterOpType string = "debug/count"
const ( // DefaultOperationTimeout is the default time duration after which an idle // operation times out and is ended or regarded as failed. DefaultOperationTimeout = 10 * time.Second )
Variables ¶
var ( // ErrUnknownError is the default error. ErrUnknownError = registerError(0, errors.New("unknown error")) ErrStopping = registerError(2, errors.New("stopping")) ErrExplicitAck = registerError(3, errors.New("explicit ack")) ErrInternalError = registerError(8, errors.New("internal error")) ErrMalformedData = registerError(9, errors.New("malformed data")) ErrUnexpectedMsgType = registerError(10, errors.New("unexpected message type")) ErrUnknownOperationType = registerError(11, errors.New("unknown operation type")) ErrUnknownOperationID = registerError(12, errors.New("unknown operation id")) ErrPermissinDenied = registerError(13, errors.New("permission denied")) ErrIntegrity = registerError(14, errors.New("integrity violated")) ErrInvalidOptions = registerError(15, errors.New("invalid options")) ErrHubNotReady = registerError(16, errors.New("hub not ready")) ErrIncorrectUsage = registerError(22, errors.New("incorrect usage")) ErrTimeout = registerError(62, errors.New("timed out")) ErrUnsupportedVersion = registerError(93, errors.New("unsupported version")) ErrShipSunk = registerError(108, errors.New("ship sunk")) ErrTryAgainLater = registerError(114, errors.New("try again later")) ErrConnectionError = registerError(121, errors.New("connection error")) ErrQueueOverflow = registerError(122, errors.New("queue overflowed")) ErrCanceled = registerError(125, context.Canceled) )
Terminal Errors.
Functions ¶
func NewCounterOp ¶
func NewCounterOp(t OpTerminal, opts CounterOpts) (*CounterOp, *Error)
func NewLocalBaseTerminal ¶
func NewLocalTestTerminal ¶ added in v0.3.11
func NewRemoteBaseTerminal ¶
func NewRemoteTestTerminal ¶ added in v0.3.11
func ParseTerminalOpts ¶
func ParseTerminalOpts(c *container.Container) (*TerminalOpts, *Error)
func RegisterOpType ¶
func RegisterOpType(params OpParams)
RegisterOpType registeres a new operation type and may only be called during Go's init and a module's prep phase.
Types ¶
type AuthorizingTerminal ¶
type AuthorizingTerminal interface { GrantPermission(grant Permission) HasPermission(required Permission) bool }
type CounterOp ¶
type CounterOp struct { ClientCounter uint64 ServerCounter uint64 Error error // contains filtered or unexported fields }
func (*CounterOp) SendCounter ¶
type CounterOpts ¶
type DuplexFlowQueue ¶
type DuplexFlowQueue struct {
// contains filtered or unexported fields
}
func NewDuplexFlowQueue ¶
func NewDuplexFlowQueue( ti TerminalInterface, queueSize uint32, submitUpstream func(*container.Container), ) *DuplexFlowQueue
func (*DuplexFlowQueue) Deliver ¶
func (dfq *DuplexFlowQueue) Deliver(c *container.Container) *Error
Deliver submits a container for receiving from upstream.
func (*DuplexFlowQueue) FlowHandler ¶
func (dfq *DuplexFlowQueue) FlowHandler(_ context.Context) error
FlowHandler handles all flow queue internals and must be started as a worker in the module where it is used.
func (*DuplexFlowQueue) FlowStats ¶
func (dfq *DuplexFlowQueue) FlowStats() string
FlowStats returns a k=v formatted string of internal stats.
func (*DuplexFlowQueue) Flush ¶
func (dfq *DuplexFlowQueue) Flush()
Flush waits for all waiting data to be sent.
func (*DuplexFlowQueue) ReadyToSend ¶
func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{}
Send adds the given container to the send queue.
func (*DuplexFlowQueue) Receive ¶
func (dfq *DuplexFlowQueue) Receive() <-chan *container.Container
Receive receives a container from the recv queue.
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error is a terminal error.
func NewExternalError ¶
NewExternalError creates an external error based on the given ID.
func ParseExternalError ¶
NewExternalError creates an external error based on the given serialized ID.
func (*Error) AsExternal ¶
AsExternal creates and returns an external version of the error.
func (*Error) IsExternal ¶
IsExternal returns whether the error occurred externally.
func (*Error) Pack ¶
Pack returns the serialized internal error ID. The additional message is lost and is replaced with the default message upon parsing.
type MsgType ¶
type MsgType uint8
const ( // MsgTypeInit is used to establish a new terminal or run a new operation. MsgTypeInit MsgType = 1 // MsgTypeData is used to send data to a terminal or operation. MsgTypeData MsgType = 2 // MsgTypeStop is used to abandon a terminal or end an operation, with an optional error. MsgTypeStop MsgType = 3 )
type OpBaseRequest ¶
func (*OpBaseRequest) End ¶
func (op *OpBaseRequest) End(err *Error)
func (*OpBaseRequest) Init ¶
func (op *OpBaseRequest) Init(deliverQueueSize int)
type OpParams ¶
type OpParams struct { // Type is the type name of an operation. Type string // Requires defines the required permissions to run an operation. Requires Permission // RunOp is the function that start a new operation. RunOp OpRunner }
type OpTerminal ¶
type OpTerminal interface { // OpInit initialized the operation with the given data. OpInit(op Operation, data *container.Container) *Error // OpSend sends data. OpSend(op Operation, data *container.Container) *Error // OpSendWithTimeout sends data, but fails after the given timeout passed. OpSendWithTimeout(op Operation, data *container.Container, timeout time.Duration) *Error // OpEnd sends the end signal and calls End(ErrNil) on the Operation. // The Operation should cease operation after calling this function. OpEnd(op Operation, err *Error) // FmtID returns the formatted ID the Operation's Terminal. FmtID() string // Flush writes all pending data waiting to be sent. Flush() }
OpTerminal provides Operations with the necessary interface to interact with the Terminal.
type Permission ¶
type Permission uint16
const ( NoPermission Permission = 0x0 MayExpand Permission = 0x1 MayConnect Permission = 0x2 IsHubOwner Permission = 0x100 IsHubAdvisor Permission = 0x200 IsCraneController Permission = 0x8000 )
func AddPermissions ¶
func AddPermissions(perms ...Permission) Permission
AddPermissions combines multiple permissions.
func (Permission) Has ¶
func (p Permission) Has(required Permission) bool
Has returns if the permission includes the specified permission.
type TerminalBase ¶
type TerminalBase struct { // Abandoned indicates if the Terminal has been abandoned. Whoever abandoned // the terminal already took care of notifying everyone, so a silent fail is // normally the best response. Abandoned *abool.AtomicBool // contains filtered or unexported fields }
TerminalBase contains the basic functions of a terminal.
func (*TerminalBase) Abandon ¶
func (t *TerminalBase) Abandon(err *Error)
Abandon abandons the Terminal with the given error.
func (*TerminalBase) Ctx ¶
func (t *TerminalBase) Ctx() context.Context
Ctx returns the Terminal's context.
func (*TerminalBase) DeleteActiveOp ¶
func (t *TerminalBase) DeleteActiveOp(opID uint32)
DeleteActiveOp deletes an active operation from the Terminal state.
func (*TerminalBase) Deliver ¶
func (t *TerminalBase) Deliver(c *container.Container) *Error
Deliver on TerminalBase only exists to conform to the interface. It must be overridden by an actual implementation.
func (*TerminalBase) Flush ¶
func (t *TerminalBase) Flush()
Flush sends all data waiting to be sent.
func (*TerminalBase) FmtID ¶
func (t *TerminalBase) FmtID() string
func (*TerminalBase) GetActiveOp ¶
func (t *TerminalBase) GetActiveOp(opID uint32) (op Operation, ok bool)
GetActiveOp returns the active operation with the given ID from the Terminal state.
func (*TerminalBase) GetActiveOpCount ¶ added in v0.3.6
func (t *TerminalBase) GetActiveOpCount() int
GetActiveOpCount returns the amount of active operations.
func (*TerminalBase) GrantPermission ¶
func (t *TerminalBase) GrantPermission(grant Permission)
GrandPermission grants the specified permissions to the Terminal.
func (*TerminalBase) Handler ¶
func (t *TerminalBase) Handler(_ context.Context) error
Handler receives and handles messages and must be started as a worker in the module where the Terminal is used.
func (*TerminalBase) HasPermission ¶
func (t *TerminalBase) HasPermission(required Permission) bool
HasPermission returns if the Terminal has the specified permission.
func (*TerminalBase) OpEnd ¶
func (t *TerminalBase) OpEnd(op Operation, err *Error)
OpEnd sends the end signal with an optional error and then deletes the operation from the Terminal state and calls End(ErrNil) on the Operation. The Operation should cease operation after calling this function. Should only be called by an operation.
func (*TerminalBase) OpInit ¶
func (t *TerminalBase) OpInit(op Operation, data *container.Container) *Error
OpInit initialized the operation with the given data.
func (*TerminalBase) OpSend ¶
func (t *TerminalBase) OpSend(op Operation, data *container.Container) *Error
OpSend sends data.
func (*TerminalBase) OpSendWithTimeout ¶
func (t *TerminalBase) OpSendWithTimeout(op Operation, data *container.Container, timeout time.Duration) *Error
OpSendWithTimeout sends data, but fails after the given timeout passed.
func (*TerminalBase) Sender ¶
func (t *TerminalBase) Sender(_ context.Context) error
Sender handles sending messages and must be started as a worker in the module where the Terminal is used.
func (*TerminalBase) SetActiveOp ¶
func (t *TerminalBase) SetActiveOp(opID uint32, op Operation)
SetActiveOp saves an active operation to the Terminal state.
func (*TerminalBase) SetTerminalExtension ¶
func (t *TerminalBase) SetTerminalExtension(ext TerminalExtension)
SetTerminalExtension sets the Terminal's extension. This function is not guarded and may only be used during initialization.
func (*TerminalBase) SetTimeout ¶
func (t *TerminalBase) SetTimeout(d time.Duration)
SetTimeout sets the Terminal's idle timeout duration. It is broken down into slots internally.
func (*TerminalBase) Shutdown ¶
func (t *TerminalBase) Shutdown(err *Error, sendError bool)
Shutdown sends a stop message with the given error (if it is external) and ends all operations with a nil error and finally cancels the terminal context. This function is usually not called directly, but at the end of an Abandon() implementation.
func (*TerminalBase) SubmitAsDataMsg ¶
func (t *TerminalBase) SubmitAsDataMsg(submitFunc func(*container.Container)) func(*container.Container)
SubmitAsDataMsg wraps the given submit function to call MakeMsg on the data before submitting.
func (*TerminalBase) WaitForFlush ¶
func (t *TerminalBase) WaitForFlush()
WaitForFlush makes the terminal pause all sending until the next call to Flush().
type TerminalExtension ¶
type TerminalInterface ¶
type TerminalOpts ¶
type TerminalOpts struct { Version uint8 `json:"-"` QueueSize uint32 `json:"qs,omitempty"` Padding uint16 `json:"p,omitempty"` Encrypt bool `json:"e,omitempty"` }
TerminalOpts holds configuration for the terminal.
type TestTerminal ¶ added in v0.3.11
type TestTerminal struct { *TerminalBase *DuplexFlowQueue }
func NewSimpleTestTerminalPair ¶ added in v0.3.11
func NewSimpleTestTerminalPair(delay time.Duration, opts *TerminalOpts) (a, b *TestTerminal, err error)
func (*TestTerminal) Abandon ¶ added in v0.3.11
func (t *TestTerminal) Abandon(err *Error)
func (*TestTerminal) Flush ¶ added in v0.3.11
func (t *TestTerminal) Flush()