datachannel

package
v0.0.0-...-3bfc256 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

datachannel package implement data channel for interactive sessions.

Index

Constants

This section is empty.

Variables

View Source
var GetRoundTripTime = func(streamingMessage StreamingMessage) time.Duration {
	return time.Since(streamingMessage.LastSentTime)
}
View Source
var ProcessAcknowledgedMessageCall = func(log log.T, dataChannel *DataChannel, acknowledgeMessage message.AcknowledgeContent) error {
	return dataChannel.ProcessAcknowledgedMessage(log, acknowledgeMessage)
}
View Source
var SendAcknowledgeMessageCall = func(log log.T, dataChannel *DataChannel, streamDataMessage message.ClientMessage) error {
	return dataChannel.SendAcknowledgeMessage(log, streamDataMessage)
}
View Source
var SendMessageCall = func(log log.T, dataChannel *DataChannel, input []byte, inputType int) error {
	return dataChannel.SendMessage(log, input, inputType)
}

Functions

This section is empty.

Types

type DataChannel

type DataChannel struct {
	Role                  string
	ClientId              string
	SessionId             string
	TargetId              string
	IsAwsCliUpgradeNeeded bool
	//records sequence number of last acknowledged message received over data channel
	ExpectedSequenceNumber int64
	//records sequence number of last stream data message sent over data channel
	StreamDataSequenceNumber int64
	//buffer to store outgoing stream messages until acknowledged
	//using linked list for this buffer as access to oldest message is required and it support faster deletion from any position of list
	OutgoingMessageBuffer ListMessageBuffer
	//buffer to store incoming stream messages if received out of sequence
	//using map for this buffer as incoming messages can be out of order and retrieval would be faster by sequenceId
	IncomingMessageBuffer MapMessageBuffer
	//round trip time of latest acknowledged message
	RoundTripTime float64
	//round trip time variation of latest acknowledged message
	RoundTripTimeVariation float64
	//timeout used for resending unacknowledged message
	RetransmissionTimeout time.Duration
	// contains filtered or unexported fields
}

DataChannel used for communication between the mgs and the cli.

func (*DataChannel) AddDataToIncomingMessageBuffer

func (dataChannel *DataChannel) AddDataToIncomingMessageBuffer(streamMessage StreamingMessage)

AddDataToIncomingMessageBuffer adds given message to IncomingMessageBuffer if it has capacity

func (*DataChannel) AddDataToOutgoingMessageBuffer

func (dataChannel *DataChannel) AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage)

AddDataToOutgoingMessageBuffer removes first message from OutgoingMessageBuffer if capacity is full and adds given message at the end

func (*DataChannel) CalculateRetransmissionTimeout

func (dataChannel *DataChannel) CalculateRetransmissionTimeout(log log.T, streamingMessage StreamingMessage)

CalculateRetransmissionTimeout calculates message retransmission timeout value based on round trip time on given message

func (*DataChannel) Close

func (dataChannel *DataChannel) Close(log log.T) error

Close closes datachannel - its web socket connection

func (*DataChannel) DeregisterOutputStreamHandler

func (dataChannel *DataChannel) DeregisterOutputStreamHandler(handler OutputStreamDataMessageHandler)

DeregisterOutputStreamHandler deregisters a handler previously registered using RegisterOutputStreamHandler

func (*DataChannel) FinalizeDataChannelHandshake

func (dataChannel *DataChannel) FinalizeDataChannelHandshake(log log.T, tokenValue string) (err error)

FinalizeHandshake sends the token for service to acknowledge the connection.

func (*DataChannel) GetAgentVersion

func (dataChannel *DataChannel) GetAgentVersion() string

GetAgentVersion returns agent version of the target instance

func (*DataChannel) GetSessionProperties

func (dataChannel *DataChannel) GetSessionProperties() interface{}

GetSessionProperties returns SessionProperties of the dataChannel

func (*DataChannel) GetSessionType

func (dataChannel *DataChannel) GetSessionType() string

GetSessionType returns SessionType of the dataChannel

func (*DataChannel) GetStreamDataSequenceNumber

func (dataChannel *DataChannel) GetStreamDataSequenceNumber() int64

GetStreamDataSequenceNumber returns StreamDataSequenceNumber of the dataChannel

func (*DataChannel) GetWsChannel

func (dataChannel *DataChannel) GetWsChannel() communicator.IWebSocketChannel

GetWsChannel returns WsChannel of the dataChannel

func (*DataChannel) HandleAcknowledgeMessage

func (dataChannel *DataChannel) HandleAcknowledgeMessage(
	log log.T,
	outputMessage message.ClientMessage) (err error)

handleAcknowledgeMessage deserialize acknowledge content and process it

func (DataChannel) HandleChannelClosedMessage

func (dataChannel DataChannel) HandleChannelClosedMessage(log log.T, stopHandler Stop, sessionId string, outputMessage message.ClientMessage)

handleChannelClosedMessage exits the shell

func (*DataChannel) HandleOutputMessage

func (dataChannel *DataChannel) HandleOutputMessage(
	log log.T,
	outputMessage message.ClientMessage,
	rawMessage []byte) (err error)

handleOutputMessage handles incoming stream data message by processing the payload and updating expectedSequenceNumber

func (*DataChannel) Initialize

func (dataChannel *DataChannel) Initialize(log log.T, clientId string, sessionId string, targetId string, isAwsCliUpgradeNeeded bool)

Initialize populates the data channel object with the correct values.

func (*DataChannel) IsSessionTypeSet

func (dataChannel *DataChannel) IsSessionTypeSet() chan bool

IsSessionTypeSet check has data channel sessionType been set

func (*DataChannel) IsStreamMessageResendTimeout

func (dataChannel *DataChannel) IsStreamMessageResendTimeout() chan bool

IsStreamMessageResendTimeout checks if resending a streaming message reaches timeout

func (*DataChannel) Open

func (dataChannel *DataChannel) Open(log log.T) (err error)

Open opens websocket connects and does final handshake to acknowledge connection

func (*DataChannel) OutputMessageHandler

func (dataChannel *DataChannel) OutputMessageHandler(log log.T, stopHandler Stop, sessionID string, rawMessage []byte) error

OutputMessageHandler gets output on the data channel

func (*DataChannel) ProcessAcknowledgedMessage

func (dataChannel *DataChannel) ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent message.AcknowledgeContent) error

ProcessAcknowledgedMessage processes acknowledge messages by deleting them from OutgoingMessageBuffer

func (*DataChannel) ProcessIncomingMessageBufferItems

func (dataChannel *DataChannel) ProcessIncomingMessageBufferItems(log log.T,
	outputMessage message.ClientMessage) (err error)

processIncomingMessageBufferItems check if new expected sequence stream data is present in IncomingMessageBuffer. If so process it and increment expected sequence number. Repeat until expected sequence stream data is not found in IncomingMessageBuffer.

func (*DataChannel) ProcessKMSEncryptionHandshakeAction

func (dataChannel *DataChannel) ProcessKMSEncryptionHandshakeAction(log log.T, actionParams json.RawMessage) (err error)

ProcessKMSEncryptionHandshakeAction sets up the encrypter and calls KMS to generate a new data key. This is triggered when encryption is specified in HandshakeRequest

func (*DataChannel) ProcessSessionTypeHandshakeAction

func (dataChannel *DataChannel) ProcessSessionTypeHandshakeAction(actionParams json.RawMessage) (err error)

ProcessSessionTypeHandshakeAction processes session type action in HandshakeRequest. This sets the session type in the datachannel.

func (*DataChannel) Reconnect

func (dataChannel *DataChannel) Reconnect(log log.T) (err error)

Reconnect calls ResumeSession API to reconnect datachannel when connection is lost

func (*DataChannel) RegisterOutputStreamHandler

func (dataChannel *DataChannel) RegisterOutputStreamHandler(handler OutputStreamDataMessageHandler, isSessionSpecificHandler bool)

RegisterOutputStreamHandler register a handler for messages of type OutputStream. This is usually called by the plugin.

func (*DataChannel) RemoveDataFromIncomingMessageBuffer

func (dataChannel *DataChannel) RemoveDataFromIncomingMessageBuffer(sequenceNumber int64)

RemoveDataFromIncomingMessageBuffer removes given sequence number message from IncomingMessageBuffer

func (*DataChannel) RemoveDataFromOutgoingMessageBuffer

func (dataChannel *DataChannel) RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element)

RemoveDataFromOutgoingMessageBuffer removes given element from OutgoingMessageBuffer

func (*DataChannel) ResendStreamDataMessageScheduler

func (dataChannel *DataChannel) ResendStreamDataMessageScheduler(log log.T) (err error)

ResendStreamDataMessageScheduler spawns a separate go thread which keeps checking OutgoingMessageBuffer at fixed interval and resends first message if time elapsed since lastSentTime of the message is more than acknowledge wait time

func (*DataChannel) SendAcknowledgeMessage

func (dataChannel *DataChannel) SendAcknowledgeMessage(log log.T, streamDataMessage message.ClientMessage) (err error)

SendAcknowledgeMessage sends acknowledge message for stream data over data channel

func (*DataChannel) SendFlag

func (dataChannel *DataChannel) SendFlag(
	log log.T,
	flagType message.PayloadTypeFlag) (err error)

SendFlag sends a data message with PayloadType as given flag.

func (*DataChannel) SendInputDataMessage

func (dataChannel *DataChannel) SendInputDataMessage(
	log log.T,
	payloadType message.PayloadType,
	inputData []byte) (err error)

SendInputDataMessage sends a data message in a form of ClientMessage.

func (*DataChannel) SendMessage

func (dataChannel *DataChannel) SendMessage(log log.T, input []byte, inputType int) error

SendMessage sends a message to the service through datachannel

func (*DataChannel) SetAgentVersion

func (dataChannel *DataChannel) SetAgentVersion(agentVersion string)

SetAgentVersion set agent version of the target instance

func (*DataChannel) SetSessionType

func (dataChannel *DataChannel) SetSessionType(sessionType string)

SetSessionType set session type

func (*DataChannel) SetWebsocket

func (dataChannel *DataChannel) SetWebsocket(log log.T, channelUrl string, channelToken string)

SetWebsocket function populates websocket channel object

func (*DataChannel) SetWsChannel

func (dataChannel *DataChannel) SetWsChannel(wsChannel communicator.IWebSocketChannel)

SetWsChannel set WsChannel of the dataChannel

type IDataChannel

type IDataChannel interface {
	Initialize(log log.T, clientId string, sessionId string, targetId string, isAwsCliUpgradeNeeded bool)
	SetWebsocket(log log.T, streamUrl string, tokenValue string)
	Reconnect(log log.T) error
	SendFlag(log log.T, flagType message.PayloadTypeFlag) error
	Open(log log.T) error
	Close(log log.T) error
	FinalizeDataChannelHandshake(log log.T, tokenValue string) error
	SendInputDataMessage(log log.T, payloadType message.PayloadType, inputData []byte) error
	ResendStreamDataMessageScheduler(log log.T) error
	ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent message.AcknowledgeContent) error
	OutputMessageHandler(log log.T, stopHandler Stop, sessionID string, rawMessage []byte) error
	SendAcknowledgeMessage(log log.T, clientMessage message.ClientMessage) error
	AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage)
	RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element)
	AddDataToIncomingMessageBuffer(streamMessage StreamingMessage)
	RemoveDataFromIncomingMessageBuffer(sequenceNumber int64)
	CalculateRetransmissionTimeout(log log.T, streamingMessage StreamingMessage)
	SendMessage(log log.T, input []byte, inputType int) error
	RegisterOutputStreamHandler(handler OutputStreamDataMessageHandler, isSessionSpecificHandler bool)
	DeregisterOutputStreamHandler(handler OutputStreamDataMessageHandler)
	IsSessionTypeSet() chan bool
	IsStreamMessageResendTimeout() chan bool
	GetSessionType() string
	SetSessionType(sessionType string)
	GetSessionProperties() interface{}
	GetWsChannel() communicator.IWebSocketChannel
	SetWsChannel(wsChannel communicator.IWebSocketChannel)
	GetStreamDataSequenceNumber() int64
	GetAgentVersion() string
	SetAgentVersion(agentVersion string)
}

type ListMessageBuffer

type ListMessageBuffer struct {
	Messages *list.List
	Capacity int
	Mutex    *sync.Mutex
}

type MapMessageBuffer

type MapMessageBuffer struct {
	Messages map[int64]StreamingMessage
	Capacity int
	Mutex    *sync.Mutex
}

type OutputStreamDataMessageHandler

type OutputStreamDataMessageHandler func(log log.T, streamDataMessage message.ClientMessage) (bool, error)

type Stop

type Stop func()

type StreamingMessage

type StreamingMessage struct {
	Content        []byte
	SequenceNumber int64
	LastSentTime   time.Time
	ResendAttempt  *int
}

Directories

Path Synopsis
Code generated by mockery 2.7.4.
Code generated by mockery 2.7.4.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL