Documentation ¶
Overview ¶
datachannel package implement data channel for interactive sessions.
Index ¶
- Variables
- type DataChannel
- func (dataChannel *DataChannel) AddDataToIncomingMessageBuffer(streamMessage StreamingMessage)
- func (dataChannel *DataChannel) AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage)
- func (dataChannel *DataChannel) CalculateRetransmissionTimeout(log log.T, streamingMessage StreamingMessage)
- func (dataChannel *DataChannel) Close(log log.T) error
- func (dataChannel *DataChannel) DeregisterOutputStreamHandler(handler OutputStreamDataMessageHandler)
- func (dataChannel *DataChannel) FinalizeDataChannelHandshake(log log.T, tokenValue string) (err error)
- func (dataChannel *DataChannel) GetAgentVersion() string
- func (dataChannel *DataChannel) GetSessionProperties() interface{}
- func (dataChannel *DataChannel) GetSessionType() string
- func (dataChannel *DataChannel) GetStreamDataSequenceNumber() int64
- func (dataChannel *DataChannel) GetWsChannel() communicator.IWebSocketChannel
- func (dataChannel *DataChannel) HandleAcknowledgeMessage(log log.T, outputMessage message.ClientMessage) (err error)
- func (dataChannel DataChannel) HandleChannelClosedMessage(log log.T, stopHandler Stop, sessionId string, ...)
- func (dataChannel *DataChannel) HandleOutputMessage(log log.T, outputMessage message.ClientMessage, rawMessage []byte) (err error)
- func (dataChannel *DataChannel) Initialize(log log.T, clientId string, sessionId string, targetId string, ...)
- func (dataChannel *DataChannel) IsSessionTypeSet() chan bool
- func (dataChannel *DataChannel) IsStreamMessageResendTimeout() chan bool
- func (dataChannel *DataChannel) Open(log log.T) (err error)
- func (dataChannel *DataChannel) OutputMessageHandler(log log.T, stopHandler Stop, sessionID string, rawMessage []byte) error
- func (dataChannel *DataChannel) ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent message.AcknowledgeContent) error
- func (dataChannel *DataChannel) ProcessIncomingMessageBufferItems(log log.T, outputMessage message.ClientMessage) (err error)
- func (dataChannel *DataChannel) ProcessKMSEncryptionHandshakeAction(log log.T, actionParams json.RawMessage) (err error)
- func (dataChannel *DataChannel) ProcessSessionTypeHandshakeAction(actionParams json.RawMessage) (err error)
- func (dataChannel *DataChannel) Reconnect(log log.T) (err error)
- func (dataChannel *DataChannel) RegisterOutputStreamHandler(handler OutputStreamDataMessageHandler, isSessionSpecificHandler bool)
- func (dataChannel *DataChannel) RemoveDataFromIncomingMessageBuffer(sequenceNumber int64)
- func (dataChannel *DataChannel) RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element)
- func (dataChannel *DataChannel) ResendStreamDataMessageScheduler(log log.T) (err error)
- func (dataChannel *DataChannel) SendAcknowledgeMessage(log log.T, streamDataMessage message.ClientMessage) (err error)
- func (dataChannel *DataChannel) SendFlag(log log.T, flagType message.PayloadTypeFlag) (err error)
- func (dataChannel *DataChannel) SendInputDataMessage(log log.T, payloadType message.PayloadType, inputData []byte) (err error)
- func (dataChannel *DataChannel) SendMessage(log log.T, input []byte, inputType int) error
- func (dataChannel *DataChannel) SetAgentVersion(agentVersion string)
- func (dataChannel *DataChannel) SetSessionType(sessionType string)
- func (dataChannel *DataChannel) SetWebsocket(log log.T, channelUrl string, channelToken string)
- func (dataChannel *DataChannel) SetWsChannel(wsChannel communicator.IWebSocketChannel)
- type IDataChannel
- type ListMessageBuffer
- type MapMessageBuffer
- type OutputStreamDataMessageHandler
- type Stop
- type StreamingMessage
Constants ¶
This section is empty.
Variables ¶
var GetRoundTripTime = func(streamingMessage StreamingMessage) time.Duration { return time.Since(streamingMessage.LastSentTime) }
var ProcessAcknowledgedMessageCall = func(log log.T, dataChannel *DataChannel, acknowledgeMessage message.AcknowledgeContent) error {
return dataChannel.ProcessAcknowledgedMessage(log, acknowledgeMessage)
}
var SendAcknowledgeMessageCall = func(log log.T, dataChannel *DataChannel, streamDataMessage message.ClientMessage) error {
return dataChannel.SendAcknowledgeMessage(log, streamDataMessage)
}
var SendMessageCall = func(log log.T, dataChannel *DataChannel, input []byte, inputType int) error {
return dataChannel.SendMessage(log, input, inputType)
}
Functions ¶
This section is empty.
Types ¶
type DataChannel ¶
type DataChannel struct { Role string ClientId string SessionId string TargetId string IsAwsCliUpgradeNeeded bool //records sequence number of last acknowledged message received over data channel ExpectedSequenceNumber int64 //records sequence number of last stream data message sent over data channel StreamDataSequenceNumber int64 //buffer to store outgoing stream messages until acknowledged //using linked list for this buffer as access to oldest message is required and it support faster deletion from any position of list OutgoingMessageBuffer ListMessageBuffer //buffer to store incoming stream messages if received out of sequence //using map for this buffer as incoming messages can be out of order and retrieval would be faster by sequenceId IncomingMessageBuffer MapMessageBuffer //round trip time of latest acknowledged message RoundTripTime float64 //round trip time variation of latest acknowledged message RoundTripTimeVariation float64 //timeout used for resending unacknowledged message RetransmissionTimeout time.Duration // contains filtered or unexported fields }
DataChannel used for communication between the mgs and the cli.
func (*DataChannel) AddDataToIncomingMessageBuffer ¶
func (dataChannel *DataChannel) AddDataToIncomingMessageBuffer(streamMessage StreamingMessage)
AddDataToIncomingMessageBuffer adds given message to IncomingMessageBuffer if it has capacity
func (*DataChannel) AddDataToOutgoingMessageBuffer ¶
func (dataChannel *DataChannel) AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage)
AddDataToOutgoingMessageBuffer removes first message from OutgoingMessageBuffer if capacity is full and adds given message at the end
func (*DataChannel) CalculateRetransmissionTimeout ¶
func (dataChannel *DataChannel) CalculateRetransmissionTimeout(log log.T, streamingMessage StreamingMessage)
CalculateRetransmissionTimeout calculates message retransmission timeout value based on round trip time on given message
func (*DataChannel) Close ¶
func (dataChannel *DataChannel) Close(log log.T) error
Close closes datachannel - its web socket connection
func (*DataChannel) DeregisterOutputStreamHandler ¶
func (dataChannel *DataChannel) DeregisterOutputStreamHandler(handler OutputStreamDataMessageHandler)
DeregisterOutputStreamHandler deregisters a handler previously registered using RegisterOutputStreamHandler
func (*DataChannel) FinalizeDataChannelHandshake ¶
func (dataChannel *DataChannel) FinalizeDataChannelHandshake(log log.T, tokenValue string) (err error)
FinalizeHandshake sends the token for service to acknowledge the connection.
func (*DataChannel) GetAgentVersion ¶
func (dataChannel *DataChannel) GetAgentVersion() string
GetAgentVersion returns agent version of the target instance
func (*DataChannel) GetSessionProperties ¶
func (dataChannel *DataChannel) GetSessionProperties() interface{}
GetSessionProperties returns SessionProperties of the dataChannel
func (*DataChannel) GetSessionType ¶
func (dataChannel *DataChannel) GetSessionType() string
GetSessionType returns SessionType of the dataChannel
func (*DataChannel) GetStreamDataSequenceNumber ¶
func (dataChannel *DataChannel) GetStreamDataSequenceNumber() int64
GetStreamDataSequenceNumber returns StreamDataSequenceNumber of the dataChannel
func (*DataChannel) GetWsChannel ¶
func (dataChannel *DataChannel) GetWsChannel() communicator.IWebSocketChannel
GetWsChannel returns WsChannel of the dataChannel
func (*DataChannel) HandleAcknowledgeMessage ¶
func (dataChannel *DataChannel) HandleAcknowledgeMessage( log log.T, outputMessage message.ClientMessage) (err error)
handleAcknowledgeMessage deserialize acknowledge content and process it
func (DataChannel) HandleChannelClosedMessage ¶
func (dataChannel DataChannel) HandleChannelClosedMessage(log log.T, stopHandler Stop, sessionId string, outputMessage message.ClientMessage)
handleChannelClosedMessage exits the shell
func (*DataChannel) HandleOutputMessage ¶
func (dataChannel *DataChannel) HandleOutputMessage( log log.T, outputMessage message.ClientMessage, rawMessage []byte) (err error)
handleOutputMessage handles incoming stream data message by processing the payload and updating expectedSequenceNumber
func (*DataChannel) Initialize ¶
func (dataChannel *DataChannel) Initialize(log log.T, clientId string, sessionId string, targetId string, isAwsCliUpgradeNeeded bool)
Initialize populates the data channel object with the correct values.
func (*DataChannel) IsSessionTypeSet ¶
func (dataChannel *DataChannel) IsSessionTypeSet() chan bool
IsSessionTypeSet check has data channel sessionType been set
func (*DataChannel) IsStreamMessageResendTimeout ¶
func (dataChannel *DataChannel) IsStreamMessageResendTimeout() chan bool
IsStreamMessageResendTimeout checks if resending a streaming message reaches timeout
func (*DataChannel) Open ¶
func (dataChannel *DataChannel) Open(log log.T) (err error)
Open opens websocket connects and does final handshake to acknowledge connection
func (*DataChannel) OutputMessageHandler ¶
func (dataChannel *DataChannel) OutputMessageHandler(log log.T, stopHandler Stop, sessionID string, rawMessage []byte) error
OutputMessageHandler gets output on the data channel
func (*DataChannel) ProcessAcknowledgedMessage ¶
func (dataChannel *DataChannel) ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent message.AcknowledgeContent) error
ProcessAcknowledgedMessage processes acknowledge messages by deleting them from OutgoingMessageBuffer
func (*DataChannel) ProcessIncomingMessageBufferItems ¶
func (dataChannel *DataChannel) ProcessIncomingMessageBufferItems(log log.T, outputMessage message.ClientMessage) (err error)
processIncomingMessageBufferItems check if new expected sequence stream data is present in IncomingMessageBuffer. If so process it and increment expected sequence number. Repeat until expected sequence stream data is not found in IncomingMessageBuffer.
func (*DataChannel) ProcessKMSEncryptionHandshakeAction ¶
func (dataChannel *DataChannel) ProcessKMSEncryptionHandshakeAction(log log.T, actionParams json.RawMessage) (err error)
ProcessKMSEncryptionHandshakeAction sets up the encrypter and calls KMS to generate a new data key. This is triggered when encryption is specified in HandshakeRequest
func (*DataChannel) ProcessSessionTypeHandshakeAction ¶
func (dataChannel *DataChannel) ProcessSessionTypeHandshakeAction(actionParams json.RawMessage) (err error)
ProcessSessionTypeHandshakeAction processes session type action in HandshakeRequest. This sets the session type in the datachannel.
func (*DataChannel) Reconnect ¶
func (dataChannel *DataChannel) Reconnect(log log.T) (err error)
Reconnect calls ResumeSession API to reconnect datachannel when connection is lost
func (*DataChannel) RegisterOutputStreamHandler ¶
func (dataChannel *DataChannel) RegisterOutputStreamHandler(handler OutputStreamDataMessageHandler, isSessionSpecificHandler bool)
RegisterOutputStreamHandler register a handler for messages of type OutputStream. This is usually called by the plugin.
func (*DataChannel) RemoveDataFromIncomingMessageBuffer ¶
func (dataChannel *DataChannel) RemoveDataFromIncomingMessageBuffer(sequenceNumber int64)
RemoveDataFromIncomingMessageBuffer removes given sequence number message from IncomingMessageBuffer
func (*DataChannel) RemoveDataFromOutgoingMessageBuffer ¶
func (dataChannel *DataChannel) RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element)
RemoveDataFromOutgoingMessageBuffer removes given element from OutgoingMessageBuffer
func (*DataChannel) ResendStreamDataMessageScheduler ¶
func (dataChannel *DataChannel) ResendStreamDataMessageScheduler(log log.T) (err error)
ResendStreamDataMessageScheduler spawns a separate go thread which keeps checking OutgoingMessageBuffer at fixed interval and resends first message if time elapsed since lastSentTime of the message is more than acknowledge wait time
func (*DataChannel) SendAcknowledgeMessage ¶
func (dataChannel *DataChannel) SendAcknowledgeMessage(log log.T, streamDataMessage message.ClientMessage) (err error)
SendAcknowledgeMessage sends acknowledge message for stream data over data channel
func (*DataChannel) SendFlag ¶
func (dataChannel *DataChannel) SendFlag( log log.T, flagType message.PayloadTypeFlag) (err error)
SendFlag sends a data message with PayloadType as given flag.
func (*DataChannel) SendInputDataMessage ¶
func (dataChannel *DataChannel) SendInputDataMessage( log log.T, payloadType message.PayloadType, inputData []byte) (err error)
SendInputDataMessage sends a data message in a form of ClientMessage.
func (*DataChannel) SendMessage ¶
SendMessage sends a message to the service through datachannel
func (*DataChannel) SetAgentVersion ¶
func (dataChannel *DataChannel) SetAgentVersion(agentVersion string)
SetAgentVersion set agent version of the target instance
func (*DataChannel) SetSessionType ¶
func (dataChannel *DataChannel) SetSessionType(sessionType string)
SetSessionType set session type
func (*DataChannel) SetWebsocket ¶
func (dataChannel *DataChannel) SetWebsocket(log log.T, channelUrl string, channelToken string)
SetWebsocket function populates websocket channel object
func (*DataChannel) SetWsChannel ¶
func (dataChannel *DataChannel) SetWsChannel(wsChannel communicator.IWebSocketChannel)
SetWsChannel set WsChannel of the dataChannel
type IDataChannel ¶
type IDataChannel interface { Initialize(log log.T, clientId string, sessionId string, targetId string, isAwsCliUpgradeNeeded bool) SetWebsocket(log log.T, streamUrl string, tokenValue string) Reconnect(log log.T) error SendFlag(log log.T, flagType message.PayloadTypeFlag) error Open(log log.T) error Close(log log.T) error FinalizeDataChannelHandshake(log log.T, tokenValue string) error SendInputDataMessage(log log.T, payloadType message.PayloadType, inputData []byte) error ResendStreamDataMessageScheduler(log log.T) error ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent message.AcknowledgeContent) error OutputMessageHandler(log log.T, stopHandler Stop, sessionID string, rawMessage []byte) error SendAcknowledgeMessage(log log.T, clientMessage message.ClientMessage) error AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage) RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element) AddDataToIncomingMessageBuffer(streamMessage StreamingMessage) RemoveDataFromIncomingMessageBuffer(sequenceNumber int64) CalculateRetransmissionTimeout(log log.T, streamingMessage StreamingMessage) SendMessage(log log.T, input []byte, inputType int) error RegisterOutputStreamHandler(handler OutputStreamDataMessageHandler, isSessionSpecificHandler bool) DeregisterOutputStreamHandler(handler OutputStreamDataMessageHandler) IsSessionTypeSet() chan bool IsStreamMessageResendTimeout() chan bool GetSessionType() string SetSessionType(sessionType string) GetSessionProperties() interface{} GetWsChannel() communicator.IWebSocketChannel SetWsChannel(wsChannel communicator.IWebSocketChannel) GetStreamDataSequenceNumber() int64 GetAgentVersion() string SetAgentVersion(agentVersion string) }
type ListMessageBuffer ¶
type MapMessageBuffer ¶
type MapMessageBuffer struct { Messages map[int64]StreamingMessage Capacity int Mutex *sync.Mutex }