channel

package
v0.16.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2022 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FlagClosed    = 0
	FlagRxStarted = 1
)
View Source
const (
	ConnectionIdHeader              = 0
	ReplyForHeader                  = 1
	ResultSuccessHeader             = 2
	HelloRouterAdvertisementsHeader = 3
	HelloVersionHeader              = 4

	// Headers in the range 128-255 inclusive will be reflected when creating replies
	ReflectedHeaderBitMask = 1 << 7
	MaxReflectedHeader     = (1 << 8) - 1
)

*

  • Message headers notes
  • 0-127 reserved for channel
  • 128-255 reserved for headers that need to be reflected back to sender on responses
  • 128 is used for a message UUID for tracing
  • 1000-1099 reserved for edge messages
  • 1100-1199 is reserved for control plane messages
  • 2000-2500 is reserved for xgress messages
  • 2000-2255 is reserved for xgress implementation headers
View Source
const (
	ContentTypeHelloType           = 0
	ContentTypePingType            = 1
	ContentTypeResultType          = 2
	ContentTypeLatencyType         = 3
	ContentTypeLatencyResponseType = 4
)
View Source
const (
	Highest  = 0
	High     = 1024
	Standard = 4096
	Low      = 10240
)
View Source
const AnyContentType = -1
View Source
const DECODER = "channel"
View Source
const DecoderFieldName = "__decoder__"
View Source
const HelloSequence = -1
View Source
const MessageFieldName = "__message__"

Variables

View Source
var ListenerClosedError = listenerClosedError{}
View Source
var UnknownVersionError = errors.New("channel synchronization error, bad magic number")

Functions

func AcceptNextChannel

func AcceptNextChannel(logicalName string, underlayFactory UnderlayFactory, options *Options, tcfg transport.Configuration) error

func DecodeTraceAndFormat

func DecodeTraceAndFormat(decode []byte) string

func NewErrorContext

func NewErrorContext(err error) context.Context

func NewReceiveHandlerCopyOnWriteMap

func NewReceiveHandlerCopyOnWriteMap() *receiveHandlerCopyOnWriteMap

func NewWSListener

func NewWSListener(peer transport.Connection) *wsListener

Types

type AsyncFunctionReceiveAdapter

type AsyncFunctionReceiveAdapter struct {
	Type    int32
	Handler func(*Message, Channel)
}

func (*AsyncFunctionReceiveAdapter) ContentType

func (adapter *AsyncFunctionReceiveAdapter) ContentType() int32

func (*AsyncFunctionReceiveAdapter) HandleReceive

func (adapter *AsyncFunctionReceiveAdapter) HandleReceive(m *Message, ch Channel)

type BindHandler

type BindHandler interface {
	BindChannel(ch Channel) error
}

type Binding

type Binding interface {
	Bind(h BindHandler) error
	AddPeekHandler(h PeekHandler)
	AddTransformHandler(h TransformHandler)
	AddReceiveHandler(h ReceiveHandler)
	AddErrorHandler(h ErrorHandler)
	AddCloseHandler(h CloseHandler)
	SetUserData(data interface{})
	GetUserData() interface{}
}

Binding is used to add handlers to Channel.

NOTE: It is intended that the Add* methods are used at initial channel setup, and not invoked on an in-service Channel. This API may change in the future to enforce those semantics programmatically.

type Channel

type Channel interface {
	Identity
	SetLogicalName(logicalName string)
	Binding
	Sender
	io.Closer
	IsClosed() bool
	Underlay() Underlay
	StartRx()
	GetTimeSinceLastRead() time.Duration
}

Channel represents an asyncronous, message-passing framework, designed to sit on top of an underlay.

func NewChannel

func NewChannel(logicalName string, underlayFactory UnderlayFactory, options *Options) (Channel, error)

func NewChannelWithTransportConfiguration

func NewChannelWithTransportConfiguration(logicalName string, underlayFactory UnderlayFactory, options *Options, tcfg transport.Configuration) (Channel, error)

type CloseHandler

type CloseHandler interface {
	HandleClose(ch Channel)
}

type ConnectOptions

type ConnectOptions struct {
	MaxQueuedConnects      int
	MaxOutstandingConnects int
	ConnectTimeoutMs       int
}

func DefaultConnectOptions

func DefaultConnectOptions() ConnectOptions

func (*ConnectOptions) ConnectTimeout

func (co *ConnectOptions) ConnectTimeout() time.Duration

func (*ConnectOptions) Validate

func (co *ConnectOptions) Validate() error

type ConnectionHandler

type ConnectionHandler interface {
	HandleConnection(hello *Hello, certificates []*x509.Certificate) error
}

type Decoder

type Decoder struct{}

func (Decoder) Decode

func (d Decoder) Decode(msg *Message) ([]byte, bool)

type ErrorContext

type ErrorContext struct {
	// contains filtered or unexported fields
}

func (*ErrorContext) Deadline

func (self *ErrorContext) Deadline() (deadline time.Time, ok bool)

func (*ErrorContext) Done

func (self *ErrorContext) Done() <-chan struct{}

func (*ErrorContext) Err

func (self *ErrorContext) Err() error

func (*ErrorContext) Value

func (self *ErrorContext) Value(key interface{}) interface{}

type ErrorHandler

type ErrorHandler interface {
	HandleError(err error, ch Channel)
}

type ErrorSendContext

type ErrorSendContext struct {
	// contains filtered or unexported fields
}

func (*ErrorSendContext) Context

func (self *ErrorSendContext) Context() context.Context

func (*ErrorSendContext) Msg

func (self *ErrorSendContext) Msg() *Message

func (*ErrorSendContext) NotifyAfterWrite

func (self *ErrorSendContext) NotifyAfterWrite()

func (*ErrorSendContext) NotifyBeforeWrite

func (self *ErrorSendContext) NotifyBeforeWrite()

func (*ErrorSendContext) NotifyErr

func (self *ErrorSendContext) NotifyErr(error)

func (*ErrorSendContext) Priority

func (self *ErrorSendContext) Priority() Priority

func (*ErrorSendContext) ReplyChan

func (self *ErrorSendContext) ReplyChan() chan<- *Message

func (*ErrorSendContext) WithPriority

func (self *ErrorSendContext) WithPriority(Priority) SendContext

func (*ErrorSendContext) WithTimeout

func (self *ErrorSendContext) WithTimeout(duration time.Duration) TimeoutSendContext

type FunctionReceiveAdapter

type FunctionReceiveAdapter struct {
	Type    int32
	Handler func(*Message, Channel)
}

func (*FunctionReceiveAdapter) ContentType

func (adapter *FunctionReceiveAdapter) ContentType() int32

func (*FunctionReceiveAdapter) HandleReceive

func (adapter *FunctionReceiveAdapter) HandleReceive(m *Message, ch Channel)

type Headers

type Headers map[int32][]byte

func (Headers) GetBoolHeader

func (self Headers) GetBoolHeader(key int32) (bool, bool)

func (Headers) GetByteHeader

func (self Headers) GetByteHeader(key int32) (byte, bool)

func (Headers) GetStringHeader

func (self Headers) GetStringHeader(key int32) (string, bool)

func (Headers) GetUint16Header

func (self Headers) GetUint16Header(key int32) (uint16, bool)

func (Headers) GetUint32Header

func (self Headers) GetUint32Header(key int32) (uint32, bool)

func (Headers) GetUint64Header

func (self Headers) GetUint64Header(key int32) (uint64, bool)

func (Headers) PutBoolHeader

func (self Headers) PutBoolHeader(key int32, value bool)

func (Headers) PutByteHeader

func (self Headers) PutByteHeader(key int32, value byte)

func (Headers) PutStringHeader

func (self Headers) PutStringHeader(key int32, value string)

func (Headers) PutUint16Header

func (self Headers) PutUint16Header(key int32, value uint16)

func (Headers) PutUint32Header

func (self Headers) PutUint32Header(key int32, value uint32)

func (Headers) PutUint64Header

func (self Headers) PutUint64Header(key int32, value uint64)

type Hello

type Hello struct {
	IdToken string
	Headers map[int32][]byte
}

func UnmarshalHello

func UnmarshalHello(message *Message) *Hello

type Identity

type Identity interface {
	// The TokenId used to represent the identity of this channel to lower-level resources.
	//
	Id() *identity.TokenId

	// The LogicalName represents the purpose or usage of this channel (i.e. 'ctrl', 'mgmt' 'r/001', etc.) Usually used
	// by humans in understand the logical purpose of a channel.
	//
	LogicalName() string

	// The ConnectionId represents the identity of this Channel to internal API components ("instance identifier").
	// Usually used by the Channel framework to differentiate Channel instances.
	//
	ConnectionId() string

	// Certificates contains the identity certificates provided by the peer.
	//
	Certificates() []*x509.Certificate

	// Label constructs a consistently-formatted string used for context logging purposes, from the components above.
	//
	Label() string
}

type LatencyHandler

type LatencyHandler struct {
	// contains filtered or unexported fields
}

LatencyHandler responds to latency messages with Result messages.

func (*LatencyHandler) ContentType

func (h *LatencyHandler) ContentType() int32

func (*LatencyHandler) HandleReceive

func (h *LatencyHandler) HandleReceive(msg *Message, ch Channel)

type MemoryContext

type MemoryContext struct {
	// contains filtered or unexported fields
}

func NewMemoryContext

func NewMemoryContext() *MemoryContext

type Message

type Message struct {
	MessageHeader
	Body []byte
}

func NewHello

func NewHello(idToken string, attributes map[int32][]byte) *Message

func NewMessage

func NewMessage(contentType int32, body []byte) *Message

func NewResult

func NewResult(success bool, message string) *Message

func ReadWSMessage

func ReadWSMessage(peer io.Reader) (*Message, error)

func (*Message) Context

func (m *Message) Context() context.Context

func (*Message) Msg

func (m *Message) Msg() *Message

func (*Message) NotifyAfterWrite

func (m *Message) NotifyAfterWrite()

func (*Message) NotifyBeforeWrite

func (m *Message) NotifyBeforeWrite()

func (*Message) NotifyErr

func (m *Message) NotifyErr(error)

func (*Message) Priority

func (m *Message) Priority() Priority

func (*Message) ReplyChan

func (m *Message) ReplyChan() chan<- *Message

func (*Message) ReplyTo

func (m *Message) ReplyTo(o *Message)

func (*Message) String

func (m *Message) String() string

func (*Message) WithPriority

func (m *Message) WithPriority(p Priority) SendContext

func (*Message) WithTimeout

func (m *Message) WithTimeout(duration time.Duration) TimeoutSendContext

type MessageHeader

type MessageHeader struct {
	ContentType int32

	Headers Headers
	// contains filtered or unexported fields
}

func (*MessageHeader) GetBoolHeader

func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)

func (*MessageHeader) GetByteHeader

func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)

func (*MessageHeader) GetStringHeader

func (header *MessageHeader) GetStringHeader(key int32) (string, bool)

func (*MessageHeader) GetUint16Header

func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)

func (*MessageHeader) GetUint32Header

func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)

func (*MessageHeader) GetUint64Header

func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)

func (*MessageHeader) IsReply

func (header *MessageHeader) IsReply() bool

func (*MessageHeader) IsReplyingTo

func (header *MessageHeader) IsReplyingTo(sequence int32) bool

func (*MessageHeader) PutBoolHeader

func (header *MessageHeader) PutBoolHeader(key int32, value bool)

func (*MessageHeader) PutByteHeader

func (header *MessageHeader) PutByteHeader(key int32, value byte)

func (*MessageHeader) PutStringHeader

func (header *MessageHeader) PutStringHeader(key int32, value string)

func (*MessageHeader) PutUint16Header

func (header *MessageHeader) PutUint16Header(key int32, value uint16)

func (*MessageHeader) PutUint32Header

func (header *MessageHeader) PutUint32Header(key int32, value uint32)

func (*MessageHeader) PutUint64Header

func (header *MessageHeader) PutUint64Header(key int32, value uint64)

func (*MessageHeader) ReplyFor

func (header *MessageHeader) ReplyFor() int32

func (*MessageHeader) Sequence

func (header *MessageHeader) Sequence() int32

type MutableSendContext

type MutableSendContext struct {
	*Message
	// contains filtered or unexported fields
}

func (*MutableSendContext) Context

func (self *MutableSendContext) Context() context.Context

func (*MutableSendContext) Priority

func (self *MutableSendContext) Priority() Priority

func (*MutableSendContext) SendAndWaitForWire

func (self *MutableSendContext) SendAndWaitForWire(ch Channel) error

func (*MutableSendContext) SendForReply

func (self *MutableSendContext) SendForReply(ch Channel) (*Message, error)

func (*MutableSendContext) SendForTypedReply

func (self *MutableSendContext) SendForTypedReply(ch Channel, result TypedMessage) error

func (*MutableSendContext) WithPriority

func (self *MutableSendContext) WithPriority(p Priority) SendContext

func (*MutableSendContext) WithTimeout

func (self *MutableSendContext) WithTimeout(duration time.Duration) TimeoutSendContext

type NoopTestChannel

type NoopTestChannel struct {
}

func (*NoopTestChannel) AddCloseHandler

func (ch *NoopTestChannel) AddCloseHandler(CloseHandler)

func (*NoopTestChannel) AddErrorHandler

func (ch *NoopTestChannel) AddErrorHandler(ErrorHandler)

func (*NoopTestChannel) AddPeekHandler

func (ch *NoopTestChannel) AddPeekHandler(PeekHandler)

func (*NoopTestChannel) AddReceiveHandler

func (ch *NoopTestChannel) AddReceiveHandler(ReceiveHandler)

func (*NoopTestChannel) AddTransformHandler

func (ch *NoopTestChannel) AddTransformHandler(TransformHandler)

func (*NoopTestChannel) Bind

func (ch *NoopTestChannel) Bind(BindHandler) error

func (*NoopTestChannel) Certificates

func (ch *NoopTestChannel) Certificates() []*x509.Certificate

func (*NoopTestChannel) Close

func (ch *NoopTestChannel) Close() error

func (*NoopTestChannel) ConnectionId

func (ch *NoopTestChannel) ConnectionId() string

func (*NoopTestChannel) GetTimeSinceLastRead

func (ch *NoopTestChannel) GetTimeSinceLastRead() time.Duration

func (*NoopTestChannel) GetUserData

func (ch *NoopTestChannel) GetUserData() interface{}

func (*NoopTestChannel) Id

func (ch *NoopTestChannel) Id() *identity.TokenId

func (*NoopTestChannel) IsClosed

func (ch *NoopTestChannel) IsClosed() bool

func (*NoopTestChannel) Label

func (ch *NoopTestChannel) Label() string

func (*NoopTestChannel) LogicalName

func (ch *NoopTestChannel) LogicalName() string

func (*NoopTestChannel) Send

func (ch *NoopTestChannel) Send(*Message) error

func (*NoopTestChannel) SendAndSync

func (ch *NoopTestChannel) SendAndSync(m *Message) (chan error, error)

func (*NoopTestChannel) SendAndSyncWithPriority

func (ch *NoopTestChannel) SendAndSyncWithPriority(*Message, Priority) (chan error, error)

func (*NoopTestChannel) SendAndWait

func (ch *NoopTestChannel) SendAndWait(*Message) (chan *Message, error)

func (*NoopTestChannel) SendAndWaitWithPriority

func (ch *NoopTestChannel) SendAndWaitWithPriority(*Message, Priority) (chan *Message, error)

func (*NoopTestChannel) SendAndWaitWithTimeout

func (ch *NoopTestChannel) SendAndWaitWithTimeout(*Message, time.Duration) (*Message, error)

func (*NoopTestChannel) SendForReply

func (ch *NoopTestChannel) SendForReply(TypedMessage, time.Duration) (*Message, error)

func (*NoopTestChannel) SendForReplyAndDecode

func (ch *NoopTestChannel) SendForReplyAndDecode(TypedMessage, time.Duration, TypedMessage) error

func (*NoopTestChannel) SendPrioritizedAndWaitWithTimeout

func (ch *NoopTestChannel) SendPrioritizedAndWaitWithTimeout(*Message, Priority, time.Duration) (*Message, error)

func (*NoopTestChannel) SendPrioritizedWithTimeout

func (ch *NoopTestChannel) SendPrioritizedWithTimeout(*Message, Priority, time.Duration) error

func (*NoopTestChannel) SendWithPriority

func (ch *NoopTestChannel) SendWithPriority(*Message, Priority) error

func (*NoopTestChannel) SendWithTimeout

func (ch *NoopTestChannel) SendWithTimeout(*Message, time.Duration) error

func (*NoopTestChannel) SetLogicalName

func (ch *NoopTestChannel) SetLogicalName(string)

func (*NoopTestChannel) SetUserData

func (ch *NoopTestChannel) SetUserData(interface{})

func (*NoopTestChannel) StartRx

func (ch *NoopTestChannel) StartRx()

func (*NoopTestChannel) Underlay

func (ch *NoopTestChannel) Underlay() Underlay

type Options

type Options struct {
	OutQueueSize int
	BindHandlers []BindHandler
	PeekHandlers []PeekHandler
	ConnectOptions
	DelayRxStart bool
	WriteTimeout time.Duration
}

func DefaultOptions

func DefaultOptions() *Options

func LoadOptions

func LoadOptions(data map[interface{}]interface{}) (*Options, error)

func (Options) String

func (o Options) String() string

type PeekHandler

type PeekHandler interface {
	Connect(ch Channel, remoteAddress string)
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
	Close(ch Channel)
}

type Priority

type Priority uint32

type ReceiveHandler

type ReceiveHandler interface {
	ContentType() int32
	HandleReceive(m *Message, ch Channel)
}

type ReplySendContext

type ReplySendContext struct {
	*MutableSendContext
	// contains filtered or unexported fields
}

func (*ReplySendContext) NotifyErr

func (self *ReplySendContext) NotifyErr(err error)

func (*ReplySendContext) ReplyChan

func (self *ReplySendContext) ReplyChan() chan<- *Message

func (*ReplySendContext) WaitForReply

func (self *ReplySendContext) WaitForReply(ch Channel) (*Message, error)

func (*ReplySendContext) WaitForTypedReply

func (self *ReplySendContext) WaitForTypedReply(ch Channel, result TypedMessage) error

type Result

type Result struct {
	Success bool
	Message string
}

func UnmarshalResult

func UnmarshalResult(message *Message) *Result

type SendContext

type SendContext interface {
	Msg() *Message
	Priority() Priority
	WithPriority(p Priority) SendContext
	Context() context.Context
	WithTimeout(duration time.Duration) TimeoutSendContext
	NotifyBeforeWrite()
	NotifyAfterWrite()
	NotifyErr(err error)
	ReplyChan() chan<- *Message
}

func MarshalProto

func MarshalProto(contentType int32, msg proto.Message) SendContext

func MarshalTyped

func MarshalTyped(msg TypedMessage) SendContext

type Sender

type Sender interface {
	Send(sendCtx SendContext) error
}

type TimeoutSendContext

type TimeoutSendContext interface {
	SendContext
	SendAndWaitForWire(ch Channel) error
	SendForReply(ch Channel) (*Message, error)
	SendForTypedReply(ch Channel, result TypedMessage) error
}

type TraceHandler

type TraceHandler struct {
	// contains filtered or unexported fields
}

func NewTraceHandler

func NewTraceHandler(path string, id string) (*TraceHandler, error)

func (*TraceHandler) AddDecoder

func (h *TraceHandler) AddDecoder(decoder TraceMessageDecoder)

func (TraceHandler) Close

func (h TraceHandler) Close(ch Channel)

func (*TraceHandler) Connect

func (h *TraceHandler) Connect(ch Channel, remoteAddress string)

func (TraceHandler) Rx

func (h TraceHandler) Rx(msg *Message, ch Channel)

func (TraceHandler) Tx

func (h TraceHandler) Tx(msg *Message, ch Channel)

type TraceMessageDecode

type TraceMessageDecode map[string]interface{}

func NewTraceMessageDecode

func NewTraceMessageDecode(decoder, message string) TraceMessageDecode

func (TraceMessageDecode) MarshalResult

func (d TraceMessageDecode) MarshalResult() ([]byte, bool)

func (TraceMessageDecode) MarshalTraceMessageDecode

func (d TraceMessageDecode) MarshalTraceMessageDecode() ([]byte, error)

type TraceMessageDecoder

type TraceMessageDecoder interface {
	Decode(msg *Message) ([]byte, bool)
}

type TransformHandler

type TransformHandler interface {
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
}

type TypedMessage

type TypedMessage interface {
	proto.Message
	GetContentType() int32
}

type Underlay

type Underlay interface {
	Rx() (*Message, error)
	Tx(m *Message) error
	Identity
	io.Closer
	IsClosed() bool
	Headers() map[int32][]byte
	SetWriteTimeout(duration time.Duration) error
}

Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.

type UnderlayFactory

type UnderlayFactory interface {
	Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error)
}

UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implement UnderlayFactory, to provide instances to Channel.

func NewClassicDialer

func NewClassicDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory

func NewMemoryDialer

func NewMemoryDialer(identity *identity.TokenId, headers map[int32][]byte, ctx *MemoryContext) UnderlayFactory

func NewReconnectingDialer

func NewReconnectingDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory

func NewReconnectingDialerWithHandler

func NewReconnectingDialerWithHandler(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte, reconnectHandler func()) UnderlayFactory

type UnderlayListener

type UnderlayListener interface {
	Listen(handlers ...ConnectionHandler) error
	UnderlayFactory
	io.Closer
}

UnderlayListener represents a component designed to listen for incoming peer connections.

func NewClassicListener

func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, headers map[int32][]byte) UnderlayListener

func NewClassicListenerWithTransportConfiguration

func NewClassicListenerWithTransportConfiguration(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, tcfg transport.Configuration, headers map[int32][]byte) UnderlayListener

func NewMemoryListener

func NewMemoryListener(identity *identity.TokenId, ctx *MemoryContext) UnderlayListener

type UnsupportedVersionError

type UnsupportedVersionError struct {
	// contains filtered or unexported fields
}

func (UnsupportedVersionError) Error

func (u UnsupportedVersionError) Error() string

type WaitSendContext

type WaitSendContext struct {
	*MutableSendContext
	// contains filtered or unexported fields
}

func (*WaitSendContext) NotifyAfterWrite

func (self *WaitSendContext) NotifyAfterWrite()

func (*WaitSendContext) NotifyErr

func (self *WaitSendContext) NotifyErr(err error)

func (*WaitSendContext) WaitForWire

func (self *WaitSendContext) WaitForWire(ch Channel) error

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL