Documentation ¶
Overview ¶
Example ¶
package main import ( "fmt" "github.com/openziti/channel" "github.com/openziti/foundation/identity/identity" "github.com/openziti/transport/tcp" "time" ) func main() { addr, err := tcp.AddressParser{}.Parse("tcp:localhost:6565") if err != nil { panic(err) } dialId := &identity.TokenId{Token: "echo-client"} underlayFactory := channel.NewClassicDialer(dialId, addr, nil) ch, err := channel.NewChannel("echo-test", underlayFactory, nil, nil) if err != nil { panic(err) } helloMessageType := int32(256) msg := channel.NewMessage(helloMessageType, []byte("hello!")) // Can send the message on the channel. The call will return once the message is queued if err := ch.Send(msg); err != nil { panic(err) } // Can also have the message send itself on the channel if err := msg.Send(ch); err != nil { panic(err) } // Can set a priority on the message before sending. This will only affect the order in the send queue if err := msg.WithPriority(channel.High).Send(ch); err != nil { panic(err) } // Can set a timeout before sending. If the message can't be queued before the timeout, an error will be returned // If the timeout expires before the message can be sent, the message won't be sent if err := msg.WithTimeout(time.Second).Send(ch); err != nil { panic(err) } // Can set a timeout before sending and wait for the message to be written to the wire. If the timeout expires // before the message is sent, the message won't be sent and a TimeoutError will be returned if err := msg.WithTimeout(time.Second).SendAndWaitForWire(ch); err != nil { panic(err) } // Can set a timeout before sending and waiting for a reply message. If the timeout expires before the message is // sent, the message won't be sent and a TimeoutError will be returned. If the timeout expires before the reply // arrives a TimeoutError will be returned. reply, err := msg.WithTimeout(time.Second).SendForReply(ch) if err != nil { panic(err) } fmt.Println(string(reply.Body)) }
Output:
Index ¶
- Constants
- Variables
- func AcceptNextChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, ...) error
- func ConfigureHeartbeat(binding Binding, heartbeatInterval time.Duration, checkInterval time.Duration, ...)
- func GetRetryVersion(err error) (uint32, bool)
- func IsTimeout(err error) bool
- func MarshalV2(m *Message) ([]byte, error)
- func NewErrorContext(err error) context.Context
- func NewWSListener(peer transport.Connection) *wsListener
- func NextConnectionId() (string, error)
- type AsyncFunctionReceiveAdapter
- type BaseSendListener
- type BaseSendable
- type BindHandler
- type BindHandlerF
- type Binding
- type Channel
- type CloseHandler
- type CloseHandlerF
- type ConnectOptions
- type ConnectionHandler
- type Decoder
- type Envelope
- type ErrorHandler
- type ErrorHandlerF
- type Headers
- func (self Headers) GetBoolHeader(key int32) (bool, bool)
- func (self Headers) GetByteHeader(key int32) (byte, bool)
- func (self Headers) GetStringHeader(key int32) (string, bool)
- func (self Headers) GetUint16Header(key int32) (uint16, bool)
- func (self Headers) GetUint32Header(key int32) (uint32, bool)
- func (self Headers) GetUint64Header(key int32) (uint64, bool)
- func (self Headers) PutBoolHeader(key int32, value bool)
- func (self Headers) PutByteHeader(key int32, value byte)
- func (self Headers) PutStringHeader(key int32, value string)
- func (self Headers) PutUint16Header(key int32, value uint16)
- func (self Headers) PutUint32Header(key int32, value uint32)
- func (self Headers) PutUint64Header(key int32, value uint64)
- type HeartbeatCallback
- type Hello
- type Identity
- type Message
- func (m *Message) Context() context.Context
- func (m *Message) Msg() *Message
- func (m *Message) Priority() Priority
- func (m *Message) ReplyReceiver() ReplyReceiver
- func (m *Message) ReplyTo(o *Message) Envelope
- func (m *Message) Send(ch Channel) error
- func (m *Message) SendListener() SendListener
- func (m *Message) SetSequence(seq int32)
- func (m *Message) String() string
- func (m *Message) ToSendable() Sendable
- func (m *Message) WithPriority(p Priority) Envelope
- func (m *Message) WithTimeout(duration time.Duration) TimeoutEnvelope
- type MessageHeader
- func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)
- func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)
- func (header *MessageHeader) GetStringHeader(key int32) (string, bool)
- func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)
- func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)
- func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)
- func (header *MessageHeader) IsReply() bool
- func (header *MessageHeader) IsReplyingTo(sequence int32) bool
- func (header *MessageHeader) PutBoolHeader(key int32, value bool)
- func (header *MessageHeader) PutByteHeader(key int32, value byte)
- func (header *MessageHeader) PutStringHeader(key int32, value string)
- func (header *MessageHeader) PutUint16Header(key int32, value uint16)
- func (header *MessageHeader) PutUint32Header(key int32, value uint32)
- func (header *MessageHeader) PutUint64Header(key int32, value uint64)
- func (header *MessageHeader) ReplyFor() int32
- func (header *MessageHeader) Sequence() int32
- type Options
- type PeekHandler
- type Priority
- type ReceiveHandler
- type ReceiveHandlerF
- type ReplyReceiver
- type Result
- type SendListener
- type Sendable
- type Sender
- type TimeoutEnvelope
- type TimeoutError
- type TraceHandler
- type TraceMessageDecode
- type TraceMessageDecoder
- type TransformHandler
- type TypedReceiveHandler
- type Underlay
- type UnderlayFactory
- func NewClassicDialer(identity *identity.TokenId, endpoint transport.Address, ...) UnderlayFactory
- func NewClassicDialerWithBindAddress(identity *identity.TokenId, endpoint transport.Address, localBinding string, ...) UnderlayFactory
- func NewExistingConnDialer(id *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory
- func NewExistingConnListener(identity *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory
- func NewReconnectingDialer(identity *identity.TokenId, endpoint transport.Address, ...) UnderlayFactory
- func NewReconnectingDialerWithHandler(identity *identity.TokenId, endpoint transport.Address, ...) UnderlayFactory
- func NewReconnectingDialerWithHandlerAndLocalBinding(identity *identity.TokenId, endpoint transport.Address, localBinding string, ...) UnderlayFactory
- type UnderlayListener
- type UnsupportedVersionError
Examples ¶
Constants ¶
const ( DefaultOutstandingConnects = 16 DefaultQueuedConnects = 1 DefaultConnectTimeout = 1000 * time.Millisecond MinQueuedConnects = 1 MinOutstandingConnects = 1 MinConnectTimeout = 30 * time.Millisecond MaxQueuedConnects = 5000 MaxOutstandingConnects = 1000 MaxConnectTimeout = 60000 * time.Millisecond DefaultOutQueueSize = 4 )
const ( ConnectionIdHeader = 0 ReplyForHeader = 1 ResultSuccessHeader = 2 HelloRouterAdvertisementsHeader = 3 HelloVersionHeader = 4 HeartbeatHeader = 5 HeartbeatResponseHeader = 6 // Headers in the range 128-255 inclusive will be reflected when creating replies ReflectedHeaderBitMask = 1 << 7 MaxReflectedHeader = (1 << 8) - 1 )
*
- Message headers notes
- 0-127 reserved for channel
- 128-255 reserved for headers that need to be reflected back to sender on responses
- 128 is used for a message UUID for tracing
- 1000-1099 reserved for edge messages
- 1100-1199 is reserved for control plane messages
- 2000-2500 is reserved for xgress messages
- 2000-2255 is reserved for xgress implementation headers
const ( ContentTypeHelloType = 0 ContentTypePingType = 1 ContentTypeResultType = 2 ContentTypeLatencyType = 3 ContentTypeLatencyResponseType = 4 ContentTypeHeartbeat = 5 )
const ( Highest = 0 High = 1024 Standard = 4096 Low = 10240 )
const AnyContentType = -1
const DECODER = "channel"
const DecoderFieldName = "__decoder__"
const HelloSequence = -1
const MessageFieldName = "__message__"
Variables ¶
var ListenerClosedError = listenerClosedError{}
var UnknownVersionError = errors.New("channel synchronization error, bad magic number")
Functions ¶
func AcceptNextChannel ¶
func AcceptNextChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options, tcfg transport.Configuration) error
func ConfigureHeartbeat ¶
func ConfigureHeartbeat(binding Binding, heartbeatInterval time.Duration, checkInterval time.Duration, cb HeartbeatCallback)
ConfigureHeartbeat setups up heartbeats on the given channel. It assumes that an equivalent setup happens on the other side of the channel.
When possible, heartbeats will be sent on existing traffic. When a heartbeat is due to be sent, the next message sent will include a heartbeat header. If no message is sent by the time the checker runs on checkInterval, a standalone heartbeat message will be sent.
Similarly, when a message with a heartbeat header is received, the next sent message will have a header set with the heartbeat response. If no message is sent within a few milliseconds, a standalone heartbeat response will be sent
func GetRetryVersion ¶ added in v0.18.31
func MarshalV2 ¶ added in v0.18.28
MarshalV2 converts a *Message into a block of V2 wire format data.
func NewErrorContext ¶
func NewWSListener ¶
func NewWSListener(peer transport.Connection) *wsListener
func NextConnectionId ¶
Types ¶
type AsyncFunctionReceiveAdapter ¶
type AsyncFunctionReceiveAdapter struct { Type int32 Handler ReceiveHandlerF }
func (*AsyncFunctionReceiveAdapter) ContentType ¶
func (adapter *AsyncFunctionReceiveAdapter) ContentType() int32
func (*AsyncFunctionReceiveAdapter) HandleReceive ¶
func (adapter *AsyncFunctionReceiveAdapter) HandleReceive(m *Message, ch Channel)
type BaseSendListener ¶
type BaseSendListener struct{}
BaseSendListener is a type that may be used to provide default methods for SendListener implementation
func (BaseSendListener) NotifyAfterWrite ¶
func (BaseSendListener) NotifyAfterWrite()
func (BaseSendListener) NotifyBeforeWrite ¶
func (BaseSendListener) NotifyBeforeWrite()
func (BaseSendListener) NotifyErr ¶
func (BaseSendListener) NotifyErr(error)
func (BaseSendListener) NotifyQueued ¶
func (BaseSendListener) NotifyQueued()
type BaseSendable ¶
type BaseSendable struct{}
BaseSendable is a type that may be used to provide default methods for Sendable implementation
func (BaseSendable) Context ¶
func (BaseSendable) Context() context.Context
func (BaseSendable) Msg ¶
func (BaseSendable) Msg() *Message
func (BaseSendable) Priority ¶
func (BaseSendable) Priority() Priority
func (BaseSendable) ReplyReceiver ¶
func (BaseSendable) ReplyReceiver() ReplyReceiver
func (BaseSendable) SendListener ¶
func (BaseSendable) SendListener() SendListener
type BindHandler ¶
type BindHandlerF ¶
func (BindHandlerF) BindChannel ¶
func (f BindHandlerF) BindChannel(binding Binding) error
type Binding ¶
type Binding interface { Bind(h BindHandler) error AddPeekHandler(h PeekHandler) AddTransformHandler(h TransformHandler) AddReceiveHandler(contentType int32, h ReceiveHandler) AddReceiveHandlerF(contentType int32, h ReceiveHandlerF) AddTypedReceiveHandler(h TypedReceiveHandler) AddErrorHandler(h ErrorHandler) AddCloseHandler(h CloseHandler) SetUserData(data interface{}) GetUserData() interface{} GetChannel() Channel }
Binding is used to add handlers to Channel.
NOTE: It is intended that the Add* methods are used at initial channel setup, and not invoked on an in-service Channel. This API may change in the future to enforce those semantics programmatically.
type Channel ¶
type Channel interface { Identity SetLogicalName(logicalName string) Sender io.Closer IsClosed() bool Underlay() Underlay StartRx() GetTimeSinceLastRead() time.Duration }
Channel represents an asynchronous, message-passing framework, designed to sit on top of an underlay.
func NewChannel ¶
func NewChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options) (Channel, error)
func NewChannelWithTransportConfiguration ¶
func NewChannelWithTransportConfiguration(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options, tcfg transport.Configuration) (Channel, error)
type CloseHandler ¶
type CloseHandler interface {
HandleClose(ch Channel)
}
type CloseHandlerF ¶
type CloseHandlerF func(ch Channel)
func (CloseHandlerF) HandleClose ¶
func (self CloseHandlerF) HandleClose(ch Channel)
type ConnectOptions ¶
type ConnectOptions struct { MaxQueuedConnects int MaxOutstandingConnects int ConnectTimeout time.Duration }
func DefaultConnectOptions ¶
func DefaultConnectOptions() ConnectOptions
func (*ConnectOptions) Validate ¶
func (co *ConnectOptions) Validate() error
type ConnectionHandler ¶
type ConnectionHandler interface {
HandleConnection(hello *Hello, certificates []*x509.Certificate) error
}
type Envelope ¶
type Envelope interface { // WithPriority returns an Envelope with the given priority WithPriority(p Priority) Envelope // WithTimeout returns a TimeoutEnvelope with a context using the given timeout WithTimeout(duration time.Duration) TimeoutEnvelope // Send sends the envelope on the given Channel Send(ch Channel) error // ReplyTo allows setting the reply header in a fluent style ReplyTo(msg *Message) Envelope // ToSendable converts the Envelope into a Sendable, which can be submitted to a Channel for sending ToSendable() Sendable }
Envelope allows setting message priority and context. Message is an Envelope (as well as a Sendable)
func NewErrorEnvelope ¶
type ErrorHandler ¶
type ErrorHandlerF ¶
func (ErrorHandlerF) HandleError ¶
func (self ErrorHandlerF) HandleError(err error, ch Channel)
type Headers ¶
func (Headers) PutBoolHeader ¶
func (Headers) PutByteHeader ¶
func (Headers) PutStringHeader ¶
func (Headers) PutUint16Header ¶
func (Headers) PutUint32Header ¶
func (Headers) PutUint64Header ¶
type HeartbeatCallback ¶
type HeartbeatCallback interface { HeartbeatTx(ts int64) HeartbeatRx(ts int64) HeartbeatRespTx(ts int64) HeartbeatRespRx(ts int64) CheckHeartBeat() }
HeartbeatCallback provide an interface that is notified when various heartbeat events take place
type Hello ¶
func UnmarshalHello ¶
type Identity ¶
type Identity interface { // The TokenId used to represent the identity of this channel to lower-level resources. // Id() *identity.TokenId // The LogicalName represents the purpose or usage of this channel (i.e. 'ctrl', 'mgmt' 'r/001', etc.) Usually used // by humans in understand the logical purpose of a channel. // LogicalName() string // The ConnectionId represents the identity of this Channel to internal API components ("instance identifier"). // Usually used by the Channel framework to differentiate Channel instances. // ConnectionId() string // Certificates contains the identity certificates provided by the peer. // Certificates() []*x509.Certificate // Label constructs a consistently-formatted string used for context logging purposes, from the components above. // Label() string }
type Message ¶
type Message struct { MessageHeader Body []byte }
func NewMessage ¶
func ReadV2 ¶ added in v0.18.28
ReadV2 reads a V2 message from the given reader and returns the unmarshalled message
func (*Message) ReplyReceiver ¶
func (m *Message) ReplyReceiver() ReplyReceiver
func (*Message) SendListener ¶
func (m *Message) SendListener() SendListener
func (*Message) SetSequence ¶
func (*Message) ToSendable ¶
func (*Message) WithPriority ¶
func (*Message) WithTimeout ¶
func (m *Message) WithTimeout(duration time.Duration) TimeoutEnvelope
type MessageHeader ¶
type MessageHeader struct { ContentType int32 Headers Headers // contains filtered or unexported fields }
func (*MessageHeader) GetBoolHeader ¶
func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)
func (*MessageHeader) GetByteHeader ¶
func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)
func (*MessageHeader) GetStringHeader ¶
func (header *MessageHeader) GetStringHeader(key int32) (string, bool)
func (*MessageHeader) GetUint16Header ¶
func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)
func (*MessageHeader) GetUint32Header ¶
func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)
func (*MessageHeader) GetUint64Header ¶
func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)
func (*MessageHeader) IsReply ¶
func (header *MessageHeader) IsReply() bool
func (*MessageHeader) IsReplyingTo ¶
func (header *MessageHeader) IsReplyingTo(sequence int32) bool
func (*MessageHeader) PutBoolHeader ¶
func (header *MessageHeader) PutBoolHeader(key int32, value bool)
func (*MessageHeader) PutByteHeader ¶
func (header *MessageHeader) PutByteHeader(key int32, value byte)
func (*MessageHeader) PutStringHeader ¶
func (header *MessageHeader) PutStringHeader(key int32, value string)
func (*MessageHeader) PutUint16Header ¶
func (header *MessageHeader) PutUint16Header(key int32, value uint16)
func (*MessageHeader) PutUint32Header ¶
func (header *MessageHeader) PutUint32Header(key int32, value uint32)
func (*MessageHeader) PutUint64Header ¶
func (header *MessageHeader) PutUint64Header(key int32, value uint64)
func (*MessageHeader) ReplyFor ¶
func (header *MessageHeader) ReplyFor() int32
func (*MessageHeader) Sequence ¶
func (header *MessageHeader) Sequence() int32
type Options ¶
type Options struct { OutQueueSize int ConnectOptions DelayRxStart bool WriteTimeout time.Duration }
func DefaultOptions ¶
func DefaultOptions() *Options
func LoadOptions ¶
type PeekHandler ¶
type ReceiveHandler ¶
type ReceiveHandlerF ¶
func (ReceiveHandlerF) HandleReceive ¶
func (self ReceiveHandlerF) HandleReceive(m *Message, ch Channel)
type ReplyReceiver ¶
type ReplyReceiver interface {
AcceptReply(*Message)
}
ReplyReceiver is used to get notified when a Message reply arrives
type Result ¶
func UnmarshalResult ¶
type SendListener ¶
type SendListener interface { // Notify Queued is called when the message has been queued for send NotifyQueued() // NotifyBeforeWrite is called before send is called NotifyBeforeWrite() // NotifyAfterWrite is called after the message has been written to the Underlay NotifyAfterWrite() // NotifyErr is called if the Sendable context errors before send or if writing to the Underlay fails NotifyErr(error) }
SendListener is notified at the various stages of a message send
type Sendable ¶
type Sendable interface { // Msg return the Message to send Msg() *Message // SetSequence sets a sequence number indicating in which order the message was received SetSequence(seq int32) // Sequence returns the sequence number Sequence() int32 // Priority returns the Priority of the Message Priority() Priority // Context returns the Context used for timeouts/cancelling message sends, etc Context() context.Context // SendListener returns the SendListener to invoke at each stage of the send operation SendListener() SendListener // ReplyReceiver returns the ReplyReceiver to be invoked when a reply for the message or received, or nil if // no ReplyReceiver should be invoked if or when a reply is received ReplyReceiver() ReplyReceiver }
Sendable encapsulates all the data and callbacks that a Channel requires when sending a Message.
type TimeoutEnvelope ¶
type TimeoutEnvelope interface { Envelope // SendAndWaitForWire will wait until the configured timeout or until the message is sent, whichever comes first // If the timeout happens first, the context error will be returned, wrapped by a TimeoutError SendAndWaitForWire(ch Channel) error // SendForReply will wait until the configured timeout or until a reply is received, whichever comes first // If the timeout happens first, the context error will be returned, wrapped by a TimeoutError SendForReply(ch Channel) (*Message, error) }
TimeoutEnvelope has timeout related convenience methods, such as waiting for a Message to be written to the wire or waiting for a Message reply
type TimeoutError ¶
type TimeoutError struct {
// contains filtered or unexported fields
}
TimeoutError is used to indicate a timeout happened
func (TimeoutError) Unwrap ¶
func (self TimeoutError) Unwrap() error
type TraceHandler ¶
type TraceHandler struct {
// contains filtered or unexported fields
}
func NewTraceHandler ¶
func NewTraceHandler(path string, id string) (*TraceHandler, error)
func (*TraceHandler) AddDecoder ¶
func (h *TraceHandler) AddDecoder(decoder TraceMessageDecoder)
func (TraceHandler) Close ¶
func (h TraceHandler) Close(ch Channel)
func (*TraceHandler) Connect ¶
func (h *TraceHandler) Connect(ch Channel, remoteAddress string)
func (TraceHandler) Rx ¶
func (h TraceHandler) Rx(msg *Message, ch Channel)
func (TraceHandler) Tx ¶
func (h TraceHandler) Tx(msg *Message, ch Channel)
type TraceMessageDecode ¶
type TraceMessageDecode map[string]interface{}
func NewTraceMessageDecode ¶
func NewTraceMessageDecode(decoder, message string) TraceMessageDecode
func (TraceMessageDecode) MarshalResult ¶
func (d TraceMessageDecode) MarshalResult() ([]byte, bool)
func (TraceMessageDecode) MarshalTraceMessageDecode ¶
func (d TraceMessageDecode) MarshalTraceMessageDecode() ([]byte, error)
type TraceMessageDecoder ¶
type TransformHandler ¶
type TypedReceiveHandler ¶
type TypedReceiveHandler interface { ContentType() int32 ReceiveHandler }
type Underlay ¶
type Underlay interface { Rx() (*Message, error) Tx(m *Message) error Identity io.Closer IsClosed() bool Headers() map[int32][]byte SetWriteTimeout(duration time.Duration) error }
Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.
type UnderlayFactory ¶
type UnderlayFactory interface {
Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error)
}
UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implement UnderlayFactory, to provide instances to Channel.
func NewClassicDialer ¶
func NewClassicDialerWithBindAddress ¶ added in v0.18.27
func NewExistingConnDialer ¶ added in v0.18.19
func NewExistingConnListener ¶ added in v0.18.19
func NewReconnectingDialer ¶
func NewReconnectingDialerWithHandlerAndLocalBinding ¶ added in v0.18.27
type UnderlayListener ¶
type UnderlayListener interface { Listen(handlers ...ConnectionHandler) error UnderlayFactory io.Closer }
UnderlayListener represents a component designed to listen for incoming peer connections.
func NewClassicListener ¶
func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, headers map[int32][]byte) UnderlayListener
func NewClassicListenerWithTransportConfiguration ¶
func NewClassicListenerWithTransportConfiguration(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, tcfg transport.Configuration, headers map[int32][]byte) UnderlayListener
type UnsupportedVersionError ¶
type UnsupportedVersionError struct {
// contains filtered or unexported fields
}
func (UnsupportedVersionError) Error ¶
func (u UnsupportedVersionError) Error() string
Source Files ¶
- channel.go
- classic_dialer.go
- classic_impl.go
- classic_listener.go
- constants.go
- decoder.go
- envelope.go
- existing_conn_dialer.go
- existing_conn_impl.go
- existing_conn_listener.go
- handler.go
- heartbeater.go
- impl.go
- message.go
- messages.go
- options.go
- ping.go
- priority.go
- reconnecting_dialer.go
- reconnecting_impl.go
- trace.go
- trace_decode.go
- ws_impl.go
- ws_listener.go