xgress

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2024 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MinHeaderKey = 2000
	MaxHeaderKey = MinHeaderKey + int32(math.MaxUint8)

	HeaderKeyCircuitId      = 2256
	HeaderKeySequence       = 2257
	HeaderKeyFlags          = 2258
	HeaderKeyRecvBufferSize = 2259
	HeaderKeyRTT            = 2260
	HeaderPayloadRaw        = 2261

	ContentTypePayloadType         = 1100
	ContentTypeAcknowledgementType = 1101
	ContentTypeControlType         = 1102
)
View Source
const (
	ControlHopCount  = 20
	ControlHopType   = 21
	ControlHopId     = 22
	ControlTimestamp = 23
	ControlUserVal   = 24
	ControlError     = 25
)
View Source
const (
	VersionMask        byte = 0b00000110
	TerminatorFlagMask byte = 0b00001000
	RttFlagMask        byte = 0b00010000
	ChunkFlagMask      byte = 0b00100000
	HeadersFlagMask    byte = 0b01000000
	HeartbeatFlagMask  byte = 0b10000000

	CircuitIdSizeMask     byte = 0b00001111
	PayloadProtocolV1     byte = 1
	PayloadProtocolOffset byte = 1
)
View Source
const DECODER = "data"
View Source
const (
	HeaderKeyUUID = 0
)

Variables

View Source
var ContentTypeValue = map[string]int32{
	"PayloadType":         ContentTypePayloadType,
	"AcknowledgementType": ContentTypeAcknowledgementType,
	"ControlType":         ContentTypeControlType,
}

Functions

func DecodePayload

func DecodePayload(payload *Payload) ([]byte, bool)

func GlobalRegistry

func GlobalRegistry() *registry

func InitAcker

func InitAcker(forwarder PayloadBufferForwarder, metrics metrics.Registry, closeNotify <-chan struct{})

func InitMetrics

func InitMetrics(registry metrics.Registry)

func InitPayloadIngester

func InitPayloadIngester(closeNotify <-chan struct{})

func InitRetransmitter

func InitRetransmitter(forwarder PayloadBufferForwarder, faultReporter RetransmitterFaultReporter, metrics metrics.Registry, closeNotify <-chan struct{})

func NewRegistry

func NewRegistry() *registry

func ReadUntil

func ReadUntil(peer transport.Conn, stop byte) ([]byte, error)

func ReadUntilNewline

func ReadUntilNewline(peer transport.Conn) ([]byte, error)

func RemoveTerminator

func RemoveTerminator(ctrls networkControllers, terminatorId string) error

func RemoveTerminators

func RemoveTerminators(ctrls networkControllers, terminatorIds []string)

func RespondToTraceRequest

func RespondToTraceRequest(headers channel.Headers, hopType, hopId string, response ControlReceiver)

func SendRequest

func SendRequest(request *Request, peer io.Writer) error

func SendResponse

func SendResponse(response *Response, peer io.Writer) error

func SetOriginatorFlag

func SetOriginatorFlag(flags uint32, originator Originator) uint32

func UnmarshallPacketPayload added in v1.1.13

func UnmarshallPacketPayload(buf []byte) (*channel.Message, error)

Types

type Acker

type Acker struct {
	// contains filtered or unexported fields
}

Note: if altering this struct, be sure to account for 64 bit alignment on 32 bit arm arch https://pkg.go.dev/sync/atomic#pkg-note-BUG https://github.com/golang/go/issues/36606

func NewAcker

func NewAcker(forwarder PayloadBufferForwarder, metrics metrics.Registry, closeNotify <-chan struct{}) *Acker

type Acknowledgement

type Acknowledgement struct {
	CircuitId      string
	Flags          uint32
	RecvBufferSize uint32
	RTT            uint16
	Sequence       []int32
}

func NewAcknowledgement

func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement

func UnmarshallAcknowledgement

func UnmarshallAcknowledgement(msg *channel.Message) (*Acknowledgement, error)

func (*Acknowledgement) GetCircuitId added in v1.1.13

func (ack *Acknowledgement) GetCircuitId() string

func (*Acknowledgement) GetFlags added in v1.1.13

func (ack *Acknowledgement) GetFlags() uint32

func (*Acknowledgement) GetLoggerFields

func (ack *Acknowledgement) GetLoggerFields() logrus.Fields

func (*Acknowledgement) GetOriginator added in v1.1.13

func (ack *Acknowledgement) GetOriginator() Originator

func (*Acknowledgement) GetSequence

func (ack *Acknowledgement) GetSequence() []int32

func (*Acknowledgement) Marshall

func (ack *Acknowledgement) Marshall() *channel.Message

type Address

type Address string

type BindHandler

type BindHandler interface {
	HandleXgressBind(x *Xgress)
}

The BindHandlers are invoked to install the appropriate handlers.

type CircuitInfo

type CircuitInfo struct {
	CtrlId      string
	CircuitId   *identity.TokenId
	Address     Address
	ResponseMsg *channel.Message
}

func GetCircuit

func GetCircuit(ctrl networkControllers, ingressId string, service string, timeout time.Duration, peerData map[uint32][]byte) (*CircuitInfo, error)

type CloseHandler

type CloseHandler interface {
	// HandleXgressClose is invoked when the connected peer terminates the communication.
	//
	HandleXgressClose(x *Xgress)
}

CloseHandler is invoked by an xgress when the connected peer terminates the communication.

type CloseHandlerF

type CloseHandlerF func(x *Xgress)

CloseHandlerF is the function version of CloseHandler

func (CloseHandlerF) HandleXgressClose

func (self CloseHandlerF) HandleXgressClose(x *Xgress)

type CloseHelper

type CloseHelper struct {
	// contains filtered or unexported fields
}

func (*CloseHelper) Close

func (self *CloseHelper) Close() error

func (*CloseHelper) Init

func (self *CloseHelper) Init(closer io.Closer)

type Connection

type Connection interface {
	io.Closer
	LogContext() string
	ReadPayload() ([]byte, map[uint8][]byte, error)
	WritePayload([]byte, map[uint8][]byte) (int, error)
	HandleControlMsg(controlType ControlType, headers channel.Headers, responder ControlReceiver) error
}

type Control

type Control struct {
	Type      ControlType
	CircuitId string
	Headers   channel.Headers
}

func UnmarshallControl

func UnmarshallControl(msg *channel.Message) (*Control, error)

func (*Control) CreateTraceResponse

func (self *Control) CreateTraceResponse(hopType, hopId string) *Control

func (*Control) DecrementAndGetHop

func (self *Control) DecrementAndGetHop() uint32

func (*Control) GetLoggerFields

func (self *Control) GetLoggerFields() logrus.Fields

func (*Control) IsTypeTraceRoute

func (self *Control) IsTypeTraceRoute() bool

func (*Control) IsTypeTraceRouteResponse

func (self *Control) IsTypeTraceRouteResponse() bool

func (*Control) Marshall

func (self *Control) Marshall() *channel.Message

type ControlReceiver

type ControlReceiver interface {
	HandleControlReceive(controlType ControlType, headers channel.Headers)
}

type ControlType

type ControlType byte
const (
	ControlTypeTraceRoute         ControlType = 1
	ControlTypeTraceRouteResponse ControlType = 2
)

func (ControlType) String

func (self ControlType) String() string

type Decoder

type Decoder struct{}

func (Decoder) Decode

func (d Decoder) Decode(msg *channel.Message) ([]byte, bool)

type DialParams

type DialParams interface {
	GetCtrlId() string
	GetDestination() string
	GetCircuitId() *identity.TokenId
	GetAddress() Address
	GetBindHandler() BindHandler
	GetLogContext() logcontext.Context
	GetDeadline() time.Time
	GetCircuitTags() map[string]string
}

type Dialer

type Dialer interface {
	Dial(params DialParams) (xt.PeerData, error)
	IsTerminatorValid(id string, destination string) bool
}

type Factory

type Factory interface {
	CreateListener(optionsData OptionsData) (Listener, error)
	CreateDialer(optionsData OptionsData) (Dialer, error)
}

type Flag added in v1.1.13

type Flag uint32
const (
	PayloadFlagCircuitEnd   Flag = 1
	PayloadFlagOriginator   Flag = 2
	PayloadFlagCircuitStart Flag = 4
	PayloadFlagChunk        Flag = 8
)

type Inspectable added in v0.33.0

type Inspectable interface {
	Inspect(key string, timeout time.Duration) any
}

type InspectableDialer added in v0.31.1

type InspectableDialer interface {
	Dialer
	InspectTerminator(id string, destination string, fixInvalid bool) (bool, string)
}

type InvalidTerminatorError

type InvalidTerminatorError struct {
	InnerError error
}

func (InvalidTerminatorError) Error

func (e InvalidTerminatorError) Error() string

func (InvalidTerminatorError) Unwrap

func (e InvalidTerminatorError) Unwrap() error

type LinkReceiveBuffer

type LinkReceiveBuffer struct {
	// contains filtered or unexported fields
}

func NewLinkReceiveBuffer

func NewLinkReceiveBuffer() *LinkReceiveBuffer

func (*LinkReceiveBuffer) Inspect

func (*LinkReceiveBuffer) PeekHead

func (buffer *LinkReceiveBuffer) PeekHead() *Payload

func (*LinkReceiveBuffer) ReceiveUnordered

func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint32) bool

func (*LinkReceiveBuffer) Remove

func (buffer *LinkReceiveBuffer) Remove(payload *Payload)

func (*LinkReceiveBuffer) Size

func (buffer *LinkReceiveBuffer) Size() uint32

type LinkSendBuffer

type LinkSendBuffer struct {
	// contains filtered or unexported fields
}

Note: if altering this struct, be sure to account for 64 bit alignment on 32 bit arm arch https://pkg.go.dev/sync/atomic#pkg-note-BUG https://github.com/golang/go/issues/36606

func NewLinkSendBuffer

func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer

func (*LinkSendBuffer) BufferPayload

func (buffer *LinkSendBuffer) BufferPayload(payload *Payload) (func(), error)

func (*LinkSendBuffer) Close

func (buffer *LinkSendBuffer) Close()

func (*LinkSendBuffer) CloseWhenEmpty

func (buffer *LinkSendBuffer) CloseWhenEmpty() bool

func (*LinkSendBuffer) Inspect

func (buffer *LinkSendBuffer) Inspect() *inspect.XgressSendBufferDetail

func (*LinkSendBuffer) ReceiveAcknowledgement

func (buffer *LinkSendBuffer) ReceiveAcknowledgement(ack *Acknowledgement)

type Listener

type Listener interface {
	Listen(address string, bindHandler BindHandler) error
	Close() error
}

type MisconfiguredTerminatorError

type MisconfiguredTerminatorError struct {
	InnerError error
}

func (MisconfiguredTerminatorError) Error

func (MisconfiguredTerminatorError) Unwrap

type Options

type Options struct {
	Mtu         int32
	RandomDrops bool
	Drop1InN    int32
	TxQueueSize int32

	TxPortalStartSize      uint32
	TxPortalMaxSize        uint32
	TxPortalMinSize        uint32
	TxPortalIncreaseThresh uint32
	TxPortalIncreaseScale  float64
	TxPortalRetxThresh     uint32
	TxPortalRetxScale      float64
	TxPortalDupAckThresh   uint32
	TxPortalDupAckScale    float64

	RxBufferSize uint32
	RetxStartMs  uint32
	RetxScale    float64
	RetxAddMs    uint32

	MaxCloseWait        time.Duration
	GetCircuitTimeout   time.Duration
	CircuitStartTimeout time.Duration
	ConnectTimeout      time.Duration
}

Options contains common Xgress configuration options

func DefaultOptions

func DefaultOptions() *Options

func LoadOptions

func LoadOptions(data OptionsData) (*Options, error)

func (Options) String

func (options Options) String() string

type OptionsData

type OptionsData map[interface{}]interface{}

type Originator

type Originator int32
const (
	Initiator  Originator = 0
	Terminator Originator = 1
)

func (Originator) String

func (o Originator) String() string

type Payload

type Payload struct {
	CircuitId string
	Flags     uint32
	RTT       uint16
	Sequence  int32
	Headers   map[uint8][]byte
	Data      []byte
	// contains filtered or unexported fields
}

func UnmarshallPayload

func UnmarshallPayload(msg *channel.Message) (*Payload, error)

func (*Payload) GetCircuitId added in v1.1.13

func (payload *Payload) GetCircuitId() string

func (*Payload) GetFlags added in v1.1.13

func (payload *Payload) GetFlags() uint32

func (*Payload) GetLoggerFields

func (payload *Payload) GetLoggerFields() logrus.Fields

func (*Payload) GetOriginator added in v1.1.13

func (payload *Payload) GetOriginator() Originator

func (*Payload) GetSequence

func (payload *Payload) GetSequence() int32

func (*Payload) IsCircuitEndFlagSet

func (payload *Payload) IsCircuitEndFlagSet() bool

func (*Payload) IsCircuitStartFlagSet

func (payload *Payload) IsCircuitStartFlagSet() bool

func (*Payload) Marshall

func (payload *Payload) Marshall() *channel.Message

type PayloadBufferForwarder

type PayloadBufferForwarder interface {
	RetransmitPayload(srcAddr Address, payload *Payload) error
	ForwardAcknowledgement(srcAddr Address, acknowledgement *Acknowledgement) error
}

type PayloadIngester

type PayloadIngester struct {
	// contains filtered or unexported fields
}

func NewPayloadIngester

func NewPayloadIngester(closeNotify <-chan struct{}) *PayloadIngester

type PayloadTransformer added in v1.1.13

type PayloadTransformer struct {
}

func (PayloadTransformer) Rx added in v1.1.13

func (self PayloadTransformer) Rx(*channel.Message, channel.Channel)

func (PayloadTransformer) Tx added in v1.1.13

func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel)

type PayloadType added in v1.1.13

type PayloadType byte
const (
	PayloadTypeXg  PayloadType = 1
	PayloadTypeRtx PayloadType = 2
	PayloadTypeFwd PayloadType = 3
)

type PeekHandler

type PeekHandler interface {
	Rx(x *Xgress, payload *Payload)
	Tx(x *Xgress, payload *Payload)
	Close(x *Xgress)
}

PeekHandler allows registering watcher to react to data flowing an xgress instance

type ReceiveHandler

type ReceiveHandler interface {
	// HandleXgressReceive is invoked when data is received from the connected xgress peer.
	//
	HandleXgressReceive(payload *Payload, x *Xgress)
	HandleControlReceive(control *Control, x *Xgress)
}

ReceiveHandler is invoked by an xgress whenever data is received from the connected peer. Generally a ReceiveHandler is implemented to connect the xgress to a data plane data transmission system.

type Request

type Request struct {
	Id        string `json:"id"`
	ServiceId string `json:"svcId"`
}

func ReceiveRequest

func ReceiveRequest(peer transport.Conn) (*Request, error)

func RequestFromJSON

func RequestFromJSON(payload []byte) (*Request, error)

func (*Request) ToJSON

func (r *Request) ToJSON() ([]byte, error)

type Response

type Response struct {
	Success   bool   `json:"scc"`
	Message   string `json:"msg"`
	CircuitId string `json:"circuitId"`
}

func CreateCircuit

func CreateCircuit(ctrl networkControllers, peer Connection, request *Request, bindHandler BindHandler, options *Options) *Response

func ReceiveResponse

func ReceiveResponse(peer transport.Conn) (*Response, error)

func ResponseFromJSON

func ResponseFromJSON(payload []byte) (*Response, error)

func (*Response) ToJSON

func (r *Response) ToJSON() ([]byte, error)

type Retransmitter

type Retransmitter struct {
	// contains filtered or unexported fields
}

func NewRetransmitter

func NewRetransmitter(forwarder PayloadBufferForwarder, faultReporter RetransmitterFaultReporter, metrics metrics.Registry, closeNotify <-chan struct{}) *Retransmitter

type RetransmitterFaultReporter

type RetransmitterFaultReporter interface {
	ReportForwardingFault(circuitId string, ctrlId string)
}

type Xgress

type Xgress struct {
	Options *Options
	// contains filtered or unexported fields
}

func NewXgress

func NewXgress(circuitId string, ctrlId string, address Address, peer Connection, originator Originator, options *Options, tags map[string]string) *Xgress

func (*Xgress) AddCloseHandler

func (self *Xgress) AddCloseHandler(closeHandler CloseHandler)

func (*Xgress) AddPeekHandler

func (self *Xgress) AddPeekHandler(peekHandler PeekHandler)

func (*Xgress) Address

func (self *Xgress) Address() Address

func (*Xgress) CircuitId

func (self *Xgress) CircuitId() string

func (*Xgress) Close

func (self *Xgress) Close()

Things which can trigger close

1. Read fails 2. Write fails 3. End of Circuit received 4. Unroute received

func (*Xgress) CloseTimeout

func (self *Xgress) CloseTimeout(duration time.Duration)

func (*Xgress) Closed

func (self *Xgress) Closed() bool

func (*Xgress) CtrlId

func (self *Xgress) CtrlId() string

func (*Xgress) ForwardEndOfCircuit

func (self *Xgress) ForwardEndOfCircuit(sendF func(payload *Payload) bool)

func (*Xgress) GetEndCircuit

func (self *Xgress) GetEndCircuit() *Payload

func (*Xgress) GetIntervalId

func (self *Xgress) GetIntervalId() string

func (*Xgress) GetSequence

func (self *Xgress) GetSequence() uint64

func (*Xgress) GetStartCircuit

func (self *Xgress) GetStartCircuit() *Payload

func (*Xgress) GetTags

func (self *Xgress) GetTags() map[string]string
func (self *Xgress) GetTimeOfLastRxFromLink() int64

func (*Xgress) HandleControlReceive

func (self *Xgress) HandleControlReceive(controlType ControlType, headers channel.Headers)

func (*Xgress) InspectCircuit

func (self *Xgress) InspectCircuit(detail *inspect.CircuitInspectDetail)

func (*Xgress) IsCircuitStarted

func (self *Xgress) IsCircuitStarted() bool

func (*Xgress) IsEndOfCircuitReceived

func (self *Xgress) IsEndOfCircuitReceived() bool

func (*Xgress) IsEndOfCircuitSent

func (self *Xgress) IsEndOfCircuitSent() bool

func (*Xgress) IsTerminator

func (self *Xgress) IsTerminator() bool

func (*Xgress) Label

func (self *Xgress) Label() string

func (*Xgress) Originator

func (self *Xgress) Originator() Originator

func (*Xgress) PayloadReceived

func (self *Xgress) PayloadReceived(payload *Payload)

func (*Xgress) SendAcknowledgement

func (self *Xgress) SendAcknowledgement(acknowledgement *Acknowledgement) error

func (*Xgress) SendControl

func (self *Xgress) SendControl(control *Control) error

func (*Xgress) SendEmptyAck

func (self *Xgress) SendEmptyAck()

func (*Xgress) SendPayload

func (self *Xgress) SendPayload(payload *Payload, _ time.Duration, _ PayloadType) error

func (*Xgress) SetReceiveHandler

func (self *Xgress) SetReceiveHandler(receiveHandler ReceiveHandler)

func (*Xgress) Start

func (self *Xgress) Start()

func (*Xgress) Unrouted

func (self *Xgress) Unrouted()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL