Documentation ¶
Overview ¶
Package worker contains logic around managing worker threads
Index ¶
- Constants
- func MockHTTPClient(mockedHTTPClient snapdapi.HTTPClient)
- func MockSnapdClient(mockedSnapd snapdapi.SnapdClient)
- func NewTaskSet() *taskSet
- type Handler
- func (h *Handler) Health()
- func (h *Handler) IsConnected() bool
- func (h *Handler) Metrics()
- func (h *Handler) Process(ctx context.Context, task *TaskInput) (*TaskOutput, error)
- func (h *Handler) Run()
- func (h *Handler) Send(ctx context.Context, output *TaskOutput) error
- func (h *Handler) SetMQTTConnection(m *mqtt.Connection)
- func (h *Handler) ShouldSkipTask(task *TaskInput) (bool, error)
- func (h *Handler) SubscribeToActions(mqttClient MQTT.Client) error
- type HandlerIFace
- type MockClient
- func (cli *MockClient) AddRoute(topic string, callback MQTT.MessageHandler)
- func (cli *MockClient) Connect() MQTT.Token
- func (cli *MockClient) Disconnect(quiesce uint)
- func (cli *MockClient) IsConnected() bool
- func (cli *MockClient) IsConnectionOpen() bool
- func (cli *MockClient) OptionsReader() MQTT.ClientOptionsReader
- func (cli *MockClient) Publish(topic string, qos byte, retained bool, payload interface{}) MQTT.Token
- func (cli *MockClient) Subscribe(topic string, qos byte, callback MQTT.MessageHandler) MQTT.Token
- func (cli *MockClient) SubscribeMultiple(filters map[string]byte, callback MQTT.MessageHandler) MQTT.Token
- func (cli *MockClient) Unsubscribe(topics ...string) MQTT.Token
- type MockHTTP
- type MockMessage
- type MockToken
- type Queue
- type QueueActions
- type QueueOptions
- type SubscribeAction
- func (act *SubscribeAction) Device(orgId, deviceId string) messages.PublishDevice
- func (act *SubscribeAction) RetrieveLogs() messages.PublishResponse
- func (act *SubscribeAction) SSHUserCreate(data *messages.DeviceUser) messages.PublishResponse
- func (act *SubscribeAction) SSHUserRemove(data *messages.DeviceUser) messages.PublishResponse
- func (act *SubscribeAction) ShouldRunOnce() (bool, error)
- func (act *SubscribeAction) SnapAck() messages.PublishResponse
- func (act *SubscribeAction) SnapConf() messages.PublishResponse
- func (act *SubscribeAction) SnapDisable() messages.PublishSnapTask
- func (act *SubscribeAction) SnapEnable() messages.PublishSnapTask
- func (act *SubscribeAction) SnapInfo() messages.PublishSnap
- func (act *SubscribeAction) SnapInstall() messages.PublishSnapTask
- func (act *SubscribeAction) SnapList(deviceId string) messages.PublishSnapsV2
- func (act *SubscribeAction) SnapRefresh() messages.PublishSnapTask
- func (act *SubscribeAction) SnapRemove() messages.PublishSnapTask
- func (act *SubscribeAction) SnapRestart() messages.PublishSnapTask
- func (act *SubscribeAction) SnapRevert() messages.PublishSnapTask
- func (act *SubscribeAction) SnapServerVersion(deviceId string) messages.PublishDeviceVersion
- func (act *SubscribeAction) SnapSetConf() messages.PublishSnapTask
- func (act *SubscribeAction) SnapSnapshot() messages.PublishResponse
- func (act *SubscribeAction) SnapStart() messages.PublishSnapTask
- func (act *SubscribeAction) SnapStop() messages.PublishSnapTask
- func (act *SubscribeAction) SnapSwitch() messages.PublishSnapTask
- func (act *SubscribeAction) Unregister(orgId, deviceId string) messages.PublishDevice
- func (act *SubscribeAction) User() messages.PublishResponse
- type Task
- type TaskId
- type TaskInput
- type TaskOutput
- type TaskResult
Constants ¶
const UserCreateAction = "create"
UserCreateAction represents the action of creating a user. This constant is used with the snapd api.
const UserRemoveAction = "remove"
UserRemoveAction represents the action of removing a user. This constant is used with the snapd api.
Variables ¶
This section is empty.
Functions ¶
func MockHTTPClient ¶
func MockHTTPClient(mockedHTTPClient snapdapi.HTTPClient)
func MockSnapdClient ¶
func MockSnapdClient(mockedSnapd snapdapi.SnapdClient)
func NewTaskSet ¶
func NewTaskSet() *taskSet
Types ¶
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
func New ¶
func New(mqttConn *mqtt.Connection, enrollment *identity.Enrollment) (*Handler, error)
func (*Handler) Health ¶
func (h *Handler) Health()
Health publishes a health message to indicate that the device is still active
func (*Handler) IsConnected ¶
func (*Handler) Metrics ¶
func (h *Handler) Metrics()
Metrics publishes a metrics messages to indicate so the device can be monitored
func (*Handler) SetMQTTConnection ¶
func (h *Handler) SetMQTTConnection(m *mqtt.Connection)
type HandlerIFace ¶
type MockClient ¶
type MockClient struct {
// contains filtered or unexported fields
}
MockClient mocks the MQTT client
func (*MockClient) AddRoute ¶
func (cli *MockClient) AddRoute(topic string, callback MQTT.MessageHandler)
AddRoute mocks routing
func (*MockClient) Connect ¶
func (cli *MockClient) Connect() MQTT.Token
Connect mocks connecting to broker
func (*MockClient) Disconnect ¶
func (cli *MockClient) Disconnect(quiesce uint)
Disconnect mocks client close
func (*MockClient) IsConnected ¶
func (cli *MockClient) IsConnected() bool
IsConnected mocks the connect status
func (*MockClient) IsConnectionOpen ¶
func (cli *MockClient) IsConnectionOpen() bool
IsConnectionOpen mocks the connect status
func (*MockClient) OptionsReader ¶
func (cli *MockClient) OptionsReader() MQTT.ClientOptionsReader
OptionsReader mocks the options reader (badly)
func (*MockClient) Publish ¶
func (cli *MockClient) Publish(topic string, qos byte, retained bool, payload interface{}) MQTT.Token
Publish mocks a publish message
func (*MockClient) Subscribe ¶
func (cli *MockClient) Subscribe(topic string, qos byte, callback MQTT.MessageHandler) MQTT.Token
Subscribe mocks a subscribe message
func (*MockClient) SubscribeMultiple ¶
func (cli *MockClient) SubscribeMultiple(filters map[string]byte, callback MQTT.MessageHandler) MQTT.Token
SubscribeMultiple mocks subscribe messages
func (*MockClient) Unsubscribe ¶
func (cli *MockClient) Unsubscribe(topics ...string) MQTT.Token
Unsubscribe mocks a unsubscribe message
type MockMessage ¶
type MockMessage struct {
// contains filtered or unexported fields
}
MockMessage implements an MQTT message
func (*MockMessage) Duplicate ¶
func (m *MockMessage) Duplicate() bool
Duplicate mocks a duplicate message check
func (*MockMessage) MessageID ¶
func (m *MockMessage) MessageID() uint16
MessageID mocks the message ID
func (*MockMessage) Payload ¶
func (m *MockMessage) Payload() []byte
Payload mocks the payload retrieval
func (*MockMessage) Retained ¶
func (m *MockMessage) Retained() bool
Retained mocks the retained flag
type MockToken ¶
type MockToken struct {
// contains filtered or unexported fields
}
MockToken implements a Token
type Queue ¶
type Queue struct { // Tasks that any worker thread can pick up and perform IncomingEvents chan Task // serialized messages that should be sent back out on MQTT ProcessedEvents chan TaskResult // contains filtered or unexported fields }
func NewQueue ¶
func NewQueue(actions QueueActions, opts QueueOptions) (*Queue, error)
initialize channels set the function that will be used for a go routine in this case will be something along the lines of legacy.performAction
type QueueActions ¶
type QueueActions interface { // The function that should be used for the worker threads Process(context.Context, *TaskInput) (*TaskOutput, error) // The function that should be used for the results returned by the worker threads Send(context.Context, *TaskOutput) error }
type QueueOptions ¶
type QueueOptions struct {
NumWorkerThreads int
}
func ViperQueueOptions ¶
func ViperQueueOptions() QueueOptions
pull options for queue from viper and put into struct
type SubscribeAction ¶
type SubscribeAction struct {
messages.SubscribeAction
}
SubscribeAction is the message format for the action topic
func (*SubscribeAction) Device ¶
func (act *SubscribeAction) Device(orgId, deviceId string) messages.PublishDevice
Device gets details of the device
func (*SubscribeAction) RetrieveLogs ¶
func (act *SubscribeAction) RetrieveLogs() messages.PublishResponse
RetrieveLogs pulls syslog logs from the snapd api and uploads them to an accessible S3 url
func (*SubscribeAction) SSHUserCreate ¶
func (act *SubscribeAction) SSHUserCreate(data *messages.DeviceUser) messages.PublishResponse
func (*SubscribeAction) SSHUserRemove ¶
func (act *SubscribeAction) SSHUserRemove(data *messages.DeviceUser) messages.PublishResponse
func (*SubscribeAction) ShouldRunOnce ¶
func (act *SubscribeAction) ShouldRunOnce() (bool, error)
func (*SubscribeAction) SnapAck ¶
func (act *SubscribeAction) SnapAck() messages.PublishResponse
SnapAck adds an assertion to the device
func (*SubscribeAction) SnapConf ¶
func (act *SubscribeAction) SnapConf() messages.PublishResponse
SnapConf gets the config for a snap
func (*SubscribeAction) SnapDisable ¶
func (act *SubscribeAction) SnapDisable() messages.PublishSnapTask
SnapDisable disables an existing snap
func (*SubscribeAction) SnapEnable ¶
func (act *SubscribeAction) SnapEnable() messages.PublishSnapTask
SnapEnable enables an existing snap
func (*SubscribeAction) SnapInfo ¶
func (act *SubscribeAction) SnapInfo() messages.PublishSnap
SnapInfo gets the info for a snap
func (*SubscribeAction) SnapInstall ¶
func (act *SubscribeAction) SnapInstall() messages.PublishSnapTask
SnapInstall installs a new snap
func (*SubscribeAction) SnapList ¶
func (act *SubscribeAction) SnapList(deviceId string) messages.PublishSnapsV2
SnapList lists installed snaps
func (*SubscribeAction) SnapRefresh ¶
func (act *SubscribeAction) SnapRefresh() messages.PublishSnapTask
SnapRefresh refreshes an existing snap
func (*SubscribeAction) SnapRemove ¶
func (act *SubscribeAction) SnapRemove() messages.PublishSnapTask
SnapRemove removes an existing snap
func (*SubscribeAction) SnapRestart ¶
func (act *SubscribeAction) SnapRestart() messages.PublishSnapTask
SnapRestart sets the config for a snap
func (*SubscribeAction) SnapRevert ¶
func (act *SubscribeAction) SnapRevert() messages.PublishSnapTask
SnapRevert reverts an existing snap
func (*SubscribeAction) SnapServerVersion ¶
func (act *SubscribeAction) SnapServerVersion(deviceId string) messages.PublishDeviceVersion
SnapServerVersion gets details of the device
func (*SubscribeAction) SnapSetConf ¶
func (act *SubscribeAction) SnapSetConf() messages.PublishSnapTask
SnapSetConf sets the config for a snap
func (*SubscribeAction) SnapSnapshot ¶
func (act *SubscribeAction) SnapSnapshot() messages.PublishResponse
SnapSnapshot creates a snapshot of a snap and uploads it to an S3 url
func (*SubscribeAction) SnapStart ¶
func (act *SubscribeAction) SnapStart() messages.PublishSnapTask
SnapStart sets the config for a snap
func (*SubscribeAction) SnapStop ¶
func (act *SubscribeAction) SnapStop() messages.PublishSnapTask
SnapStop sets the config for a snap
func (*SubscribeAction) SnapSwitch ¶
func (act *SubscribeAction) SnapSwitch() messages.PublishSnapTask
SnapSwitch refreshes an existing snap
func (*SubscribeAction) Unregister ¶
func (act *SubscribeAction) Unregister(orgId, deviceId string) messages.PublishDevice
func (*SubscribeAction) User ¶
func (act *SubscribeAction) User() messages.PublishResponse
type TaskOutput ¶
type TaskOutput struct {
// contains filtered or unexported fields
}
type TaskResult ¶
type TaskResult struct {
// contains filtered or unexported fields
}