Documentation ¶
Overview ¶
Package datachannel implements data channel which is used to interactively run commands.
Index ¶
- type DataChannel
- func (dataChannel *DataChannel) AddDataToIncomingMessageBuffer(streamMessage StreamingMessage)
- func (dataChannel *DataChannel) AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage)
- func (dataChannel *DataChannel) Close(log log.T) error
- func (dataChannel *DataChannel) GetClientVersion() string
- func (dataChannel *DataChannel) GetInstanceId() string
- func (dataChannel *DataChannel) GetRegion() string
- func (dataChannel *DataChannel) GetSeparateOutputPayload() bool
- func (dataChannel *DataChannel) Initialize(context context.T, mgsService service.Service, sessionId string, ...)
- func (dataChannel *DataChannel) IsActive() bool
- func (dataChannel *DataChannel) Open(log log.T) error
- func (dataChannel *DataChannel) PerformHandshake(log log.T, kmsKeyId string, encryptionEnabled bool, ...) (err error)
- func (dataChannel *DataChannel) PrepareToCloseChannel(log log.T)
- func (dataChannel *DataChannel) ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent mgsContracts.AcknowledgeContent)
- func (dataChannel *DataChannel) Reconnect(log log.T) error
- func (dataChannel *DataChannel) RemoveDataFromIncomingMessageBuffer(sequenceNumber int64)
- func (dataChannel *DataChannel) RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element)
- func (dataChannel *DataChannel) ResendStreamDataMessageScheduler(log log.T) error
- func (dataChannel *DataChannel) SendAcknowledgeMessage(log log.T, streamDataMessage mgsContracts.AgentMessage) error
- func (dataChannel *DataChannel) SendAgentSessionStateMessage(log log.T, sessionStatus mgsContracts.SessionStatus) error
- func (dataChannel *DataChannel) SendMessage(log log.T, input []byte, inputType int) error
- func (dataChannel *DataChannel) SendStreamDataMessage(log log.T, payloadType mgsContracts.PayloadType, inputData []byte) (err error)
- func (dataChannel *DataChannel) SetSeparateOutputPayload(separateOutputPayload bool)
- func (dataChannel *DataChannel) SetWebSocket(context context.T, mgsService service.Service, sessionId string, ...) error
- func (dataChannel *DataChannel) SkipHandshake(log log.T)
- type Handshake
- type IDataChannel
- type InputStreamMessageHandler
- type ListMessageBuffer
- type MapMessageBuffer
- type StreamingMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataChannel ¶
type DataChannel struct { Service service.Service ChannelId string ClientId string InstanceId string Role string Pause bool //records sequence number of last acknowledged message received over data channel ExpectedSequenceNumber int64 //records sequence number of last stream data message sent over data channel StreamDataSequenceNumber int64 //ensure only one goroutine sending message with current StreamDataSequenceNumber in data channel //check carefully for deadlock when not reusing of SendStreamDataMessage StreamDataSequenceNumberMutex *sync.Mutex //buffer to store outgoing stream messages until acknowledged //using linked list for this buffer as access to oldest message is required and it support faster deletion from any position of list OutgoingMessageBuffer ListMessageBuffer //buffer to store incoming stream messages if received out of sequence //using map for this buffer as incoming messages can be out of order and retrieval would be faster by sequenceId IncomingMessageBuffer MapMessageBuffer //round trip time of latest acknowledged message RoundTripTime float64 //round trip time variation of latest acknowledged message RoundTripTimeVariation float64 //timeout used for resending unacknowledged message RetransmissionTimeout time.Duration // contains filtered or unexported fields }
DataChannel used for session communication between the message gateway service and the agent.
func NewDataChannel ¶
func NewDataChannel(context context.T, channelId string, clientId string, inputStreamMessageHandler InputStreamMessageHandler, cancelFlag task.CancelFlag) (*DataChannel, error)
NewDataChannel constructs datachannel objects.
func (*DataChannel) AddDataToIncomingMessageBuffer ¶
func (dataChannel *DataChannel) AddDataToIncomingMessageBuffer(streamMessage StreamingMessage)
AddDataToIncomingMessageBuffer adds given message to IncomingMessageBuffer if it has capacity.
func (*DataChannel) AddDataToOutgoingMessageBuffer ¶
func (dataChannel *DataChannel) AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage)
AddDataToOutgoingMessageBuffer adds given message at the end of OutputMessageBuffer if it has capacity.
func (*DataChannel) Close ¶
func (dataChannel *DataChannel) Close(log log.T) error
Close closes datachannel - its web socket connection.
func (*DataChannel) GetClientVersion ¶
func (dataChannel *DataChannel) GetClientVersion() string
GetClientVersion returns version of the client
func (*DataChannel) GetInstanceId ¶
func (dataChannel *DataChannel) GetInstanceId() string
GetInstanceId returns id of the target
func (*DataChannel) GetRegion ¶
func (dataChannel *DataChannel) GetRegion() string
GetRegion returns aws region of the target
func (*DataChannel) GetSeparateOutputPayload ¶
func (dataChannel *DataChannel) GetSeparateOutputPayload() bool
GetSeparateOutputPayload returns boolean value indicating separate stdout/stderr output for non-interactive session or not
func (*DataChannel) Initialize ¶
func (dataChannel *DataChannel) Initialize(context context.T, mgsService service.Service, sessionId string, clientId string, instanceId string, role string, cancelFlag task.CancelFlag, inputStreamMessageHandler InputStreamMessageHandler)
Initialize populates datachannel object.
func (*DataChannel) IsActive ¶
func (dataChannel *DataChannel) IsActive() bool
IsActive returns a boolean value indicating the datachannel is actively listening and communicating with service
func (*DataChannel) Open ¶
func (dataChannel *DataChannel) Open(log log.T) error
Open opens the websocket connection and sends the token for service to acknowledge the connection.
func (*DataChannel) PerformHandshake ¶
func (dataChannel *DataChannel) PerformHandshake(log log.T, kmsKeyId string, encryptionEnabled bool, sessionTypeRequest mgsContracts.SessionTypeRequest) (err error)
PerformHandshake performs handshake to share version string and encryption information with clients like cli/console
func (*DataChannel) PrepareToCloseChannel ¶
func (dataChannel *DataChannel) PrepareToCloseChannel(log log.T)
PrepareToCloseChannel waits for all messages to be sent to MGS
func (*DataChannel) ProcessAcknowledgedMessage ¶
func (dataChannel *DataChannel) ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent mgsContracts.AcknowledgeContent)
ProcessAcknowledgedMessage processes acknowledge messages by deleting them from OutgoingMessageBuffer.
func (*DataChannel) Reconnect ¶
func (dataChannel *DataChannel) Reconnect(log log.T) error
Reconnect reconnects datachannel to service endpoint.
func (*DataChannel) RemoveDataFromIncomingMessageBuffer ¶
func (dataChannel *DataChannel) RemoveDataFromIncomingMessageBuffer(sequenceNumber int64)
RemoveDataFromIncomingMessageBuffer removes given sequence number message from IncomingMessageBuffer.
func (*DataChannel) RemoveDataFromOutgoingMessageBuffer ¶
func (dataChannel *DataChannel) RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element)
RemoveDataFromOutgoingMessageBuffer removes given element from OutgoingMessageBuffer.
func (*DataChannel) ResendStreamDataMessageScheduler ¶
func (dataChannel *DataChannel) ResendStreamDataMessageScheduler(log log.T) error
ResendStreamDataMessageScheduler spawns a separate go thread which keeps checking OutgoingMessageBuffer at fixed interval and resends first message if time elapsed since lastSentTime of the message is more than acknowledge wait time
func (*DataChannel) SendAcknowledgeMessage ¶
func (dataChannel *DataChannel) SendAcknowledgeMessage(log log.T, streamDataMessage mgsContracts.AgentMessage) error
SendAcknowledgeMessage sends acknowledge message for stream data over data channel
func (*DataChannel) SendAgentSessionStateMessage ¶
func (dataChannel *DataChannel) SendAgentSessionStateMessage(log log.T, sessionStatus mgsContracts.SessionStatus) error
SendAgentSessionStateMessage sends agent session state to MGS
func (*DataChannel) SendMessage ¶
SendMessage sends a message to the service through datachannel.
func (*DataChannel) SendStreamDataMessage ¶
func (dataChannel *DataChannel) SendStreamDataMessage(log log.T, payloadType mgsContracts.PayloadType, inputData []byte) (err error)
SendStreamDataMessage sends a data message in a form of AgentMessage for streaming.
func (*DataChannel) SetSeparateOutputPayload ¶
func (dataChannel *DataChannel) SetSeparateOutputPayload(separateOutputPayload bool)
SetSeparateOutputPayload set separateOutputPayload value
func (*DataChannel) SetWebSocket ¶
func (dataChannel *DataChannel) SetWebSocket(context context.T, mgsService service.Service, sessionId string, clientId string, onMessageHandler func(input []byte)) error
SetWebSocket populates webchannel object.
func (*DataChannel) SkipHandshake ¶
func (dataChannel *DataChannel) SkipHandshake(log log.T)
SkipHandshake is used to skip handshake if the plugin decides it is not necessary
type IDataChannel ¶
type IDataChannel interface { Initialize(context context.T, mgsService service.Service, sessionId string, clientId string, instanceId string, role string, cancelFlag task.CancelFlag, inputStreamMessageHandler InputStreamMessageHandler) SetWebSocket(context context.T, mgsService service.Service, sessionId string, clientId string, onMessageHandler func(input []byte)) error Open(log log.T) error Close(log log.T) error Reconnect(log log.T) error SendMessage(log log.T, input []byte, inputType int) error SendStreamDataMessage(log log.T, dataType mgsContracts.PayloadType, inputData []byte) error ResendStreamDataMessageScheduler(log log.T) error ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent mgsContracts.AcknowledgeContent) SendAcknowledgeMessage(log log.T, agentMessage mgsContracts.AgentMessage) error SendAgentSessionStateMessage(log log.T, sessionStatus mgsContracts.SessionStatus) error AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage) RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element) AddDataToIncomingMessageBuffer(streamMessage StreamingMessage) RemoveDataFromIncomingMessageBuffer(sequenceNumber int64) SkipHandshake(log log.T) PerformHandshake(log log.T, kmsKeyId string, encryptionEnabled bool, sessionTypeRequest mgsContracts.SessionTypeRequest) (err error) GetClientVersion() string GetInstanceId() string GetRegion() string IsActive() bool PrepareToCloseChannel(log log.T) GetSeparateOutputPayload() bool SetSeparateOutputPayload(separateOutputPayload bool) }
type InputStreamMessageHandler ¶
type InputStreamMessageHandler func(log log.T, streamDataMessage mgsContracts.AgentMessage) error
type ListMessageBuffer ¶
type MapMessageBuffer ¶
type MapMessageBuffer struct { Messages map[int64]StreamingMessage Capacity int Mutex *sync.Mutex }