channel

package module
v0.18.31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2022 License: Apache-2.0 Imports: 26 Imported by: 5

README

Channel

channel is a binary message framework. It includes the following features:

  • Supports registering message type handlers
  • Supports request/response pattern as well as unidirectional message streams
  • Allows plugging observers to enable metrics and other use cases
  • Async or sync patterns
  • Protobufs supported but not required

Documentation

Overview

Example
package main

import (
	"fmt"
	"github.com/openziti/channel"
	"github.com/openziti/foundation/identity/identity"
	"github.com/openziti/transport/tcp"
	"time"
)

func main() {
	addr, err := tcp.AddressParser{}.Parse("tcp:localhost:6565")
	if err != nil {
		panic(err)
	}
	dialId := &identity.TokenId{Token: "echo-client"}
	underlayFactory := channel.NewClassicDialer(dialId, addr, nil)

	ch, err := channel.NewChannel("echo-test", underlayFactory, nil, nil)
	if err != nil {
		panic(err)
	}

	helloMessageType := int32(256)
	msg := channel.NewMessage(helloMessageType, []byte("hello!"))

	// Can send the message on the channel. The call will return once the message is queued
	if err := ch.Send(msg); err != nil {
		panic(err)
	}

	// Can also have the message send itself on the channel
	if err := msg.Send(ch); err != nil {
		panic(err)
	}

	// Can set a priority on the message before sending. This will only affect the order in the send queue
	if err := msg.WithPriority(channel.High).Send(ch); err != nil {
		panic(err)
	}

	// Can set a timeout before sending. If the message can't be queued before the timeout, an error will be returned
	// If the timeout expires before the message can be sent, the message won't be sent
	if err := msg.WithTimeout(time.Second).Send(ch); err != nil {
		panic(err)
	}

	// Can set a timeout before sending and wait for the message to be written to the wire. If the timeout expires
	// before the message is sent, the message won't be sent and a TimeoutError will be returned
	if err := msg.WithTimeout(time.Second).SendAndWaitForWire(ch); err != nil {
		panic(err)
	}

	// Can set a timeout before sending and waiting for a reply message. If the timeout expires before the message is
	// sent, the message won't be sent and a TimeoutError will be returned. If the timeout expires before the reply
	// arrives a TimeoutError will be returned.
	reply, err := msg.WithTimeout(time.Second).SendForReply(ch)
	if err != nil {
		panic(err)
	}
	fmt.Println(string(reply.Body))
}
Output:

Index

Examples

Constants

View Source
const (
	DefaultOutstandingConnects = 16
	DefaultQueuedConnects      = 1
	DefaultConnectTimeout      = 1000 * time.Millisecond

	MinQueuedConnects      = 1
	MinOutstandingConnects = 1
	MinConnectTimeout      = 30 * time.Millisecond

	MaxQueuedConnects      = 5000
	MaxOutstandingConnects = 1000
	MaxConnectTimeout      = 60000 * time.Millisecond

	DefaultOutQueueSize = 4
)
View Source
const (
	ConnectionIdHeader              = 0
	ReplyForHeader                  = 1
	ResultSuccessHeader             = 2
	HelloRouterAdvertisementsHeader = 3
	HelloVersionHeader              = 4
	HeartbeatHeader                 = 5
	HeartbeatResponseHeader         = 6

	// Headers in the range 128-255 inclusive will be reflected when creating replies
	ReflectedHeaderBitMask = 1 << 7
	MaxReflectedHeader     = (1 << 8) - 1
)

*

  • Message headers notes
  • 0-127 reserved for channel
  • 128-255 reserved for headers that need to be reflected back to sender on responses
  • 128 is used for a message UUID for tracing
  • 1000-1099 reserved for edge messages
  • 1100-1199 is reserved for control plane messages
  • 2000-2500 is reserved for xgress messages
  • 2000-2255 is reserved for xgress implementation headers
View Source
const (
	ContentTypeHelloType           = 0
	ContentTypePingType            = 1
	ContentTypeResultType          = 2
	ContentTypeLatencyType         = 3
	ContentTypeLatencyResponseType = 4
	ContentTypeHeartbeat           = 5
)
View Source
const (
	Highest  = 0
	High     = 1024
	Standard = 4096
	Low      = 10240
)
View Source
const AnyContentType = -1
View Source
const DECODER = "channel"
View Source
const DecoderFieldName = "__decoder__"
View Source
const HelloSequence = -1
View Source
const MessageFieldName = "__message__"

Variables

View Source
var ListenerClosedError = listenerClosedError{}
View Source
var UnknownVersionError = errors.New("channel synchronization error, bad magic number")

Functions

func AcceptNextChannel

func AcceptNextChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options, tcfg transport.Configuration) error

func ConfigureHeartbeat

func ConfigureHeartbeat(binding Binding, heartbeatInterval time.Duration, checkInterval time.Duration, cb HeartbeatCallback)

ConfigureHeartbeat setups up heartbeats on the given channel. It assumes that an equivalent setup happens on the other side of the channel.

When possible, heartbeats will be sent on existing traffic. When a heartbeat is due to be sent, the next message sent will include a heartbeat header. If no message is sent by the time the checker runs on checkInterval, a standalone heartbeat message will be sent.

Similarly, when a message with a heartbeat header is received, the next sent message will have a header set with the heartbeat response. If no message is sent within a few milliseconds, a standalone heartbeat response will be sent

func GetRetryVersion added in v0.18.31

func GetRetryVersion(err error) (uint32, bool)

func IsTimeout

func IsTimeout(err error) bool

func MarshalV2 added in v0.18.28

func MarshalV2(m *Message) ([]byte, error)

MarshalV2 converts a *Message into a block of V2 wire format data.

func NewErrorContext

func NewErrorContext(err error) context.Context

func NewWSListener

func NewWSListener(peer transport.Connection) *wsListener

func NextConnectionId

func NextConnectionId() (string, error)

Types

type AsyncFunctionReceiveAdapter

type AsyncFunctionReceiveAdapter struct {
	Type    int32
	Handler ReceiveHandlerF
}

func (*AsyncFunctionReceiveAdapter) ContentType

func (adapter *AsyncFunctionReceiveAdapter) ContentType() int32

func (*AsyncFunctionReceiveAdapter) HandleReceive

func (adapter *AsyncFunctionReceiveAdapter) HandleReceive(m *Message, ch Channel)

type BaseSendListener

type BaseSendListener struct{}

BaseSendListener is a type that may be used to provide default methods for SendListener implementation

func (BaseSendListener) NotifyAfterWrite

func (BaseSendListener) NotifyAfterWrite()

func (BaseSendListener) NotifyBeforeWrite

func (BaseSendListener) NotifyBeforeWrite()

func (BaseSendListener) NotifyErr

func (BaseSendListener) NotifyErr(error)

func (BaseSendListener) NotifyQueued

func (BaseSendListener) NotifyQueued()

type BaseSendable

type BaseSendable struct{}

BaseSendable is a type that may be used to provide default methods for Sendable implementation

func (BaseSendable) Context

func (BaseSendable) Context() context.Context

func (BaseSendable) Msg

func (BaseSendable) Msg() *Message

func (BaseSendable) Priority

func (BaseSendable) Priority() Priority

func (BaseSendable) ReplyReceiver

func (BaseSendable) ReplyReceiver() ReplyReceiver

func (BaseSendable) SendListener

func (BaseSendable) SendListener() SendListener

type BindHandler

type BindHandler interface {
	BindChannel(binding Binding) error
}

type BindHandlerF

type BindHandlerF func(binding Binding) error

func (BindHandlerF) BindChannel

func (f BindHandlerF) BindChannel(binding Binding) error

type Binding

type Binding interface {
	Bind(h BindHandler) error
	AddPeekHandler(h PeekHandler)
	AddTransformHandler(h TransformHandler)
	AddReceiveHandler(contentType int32, h ReceiveHandler)
	AddReceiveHandlerF(contentType int32, h ReceiveHandlerF)
	AddTypedReceiveHandler(h TypedReceiveHandler)
	AddErrorHandler(h ErrorHandler)
	AddCloseHandler(h CloseHandler)
	SetUserData(data interface{})
	GetUserData() interface{}
	GetChannel() Channel
}

Binding is used to add handlers to Channel.

NOTE: It is intended that the Add* methods are used at initial channel setup, and not invoked on an in-service Channel. This API may change in the future to enforce those semantics programmatically.

type Channel

type Channel interface {
	Identity
	SetLogicalName(logicalName string)
	Sender
	io.Closer
	IsClosed() bool
	Underlay() Underlay
	StartRx()
	GetTimeSinceLastRead() time.Duration
}

Channel represents an asynchronous, message-passing framework, designed to sit on top of an underlay.

func NewChannel

func NewChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options) (Channel, error)

func NewChannelWithTransportConfiguration

func NewChannelWithTransportConfiguration(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options, tcfg transport.Configuration) (Channel, error)

type CloseHandler

type CloseHandler interface {
	HandleClose(ch Channel)
}

type CloseHandlerF

type CloseHandlerF func(ch Channel)

func (CloseHandlerF) HandleClose

func (self CloseHandlerF) HandleClose(ch Channel)

type ConnectOptions

type ConnectOptions struct {
	MaxQueuedConnects      int
	MaxOutstandingConnects int
	ConnectTimeout         time.Duration
}

func DefaultConnectOptions

func DefaultConnectOptions() ConnectOptions

func (*ConnectOptions) Validate

func (co *ConnectOptions) Validate() error

type ConnectionHandler

type ConnectionHandler interface {
	HandleConnection(hello *Hello, certificates []*x509.Certificate) error
}

type Decoder

type Decoder struct{}

func (Decoder) Decode

func (d Decoder) Decode(msg *Message) ([]byte, bool)

type Envelope

type Envelope interface {
	// WithPriority returns an Envelope with the given priority
	WithPriority(p Priority) Envelope

	// WithTimeout returns a TimeoutEnvelope with a context using the given timeout
	WithTimeout(duration time.Duration) TimeoutEnvelope

	// Send sends the envelope on the given Channel
	Send(ch Channel) error

	// ReplyTo allows setting the reply header in a fluent style
	ReplyTo(msg *Message) Envelope

	// ToSendable converts the Envelope into a Sendable, which can be submitted to a Channel for sending
	ToSendable() Sendable
}

Envelope allows setting message priority and context. Message is an Envelope (as well as a Sendable)

func NewErrorEnvelope

func NewErrorEnvelope(err error) Envelope

type ErrorHandler

type ErrorHandler interface {
	HandleError(err error, ch Channel)
}

type ErrorHandlerF

type ErrorHandlerF func(err error, ch Channel)

func (ErrorHandlerF) HandleError

func (self ErrorHandlerF) HandleError(err error, ch Channel)

type Headers

type Headers map[int32][]byte

func (Headers) GetBoolHeader

func (self Headers) GetBoolHeader(key int32) (bool, bool)

func (Headers) GetByteHeader

func (self Headers) GetByteHeader(key int32) (byte, bool)

func (Headers) GetStringHeader

func (self Headers) GetStringHeader(key int32) (string, bool)

func (Headers) GetUint16Header

func (self Headers) GetUint16Header(key int32) (uint16, bool)

func (Headers) GetUint32Header

func (self Headers) GetUint32Header(key int32) (uint32, bool)

func (Headers) GetUint64Header

func (self Headers) GetUint64Header(key int32) (uint64, bool)

func (Headers) PutBoolHeader

func (self Headers) PutBoolHeader(key int32, value bool)

func (Headers) PutByteHeader

func (self Headers) PutByteHeader(key int32, value byte)

func (Headers) PutStringHeader

func (self Headers) PutStringHeader(key int32, value string)

func (Headers) PutUint16Header

func (self Headers) PutUint16Header(key int32, value uint16)

func (Headers) PutUint32Header

func (self Headers) PutUint32Header(key int32, value uint32)

func (Headers) PutUint64Header

func (self Headers) PutUint64Header(key int32, value uint64)

type HeartbeatCallback

type HeartbeatCallback interface {
	HeartbeatTx(ts int64)
	HeartbeatRx(ts int64)
	HeartbeatRespTx(ts int64)
	HeartbeatRespRx(ts int64)
	CheckHeartBeat()
}

HeartbeatCallback provide an interface that is notified when various heartbeat events take place

type Hello

type Hello struct {
	IdToken string
	Headers map[int32][]byte
}

func UnmarshalHello

func UnmarshalHello(message *Message) *Hello

type Identity

type Identity interface {
	// The TokenId used to represent the identity of this channel to lower-level resources.
	//
	Id() *identity.TokenId

	// The LogicalName represents the purpose or usage of this channel (i.e. 'ctrl', 'mgmt' 'r/001', etc.) Usually used
	// by humans in understand the logical purpose of a channel.
	//
	LogicalName() string

	// The ConnectionId represents the identity of this Channel to internal API components ("instance identifier").
	// Usually used by the Channel framework to differentiate Channel instances.
	//
	ConnectionId() string

	// Certificates contains the identity certificates provided by the peer.
	//
	Certificates() []*x509.Certificate

	// Label constructs a consistently-formatted string used for context logging purposes, from the components above.
	//
	Label() string
}

type Message

type Message struct {
	MessageHeader
	Body []byte
}

func NewHello

func NewHello(idToken string, attributes map[int32][]byte) *Message

func NewMessage

func NewMessage(contentType int32, body []byte) *Message

func NewResult

func NewResult(success bool, message string) *Message

func ReadV2 added in v0.18.28

func ReadV2(peer io.Reader) (*Message, error)

ReadV2 reads a V2 message from the given reader and returns the unmarshalled message

func (*Message) Context

func (m *Message) Context() context.Context

func (*Message) Msg

func (m *Message) Msg() *Message

func (*Message) Priority

func (m *Message) Priority() Priority

func (*Message) ReplyReceiver

func (m *Message) ReplyReceiver() ReplyReceiver

func (*Message) ReplyTo

func (m *Message) ReplyTo(o *Message) Envelope

func (*Message) Send

func (m *Message) Send(ch Channel) error

func (*Message) SendListener

func (m *Message) SendListener() SendListener

func (*Message) SetSequence

func (m *Message) SetSequence(seq int32)

func (*Message) String

func (m *Message) String() string

func (*Message) ToSendable

func (m *Message) ToSendable() Sendable

func (*Message) WithPriority

func (m *Message) WithPriority(p Priority) Envelope

func (*Message) WithTimeout

func (m *Message) WithTimeout(duration time.Duration) TimeoutEnvelope

type MessageHeader

type MessageHeader struct {
	ContentType int32

	Headers Headers
	// contains filtered or unexported fields
}

func (*MessageHeader) GetBoolHeader

func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)

func (*MessageHeader) GetByteHeader

func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)

func (*MessageHeader) GetStringHeader

func (header *MessageHeader) GetStringHeader(key int32) (string, bool)

func (*MessageHeader) GetUint16Header

func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)

func (*MessageHeader) GetUint32Header

func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)

func (*MessageHeader) GetUint64Header

func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)

func (*MessageHeader) IsReply

func (header *MessageHeader) IsReply() bool

func (*MessageHeader) IsReplyingTo

func (header *MessageHeader) IsReplyingTo(sequence int32) bool

func (*MessageHeader) PutBoolHeader

func (header *MessageHeader) PutBoolHeader(key int32, value bool)

func (*MessageHeader) PutByteHeader

func (header *MessageHeader) PutByteHeader(key int32, value byte)

func (*MessageHeader) PutStringHeader

func (header *MessageHeader) PutStringHeader(key int32, value string)

func (*MessageHeader) PutUint16Header

func (header *MessageHeader) PutUint16Header(key int32, value uint16)

func (*MessageHeader) PutUint32Header

func (header *MessageHeader) PutUint32Header(key int32, value uint32)

func (*MessageHeader) PutUint64Header

func (header *MessageHeader) PutUint64Header(key int32, value uint64)

func (*MessageHeader) ReplyFor

func (header *MessageHeader) ReplyFor() int32

func (*MessageHeader) Sequence

func (header *MessageHeader) Sequence() int32

type Options

type Options struct {
	OutQueueSize int
	ConnectOptions
	DelayRxStart bool
	WriteTimeout time.Duration
}

func DefaultOptions

func DefaultOptions() *Options

func LoadOptions

func LoadOptions(data map[interface{}]interface{}) (*Options, error)

func (Options) String

func (o Options) String() string

type PeekHandler

type PeekHandler interface {
	Connect(ch Channel, remoteAddress string)
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
	Close(ch Channel)
}

type Priority

type Priority uint32

type ReceiveHandler

type ReceiveHandler interface {
	HandleReceive(m *Message, ch Channel)
}

type ReceiveHandlerF

type ReceiveHandlerF func(m *Message, ch Channel)

func (ReceiveHandlerF) HandleReceive

func (self ReceiveHandlerF) HandleReceive(m *Message, ch Channel)

type ReplyReceiver

type ReplyReceiver interface {
	AcceptReply(*Message)
}

ReplyReceiver is used to get notified when a Message reply arrives

type Result

type Result struct {
	Success bool
	Message string
}

func UnmarshalResult

func UnmarshalResult(message *Message) *Result

type SendListener

type SendListener interface {
	// Notify Queued is called when the message has been queued for send
	NotifyQueued()
	// NotifyBeforeWrite is called before send is called
	NotifyBeforeWrite()
	// NotifyAfterWrite is called after the message has been written to the Underlay
	NotifyAfterWrite()
	// NotifyErr is called if the Sendable context errors before send or if writing to the Underlay fails
	NotifyErr(error)
}

SendListener is notified at the various stages of a message send

type Sendable

type Sendable interface {
	// Msg return the Message to send
	Msg() *Message

	// SetSequence sets a sequence number indicating in which order the message was received
	SetSequence(seq int32)

	// Sequence returns the sequence number
	Sequence() int32

	// Priority returns the Priority of the Message
	Priority() Priority

	// Context returns the Context used for timeouts/cancelling message sends, etc
	Context() context.Context

	// SendListener returns the SendListener to invoke at each stage of the send operation
	SendListener() SendListener

	// ReplyReceiver returns the ReplyReceiver to be invoked when a reply for the message or received, or nil if
	// no ReplyReceiver should be invoked if or when a reply is received
	ReplyReceiver() ReplyReceiver
}

Sendable encapsulates all the data and callbacks that a Channel requires when sending a Message.

type Sender

type Sender interface {
	Send(s Sendable) error
}

type TimeoutEnvelope

type TimeoutEnvelope interface {
	Envelope

	// SendAndWaitForWire will wait until the configured timeout or until the message is sent, whichever comes first
	// If the timeout happens first, the context error will be returned, wrapped by a TimeoutError
	SendAndWaitForWire(ch Channel) error

	// SendForReply will wait until the configured timeout or until a reply is received, whichever comes first
	// If the timeout happens first, the context error will be returned, wrapped by a TimeoutError
	SendForReply(ch Channel) (*Message, error)
}

TimeoutEnvelope has timeout related convenience methods, such as waiting for a Message to be written to the wire or waiting for a Message reply

type TimeoutError

type TimeoutError struct {
	// contains filtered or unexported fields
}

TimeoutError is used to indicate a timeout happened

func (TimeoutError) Unwrap

func (self TimeoutError) Unwrap() error

type TraceHandler

type TraceHandler struct {
	// contains filtered or unexported fields
}

func NewTraceHandler

func NewTraceHandler(path string, id string) (*TraceHandler, error)

func (*TraceHandler) AddDecoder

func (h *TraceHandler) AddDecoder(decoder TraceMessageDecoder)

func (TraceHandler) Close

func (h TraceHandler) Close(ch Channel)

func (*TraceHandler) Connect

func (h *TraceHandler) Connect(ch Channel, remoteAddress string)

func (TraceHandler) Rx

func (h TraceHandler) Rx(msg *Message, ch Channel)

func (TraceHandler) Tx

func (h TraceHandler) Tx(msg *Message, ch Channel)

type TraceMessageDecode

type TraceMessageDecode map[string]interface{}

func NewTraceMessageDecode

func NewTraceMessageDecode(decoder, message string) TraceMessageDecode

func (TraceMessageDecode) MarshalResult

func (d TraceMessageDecode) MarshalResult() ([]byte, bool)

func (TraceMessageDecode) MarshalTraceMessageDecode

func (d TraceMessageDecode) MarshalTraceMessageDecode() ([]byte, error)

type TraceMessageDecoder

type TraceMessageDecoder interface {
	Decode(msg *Message) ([]byte, bool)
}

type TransformHandler

type TransformHandler interface {
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
}

type TypedReceiveHandler

type TypedReceiveHandler interface {
	ContentType() int32
	ReceiveHandler
}

type Underlay

type Underlay interface {
	Rx() (*Message, error)
	Tx(m *Message) error
	Identity
	io.Closer
	IsClosed() bool
	Headers() map[int32][]byte
	SetWriteTimeout(duration time.Duration) error
}

Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.

type UnderlayFactory

type UnderlayFactory interface {
	Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error)
}

UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implement UnderlayFactory, to provide instances to Channel.

func NewClassicDialer

func NewClassicDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory

func NewClassicDialerWithBindAddress added in v0.18.27

func NewClassicDialerWithBindAddress(identity *identity.TokenId, endpoint transport.Address, localBinding string, headers map[int32][]byte) UnderlayFactory

func NewExistingConnDialer added in v0.18.19

func NewExistingConnDialer(id *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory

func NewExistingConnListener added in v0.18.19

func NewExistingConnListener(identity *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory

func NewReconnectingDialer

func NewReconnectingDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory

func NewReconnectingDialerWithHandler

func NewReconnectingDialerWithHandler(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte, reconnectHandler func()) UnderlayFactory

func NewReconnectingDialerWithHandlerAndLocalBinding added in v0.18.27

func NewReconnectingDialerWithHandlerAndLocalBinding(identity *identity.TokenId, endpoint transport.Address, localBinding string, headers map[int32][]byte, reconnectHandler func()) UnderlayFactory

type UnderlayListener

type UnderlayListener interface {
	Listen(handlers ...ConnectionHandler) error
	UnderlayFactory
	io.Closer
}

UnderlayListener represents a component designed to listen for incoming peer connections.

func NewClassicListener

func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, headers map[int32][]byte) UnderlayListener

func NewClassicListenerWithTransportConfiguration

func NewClassicListenerWithTransportConfiguration(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, tcfg transport.Configuration, headers map[int32][]byte) UnderlayListener

type UnsupportedVersionError

type UnsupportedVersionError struct {
	// contains filtered or unexported fields
}

func (UnsupportedVersionError) Error

func (u UnsupportedVersionError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL