fleet

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2022 License: MPL-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HeartbeatFreq  = 60 * time.Second
	DefaultTimeout = 300 * time.Second
)
View Source
const AgentMetricsRPCFunc = "agent_metrics"
View Source
const AgentPoliciesReqRPCFunc = "agent_policies_req"
View Source
const AgentPolicyRPCFunc = "agent_policy"
View Source
const AgentResetRPCFunc = "agent_reset"
View Source
const AgentStopRPCFunc = "agent_stop"
View Source
const CapabilitiesTopic = "agent"
View Source
const CurrentCapabilitiesSchemaVersion = "1.0"
View Source
const CurrentHeartbeatSchemaVersion = "1.0"
View Source
const CurrentRPCSchemaVersion = "1.0"
View Source
const DatasetRemovedRPCFunc = "dataset_removed"
View Source
const GroupMembershipRPCFunc = "group_membership"
View Source
const GroupMembershipReqRPCFunc = "group_membership_req"
View Source
const GroupRemovedRPCFunc = "group_removed"
View Source
const HeartbeatsTopic = "hb"
View Source
const LogTopic = "log"
View Source
const MaxMsgPayloadSize = 1024 * 25

MaxMsgPayloadSize maximum payload size we will process from a client

View Source
const RPCFromCoreTopic = "fromcore"
View Source
const RPCToCoreTopic = "tocore"

Variables

View Source
var (
	ErrCreateAgentGroup = errors.New("failed to create agent group")

	ErrMaintainAgentGroupChannels = errors.New("failed to maintain agent group channels")
)
View Source
var (
	// ErrMalformedEntity indicates malformed entity specification (e.g.
	// invalid username or password).
	ErrMalformedEntity = errors.New("malformed entity specification")
	// ErrNotFound indicates a non-existent entity request.
	ErrNotFound = errors.New("non-existent entity")
	// ErrConflict indicates that entity already exists.
	ErrConflict = errors.New("entity already exists")
	// ErrUnauthorizedAccess indicates while checking the credentials
	ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided")
	// ErrScanMetadata indicates problem with metadata in db
	ErrScanMetadata = errors.New("failed to scan metadata in db")
	// ErrSelectEntity indicates error while reading entity from database
	ErrSelectEntity = errors.New("select entity from db error")
	// ErrEntityConnected indicates error while checking connection in database
	ErrEntityConnected = errors.New("check connection in database error")
	// ErrUpdateEntity indicates error while updating a entity
	ErrUpdateEntity = errors.New("failed to update entity")
	// ErrRemoveEntity indicates a error while deleting a agent group
	ErrRemoveEntity = errors.New("failed to remove entity")
)
View Source
var (
	ErrCreateAgent = errors.New("failed to create agent")

	// ErrThings indicates failure to communicate with Mainflux Things service.
	// It can be due to networking error or invalid/unauthorized request.
	ErrThings = errors.New("failed to receive response from Things service")
)
View Source
var (
	// ErrSchemaVersion a message was received indicating a version we don't support
	ErrSchemaVersion = errors.New("unsupported schema version")
	// ErrSchemaMalformed a message contained a schema we couldn't parse
	ErrSchemaMalformed = errors.New("schema malformed")
	// ErrPayloadTooBig a message contained a payload that was abnormally large
	ErrPayloadTooBig = errors.New("payload too big")
)

Functions

This section is empty.

Types

type Agent

type Agent struct {
	Name           types.Identifier
	MFOwnerID      string
	MFThingID      string
	MFKeyID        string
	MFChannelID    string
	Created        time.Time
	OrbTags        types.Tags
	AgentTags      types.Tags
	AgentMetadata  types.Metadata
	State          State
	LastHBData     types.Metadata
	LastHB         time.Time
	MatchingGroups types.Metadata
}

type AgentCommsService

type AgentCommsService interface {
	// Start set up communication with the message bus to communicate with agents
	Start() error
	// Stop end communication with the message bus
	Stop() error

	// NotifyAgentNewGroupMembership RPC Core -> Agent: Notify a specific Agent of new AgentGroup membership it now belongs to
	NotifyAgentNewGroupMembership(a Agent, ag AgentGroup) error
	// NotifyAgentGroupMemberships RPC Core -> Agent: Notify a specific Agent of all AgentGroup memberships it belongs to
	NotifyAgentGroupMemberships(a Agent) error
	// NotifyAgentAllDatasets RPC Core -> Agent: Notify Agent of all Policy it should currently run based on group membership and current Datasets
	NotifyAgentAllDatasets(a Agent) error
	// NotifyAgentStop RPC Core -> Agent: Notify Agent that it should Stop (Send the message to Agent Channel)
	NotifyAgentStop(agent Agent, reason string) error
	// NotifyGroupNewDataset RPC Core -> Agent: Notify AgentGroup of a newly created Dataset, exposing a new Policy to run
	NotifyGroupNewDataset(ctx context.Context, ag AgentGroup, datasetID string, policyID string, ownerID string) error
	// NotifyGroupRemoval RPC core -> Agent: Notify AgentGroup that the group has been removed
	NotifyGroupRemoval(ctx context.Context, ag AgentGroup) error
	// NotifyGroupPolicyRemoval RPC core -> Agent: Notify AgentGroup that a Policy has been removed
	NotifyGroupPolicyRemoval(ag AgentGroup, policyID string, policyName string, backend string) error
	// NotifyGroupDatasetRemoval RPC core -> Agent: Notify AgentGroup that a Dataset has been removed
	NotifyGroupDatasetRemoval(ag AgentGroup, dsID string, policyID string) error
	// NotifyGroupPolicyUpdate RPC core -> Agent: Notify AgentGroup that a Policy has been updated
	NotifyGroupPolicyUpdate(ctx context.Context, ag AgentGroup, policyID string, ownerID string) error
	//NotifyAgentReset RPC core -> Agent: Notify Agent to reset the backend
	NotifyAgentReset(agent Agent, fullReset bool, reason string) error
	// NotifyGroupDatasetEdit RPC core -> Agent: Notify Agent an already created Dataset goes invalid or valid
	NotifyGroupDatasetEdit(ctx context.Context, ag AgentGroup, datasetID, policyID, ownerID string, valid bool) error
}

func CommsMetricsMiddleware

func CommsMetricsMiddleware(svc AgentCommsService, counter metrics.Counter, latency metrics.Histogram) AgentCommsService

func NewFleetCommsService

func NewFleetCommsService(logger *zap.Logger, policyClient pb.PolicyServiceClient, agentRepo AgentRepository, agentGroupRepo AgentGroupRepository, agentPubSub mfnats.PubSub) AgentCommsService

type AgentGroup

type AgentGroup struct {
	ID             string
	MFOwnerID      string
	Name           types.Identifier
	Description    string
	MFChannelID    string
	Tags           types.Tags
	Created        time.Time
	MatchingAgents types.Metadata
}

type AgentGroupRepository

type AgentGroupRepository interface {
	// Save persists the AgentGroup. Successful operation is indicated by non-nil
	// error response.
	Save(ctx context.Context, group AgentGroup) (string, error)
	// RetrieveAllByAgent get all AgentGroup which an Agent belongs to.
	RetrieveAllByAgent(ctx context.Context, a Agent) ([]AgentGroup, error)
	// RetrieveByID get an AgentGroup by id
	RetrieveByID(ctx context.Context, groupID string, ownerID string) (AgentGroup, error)
	// RetrieveAllAgentGroupsByOwner get all AgentGroup by owner.
	RetrieveAllAgentGroupsByOwner(ctx context.Context, ownerID string, pm PageMetadata) (PageAgentGroup, error)
	// Update a existing agent group by owner and id
	Update(ctx context.Context, ownerID string, group AgentGroup) (AgentGroup, error)
	// Delete a existing agent group by owner and id
	Delete(ctx context.Context, groupID string, ownerID string) error
	// RetrieveMatchingGroups Groups this Agent currently belongs to, according to matching agent and group tags
	RetrieveMatchingGroups(ctx context.Context, ownerID string, thingID string) (MatchingGroups, error)
}

type AgentGroupService

type AgentGroupService interface {
	// CreateAgentGroup creates new AgentGroup, associated channel, applies to Agents as appropriate
	CreateAgentGroup(ctx context.Context, token string, s AgentGroup) (AgentGroup, error)
	// ViewAgentGroupByID Retrieve an AgentGroup by id
	ViewAgentGroupByID(ctx context.Context, token string, id string) (AgentGroup, error)
	// ViewAgentGroupByIDInternal Retrieve an AgentGroup by id, without a token
	ViewAgentGroupByIDInternal(ctx context.Context, groupID string, ownerID string) (AgentGroup, error)
	// ListAgentGroups Retrieve a list of AgentGroups by owner
	ListAgentGroups(ctx context.Context, token string, pm PageMetadata) (PageAgentGroup, error)
	// EditAgentGroup edit a existing agent group by id and owner
	EditAgentGroup(ctx context.Context, token string, ag AgentGroup) (AgentGroup, error)
	// RemoveAgentGroup Remove a existing agent group by owner an id
	RemoveAgentGroup(ctx context.Context, token string, id string) error
	// ValidateAgentGroup validate AgentGroup
	ValidateAgentGroup(ctx context.Context, token string, s AgentGroup) (AgentGroup, error)
}

type AgentHeartbeatRepository

type AgentHeartbeatRepository interface {
	// UpdateHeartbeatByIDWithChannel update the heartbeat data for the Agent having the provided ID and owner
	UpdateHeartbeatByIDWithChannel(ctx context.Context, agent Agent) error
}

type AgentMetricsRPC

type AgentMetricsRPC struct {
	SchemaVersion string                   `json:"schema_version"`
	Func          string                   `json:"func"`
	Payload       []AgentMetricsRPCPayload `json:"payload"`
}

type AgentMetricsRPCPayload

type AgentMetricsRPCPayload struct {
	PolicyID   string   `json:"policy_id"`
	PolicyName string   `json:"policy_name"`
	Datasets   []string `json:"datasets"`
	Format     string   `json:"format"`
	BEVersion  string   `json:"be_version"`
	Data       []byte   `json:"data"`
}

type AgentPoliciesReqRPCPayload

type AgentPoliciesReqRPCPayload struct {
}

type AgentPolicyRPC

type AgentPolicyRPC struct {
	SchemaVersion string                  `json:"schema_version"`
	Func          string                  `json:"func"`
	Payload       []AgentPolicyRPCPayload `json:"payload"`
	FullList      bool                    `json:"full_list"`
}

type AgentPolicyRPCPayload

type AgentPolicyRPCPayload struct {
	Action       string      `json:"action"`
	ID           string      `json:"id"`
	DatasetID    string      `json:"dataset_id"`
	AgentGroupID string      `json:"agent_group_id"`
	Name         string      `json:"name"`
	Backend      string      `json:"backend"`
	Version      int32       `json:"version"`
	Data         interface{} `json:"data"`
}

type AgentRepository

type AgentRepository interface {
	AgentHeartbeatRepository // may move this out so it can be in e.g. redis

	// Save persists the Agent. Successful operation is indicated by non-nil
	// error response.
	Save(ctx context.Context, agent Agent) error
	// UpdateDataByIDWithChannel update the tags and metadata for the Agent having the provided ID and owner
	UpdateDataByIDWithChannel(ctx context.Context, agent Agent) error
	// RetrieveByIDWithChannel retrieves the Agent having the provided ID and channelID access (i.e. from a Message)
	RetrieveByIDWithChannel(ctx context.Context, thingID string, channelID string) (Agent, error)
	// RetrieveAll retrieves the subset of Agents owned by the specified user
	RetrieveAll(ctx context.Context, owner string, pm PageMetadata) (Page, error)
	// RetrieveAllByAgentGroupID retrieves Agents in the specified group
	RetrieveAllByAgentGroupID(ctx context.Context, owner string, agentGroupID string, onlinishOnly bool) ([]Agent, error)
	// RetrieveMatchingAgents retrieve the matching agents by tags
	RetrieveMatchingAgents(ctx context.Context, owner string, tags types.Tags) (types.Metadata, error)
	// UpdateAgentByID update the the tags and name for the Agent having provided ID and owner
	UpdateAgentByID(ctx context.Context, ownerID string, agent Agent) error
	// RetrieveByID retrieves the Agent having the provided ID and owner
	RetrieveByID(ctx context.Context, ownerID string, thingID string) (Agent, error)
	// Delete an existing agent by owner and id
	Delete(ctx context.Context, ownerID string, thingID string) error
	// RetrieveAgentMetadataByOwner retrieves the Metadata having the OwnerID
	RetrieveAgentMetadataByOwner(ctx context.Context, ownerID string) ([]types.Metadata, error)
	// SetStaleStatus change status to stale according provided duration without heartbeats
	SetStaleStatus(ctx context.Context, minutes time.Duration) (int64, error)
	// RetrieveAgentInfoByChannelID gRPC version to retrieve ownerID, name and agent tags by a provided channelID
	RetrieveAgentInfoByChannelID(ctx context.Context, channelID string) (Agent, error)
}

type AgentResetRPC

type AgentResetRPC struct {
	SchemaVersion string               `json:"schema_version"`
	Func          string               `json:"func"`
	Payload       AgentResetRPCPayload `json:"payload"`
}

type AgentResetRPCPayload

type AgentResetRPCPayload struct {
	FullReset bool   `json:"full_reset"`
	Reason    string `json:"reason"`
}

type AgentService

type AgentService interface {
	// CreateAgent creates new agent
	CreateAgent(ctx context.Context, token string, a Agent) (Agent, error)
	// ViewAgentByID retrieves a Agent by provided thingID
	ViewAgentByID(ctx context.Context, token string, thingID string) (Agent, error)
	// ViewAgentMatchingGroupsByID Groups this Agent currently belongs to, according to matching agent and group tags
	ViewAgentMatchingGroupsByID(ctx context.Context, token string, thingID string) (MatchingGroups, error)
	// ViewAgentByIDInternal retrieves a Agent by provided thingID
	ViewAgentByIDInternal(ctx context.Context, ownerID string, thingID string) (Agent, error)
	// ListAgents retrieves data about subset of agents that belongs to the
	// user identified by the provided key.
	ListAgents(ctx context.Context, token string, pm PageMetadata) (Page, error)
	// EditAgent edit a Agent by provided thingID
	EditAgent(ctx context.Context, token string, agent Agent) (Agent, error)
	// ValidateAgent validates agent
	ValidateAgent(ctx context.Context, token string, a Agent) (Agent, error)
	// RemoveAgent removes an existing agent by owner and id
	RemoveAgent(ctx context.Context, token string, thingID string) error
	// ListAgentBackends List the available backends from fleet agents
	ListAgentBackends(ctx context.Context, token string) ([]string, error)
	// ViewAgentBackend retrieves a Backend by provided backend name
	ViewAgentBackend(ctx context.Context, token string, name string) (interface{}, error)
	//ViewAgentInfoByChannelIDInternal return a correspondent ownerID, name and agent tags by a provided channel id
	ViewAgentInfoByChannelIDInternal(ctx context.Context, channelID string) (Agent, error)
	// ResetAgent reset a agent on edge by a provided agent
	ResetAgent(ct context.Context, token string, agentID string) error
	// GetPolicyState get all policies state per agent in a formatted way from a given existent agent
	GetPolicyState(ctx context.Context, agent Agent) (map[string]interface{}, error)
	// ViewAgentMatchingGroupsByIDInternal Groups this Agent currently belongs to, according to matching agent and group tags
	ViewAgentMatchingGroupsByIDInternal(ctx context.Context, agentID string, ownerID string) (MatchingGroups, error)
}

AgentService Agent CRUD interface

type AgentStopRPC

type AgentStopRPC struct {
	SchemaVersion string              `json:"schema_version"`
	Func          string              `json:"func"`
	Payload       AgentStopRPCPayload `json:"payload"`
}

type AgentStopRPCPayload

type AgentStopRPCPayload struct {
	Reason string `json:"reason"`
}

type BackendInfo

type BackendInfo struct {
	Version string                 `json:"version"`
	Data    map[string]interface{} `json:"data"`
}

type BackendStateInfo

type BackendStateInfo struct {
	State             string    `json:"state"`
	Error             string    `json:"error,omitempty"`
	RestartCount      int64     `json:"restart_count,omitempty"`
	LastError         string    `json:"last_error,omitempty"`
	LastRestartTS     time.Time `json:"last_restart_ts,omitempty"`
	LastRestartReason string    `json:"last_restart_reason,omitempty"`
}

type Capabilities

type Capabilities struct {
	SchemaVersion string                 `json:"schema_version"`
	OrbAgent      OrbAgentInfo           `json:"orb_agent"`
	AgentTags     map[string]string      `json:"agent_tags"`
	Backends      map[string]BackendInfo `json:"backends"`
}

type DatasetRemovedRPC

type DatasetRemovedRPC struct {
	SchemaVersion string                   `json:"schema_version"`
	Func          string                   `json:"func"`
	Payload       DatasetRemovedRPCPayload `json:"payload"`
}

type DatasetRemovedRPCPayload

type DatasetRemovedRPCPayload struct {
	DatasetID string `json:"dataset_id"`
	PolicyID  string `json:"policy_id"`
}

type Group

type Group struct {
	GroupID   string
	GroupName types.Identifier
}

type GroupMembershipData

type GroupMembershipData struct {
	GroupID   string `json:"group_id"`
	Name      string `json:"name"`
	ChannelID string `json:"channel_id"`
}

type GroupMembershipRPC

type GroupMembershipRPC struct {
	SchemaVersion string                    `json:"schema_version"`
	Func          string                    `json:"func"`
	Payload       GroupMembershipRPCPayload `json:"payload"`
}

type GroupMembershipRPCPayload

type GroupMembershipRPCPayload struct {
	Groups   []GroupMembershipData `json:"groups"`
	FullList bool                  `json:"full_list"`
}

type GroupMembershipReqRPCPayload

type GroupMembershipReqRPCPayload struct {
}

type GroupRemovedRPC

type GroupRemovedRPC struct {
	SchemaVersion string                 `json:"schema_version"`
	Func          string                 `json:"func"`
	Payload       GroupRemovedRPCPayload `json:"payload"`
}

type GroupRemovedRPCPayload

type GroupRemovedRPCPayload struct {
	AgentGroupID string   `json:"agent_group_id"`
	ChannelID    string   `json:"channel_id"`
	Datasets     []string `json:"datasets"`
}

type GroupStateInfo

type GroupStateInfo struct {
	GroupName    string `json:"name"`
	GroupChannel string `json:"channel"`
}

type Heartbeat

type Heartbeat struct {
	SchemaVersion string                      `json:"schema_version"`
	TimeStamp     time.Time                   `json:"ts"`
	State         State                       `json:"state"`
	BackendState  map[string]BackendStateInfo `json:"backend_state"`
	PolicyState   map[string]PolicyStateInfo  `json:"policy_state"`
	GroupState    map[string]GroupStateInfo   `json:"group_state"`
}

type MatchingGroups

type MatchingGroups struct {
	OwnerID string
	Groups  []Group
}

type OrbAgentInfo

type OrbAgentInfo struct {
	Version string `json:"version"`
}

type Page

type Page struct {
	PageMetadata
	Agents []Agent
}

Page contains page related metadata as well as list of agents that belong to this page.

type PageAgentGroup

type PageAgentGroup struct {
	PageMetadata
	AgentGroups []AgentGroup
}

type PageMetadata

type PageMetadata struct {
	Total    uint64
	Offset   uint64         `json:"offset,omitempty"`
	Limit    uint64         `json:"limit,omitempty"`
	Name     string         `json:"name,omitempty"`
	Order    string         `json:"order,omitempty"`
	Dir      string         `json:"dir,omitempty"`
	Metadata types.Metadata `json:"metadata,omitempty"`
	Tags     types.Tags     `json:"tags,omitempty"`
}

PageMetadata contains page metadata that helps navigation.

type PolicyStateInfo

type PolicyStateInfo struct {
	Name            string    `json:"name"`
	Datasets        []string  `json:"datasets,omitempty"`
	State           string    `json:"state"`
	Error           string    `json:"error,omitempty"`
	Version         int32     `json:"version,omitempty"`
	LastScrapeBytes int64     `json:"last_scrape_bytes,omitempty"`
	LastScrapeTS    time.Time `json:"last_scrape_ts,omitempty"`
	Backend         string    `json:"backend,omitempty"`
}

type RPC

type RPC struct {
	SchemaVersion string      `json:"schema_version"`
	Func          string      `json:"func"`
	Payload       interface{} `json:"payload"`
}

type SchemaVersionCheck

type SchemaVersionCheck struct {
	SchemaVersion string `json:"schema_version"`
}

type Service

type Service interface {
	AgentService
	AgentGroupService
}

func NewFleetService

func NewFleetService(logger *zap.Logger, auth mainflux.AuthServiceClient, agentRepo AgentRepository, agentGroupRepository AgentGroupRepository, agentComms AgentCommsService, mfsdk mfsdk.SDK, aDone chan bool) Service

type State

type State int
const (
	New State = iota
	Online
	Offline
	Stale
	Removed
	UpgradeRequired
)

func (*State) Scan

func (s *State) Scan(value interface{}) error

func (State) String

func (s State) String() string

func (State) Value

func (s State) Value() (driver.Value, error)

Directories

Path Synopsis
api
Package postgres contains repository implementations using PostgreSQL as the underlying database.
Package postgres contains repository implementations using PostgreSQL as the underlying database.
redis
consumer
Package esconsumer contains events esconsumer for events
Package esconsumer contains events esconsumer for events
producer
Package producer contains the domain events needed to support event sourcing of Sink service actions.
Package producer contains the domain events needed to support event sourcing of Sink service actions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL