connectivity

package
v0.0.0-...-735b8cc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2022 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrManagerClosed = errors.New("connectivity manager has been closed")
)
View Source
var (
	ErrUnsupportedManager = errors.New("unsupported manager")
)

Functions

func HandleMessages

func HandleMessages(
	msgCh <-chan *aranyagopb.Msg,
	onMsg MsgHandleFunc,
)

Types

type AMQPOpts

type AMQPOpts struct {
	TLSConfig *tls.Config
	Username  []byte
	Password  []byte
	Config    aranyaapi.AMQPSpec
}

runtime options

type AzureIoTHubOpts

type AzureIoTHubOpts struct {
	IoTHubConnectionString   string
	EventHubConnectionString string
	Config                   aranyaapi.AzureIoTHubSpec
}

runtime options

type GCPIoTCoreOpts

type GCPIoTCoreOpts struct {
	PubSubCredentialsJSON   []byte
	CloudIoTCredentialsJSON []byte
	Config                  aranyaapi.GCPIoTCoreSpec
}

runtime options

type GRPCManager

type GRPCManager struct {
	// contains filtered or unexported fields
}

func NewGRPCManager

func NewGRPCManager(parentCtx context.Context, name string, mgrConfig *Options) (*GRPCManager, error)

func (*GRPCManager) Close

func (m *GRPCManager) Close()

func (GRPCManager) Connected

func (m GRPCManager) Connected() <-chan struct{}

Connected notify when agent connected

func (GRPCManager) Disconnected

func (m GRPCManager) Disconnected() <-chan struct{}

Disconnected notify when agent disconnected

func (GRPCManager) GlobalMessages

func (m GRPCManager) GlobalMessages() <-chan *aranyagopb.Msg

func (GRPCManager) MaxPayloadSize

func (m GRPCManager) MaxPayloadSize() int

func (GRPCManager) OnConnected

func (m GRPCManager) OnConnected(initialize func() (id string))

func (GRPCManager) OnDisconnected

func (m GRPCManager) OnDisconnected(finalize func() (id string, all bool))

onDisconnected delete device connection related jobs

func (GRPCManager) PostCmd

func (m GRPCManager) PostCmd(
	sid uint64,
	kind aranyagopb.CmdType,
	payloadCmd proto.Marshaler,
) (
	msgCh <-chan *aranyagopb.Msg,
	realSID uint64,
	err error,
)

func (GRPCManager) PostData

func (m GRPCManager) PostData(
	sid uint64,
	kind aranyagopb.CmdType,
	seq uint64,
	completed bool,
	payload []byte,
) (
	msgCh <-chan *aranyagopb.Msg,
	realSid, lastSeq uint64,
	err error,
)

func (GRPCManager) PostEncodedData

func (m GRPCManager) PostEncodedData(data []byte, ensure bool) error

func (GRPCManager) PostStreamCmd

func (m GRPCManager) PostStreamCmd(
	kind aranyagopb.CmdType,
	payloadCmd proto.Marshaler,
	dataOut, errOut io.Writer,
	keepPacket bool,
	canWrite <-chan struct{},
) (
	msgCh <-chan *aranyagopb.Msg,
	streamReady <-chan struct{},
	realSID uint64,
	err error,
)

func (*GRPCManager) Reject

func (m *GRPCManager) Reject(reason aranyagopb.RejectionReason, message string)

func (GRPCManager) Rejected

func (m GRPCManager) Rejected() <-chan struct{}

Rejected notify when agent get rejected

func (*GRPCManager) Start

func (m *GRPCManager) Start() error

func (*GRPCManager) Sync

func (m *GRPCManager) Sync(server rpcpb.EdgeDevice_SyncServer) error

type GRPCOpts

type GRPCOpts struct {
	Server   *grpc.Server
	Listener net.Listener
}

runtime options

type MQTTOpts

type MQTTOpts struct {
	TLSConfig *tls.Config
	Username  []byte
	Password  []byte
	Config    aranyaapi.MQTTSpec
}

runtime options

type Manager

type Manager interface {
	// Start manager and block until stopped
	Start() error

	// Close manager immediately
	Close()

	// Reject current device connection if any
	Reject(reason aranyagopb.RejectionReason, message string)

	// Connected signal
	Connected() <-chan struct{}

	// Disconnected signal
	Disconnected() <-chan struct{}

	// GlobalMessages message with no session attached
	GlobalMessages() <-chan *aranyagopb.Msg

	// PostData
	PostData(
		sid uint64,
		kind aranyagopb.CmdType,
		seq uint64,
		complete bool,
		payload []byte,
	) (
		msgCh <-chan *aranyagopb.Msg,
		realSID, lastSeq uint64,
		err error,
	)

	// PostEncodedData regardless of the size of the data, if ensure is true, will make sure data sent even when
	// connection lost (block and set after get connected again)
	PostEncodedData(data []byte, ensure bool) error

	// PostCmd send a command to remote device with timeout
	// return a channel for messages to be received in the session
	PostCmd(
		sid uint64,
		kind aranyagopb.CmdType,
		payloadCmd proto.Marshaler,
	) (
		msgCh <-chan *aranyagopb.Msg,
		realSID uint64,
		err error,
	)

	// PostStreamCmd is like PostCmd, but will set session in streaming mode, received stream
	// data will be written to dataOut/errOut directly
	PostStreamCmd(
		kind aranyagopb.CmdType,
		payloadCmd proto.Marshaler,
		dataOut, errOut io.Writer,
		keepPacket bool,
		canWrite <-chan struct{},
	) (
		msgCh <-chan *aranyagopb.Msg,
		streamReady <-chan struct{},
		realSID uint64,
		err error,
	)

	// MaxPayloadSize of this kind connectivity method, used to reduce message overhead
	// when handling date streams for port-forward and command execution
	MaxPayloadSize() int

	// OnConnected called after device connected and finished
	//   - node sync initialization
	//   - network sync initialization
	//   - pod sync initialization
	OnConnected(initialize func() (id string))

	// OnDisconnected called after lost of device connection, `finalize`
	// function is used to determine which device lost connection by returning
	// its online id
	OnDisconnected(finalize func() (id string, all bool))
}

Manager is the connectivity manager interface, and is designed for message queue based managers such as MQTT

func NewManager

func NewManager(parentCtx context.Context, name string, options *Options) (Manager, error)

type MessageQueueManager

type MessageQueueManager struct {
	// contains filtered or unexported fields
}

func NewMessageQueueManager

func NewMessageQueueManager(
	parentCtx context.Context,
	name string,
	opts *Options,
) (
	*MessageQueueManager, error,
)

func (*MessageQueueManager) Close

func (m *MessageQueueManager) Close()

func (MessageQueueManager) Connected

func (m MessageQueueManager) Connected() <-chan struct{}

Connected notify when agent connected

func (MessageQueueManager) Disconnected

func (m MessageQueueManager) Disconnected() <-chan struct{}

Disconnected notify when agent disconnected

func (MessageQueueManager) GlobalMessages

func (m MessageQueueManager) GlobalMessages() <-chan *aranyagopb.Msg

func (MessageQueueManager) MaxPayloadSize

func (m MessageQueueManager) MaxPayloadSize() int

func (MessageQueueManager) OnConnected

func (m MessageQueueManager) OnConnected(initialize func() (id string))

func (MessageQueueManager) OnDisconnected

func (m MessageQueueManager) OnDisconnected(finalize func() (id string, all bool))

onDisconnected delete device connection related jobs

func (MessageQueueManager) PostCmd

func (m MessageQueueManager) PostCmd(
	sid uint64,
	kind aranyagopb.CmdType,
	payloadCmd proto.Marshaler,
) (
	msgCh <-chan *aranyagopb.Msg,
	realSID uint64,
	err error,
)

func (MessageQueueManager) PostData

func (m MessageQueueManager) PostData(
	sid uint64,
	kind aranyagopb.CmdType,
	seq uint64,
	completed bool,
	payload []byte,
) (
	msgCh <-chan *aranyagopb.Msg,
	realSid, lastSeq uint64,
	err error,
)

func (MessageQueueManager) PostEncodedData

func (m MessageQueueManager) PostEncodedData(data []byte, ensure bool) error

func (MessageQueueManager) PostStreamCmd

func (m MessageQueueManager) PostStreamCmd(
	kind aranyagopb.CmdType,
	payloadCmd proto.Marshaler,
	dataOut, errOut io.Writer,
	keepPacket bool,
	canWrite <-chan struct{},
) (
	msgCh <-chan *aranyagopb.Msg,
	streamReady <-chan struct{},
	realSID uint64,
	err error,
)

func (*MessageQueueManager) Reject

func (m *MessageQueueManager) Reject(reason aranyagopb.RejectionReason, message string)

func (MessageQueueManager) Rejected

func (m MessageQueueManager) Rejected() <-chan struct{}

Rejected notify when agent get rejected

func (*MessageQueueManager) Start

func (m *MessageQueueManager) Start() error

type MsgHandleFunc

type MsgHandleFunc func(msg *aranyagopb.Msg) (exit bool)

type Options

type Options struct {
	SessionManagerOptions SessionOptions

	GRPCOpts        *GRPCOpts
	MQTTOpts        *MQTTOpts
	AMQPOpts        *AMQPOpts
	AzureIoTHubOpts *AzureIoTHubOpts
	GCPIoTCoreOpts  *GCPIoTCoreOpts
}

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

func NewSessionManager

func NewSessionManager(opts *SessionOptions) *SessionManager

func (*SessionManager) Add

func (m *SessionManager) Add(
	sid uint64,
	stream bool,
	keepPacket bool,
) (
	realSid uint64,
	ch chan *aranyagopb.Msg,
)

Add or reuse a session

func (*SessionManager) Cleanup

func (m *SessionManager) Cleanup()

func (*SessionManager) Delete

func (m *SessionManager) Delete(sid uint64)

func (*SessionManager) Dispatch

func (m *SessionManager) Dispatch(msg *aranyagopb.Msg) bool

func (*SessionManager) Remains

func (m *SessionManager) Remains() []uint64

func (*SessionManager) SetStream

func (m *SessionManager) SetStream(
	sid uint64, dataOut, errOut io.Writer, canWrite <-chan struct{},
) (<-chan struct{}, bool)

func (*SessionManager) TimedRemains

func (m *SessionManager) TimedRemains() []uint64

type SessionOptions

type SessionOptions struct {
	Name                  string
	UnarySessionTimeout   time.Duration
	StreamCreationTimeout time.Duration

	SessionTimeoutCenter SessionTimeoutCenter
}

type SessionTimeoutCenter

type SessionTimeoutCenter interface {
	AddStreamCreation(name string, sid uint64, creationTimeout time.Duration, onCreationTimeout SessionTimeoutHandleFunc)
	AddUnarySession(name string, sid uint64, timeout time.Duration, onSessionTimeout SessionTimeoutHandleFunc)
	DeleteSession(name string, sid uint64)
	GetSessions(name string) []uint64
}

type SessionTimeoutHandleFunc

type SessionTimeoutHandleFunc func()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL