Documentation ¶
Index ¶
- Constants
- Variables
- func AddIDType(c *container.Container, id uint32, msgType MsgType)
- func MakeDirectDeliveryDeliverFunc(ctx context.Context, deliver chan *Msg) func(c *Msg) *Error
- func MakeDirectDeliveryRecvFunc(deliver chan *Msg) func() <-chan *Msg
- func MakeMsg(c *container.Container, id uint32, msgType MsgType)
- func NewCounterOp(t Terminal, opts CounterOpts) (*CounterOp, *Error)
- func NewLocalBaseTerminal(ctx context.Context, id uint32, parentID string, remoteHub *hub.Hub, ...) (t *TerminalBase, initData *container.Container, err *Error)
- func NewLocalTestTerminal(ctx context.Context, id uint32, parentID string, remoteHub *hub.Hub, ...) (*TestTerminal, *container.Container, *Error)
- func NewRemoteBaseTerminal(ctx context.Context, id uint32, parentID string, identity *cabin.Identity, ...) (t *TerminalBase, initMsg *TerminalOpts, err *Error)
- func NewRemoteTestTerminal(ctx context.Context, id uint32, parentID string, identity *cabin.Identity, ...) (*TestTerminal, *TerminalOpts, *Error)
- func ParseTerminalOpts(c *container.Container) (*TerminalOpts, *Error)
- func RegisterOpType(factory OperationFactory)
- func StopScheduler()
- func TimedOut(timeout time.Duration) <-chan time.Time
- type AuthorizingTerminal
- type BareTerminal
- func (t *BareTerminal) Abandon(err *Error)
- func (t *BareTerminal) Ctx() context.Context
- func (t *BareTerminal) Deliver(msg *Msg) *Error
- func (t *BareTerminal) Flush(timeout time.Duration)
- func (t *BareTerminal) FmtID() string
- func (t *BareTerminal) HandleAbandon(err *Error) (errorToSend *Error)
- func (t *BareTerminal) HandleDestruction(err *Error)
- func (t *BareTerminal) ID() uint32
- func (t *BareTerminal) Send(msg *Msg, timeout time.Duration) *Error
- func (t *BareTerminal) StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error
- func (t *BareTerminal) StopOperation(op Operation, err *Error)
- type CounterOp
- type CounterOpts
- type CustomTerminalIDFormatting
- type DuplexFlowQueue
- func (dfq *DuplexFlowQueue) Deliver(msg *Msg) *Error
- func (dfq *DuplexFlowQueue) FlowHandler(_ context.Context) error
- func (dfq *DuplexFlowQueue) FlowStats() string
- func (dfq *DuplexFlowQueue) Flush(timeout time.Duration)
- func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{}
- func (dfq *DuplexFlowQueue) Receive() <-chan *Msg
- func (dfq *DuplexFlowQueue) RecvQueueLen() int
- func (dfq *DuplexFlowQueue) Send(msg *Msg, timeout time.Duration) *Error
- func (dfq *DuplexFlowQueue) SendQueueLen() int
- func (dfq *DuplexFlowQueue) StartWorkers(m *modules.Module, terminalName string)
- type Error
- func (e *Error) AsExternal() *Error
- func (e *Error) Error() string
- func (e *Error) ID() uint8
- func (e *Error) Is(target error) bool
- func (e *Error) IsError() bool
- func (e *Error) IsExternal() bool
- func (e *Error) IsOK() bool
- func (e *Error) Pack() []byte
- func (e *Error) Unwrap() error
- func (e *Error) With(format string, a ...interface{}) *Error
- func (e *Error) Wrap(format string, a ...interface{}) *Error
- type FlowControl
- type FlowControlType
- type MessageStreamOperationBase
- type Msg
- type MsgType
- type OneOffOperationBase
- type Operation
- type OperationBase
- func (op *OperationBase) Deliver(_ *Msg) *Error
- func (op *OperationBase) Flush(timeout time.Duration)
- func (op *OperationBase) HandleStop(err *Error) (errorToSend *Error)
- func (op *OperationBase) ID() uint32
- func (op *OperationBase) InitOperationBase(t Terminal, opID uint32)
- func (op *OperationBase) NewEmptyMsg() *Msg
- func (op *OperationBase) NewMsg(data []byte) *Msg
- func (op *OperationBase) Send(msg *Msg, timeout time.Duration) *Error
- func (op *OperationBase) Stop(self Operation, err *Error)
- func (op *OperationBase) Stopped() bool
- func (op *OperationBase) Terminal() Terminal
- func (op *OperationBase) Type() string
- type OperationFactory
- type OperationStarter
- type Permission
- type RateLimiter
- type Terminal
- type TerminalBase
- func (t *TerminalBase) Abandon(err *Error)
- func (t *TerminalBase) Ctx() context.Context
- func (t *TerminalBase) DeleteActiveOp(opID uint32)
- func (t *TerminalBase) Deliver(msg *Msg) *Error
- func (t *TerminalBase) Flush(timeout time.Duration)
- func (t *TerminalBase) FmtID() string
- func (t *TerminalBase) GetActiveOp(opID uint32) (op Operation, ok bool)
- func (t *TerminalBase) GetActiveOpCount() int
- func (t *TerminalBase) GrantPermission(grant Permission)
- func (t *TerminalBase) HandleAbandon(err *Error) (errorToSend *Error)
- func (t *TerminalBase) HandleDestruction(err *Error)
- func (t *TerminalBase) Handler(_ context.Context) error
- func (t *TerminalBase) HasPermission(required Permission) bool
- func (t *TerminalBase) ID() uint32
- func (t *TerminalBase) Send(msg *Msg, timeout time.Duration) *Error
- func (t *TerminalBase) Sender(_ context.Context) error
- func (t *TerminalBase) SetActiveOp(opID uint32, op Operation)
- func (t *TerminalBase) SetTerminalExtension(ext Terminal)
- func (t *TerminalBase) SetTimeout(d time.Duration)
- func (t *TerminalBase) StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error
- func (t *TerminalBase) StartWorkers(m *modules.Module, terminalName string)
- func (t *TerminalBase) StopOperation(op Operation, err *Error)
- func (t *TerminalBase) WaitForFlush()
- type TerminalOpts
- type TestTerminal
- type Upstream
- type UpstreamSendFunc
Constants ¶
const ( DefaultQueueSize = 50000 MaxQueueSize = 1000000 )
Flow Queue Configuration.
const CounterOpType string = "debug/count"
CounterOpType is the type ID for the Counter Operation.
const ( // UsePriorityDataMsgs defines whether priority data messages should be used. UsePriorityDataMsgs = true )
Variables ¶
var ( // ErrUnknownError is the default error. ErrUnknownError = registerError(0, errors.New("unknown error")) ErrStopping = registerError(2, errors.New("stopping")) ErrExplicitAck = registerError(3, errors.New("explicit ack")) ErrNoActivity = registerError(4, errors.New("no activity")) ErrInternalError = registerError(8, errors.New("internal error")) ErrMalformedData = registerError(9, errors.New("malformed data")) ErrUnexpectedMsgType = registerError(10, errors.New("unexpected message type")) ErrUnknownOperationType = registerError(11, errors.New("unknown operation type")) ErrUnknownOperationID = registerError(12, errors.New("unknown operation id")) ErrPermissionDenied = registerError(13, errors.New("permission denied")) ErrIntegrity = registerError(14, errors.New("integrity violated")) ErrInvalidOptions = registerError(15, errors.New("invalid options")) ErrHubNotReady = registerError(16, errors.New("hub not ready")) ErrIncorrectUsage = registerError(22, errors.New("incorrect usage")) ErrTimeout = registerError(62, errors.New("timed out")) ErrUnsupportedVersion = registerError(93, errors.New("unsupported version")) ErrAbandonedTerminal = registerError(102, errors.New("terminal is being abandoned")) ErrShipSunk = registerError(108, errors.New("ship sunk")) ErrTryAgainLater = registerError(114, errors.New("try again later")) ErrConnectionError = registerError(121, errors.New("connection error")) ErrQueueOverflow = registerError(122, errors.New("queue overflowed")) ErrCanceled = registerError(125, context.Canceled) )
Terminal Errors.
Functions ¶
func MakeDirectDeliveryDeliverFunc ¶ added in v0.4.8
MakeDirectDeliveryDeliverFunc creates a submit upstream function with the given delivery channel.
func MakeDirectDeliveryRecvFunc ¶ added in v0.4.8
MakeDirectDeliveryRecvFunc makes a delivery receive function with the given delivery channel.
func NewCounterOp ¶
func NewCounterOp(t Terminal, opts CounterOpts) (*CounterOp, *Error)
NewCounterOp returns a new CounterOp.
func NewLocalBaseTerminal ¶
func NewLocalBaseTerminal( ctx context.Context, id uint32, parentID string, remoteHub *hub.Hub, initMsg *TerminalOpts, upstream Upstream, ) ( t *TerminalBase, initData *container.Container, err *Error, )
NewLocalBaseTerminal creates a new local terminal base for use with inheriting terminals.
func NewLocalTestTerminal ¶ added in v0.3.11
func NewLocalTestTerminal( ctx context.Context, id uint32, parentID string, remoteHub *hub.Hub, initMsg *TerminalOpts, upstream Upstream, ) (*TestTerminal, *container.Container, *Error)
NewLocalTestTerminal returns a new local test terminal.
func NewRemoteBaseTerminal ¶
func NewRemoteBaseTerminal( ctx context.Context, id uint32, parentID string, identity *cabin.Identity, initData *container.Container, upstream Upstream, ) ( t *TerminalBase, initMsg *TerminalOpts, err *Error, )
NewRemoteBaseTerminal creates a new remote terminal base for use with inheriting terminals.
func NewRemoteTestTerminal ¶ added in v0.3.11
func NewRemoteTestTerminal( ctx context.Context, id uint32, parentID string, identity *cabin.Identity, initData *container.Container, upstream Upstream, ) (*TestTerminal, *TerminalOpts, *Error)
NewRemoteTestTerminal returns a new remote test terminal.
func ParseTerminalOpts ¶
func ParseTerminalOpts(c *container.Container) (*TerminalOpts, *Error)
ParseTerminalOpts parses terminal options from the container and checks if they are valid.
func RegisterOpType ¶
func RegisterOpType(factory OperationFactory)
RegisterOpType registers a new operation type and may only be called during Go's init and a module's prep phase.
Types ¶
type AuthorizingTerminal ¶
type AuthorizingTerminal interface { GrantPermission(grant Permission) HasPermission(required Permission) bool }
AuthorizingTerminal is an interface for terminals that support authorization.
type BareTerminal ¶ added in v0.6.0
type BareTerminal struct{}
BareTerminal is a bare terminal that just returns errors for testing.
func (*BareTerminal) Abandon ¶ added in v0.6.0
func (t *BareTerminal) Abandon(err *Error)
Abandon shuts down the terminal unregistering it from upstream and calling HandleAbandon(). Should not be overridden by implementations.
func (*BareTerminal) Ctx ¶ added in v0.6.0
func (t *BareTerminal) Ctx() context.Context
Ctx returns the terminal context.
func (*BareTerminal) Deliver ¶ added in v0.6.0
func (t *BareTerminal) Deliver(msg *Msg) *Error
Deliver delivers a message to the terminal. Should not be overridden by implementations.
func (*BareTerminal) Flush ¶ added in v0.6.0
func (t *BareTerminal) Flush(timeout time.Duration)
Flush sends all messages waiting in the terminal. Should not be overridden by implementations.
func (*BareTerminal) FmtID ¶ added in v0.6.0
func (t *BareTerminal) FmtID() string
FmtID formats the terminal ID (including parent IDs). May be overridden by implementations.
func (*BareTerminal) HandleAbandon ¶ added in v0.6.0
func (t *BareTerminal) HandleAbandon(err *Error) (errorToSend *Error)
HandleAbandon gives the terminal the ability to cleanly shut down. The terminal is still fully functional at this point. The returned error is the error to send to the other side. Should never be called directly. Call Abandon() instead. Meant to be overridden by implementations.
func (*BareTerminal) HandleDestruction ¶ added in v0.6.0
func (t *BareTerminal) HandleDestruction(err *Error)
HandleDestruction gives the terminal the ability to clean up. The terminal has already fully shut down at this point. Should never be called directly. Call Abandon() instead. Meant to be overridden by implementations.
func (*BareTerminal) ID ¶ added in v0.6.0
func (t *BareTerminal) ID() uint32
ID returns the terminal ID.
func (*BareTerminal) Send ¶ added in v0.6.0
func (t *BareTerminal) Send(msg *Msg, timeout time.Duration) *Error
Send is used by others to send a message through the terminal. Should not be overridden by implementations.
func (*BareTerminal) StartOperation ¶ added in v0.6.0
func (t *BareTerminal) StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error
StartOperation starts the given operation by assigning it an ID and sending the given operation initialization data. Should not be overridden by implementations.
func (*BareTerminal) StopOperation ¶ added in v0.6.0
func (t *BareTerminal) StopOperation(op Operation, err *Error)
StopOperation stops the given operation. Should not be overridden by implementations.
type CounterOp ¶
type CounterOp struct { OperationBase ClientCounter uint64 ServerCounter uint64 Error error // contains filtered or unexported fields }
CounterOp sends increasing numbers on both sides.
func (*CounterOp) CounterWorker ¶
CounterWorker is a worker that sends counters.
func (*CounterOp) HandleStop ¶ added in v0.6.0
HandleStop handles stopping the operation.
func (*CounterOp) SendCounter ¶
SendCounter sends the next counter.
type CounterOpts ¶
type CounterOpts struct { ClientCountTo uint64 ServerCountTo uint64 Wait time.Duration Flush bool // contains filtered or unexported fields }
CounterOpts holds the options for CounterOp.
type CustomTerminalIDFormatting ¶ added in v0.6.0
type CustomTerminalIDFormatting interface {
CustomIDFormat() string
}
CustomTerminalIDFormatting defines an interface for terminal to define their custom ID format.
type DuplexFlowQueue ¶
type DuplexFlowQueue struct {
// contains filtered or unexported fields
}
DuplexFlowQueue is a duplex flow control mechanism using queues.
func NewDuplexFlowQueue ¶
func NewDuplexFlowQueue( ctx context.Context, queueSize uint32, submitUpstream func(msg *Msg, timeout time.Duration), ) *DuplexFlowQueue
NewDuplexFlowQueue returns a new duplex flow queue.
func (*DuplexFlowQueue) Deliver ¶
func (dfq *DuplexFlowQueue) Deliver(msg *Msg) *Error
Deliver submits a container for receiving from upstream.
func (*DuplexFlowQueue) FlowHandler ¶
func (dfq *DuplexFlowQueue) FlowHandler(_ context.Context) error
FlowHandler handles all flow queue internals and must be started as a worker in the module where it is used.
func (*DuplexFlowQueue) FlowStats ¶
func (dfq *DuplexFlowQueue) FlowStats() string
FlowStats returns a k=v formatted string of internal stats.
func (*DuplexFlowQueue) Flush ¶
func (dfq *DuplexFlowQueue) Flush(timeout time.Duration)
Flush waits for all waiting data to be sent.
func (*DuplexFlowQueue) ReadyToSend ¶
func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{}
ReadyToSend returns a channel that can be read when data can be sent.
func (*DuplexFlowQueue) Receive ¶
func (dfq *DuplexFlowQueue) Receive() <-chan *Msg
Receive receives a container from the recv queue.
func (*DuplexFlowQueue) RecvQueueLen ¶ added in v0.4.14
func (dfq *DuplexFlowQueue) RecvQueueLen() int
RecvQueueLen returns the current length of the receive queue.
func (*DuplexFlowQueue) Send ¶
func (dfq *DuplexFlowQueue) Send(msg *Msg, timeout time.Duration) *Error
Send adds the given container to the send queue.
func (*DuplexFlowQueue) SendQueueLen ¶ added in v0.4.14
func (dfq *DuplexFlowQueue) SendQueueLen() int
SendQueueLen returns the current length of the send queue.
func (*DuplexFlowQueue) StartWorkers ¶ added in v0.4.8
func (dfq *DuplexFlowQueue) StartWorkers(m *modules.Module, terminalName string)
StartWorkers starts the necessary workers to operate the flow queue.
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error is a terminal error.
func NewExternalError ¶
NewExternalError creates an external error based on the given ID.
func ParseExternalError ¶
ParseExternalError parses an external error.
func (*Error) AsExternal ¶
AsExternal creates and returns an external version of the error.
func (*Error) IsError ¶ added in v0.3.11
IsError returns if the error represents an erronous condition.
func (*Error) IsExternal ¶
IsExternal returns whether the error occurred externally.
func (*Error) IsOK ¶ added in v0.3.11
IsOK returns if the error represents a "OK" or success status.
func (*Error) Pack ¶
Pack returns the serialized internal error ID. The additional message is lost and is replaced with the default message upon parsing.
type FlowControl ¶ added in v0.4.8
type FlowControl interface { Deliver(msg *Msg) *Error Receive() <-chan *Msg Send(msg *Msg, timeout time.Duration) *Error ReadyToSend() <-chan struct{} Flush(timeout time.Duration) StartWorkers(m *modules.Module, terminalName string) RecvQueueLen() int SendQueueLen() int }
FlowControl defines the flow control interface.
type FlowControlType ¶ added in v0.4.8
type FlowControlType uint8
FlowControlType represents a flow control type.
const ( FlowControlDefault FlowControlType = 0 FlowControlDFQ FlowControlType = 1 FlowControlNone FlowControlType = 2 )
Flow Control Types.
func (FlowControlType) DefaultSize ¶ added in v0.4.8
func (fct FlowControlType) DefaultSize() uint32
DefaultSize returns the default flow control size.
type MessageStreamOperationBase ¶ added in v0.6.0
type MessageStreamOperationBase struct { OperationBase Delivered chan *Msg Ended chan *Error }
MessageStreamOperationBase is an operation base for receiving a message stream. Every received message must be finished by the implementing operation.
func (*MessageStreamOperationBase) Deliver ¶ added in v0.6.0
func (op *MessageStreamOperationBase) Deliver(msg *Msg) *Error
Deliver delivers data to the operation.
func (*MessageStreamOperationBase) HandleStop ¶ added in v0.6.0
func (op *MessageStreamOperationBase) HandleStop(err *Error) (errorToSend *Error)
HandleStop gives the operation the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Stop() instead.
func (*MessageStreamOperationBase) Init ¶ added in v0.6.0
func (op *MessageStreamOperationBase) Init(deliverQueueSize int)
Init initializes the operation base.
type Msg ¶ added in v0.6.0
type Msg struct { FlowID uint32 Type MsgType Data *container.Container // Unit scheduling. // Note: With just 100B per packet, a uint64 (the Unit ID) is enough for // over 1800 Exabyte. No need for overflow support. Unit *unit.Unit }
Msg is a message within the SPN network stack. It includes metadata and unit scheduling.
func NewEmptyMsg ¶ added in v0.6.0
func NewEmptyMsg() *Msg
NewEmptyMsg returns a new empty msg with an initialized Unit. The FlowID is unset. The Type is Data. The Data is unset.
func (*Msg) Consume ¶ added in v0.6.0
Consume adds another Message to itself. The given Msg is packed before adding it to the data. The data is moved - not copied! High priority mark is inherited.
func (*Msg) Debug ¶ added in v0.6.0
func (msg *Msg) Debug()
Debug registers the unit for debug output with the given source. Additional calls on the same unit update the unit source. StartDebugLog() must be called before calling DebugUnit().
type MsgType ¶
type MsgType uint8
MsgType is the message type for both terminals and operations.
const ( // MsgTypeInit is used to establish a new terminal or run a new operation. MsgTypeInit MsgType = 1 // MsgTypeData is used to send data to a terminal or operation. MsgTypeData MsgType = 2 // MsgTypePriorityData is used to send prioritized data to a terminal or operation. MsgTypePriorityData MsgType = 0 // MsgTypeStop is used to abandon a terminal or end an operation, with an optional error. MsgTypeStop MsgType = 3 )
type OneOffOperationBase ¶ added in v0.6.0
type OneOffOperationBase struct { OperationBase Result chan *Error }
OneOffOperationBase is an operation base for operations that just have one message and a error return.
func (*OneOffOperationBase) HandleStop ¶ added in v0.6.0
func (op *OneOffOperationBase) HandleStop(err *Error) (errorToSend *Error)
HandleStop gives the operation the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Stop() instead.
func (*OneOffOperationBase) Init ¶ added in v0.6.0
func (op *OneOffOperationBase) Init()
Init initializes the single operation base.
type Operation ¶
type Operation interface { // InitOperationBase initialize the operation with the ID and attached terminal. // Should not be overridden by implementations. InitOperationBase(t Terminal, opID uint32) // ID returns the ID of the operation. // Should not be overridden by implementations. ID() uint32 // Type returns the operation's type ID. // Should be overridden by implementations to return correct type ID. Type() string // Deliver delivers a message to the operation. // Meant to be overridden by implementations. Deliver(msg *Msg) *Error // NewMsg creates a new message from this operation. // Should not be overridden by implementations. NewMsg(data []byte) *Msg // Send sends a message to the other side. // Should not be overridden by implementations. Send(msg *Msg, timeout time.Duration) *Error // Flush sends all messages waiting in the terminal. // Should not be overridden by implementations. Flush(timeout time.Duration) // Stopped returns whether the operation has stopped. // Should not be overridden by implementations. Stopped() bool // Stop stops the operation by unregistering it from the terminal and calling HandleStop(). // Should not be overridden by implementations. Stop(self Operation, err *Error) // HandleStop gives the operation the ability to cleanly shut down. // The returned error is the error to send to the other side. // Should never be called directly. Call Stop() instead. // Meant to be overridden by implementations. HandleStop(err *Error) (errorToSend *Error) // Terminal returns the terminal the operation is linked to. // Should not be overridden by implementations. Terminal() Terminal // contains filtered or unexported methods }
Operation is an interface for all operations.
type OperationBase ¶ added in v0.6.0
type OperationBase struct {
// contains filtered or unexported fields
}
OperationBase provides the basic operation functionality.
func (*OperationBase) Deliver ¶ added in v0.6.0
func (op *OperationBase) Deliver(_ *Msg) *Error
Deliver delivers a message to the operation. Meant to be overridden by implementations.
func (*OperationBase) Flush ¶ added in v0.6.0
func (op *OperationBase) Flush(timeout time.Duration)
Flush sends all messages waiting in the terminal. Meant to be overridden by implementations.
func (*OperationBase) HandleStop ¶ added in v0.6.0
func (op *OperationBase) HandleStop(err *Error) (errorToSend *Error)
HandleStop gives the operation the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Stop() instead. Meant to be overridden by implementations.
func (*OperationBase) ID ¶ added in v0.6.0
func (op *OperationBase) ID() uint32
ID returns the ID of the operation. Should not be overridden by implementations.
func (*OperationBase) InitOperationBase ¶ added in v0.6.0
func (op *OperationBase) InitOperationBase(t Terminal, opID uint32)
InitOperationBase initialize the operation with the ID and attached terminal. Should not be overridden by implementations.
func (*OperationBase) NewEmptyMsg ¶ added in v0.6.0
func (op *OperationBase) NewEmptyMsg() *Msg
NewEmptyMsg creates a new empty message from this operation. Should not be overridden by implementations.
func (*OperationBase) NewMsg ¶ added in v0.6.0
func (op *OperationBase) NewMsg(data []byte) *Msg
NewMsg creates a new message from this operation. Should not be overridden by implementations.
func (*OperationBase) Send ¶ added in v0.6.0
func (op *OperationBase) Send(msg *Msg, timeout time.Duration) *Error
Send sends a message to the other side. Should not be overridden by implementations.
func (*OperationBase) Stop ¶ added in v0.6.0
func (op *OperationBase) Stop(self Operation, err *Error)
Stop stops the operation by unregistering it from the terminal and calling HandleStop(). Should not be overridden by implementations.
func (*OperationBase) Stopped ¶ added in v0.6.0
func (op *OperationBase) Stopped() bool
Stopped returns whether the operation has stopped. Should not be overridden by implementations.
func (*OperationBase) Terminal ¶ added in v0.6.0
func (op *OperationBase) Terminal() Terminal
Terminal returns the terminal the operation is linked to. Should not be overridden by implementations.
func (*OperationBase) Type ¶ added in v0.6.0
func (op *OperationBase) Type() string
Type returns the operation's type ID. Should be overridden by implementations to return correct type ID.
type OperationFactory ¶ added in v0.6.0
type OperationFactory struct { // Type is the type id of an operation. Type string // Requires defines the required permissions to run an operation. Requires Permission // Start is the function that starts a new operation. Start OperationStarter }
OperationFactory defines an operation factory.
type OperationStarter ¶ added in v0.6.0
type OperationStarter func(attachedTerminal Terminal, opID uint32, initData *container.Container) (Operation, *Error)
OperationStarter is used to initialize operations remotely.
type Permission ¶
type Permission uint16
Permission is a bit-map of granted permissions.
const ( NoPermission Permission = 0x0 MayExpand Permission = 0x1 MayConnect Permission = 0x2 IsHubOwner Permission = 0x100 IsHubAdvisor Permission = 0x200 IsCraneController Permission = 0x8000 )
Permissions.
func AddPermissions ¶
func AddPermissions(perms ...Permission) Permission
AddPermissions combines multiple permissions.
func (Permission) Has ¶
func (p Permission) Has(required Permission) bool
Has returns if the permission includes the specified permission.
type RateLimiter ¶ added in v0.6.0
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter is a data flow rate limiter.
func NewRateLimiter ¶ added in v0.6.0
func NewRateLimiter(mbits uint64) *RateLimiter
NewRateLimiter returns a new rate limiter. The given MBit/s are transformed to bytes, so giving a multiple of 8 is advised for accurate results.
func (*RateLimiter) Limit ¶ added in v0.6.0
func (rl *RateLimiter) Limit(xferBytes uint64)
Limit is given the current transferred bytes and blocks until they may be sent.
type Terminal ¶ added in v0.6.0
type Terminal interface { // ID returns the terminal ID. ID() uint32 // Ctx returns the terminal context. Ctx() context.Context // Deliver delivers a message to the terminal. // Should not be overridden by implementations. Deliver(msg *Msg) *Error // Send is used by others to send a message through the terminal. // Should not be overridden by implementations. Send(msg *Msg, timeout time.Duration) *Error // Flush sends all messages waiting in the terminal. // Should not be overridden by implementations. Flush(timeout time.Duration) // StartOperation starts the given operation by assigning it an ID and sending the given operation initialization data. // Should not be overridden by implementations. StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error // StopOperation stops the given operation. // Should not be overridden by implementations. StopOperation(op Operation, err *Error) // Abandon shuts down the terminal unregistering it from upstream and calling HandleAbandon(). // Should not be overridden by implementations. Abandon(err *Error) // HandleAbandon gives the terminal the ability to cleanly shut down. // The terminal is still fully functional at this point. // The returned error is the error to send to the other side. // Should never be called directly. Call Abandon() instead. // Meant to be overridden by implementations. HandleAbandon(err *Error) (errorToSend *Error) // HandleDestruction gives the terminal the ability to clean up. // The terminal has already fully shut down at this point. // Should never be called directly. Call Abandon() instead. // Meant to be overridden by implementations. HandleDestruction(err *Error) // FmtID formats the terminal ID (including parent IDs). // May be overridden by implementations. FmtID() string }
Terminal represents a terminal.
type TerminalBase ¶
type TerminalBase struct { // TODO: Fix maligned. Terminal // Interface check. // Abandoning indicates if the Terminal is being abandoned. The main handlers // will keep running until the context has been canceled by the abandon // procedure. // No new operations should be started. // Whoever initiates the abandoning must also start the abandon procedure. Abandoning *abool.AtomicBool // contains filtered or unexported fields }
TerminalBase contains the basic functions of a terminal.
func (*TerminalBase) Abandon ¶
func (t *TerminalBase) Abandon(err *Error)
Abandon shuts down the terminal unregistering it from upstream and calling HandleAbandon(). Should not be overridden by implementations.
func (*TerminalBase) Ctx ¶
func (t *TerminalBase) Ctx() context.Context
Ctx returns the Terminal's context.
func (*TerminalBase) DeleteActiveOp ¶
func (t *TerminalBase) DeleteActiveOp(opID uint32)
DeleteActiveOp deletes an active operation from the Terminal state.
func (*TerminalBase) Deliver ¶
func (t *TerminalBase) Deliver(msg *Msg) *Error
Deliver on TerminalBase only exists to conform to the interface. It must be overridden by an actual implementation.
func (*TerminalBase) Flush ¶
func (t *TerminalBase) Flush(timeout time.Duration)
Flush sends all data waiting to be sent.
func (*TerminalBase) FmtID ¶
func (t *TerminalBase) FmtID() string
FmtID formats the terminal ID together with the parent's ID.
func (*TerminalBase) GetActiveOp ¶
func (t *TerminalBase) GetActiveOp(opID uint32) (op Operation, ok bool)
GetActiveOp returns the active operation with the given ID from the Terminal state.
func (*TerminalBase) GetActiveOpCount ¶ added in v0.3.6
func (t *TerminalBase) GetActiveOpCount() int
GetActiveOpCount returns the amount of active operations.
func (*TerminalBase) GrantPermission ¶
func (t *TerminalBase) GrantPermission(grant Permission)
GrantPermission grants the specified permissions to the Terminal.
func (*TerminalBase) HandleAbandon ¶ added in v0.6.0
func (t *TerminalBase) HandleAbandon(err *Error) (errorToSend *Error)
HandleAbandon gives the terminal the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Abandon() instead. Meant to be overridden by implementations.
func (*TerminalBase) HandleDestruction ¶ added in v0.6.0
func (t *TerminalBase) HandleDestruction(err *Error)
HandleDestruction gives the terminal the ability to clean up. The terminal has already fully shut down at this point. Should never be called directly. Call Abandon() instead. Meant to be overridden by implementations.
func (*TerminalBase) Handler ¶
func (t *TerminalBase) Handler(_ context.Context) error
Handler receives and handles messages and must be started as a worker in the module where the Terminal is used.
func (*TerminalBase) HasPermission ¶
func (t *TerminalBase) HasPermission(required Permission) bool
HasPermission returns if the Terminal has the specified permission.
func (*TerminalBase) Send ¶ added in v0.6.0
func (t *TerminalBase) Send(msg *Msg, timeout time.Duration) *Error
Send sends data via this terminal. If a timeout is set, sending will fail after the given timeout passed.
func (*TerminalBase) Sender ¶
func (t *TerminalBase) Sender(_ context.Context) error
Sender handles sending messages and must be started as a worker in the module where the Terminal is used.
func (*TerminalBase) SetActiveOp ¶
func (t *TerminalBase) SetActiveOp(opID uint32, op Operation)
SetActiveOp saves an active operation to the Terminal state.
func (*TerminalBase) SetTerminalExtension ¶
func (t *TerminalBase) SetTerminalExtension(ext Terminal)
SetTerminalExtension sets the Terminal's extension. This function is not guarded and may only be used during initialization.
func (*TerminalBase) SetTimeout ¶
func (t *TerminalBase) SetTimeout(d time.Duration)
SetTimeout sets the Terminal's idle timeout duration. It is broken down into slots internally.
func (*TerminalBase) StartOperation ¶ added in v0.6.0
func (t *TerminalBase) StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error
StartOperation starts the given operation by assigning it an ID and sending the given operation initialization data.
func (*TerminalBase) StartWorkers ¶ added in v0.4.8
func (t *TerminalBase) StartWorkers(m *modules.Module, terminalName string)
StartWorkers starts the necessary workers to operate the Terminal.
func (*TerminalBase) StopOperation ¶ added in v0.6.0
func (t *TerminalBase) StopOperation(op Operation, err *Error)
StopOperation sends the end signal with an optional error and then deletes the operation from the Terminal state and calls HandleStop() on the Operation.
func (*TerminalBase) WaitForFlush ¶
func (t *TerminalBase) WaitForFlush()
WaitForFlush makes the terminal pause all sending until the next call to Flush().
type TerminalOpts ¶
type TerminalOpts struct { Version uint8 `json:"-"` Encrypt bool `json:"e,omitempty"` Padding uint16 `json:"p,omitempty"` FlowControl FlowControlType `json:"fc,omitempty"` FlowControlSize uint32 `json:"qs,omitempty"` // Previously was "QueueSize". UsePriorityDataMsgs bool `json:"pr,omitempty"` }
TerminalOpts holds configuration for the terminal.
func DefaultCraneControllerOpts ¶ added in v0.4.9
func DefaultCraneControllerOpts() *TerminalOpts
DefaultCraneControllerOpts returns the default terminal options for a crane controller terminal.
func DefaultExpansionTerminalOpts ¶ added in v0.4.9
func DefaultExpansionTerminalOpts() *TerminalOpts
DefaultExpansionTerminalOpts returns the default terminal options for an expansion terminal.
func DefaultHomeHubTerminalOpts ¶ added in v0.4.9
func DefaultHomeHubTerminalOpts() *TerminalOpts
DefaultHomeHubTerminalOpts returns the default terminal options for a crane terminal used for the home hub.
func (*TerminalOpts) Check ¶ added in v0.4.8
func (opts *TerminalOpts) Check(useDefaultsForRequired bool) *Error
Check checks if terminal options are valid.
type TestTerminal ¶ added in v0.3.11
type TestTerminal struct {
*TerminalBase
}
TestTerminal is a terminal for running tests.
func NewSimpleTestTerminalPair ¶ added in v0.3.11
func NewSimpleTestTerminalPair(delay time.Duration, delayQueueSize int, opts *TerminalOpts) (a, b *TestTerminal, err error)
NewSimpleTestTerminalPair provides a simple conntected terminal pair for tests.
func (*TestTerminal) HandleAbandon ¶ added in v0.6.0
func (t *TestTerminal) HandleAbandon(err *Error) (errorToSend *Error)
HandleAbandon gives the terminal the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Abandon() instead.
type UpstreamSendFunc ¶ added in v0.6.0
UpstreamSendFunc is a helper to be able to satisfy the Upstream interface.