Documentation ¶
Index ¶
- Variables
- func HandleMessages(msgCh <-chan *aranyagopb.Msg, onMsg MsgHandleFunc)
- type AMQPOpts
- type AzureIoTHubOpts
- type GCPIoTCoreOpts
- type GRPCManager
- func (m *GRPCManager) Close()
- func (m GRPCManager) Connected() <-chan struct{}
- func (m GRPCManager) Disconnected() <-chan struct{}
- func (m GRPCManager) GlobalMessages() <-chan *aranyagopb.Msg
- func (m GRPCManager) MaxPayloadSize() int
- func (m GRPCManager) OnConnected(initialize func() (id string))
- func (m GRPCManager) OnDisconnected(finalize func() (id string, all bool))
- func (m GRPCManager) PostCmd(sid uint64, kind aranyagopb.CmdType, payloadCmd proto.Marshaler) (msgCh <-chan *aranyagopb.Msg, realSID uint64, err error)
- func (m GRPCManager) PostData(sid uint64, kind aranyagopb.CmdType, seq uint64, completed bool, ...) (msgCh <-chan *aranyagopb.Msg, realSid, lastSeq uint64, err error)
- func (m GRPCManager) PostEncodedData(data []byte, ensure bool) error
- func (m GRPCManager) PostStreamCmd(kind aranyagopb.CmdType, payloadCmd proto.Marshaler, dataOut, errOut io.Writer, ...) (msgCh <-chan *aranyagopb.Msg, streamReady <-chan struct{}, realSID uint64, ...)
- func (m *GRPCManager) Reject(reason aranyagopb.RejectionReason, message string)
- func (m GRPCManager) Rejected() <-chan struct{}
- func (m *GRPCManager) Start() error
- func (m *GRPCManager) Sync(server rpcpb.EdgeDevice_SyncServer) error
- type GRPCOpts
- type MQTTOpts
- type Manager
- type MessageQueueManager
- func (m *MessageQueueManager) Close()
- func (m MessageQueueManager) Connected() <-chan struct{}
- func (m MessageQueueManager) Disconnected() <-chan struct{}
- func (m MessageQueueManager) GlobalMessages() <-chan *aranyagopb.Msg
- func (m MessageQueueManager) MaxPayloadSize() int
- func (m MessageQueueManager) OnConnected(initialize func() (id string))
- func (m MessageQueueManager) OnDisconnected(finalize func() (id string, all bool))
- func (m MessageQueueManager) PostCmd(sid uint64, kind aranyagopb.CmdType, payloadCmd proto.Marshaler) (msgCh <-chan *aranyagopb.Msg, realSID uint64, err error)
- func (m MessageQueueManager) PostData(sid uint64, kind aranyagopb.CmdType, seq uint64, completed bool, ...) (msgCh <-chan *aranyagopb.Msg, realSid, lastSeq uint64, err error)
- func (m MessageQueueManager) PostEncodedData(data []byte, ensure bool) error
- func (m MessageQueueManager) PostStreamCmd(kind aranyagopb.CmdType, payloadCmd proto.Marshaler, dataOut, errOut io.Writer, ...) (msgCh <-chan *aranyagopb.Msg, streamReady <-chan struct{}, realSID uint64, ...)
- func (m *MessageQueueManager) Reject(reason aranyagopb.RejectionReason, message string)
- func (m MessageQueueManager) Rejected() <-chan struct{}
- func (m *MessageQueueManager) Start() error
- type MsgHandleFunc
- type Options
- type SessionManager
- func (m *SessionManager) Add(sid uint64, stream bool, keepPacket bool) (realSid uint64, ch chan *aranyagopb.Msg)
- func (m *SessionManager) Cleanup()
- func (m *SessionManager) Delete(sid uint64)
- func (m *SessionManager) Dispatch(msg *aranyagopb.Msg) bool
- func (m *SessionManager) Remains() []uint64
- func (m *SessionManager) SetStream(sid uint64, dataOut, errOut io.Writer, canWrite <-chan struct{}) (<-chan struct{}, bool)
- func (m *SessionManager) TimedRemains() []uint64
- type SessionOptions
- type SessionTimeoutCenter
- type SessionTimeoutHandleFunc
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func HandleMessages ¶
func HandleMessages( msgCh <-chan *aranyagopb.Msg, onMsg MsgHandleFunc, )
Types ¶
type AMQPOpts ¶
type AMQPOpts struct { TLSConfig *tls.Config Username []byte Password []byte Config aranyaapi.AMQPSpec }
runtime options
type AzureIoTHubOpts ¶
type AzureIoTHubOpts struct { IoTHubConnectionString string EventHubConnectionString string Config aranyaapi.AzureIoTHubSpec }
runtime options
type GCPIoTCoreOpts ¶
type GCPIoTCoreOpts struct { PubSubCredentialsJSON []byte CloudIoTCredentialsJSON []byte Config aranyaapi.GCPIoTCoreSpec }
runtime options
type GRPCManager ¶
type GRPCManager struct {
// contains filtered or unexported fields
}
func NewGRPCManager ¶
func (*GRPCManager) Close ¶
func (m *GRPCManager) Close()
func (GRPCManager) Connected ¶
func (m GRPCManager) Connected() <-chan struct{}
Connected notify when agent connected
func (GRPCManager) Disconnected ¶
func (m GRPCManager) Disconnected() <-chan struct{}
Disconnected notify when agent disconnected
func (GRPCManager) GlobalMessages ¶
func (m GRPCManager) GlobalMessages() <-chan *aranyagopb.Msg
func (GRPCManager) OnDisconnected ¶
onDisconnected delete device connection related jobs
func (GRPCManager) PostCmd ¶
func (m GRPCManager) PostCmd( sid uint64, kind aranyagopb.CmdType, payloadCmd proto.Marshaler, ) ( msgCh <-chan *aranyagopb.Msg, realSID uint64, err error, )
func (GRPCManager) PostData ¶
func (m GRPCManager) PostData( sid uint64, kind aranyagopb.CmdType, seq uint64, completed bool, payload []byte, ) ( msgCh <-chan *aranyagopb.Msg, realSid, lastSeq uint64, err error, )
func (GRPCManager) PostEncodedData ¶
func (GRPCManager) PostStreamCmd ¶
func (*GRPCManager) Reject ¶
func (m *GRPCManager) Reject(reason aranyagopb.RejectionReason, message string)
func (GRPCManager) Rejected ¶
func (m GRPCManager) Rejected() <-chan struct{}
Rejected notify when agent get rejected
func (*GRPCManager) Start ¶
func (m *GRPCManager) Start() error
func (*GRPCManager) Sync ¶
func (m *GRPCManager) Sync(server rpcpb.EdgeDevice_SyncServer) error
type MQTTOpts ¶
type MQTTOpts struct { TLSConfig *tls.Config Username []byte Password []byte Config aranyaapi.MQTTSpec }
runtime options
type Manager ¶
type Manager interface { // Start manager and block until stopped Start() error // Close manager immediately Close() // Reject current device connection if any Reject(reason aranyagopb.RejectionReason, message string) // Connected signal Connected() <-chan struct{} // Disconnected signal Disconnected() <-chan struct{} // GlobalMessages message with no session attached GlobalMessages() <-chan *aranyagopb.Msg // PostData PostData( sid uint64, kind aranyagopb.CmdType, seq uint64, complete bool, payload []byte, ) ( msgCh <-chan *aranyagopb.Msg, realSID, lastSeq uint64, err error, ) // PostEncodedData regardless of the size of the data, if ensure is true, will make sure data sent even when // connection lost (block and set after get connected again) PostEncodedData(data []byte, ensure bool) error // PostCmd send a command to remote device with timeout // return a channel for messages to be received in the session PostCmd( sid uint64, kind aranyagopb.CmdType, payloadCmd proto.Marshaler, ) ( msgCh <-chan *aranyagopb.Msg, realSID uint64, err error, ) // PostStreamCmd is like PostCmd, but will set session in streaming mode, received stream // data will be written to dataOut/errOut directly PostStreamCmd( kind aranyagopb.CmdType, payloadCmd proto.Marshaler, dataOut, errOut io.Writer, keepPacket bool, canWrite <-chan struct{}, ) ( msgCh <-chan *aranyagopb.Msg, streamReady <-chan struct{}, realSID uint64, err error, ) // MaxPayloadSize of this kind connectivity method, used to reduce message overhead // when handling date streams for port-forward and command execution MaxPayloadSize() int // OnConnected called after device connected and finished // - node sync initialization // - network sync initialization // - pod sync initialization OnConnected(initialize func() (id string)) // OnDisconnected called after lost of device connection, `finalize` // function is used to determine which device lost connection by returning // its online id OnDisconnected(finalize func() (id string, all bool)) }
Manager is the connectivity manager interface, and is designed for message queue based managers such as MQTT
type MessageQueueManager ¶
type MessageQueueManager struct {
// contains filtered or unexported fields
}
func NewMessageQueueManager ¶
func (*MessageQueueManager) Close ¶
func (m *MessageQueueManager) Close()
func (MessageQueueManager) Connected ¶
func (m MessageQueueManager) Connected() <-chan struct{}
Connected notify when agent connected
func (MessageQueueManager) Disconnected ¶
func (m MessageQueueManager) Disconnected() <-chan struct{}
Disconnected notify when agent disconnected
func (MessageQueueManager) GlobalMessages ¶
func (m MessageQueueManager) GlobalMessages() <-chan *aranyagopb.Msg
func (MessageQueueManager) OnConnected ¶
func (m MessageQueueManager) OnConnected(initialize func() (id string))
func (MessageQueueManager) OnDisconnected ¶
onDisconnected delete device connection related jobs
func (MessageQueueManager) PostCmd ¶
func (m MessageQueueManager) PostCmd( sid uint64, kind aranyagopb.CmdType, payloadCmd proto.Marshaler, ) ( msgCh <-chan *aranyagopb.Msg, realSID uint64, err error, )
func (MessageQueueManager) PostData ¶
func (m MessageQueueManager) PostData( sid uint64, kind aranyagopb.CmdType, seq uint64, completed bool, payload []byte, ) ( msgCh <-chan *aranyagopb.Msg, realSid, lastSeq uint64, err error, )
func (MessageQueueManager) PostEncodedData ¶
func (MessageQueueManager) PostStreamCmd ¶
func (*MessageQueueManager) Reject ¶
func (m *MessageQueueManager) Reject(reason aranyagopb.RejectionReason, message string)
func (MessageQueueManager) Rejected ¶
func (m MessageQueueManager) Rejected() <-chan struct{}
Rejected notify when agent get rejected
func (*MessageQueueManager) Start ¶
func (m *MessageQueueManager) Start() error
type MsgHandleFunc ¶
type MsgHandleFunc func(msg *aranyagopb.Msg) (exit bool)
type Options ¶
type Options struct { SessionManagerOptions SessionOptions GRPCOpts *GRPCOpts MQTTOpts *MQTTOpts AMQPOpts *AMQPOpts AzureIoTHubOpts *AzureIoTHubOpts GCPIoTCoreOpts *GCPIoTCoreOpts }
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
func NewSessionManager ¶
func NewSessionManager(opts *SessionOptions) *SessionManager
func (*SessionManager) Add ¶
func (m *SessionManager) Add( sid uint64, stream bool, keepPacket bool, ) ( realSid uint64, ch chan *aranyagopb.Msg, )
Add or reuse a session
func (*SessionManager) Cleanup ¶
func (m *SessionManager) Cleanup()
func (*SessionManager) Delete ¶
func (m *SessionManager) Delete(sid uint64)
func (*SessionManager) Dispatch ¶
func (m *SessionManager) Dispatch(msg *aranyagopb.Msg) bool
func (*SessionManager) Remains ¶
func (m *SessionManager) Remains() []uint64
func (*SessionManager) SetStream ¶
func (m *SessionManager) SetStream( sid uint64, dataOut, errOut io.Writer, canWrite <-chan struct{}, ) (<-chan struct{}, bool)
func (*SessionManager) TimedRemains ¶
func (m *SessionManager) TimedRemains() []uint64
type SessionOptions ¶
type SessionTimeoutCenter ¶
type SessionTimeoutCenter interface { AddStreamCreation(name string, sid uint64, creationTimeout time.Duration, onCreationTimeout SessionTimeoutHandleFunc) AddUnarySession(name string, sid uint64, timeout time.Duration, onSessionTimeout SessionTimeoutHandleFunc) DeleteSession(name string, sid uint64) GetSessions(name string) []uint64 }
type SessionTimeoutHandleFunc ¶
type SessionTimeoutHandleFunc func()
Click to show internal directories.
Click to hide internal directories.