Documentation ¶
Index ¶
- Constants
- Variables
- func AcceptNextChannel(logicalName string, underlayFactory UnderlayFactory, options *Options, ...) error
- func DecodeTraceAndFormat(decode []byte) string
- func SetUnderlayRegistrySequence(sequence *sequence.Sequence)
- type BindHandler
- type Binding
- type Channel
- type CloseHandler
- type ConnectOptions
- type ConnectionHandler
- type Decoder
- type ErrorHandler
- type Hello
- type Identity
- type LatencyHandler
- type MemoryContext
- type Message
- type MessageHeader
- func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)
- func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)
- func (header *MessageHeader) GetStringHeader(key int32) (string, bool)
- func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)
- func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)
- func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)
- func (header *MessageHeader) IsReply() bool
- func (header *MessageHeader) IsReplyingTo(sequence int32) bool
- func (header *MessageHeader) PutBoolHeader(key int32, value bool)
- func (header *MessageHeader) PutByteHeader(key int32, value byte)
- func (header *MessageHeader) PutUint16Header(key int32, value uint16)
- func (header *MessageHeader) PutUint32Header(key int32, value uint32)
- func (header *MessageHeader) PutUint64Header(key int32, value uint64)
- func (header *MessageHeader) ReplyFor() int32
- func (header *MessageHeader) Sequence() int32
- type Options
- type PeekHandler
- type Priority
- type ReceiveHandler
- type Result
- type Sender
- type TraceHandler
- type TraceMessageDecode
- type TraceMessageDecoder
- type TransformHandler
- type TypedMessage
- type Underlay
- type UnderlayFactory
- func NewClassicDialer(identity *identity.TokenId, endpoint transport.Address, ...) UnderlayFactory
- func NewMemoryDialer(identity *identity.TokenId, headers map[int32][]byte, ctx *MemoryContext) UnderlayFactory
- func NewReconnectingDialer(identity *identity.TokenId, endpoint transport.Address, ...) UnderlayFactory
- func NewReconnectingDialerWithHandler(identity *identity.TokenId, endpoint transport.Address, ...) UnderlayFactory
- type UnderlayListener
- func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, ...) UnderlayListener
- func NewClassicListenerWithTransportConfiguration(identity *identity.TokenId, endpoint transport.Address, ...) UnderlayListener
- func NewMemoryListener(identity *identity.TokenId, ctx *MemoryContext) UnderlayListener
- type UnsupportedVersionError
Constants ¶
const ( ConnectionIdHeader = 0 ReplyForHeader = 1 ResultSuccessHeader = 2 HelloRouterAdvertisementsHeader = 3 HelloVersionHeader = 4 // Headers in the range 128-255 inclusive will be reflected when creating replies ReflectedHeaderBitMask = 1 << 7 MaxReflectedHeader = (1 << 8) - 1 )
*
- Message headers notes
- 0-127 reserved for channel
- 128-255 reserved for headers that need to be reflected back to sender on responses
- 128 is used for a message UUID for tracing
- 1000-1099 reserved for edge messages
- 1100-1199 is reserved for control plane messages
- 2000-2500 is reserved for xgress messages
- 2000-2255 is reserved for xgress implementation headers
const ( ContentTypeHelloType = 0 ContentTypePingType = 1 ContentTypeResultType = 2 ContentTypeLatencyType = 3 ContentTypeLatencyResponseType = 4 )
const ( Highest = 0 High = 1024 Standard = 4096 Low = 10240 )
const AnyContentType = -1
const DECODER = "channel"
const DecoderFieldName = "__decoder__"
const HelloSequence = -1
const MessageFieldName = "__message__"
Variables ¶
var ListenerClosedError = listenerClosedError{}
var UnknownVersionError = errors.New("channel synchronization error, bad magic number")
Functions ¶
func AcceptNextChannel ¶
func AcceptNextChannel(logicalName string, underlayFactory UnderlayFactory, options *Options, tcfg transport.Configuration) error
func DecodeTraceAndFormat ¶
Types ¶
type BindHandler ¶
type Binding ¶
type Binding interface { Bind(h BindHandler) error AddPeekHandler(h PeekHandler) AddTransformHandler(h TransformHandler) AddReceiveHandler(h ReceiveHandler) AddErrorHandler(h ErrorHandler) AddCloseHandler(h CloseHandler) SetUserData(data interface{}) GetUserData() interface{} }
Binding is used to add handlers to Channel.
NOTE: It is intended that the Add* methods are used at initial channel setup, and not invoked on an in-service Channel. This API may change in the future to enforce those semantics programmatically.
type Channel ¶
type Channel interface { Identity SetLogicalName(logicalName string) Binding Sender io.Closer IsClosed() bool Underlay() Underlay StartRx() GetTimeSinceLastRead() time.Duration }
Channel represents an asyncronous, message-passing endpoint, designed to sit on top of an underlay.
func NewChannel ¶
func NewChannel(logicalName string, underlayFactory UnderlayFactory, options *Options) (Channel, error)
func NewChannelWithTransportConfiguration ¶
func NewChannelWithTransportConfiguration(logicalName string, underlayFactory UnderlayFactory, options *Options, tcfg transport.Configuration) (Channel, error)
type CloseHandler ¶
type CloseHandler interface {
HandleClose(ch Channel)
}
type ConnectOptions ¶
type ConnectOptions struct { MaxQueuedConnects int MaxOutstandingConnects int ConnectTimeoutMs int }
func DefaultConnectOptions ¶
func DefaultConnectOptions() ConnectOptions
func (*ConnectOptions) ConnectTimeout ¶
func (co *ConnectOptions) ConnectTimeout() time.Duration
func (*ConnectOptions) Validate ¶
func (co *ConnectOptions) Validate() error
type ConnectionHandler ¶
type ConnectionHandler interface {
HandleConnection(hello *Hello, certificates []*x509.Certificate) error
}
type ErrorHandler ¶
type Hello ¶
func UnmarshalHello ¶
type Identity ¶
type Identity interface { // The TokenId used to represent the identity of this channel to lower-level resources. // Id() *identity.TokenId // The LogicalName represents the purpose or usage of this channel (i.e. 'ctrl', 'mgmt' 'r/001', etc.) Usually used // by humans in understand the logical purpose of a channel. // LogicalName() string // The ConnectionId represents the identity of this Channel to internal API components ("instance identifier"). // Usually used by the Channel framework to differentiate Channel instances. // ConnectionId() string // Certificates contains the identity certificates provided by the peer. // Certificates() []*x509.Certificate // Label constructs a consistently-formatted string used for context logging purposes, from the components above. // Label() string }
type LatencyHandler ¶
type LatencyHandler struct {
// contains filtered or unexported fields
}
LatencyHandler responds to latency messages with Result messages.
func (*LatencyHandler) ContentType ¶
func (h *LatencyHandler) ContentType() int32
func (*LatencyHandler) HandleReceive ¶
func (h *LatencyHandler) HandleReceive(msg *Message, ch Channel)
type MemoryContext ¶
type MemoryContext struct {
// contains filtered or unexported fields
}
func NewMemoryContext ¶
func NewMemoryContext() *MemoryContext
type MessageHeader ¶
type MessageHeader struct { ContentType int32 Headers map[int32][]byte // contains filtered or unexported fields }
func (*MessageHeader) GetBoolHeader ¶
func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)
func (*MessageHeader) GetByteHeader ¶
func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)
func (*MessageHeader) GetStringHeader ¶
func (header *MessageHeader) GetStringHeader(key int32) (string, bool)
func (*MessageHeader) GetUint16Header ¶
func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)
func (*MessageHeader) GetUint32Header ¶
func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)
func (*MessageHeader) GetUint64Header ¶
func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)
func (*MessageHeader) IsReply ¶
func (header *MessageHeader) IsReply() bool
func (*MessageHeader) IsReplyingTo ¶
func (header *MessageHeader) IsReplyingTo(sequence int32) bool
func (*MessageHeader) PutBoolHeader ¶
func (header *MessageHeader) PutBoolHeader(key int32, value bool)
func (*MessageHeader) PutByteHeader ¶
func (header *MessageHeader) PutByteHeader(key int32, value byte)
func (*MessageHeader) PutUint16Header ¶
func (header *MessageHeader) PutUint16Header(key int32, value uint16)
func (*MessageHeader) PutUint32Header ¶
func (header *MessageHeader) PutUint32Header(key int32, value uint32)
func (*MessageHeader) PutUint64Header ¶
func (header *MessageHeader) PutUint64Header(key int32, value uint64)
func (*MessageHeader) ReplyFor ¶
func (header *MessageHeader) ReplyFor() int32
func (*MessageHeader) Sequence ¶
func (header *MessageHeader) Sequence() int32
type Options ¶
type Options struct { OutQueueSize int BindHandlers []BindHandler PeekHandlers []PeekHandler ConnectOptions DelayRxStart bool }
func DefaultOptions ¶
func DefaultOptions() *Options
func LoadOptions ¶
func LoadOptions(data map[interface{}]interface{}) *Options
type PeekHandler ¶
type ReceiveHandler ¶
type Result ¶
func UnmarshalResult ¶
type Sender ¶
type Sender interface { Send(m *Message) error SendWithPriority(m *Message, p Priority) error SendAndSync(m *Message) (chan error, error) SendAndSyncWithPriority(m *Message, p Priority) (chan error, error) SendWithTimeout(m *Message, timeout time.Duration) error SendPrioritizedWithTimeout(m *Message, p Priority, timeout time.Duration) error SendAndWaitWithTimeout(m *Message, timeout time.Duration) (*Message, error) SendPrioritizedAndWaitWithTimeout(m *Message, p Priority, timeout time.Duration) (*Message, error) SendAndWait(m *Message) (chan *Message, error) SendAndWaitWithPriority(m *Message, p Priority) (chan *Message, error) SendForReply(msg TypedMessage, timeout time.Duration) (*Message, error) SendForReplyAndDecode(msg TypedMessage, timeout time.Duration, result TypedMessage) error }
type TraceHandler ¶
type TraceHandler struct {
// contains filtered or unexported fields
}
func NewTraceHandler ¶
func NewTraceHandler(path string, id *identity.TokenId) (*TraceHandler, error)
func (*TraceHandler) AddDecoder ¶
func (h *TraceHandler) AddDecoder(decoder TraceMessageDecoder)
func (TraceHandler) Close ¶
func (h TraceHandler) Close(ch Channel)
func (*TraceHandler) Connect ¶
func (h *TraceHandler) Connect(ch Channel, remoteAddress string)
func (TraceHandler) Rx ¶
func (h TraceHandler) Rx(msg *Message, ch Channel)
func (TraceHandler) Tx ¶
func (h TraceHandler) Tx(msg *Message, ch Channel)
type TraceMessageDecode ¶
type TraceMessageDecode map[string]interface{}
func NewTraceMessageDecode ¶
func NewTraceMessageDecode(decoder, message string) TraceMessageDecode
func (TraceMessageDecode) MarshalResult ¶
func (d TraceMessageDecode) MarshalResult() ([]byte, bool)
func (TraceMessageDecode) MarshalTraceMessageDecode ¶
func (d TraceMessageDecode) MarshalTraceMessageDecode() ([]byte, error)
type TraceMessageDecoder ¶
type TransformHandler ¶
type TypedMessage ¶
type Underlay ¶
type Underlay interface { Rx() (*Message, error) Tx(m *Message) error Identity io.Closer IsClosed() bool Headers() map[int32][]byte }
Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.
type UnderlayFactory ¶
type UnderlayFactory interface {
Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error)
}
UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implements UnderlayFactory, to provide instances to Channel.
func NewClassicDialer ¶
func NewMemoryDialer ¶
func NewMemoryDialer(identity *identity.TokenId, headers map[int32][]byte, ctx *MemoryContext) UnderlayFactory
func NewReconnectingDialer ¶
type UnderlayListener ¶
type UnderlayListener interface { Listen(handlers ...ConnectionHandler) error UnderlayFactory io.Closer }
UnderlayListener represents a component designed to listen for incoming peer connections on some abstracted underlay facility.
func NewClassicListener ¶
func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, headers map[int32][]byte) UnderlayListener
func NewClassicListenerWithTransportConfiguration ¶
func NewClassicListenerWithTransportConfiguration(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, tcfg transport.Configuration, headers map[int32][]byte) UnderlayListener
func NewMemoryListener ¶
func NewMemoryListener(identity *identity.TokenId, ctx *MemoryContext) UnderlayListener
type UnsupportedVersionError ¶
type UnsupportedVersionError struct {
// contains filtered or unexported fields
}
func (UnsupportedVersionError) Error ¶
func (u UnsupportedVersionError) Error() string