terminal

package
v0.3.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2022 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultQueueSize = 50000
	MaxQueueSize     = 1000000
)
View Source
const CounterOpType string = "debug/count"
View Source
const (
	// DefaultOperationTimeout is the default time duration after which an idle
	// operation times out and is ended or regarded as failed.
	DefaultOperationTimeout = 10 * time.Second
)

Variables

View Source
var (
	// ErrUnknownError is the default error.
	ErrUnknownError = registerError(0, errors.New("unknown error"))

	ErrStopping    = registerError(2, errors.New("stopping"))
	ErrExplicitAck = registerError(3, errors.New("explicit ack"))

	ErrInternalError          = registerError(8, errors.New("internal error"))
	ErrMalformedData          = registerError(9, errors.New("malformed data"))
	ErrUnexpectedMsgType      = registerError(10, errors.New("unexpected message type"))
	ErrUnknownOperationType   = registerError(11, errors.New("unknown operation type"))
	ErrUnknownOperationID     = registerError(12, errors.New("unknown operation id"))
	ErrPermissinDenied        = registerError(13, errors.New("permission denied"))
	ErrIntegrity              = registerError(14, errors.New("integrity violated"))
	ErrInvalidOptions         = registerError(15, errors.New("invalid options"))
	ErrHubNotReady            = registerError(16, errors.New("hub not ready"))
	ErrIncorrectUsage         = registerError(22, errors.New("incorrect usage"))
	ErrTimeout                = registerError(62, errors.New("timed out"))
	ErrUnsupportedVersion     = registerError(93, errors.New("unsupported version"))
	ErrHubUnavailable         = registerError(101, errors.New("hub unavailable"))
	ErrShipSunk               = registerError(108, errors.New("ship sunk"))
	ErrDestinationUnavailable = registerError(113, errors.New("destination unavailable"))
	ErrTryAgainLater          = registerError(114, errors.New("try again later"))
	ErrConnectionError        = registerError(121, errors.New("connection error"))
	ErrQueueOverflow          = registerError(122, errors.New("queue overflowed"))
	ErrCanceled               = registerError(125, context.Canceled)
)

Terminal Errors.

Functions

func AddIDType

func AddIDType(c *container.Container, id uint32, msgType MsgType)

AddIDType prepends the ID and Type header to the message.

func MakeMsg

func MakeMsg(c *container.Container, id uint32, msgType MsgType)

MakeMsg prepends the ID and Type header and the length of the message.

func NewCounterOp

func NewCounterOp(t OpTerminal, opts CounterOpts) (*CounterOp, *Error)

func NewLocalBaseTerminal

func NewLocalBaseTerminal(
	ctx context.Context,
	id uint32,
	parentID string,
	remoteHub *hub.Hub,
	initMsg *TerminalOpts,
) (
	t *TerminalBase,
	initData *container.Container,
	err *Error,
)

func NewLocalTestTerminal added in v0.3.11

func NewLocalTestTerminal(
	ctx context.Context,
	id uint32,
	parentID string,
	remoteHub *hub.Hub,
	initMsg *TerminalOpts,
	submitUpstream func(*container.Container),
) (*TestTerminal, *container.Container, *Error)

func NewRemoteBaseTerminal

func NewRemoteBaseTerminal(
	ctx context.Context,
	id uint32,
	parentID string,
	identity *cabin.Identity,
	initData *container.Container,
) (
	t *TerminalBase,
	initMsg *TerminalOpts,
	err *Error,
)

func NewRemoteTestTerminal added in v0.3.11

func NewRemoteTestTerminal(
	ctx context.Context,
	id uint32,
	parentID string,
	identity *cabin.Identity,
	initData *container.Container,
	submitUpstream func(*container.Container),
) (*TestTerminal, *TerminalOpts, *Error)

func ParseTerminalOpts

func ParseTerminalOpts(c *container.Container) (*TerminalOpts, *Error)

func RegisterOpType

func RegisterOpType(params OpParams)

RegisterOpType registeres a new operation type and may only be called during Go's init and a module's prep phase.

Types

type AuthorizingTerminal

type AuthorizingTerminal interface {
	GrantPermission(grant Permission)
	HasPermission(required Permission) bool
}

type CounterOp

type CounterOp struct {
	ClientCounter uint64
	ServerCounter uint64

	Error error
	// contains filtered or unexported fields
}

func (*CounterOp) CounterWorker

func (op *CounterOp) CounterWorker(ctx context.Context) error

func (*CounterOp) Deliver

func (op *CounterOp) Deliver(data *container.Container) *Error

func (*CounterOp) End

func (op *CounterOp) End(err *Error)

func (*CounterOp) HasEnded

func (op *CounterOp) HasEnded(end bool) bool

func (*CounterOp) ID

func (op *CounterOp) ID() uint32

func (*CounterOp) SendCounter

func (op *CounterOp) SendCounter() *Error

func (*CounterOp) SetID

func (op *CounterOp) SetID(id uint32)

func (*CounterOp) Type

func (op *CounterOp) Type() string

func (*CounterOp) Wait

func (op *CounterOp) Wait()

type CounterOpts

type CounterOpts struct {
	ClientCountTo uint64
	ServerCountTo uint64
	Flush         bool
	Wait          time.Duration
	// contains filtered or unexported fields
}

type DuplexFlowQueue

type DuplexFlowQueue struct {
	// contains filtered or unexported fields
}

func NewDuplexFlowQueue

func NewDuplexFlowQueue(
	ti TerminalInterface,
	queueSize uint32,
	submitUpstream func(*container.Container),
) *DuplexFlowQueue

func (*DuplexFlowQueue) Deliver

func (dfq *DuplexFlowQueue) Deliver(c *container.Container) *Error

Deliver submits a container for receiving from upstream.

func (*DuplexFlowQueue) FlowHandler

func (dfq *DuplexFlowQueue) FlowHandler(_ context.Context) error

FlowHandler handles all flow queue internals and must be started as a worker in the module where it is used.

func (*DuplexFlowQueue) FlowStats

func (dfq *DuplexFlowQueue) FlowStats() string

FlowStats returns a k=v formatted string of internal stats.

func (*DuplexFlowQueue) Flush

func (dfq *DuplexFlowQueue) Flush()

Flush waits for all waiting data to be sent.

func (*DuplexFlowQueue) ReadyToSend

func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{}

Send adds the given container to the send queue.

func (*DuplexFlowQueue) Receive

func (dfq *DuplexFlowQueue) Receive() <-chan *container.Container

Receive receives a container from the recv queue.

func (*DuplexFlowQueue) Send

func (dfq *DuplexFlowQueue) Send(c *container.Container) *Error

Send adds the given container to the send queue.

func (*DuplexFlowQueue) SendRaw

func (dfq *DuplexFlowQueue) SendRaw(c *container.Container) *Error

SendRaw sends the given raw data without any further processing.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error is a terminal error.

func NewExternalError

func NewExternalError(id uint8) *Error

NewExternalError creates an external error based on the given ID.

func ParseExternalError

func ParseExternalError(id []byte) (*Error, error)

NewExternalError creates an external error based on the given serialized ID.

func (*Error) AsExternal

func (e *Error) AsExternal() *Error

AsExternal creates and returns an external version of the error.

func (*Error) Error

func (e *Error) Error() string

Error returns the human readable format of the error.

func (*Error) ID

func (e *Error) ID() uint8

ID returns the internal ID of the error.

func (*Error) Is

func (e *Error) Is(target error) bool

Is returns whether the given error is of the same type.

func (*Error) IsError added in v0.3.11

func (e *Error) IsError() bool

func (*Error) IsExternal

func (e *Error) IsExternal() bool

IsExternal returns whether the error occurred externally.

func (*Error) IsOK added in v0.3.11

func (e *Error) IsOK() bool

func (*Error) Pack

func (e *Error) Pack() []byte

Pack returns the serialized internal error ID. The additional message is lost and is replaced with the default message upon parsing.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the wrapped error.

func (*Error) With

func (e *Error) With(format string, a ...interface{}) *Error

With adds context and details where the error occurred. The provided message is appended to the error. A new error with the same ID is returned and must be compared with errors.Is().

func (*Error) Wrap

func (e *Error) Wrap(format string, a ...interface{}) *Error

Wrap adds context higher up in the call chain. The provided message is prepended to the error. A new error with the same ID is returned and must be compared with errors.Is().

type MsgType

type MsgType uint8
const (
	// MsgTypeInit is used to establish a new terminal or run a new operation.
	MsgTypeInit MsgType = 1

	// MsgTypeData is used to send data to a terminal or operation.
	MsgTypeData MsgType = 2

	// MsgTypeStop is used to abandon a terminal or end an operation, with an optional error.
	MsgTypeStop MsgType = 3
)

func ParseIDType

func ParseIDType(c *container.Container) (id uint32, msgType MsgType, err error)

type OpBase

type OpBase struct {
	// contains filtered or unexported fields
}

func (*OpBase) HasEnded

func (op *OpBase) HasEnded(end bool) bool

func (*OpBase) ID

func (op *OpBase) ID() uint32

func (*OpBase) Init

func (op *OpBase) Init()

func (*OpBase) SetID

func (op *OpBase) SetID(id uint32)

type OpBaseRequest

type OpBaseRequest struct {
	OpBase

	Delivered chan *container.Container
	Ended     chan *Error
}

func (*OpBaseRequest) Deliver

func (op *OpBaseRequest) Deliver(data *container.Container) *Error

func (*OpBaseRequest) End

func (op *OpBaseRequest) End(err *Error)

func (*OpBaseRequest) Init

func (op *OpBaseRequest) Init(deliverQueueSize int)

type OpParams

type OpParams struct {
	// Type is the type name of an operation.
	Type string
	// Requires defines the required permissions to run an operation.
	Requires Permission
	// RunOp is the function that start a new operation.
	RunOp OpRunner
}

type OpRunner

type OpRunner func(t OpTerminal, opID uint32, initData *container.Container) (Operation, *Error)

type OpTerminal

type OpTerminal interface {
	// OpInit initialized the operation with the given data.
	OpInit(op Operation, data *container.Container) *Error

	// OpSend sends data.
	OpSend(op Operation, data *container.Container) *Error

	// OpSendWithTimeout sends data, but fails after the given timeout passed.
	OpSendWithTimeout(op Operation, data *container.Container, timeout time.Duration) *Error

	// OpEnd sends the end signal and calls End(ErrNil) on the Operation.
	// The Operation should cease operation after calling this function.
	OpEnd(op Operation, err *Error)

	// FmtID returns the formatted ID the Operation's Terminal.
	FmtID() string

	// Flush writes all pending data waiting to be sent.
	Flush()
}

OpTerminal provides Operations with the necessary interface to interact with the Terminal.

type Operation

type Operation interface {
	ID() uint32
	SetID(id uint32)
	Type() string
	Deliver(data *container.Container) *Error
	HasEnded(end bool) bool
	End(err *Error)
}

type Permission

type Permission uint16
const (
	NoPermission      Permission = 0x0
	MayExpand         Permission = 0x1
	MayConnect        Permission = 0x2
	IsHubOwner        Permission = 0x100
	IsHubAdvisor      Permission = 0x200
	IsCraneController Permission = 0x8000
)

func AddPermissions

func AddPermissions(perms ...Permission) Permission

AddPermissions combines multiple permissions.

func (Permission) Has

func (p Permission) Has(required Permission) bool

Has returns if the permission includes the specified permission.

type TerminalBase

type TerminalBase struct {

	// Abandoned indicates if the Terminal has been abandoned. Whoever abandoned
	// the terminal already took care of notifying everyone, so a silent fail is
	// normally the best response.
	Abandoned *abool.AtomicBool
	// contains filtered or unexported fields
}

TerminalBase contains the basic functions of a terminal.

func (*TerminalBase) Abandon

func (t *TerminalBase) Abandon(err *Error)

Abandon abandons the Terminal with the given error.

func (*TerminalBase) Ctx

func (t *TerminalBase) Ctx() context.Context

Ctx returns the Terminal's context.

func (*TerminalBase) DeleteActiveOp

func (t *TerminalBase) DeleteActiveOp(opID uint32)

DeleteActiveOp deletes an active operation from the Terminal state.

func (*TerminalBase) Deliver

func (t *TerminalBase) Deliver(c *container.Container) *Error

Deliver on TerminalBase only exists to conform to the interface. It must be overridden by an actual implementation.

func (*TerminalBase) Flush

func (t *TerminalBase) Flush()

Flush sends all data waiting to be sent.

func (*TerminalBase) FmtID

func (t *TerminalBase) FmtID() string

func (*TerminalBase) GetActiveOp

func (t *TerminalBase) GetActiveOp(opID uint32) (op Operation, ok bool)

GetActiveOp returns the active operation with the given ID from the Terminal state.

func (*TerminalBase) GetActiveOpCount added in v0.3.6

func (t *TerminalBase) GetActiveOpCount() int

GetActiveOpCount returns the amount of active operations.

func (*TerminalBase) GrantPermission

func (t *TerminalBase) GrantPermission(grant Permission)

GrandPermission grants the specified permissions to the Terminal.

func (*TerminalBase) Handler

func (t *TerminalBase) Handler(_ context.Context) error

Handler receives and handles messages and must be started as a worker in the module where the Terminal is used.

func (*TerminalBase) HasPermission

func (t *TerminalBase) HasPermission(required Permission) bool

HasPermission returns if the Terminal has the specified permission.

func (*TerminalBase) ID

func (t *TerminalBase) ID() uint32

ID returns the Terminal's ID.

func (*TerminalBase) OpEnd

func (t *TerminalBase) OpEnd(op Operation, err *Error)

OpEnd sends the end signal with an optional error and then deletes the operation from the Terminal state and calls End(ErrNil) on the Operation. The Operation should cease operation after calling this function. Should only be called by an operation.

func (*TerminalBase) OpInit

func (t *TerminalBase) OpInit(op Operation, data *container.Container) *Error

OpInit initialized the operation with the given data.

func (*TerminalBase) OpSend

func (t *TerminalBase) OpSend(op Operation, data *container.Container) *Error

OpSend sends data.

func (*TerminalBase) OpSendWithTimeout

func (t *TerminalBase) OpSendWithTimeout(op Operation, data *container.Container, timeout time.Duration) *Error

OpSendWithTimeout sends data, but fails after the given timeout passed.

func (*TerminalBase) Sender

func (t *TerminalBase) Sender(_ context.Context) error

Sender handles sending messages and must be started as a worker in the module where the Terminal is used.

func (*TerminalBase) SetActiveOp

func (t *TerminalBase) SetActiveOp(opID uint32, op Operation)

SetActiveOp saves an active operation to the Terminal state.

func (*TerminalBase) SetTerminalExtension

func (t *TerminalBase) SetTerminalExtension(ext TerminalExtension)

SetTerminalExtension sets the Terminal's extension. This function is not guarded and may only be used during initialization.

func (*TerminalBase) SetTimeout

func (t *TerminalBase) SetTimeout(d time.Duration)

SetTimeout sets the Terminal's idle timeout duration. It is broken down into slots internally.

func (*TerminalBase) Shutdown

func (t *TerminalBase) Shutdown(err *Error, sendError bool)

Shutdown sends a stop message with the given error (if it is external) and ends all operations with a nil error and finally cancels the terminal context. This function is usually not called directly, but at the end of an Abandon() implementation.

func (*TerminalBase) SubmitAsDataMsg

func (t *TerminalBase) SubmitAsDataMsg(submitFunc func(*container.Container)) func(*container.Container)

SubmitAsDataMsg wraps the given submit function to call MakeMsg on the data before submitting.

func (*TerminalBase) WaitForFlush

func (t *TerminalBase) WaitForFlush()

WaitForFlush makes the terminal pause all sending until the next call to Flush().

type TerminalExtension

type TerminalExtension interface {
	OpTerminal

	ReadyToSend() <-chan struct{}
	Send(c *container.Container) *Error
	SendRaw(c *container.Container) *Error
	Receive() <-chan *container.Container
	Abandon(err *Error)
}

type TerminalInterface

type TerminalInterface interface {
	ID() uint32
	Ctx() context.Context
	Deliver(c *container.Container) *Error
	Abandon(err *Error)
	FmtID() string
	Flush()
}

type TerminalOpts

type TerminalOpts struct {
	Version   uint8  `json:"-"`
	QueueSize uint32 `json:"qs,omitempty"`
	Padding   uint16 `json:"p,omitempty"`
	Encrypt   bool   `json:"e,omitempty"`
}

TerminalOpts holds configuration for the terminal.

func (*TerminalOpts) Pack

func (opts *TerminalOpts) Pack() (*container.Container, *Error)

type TestTerminal added in v0.3.11

type TestTerminal struct {
	*TerminalBase
	*DuplexFlowQueue
}

func NewSimpleTestTerminalPair added in v0.3.11

func NewSimpleTestTerminalPair(delay time.Duration, opts *TerminalOpts) (a, b *TestTerminal, err error)

func (*TestTerminal) Abandon added in v0.3.11

func (t *TestTerminal) Abandon(err *Error)

func (*TestTerminal) Flush added in v0.3.11

func (t *TestTerminal) Flush()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL