Documentation ¶
Index ¶
- Constants
- Variables
- type Agent
- type AgentCommsService
- type AgentGroup
- type AgentGroupRepository
- type AgentGroupService
- type AgentHeartbeatRepository
- type AgentMetricsRPC
- type AgentMetricsRPCPayload
- type AgentPoliciesReqRPCPayload
- type AgentPolicyRPC
- type AgentPolicyRPCPayload
- type AgentRepository
- type AgentResetRPC
- type AgentResetRPCPayload
- type AgentService
- type AgentStopRPC
- type AgentStopRPCPayload
- type BackendInfo
- type BackendStateInfo
- type Capabilities
- type DatasetRemovedRPC
- type DatasetRemovedRPCPayload
- type Group
- type GroupMembershipData
- type GroupMembershipRPC
- type GroupMembershipRPCPayload
- type GroupMembershipReqRPCPayload
- type GroupRemovedRPC
- type GroupRemovedRPCPayload
- type GroupStateInfo
- type Heartbeat
- type MatchingGroups
- type OrbAgentInfo
- type Page
- type PageAgentGroup
- type PageMetadata
- type PolicyStateInfo
- type RPC
- type SchemaVersionCheck
- type Service
- type State
Constants ¶
View Source
const ( HeartbeatFreq = 60 * time.Second DefaultTimeout = 300 * time.Second )
View Source
const AgentMetricsRPCFunc = "agent_metrics"
View Source
const AgentPoliciesReqRPCFunc = "agent_policies_req"
View Source
const AgentPolicyRPCFunc = "agent_policy"
View Source
const AgentResetRPCFunc = "agent_reset"
View Source
const AgentStopRPCFunc = "agent_stop"
View Source
const CapabilitiesTopic = "agent"
View Source
const CurrentCapabilitiesSchemaVersion = "1.0"
View Source
const CurrentHeartbeatSchemaVersion = "1.0"
View Source
const CurrentRPCSchemaVersion = "1.0"
View Source
const DatasetRemovedRPCFunc = "dataset_removed"
View Source
const GroupMembershipRPCFunc = "group_membership"
View Source
const GroupMembershipReqRPCFunc = "group_membership_req"
View Source
const GroupRemovedRPCFunc = "group_removed"
View Source
const HeartbeatsTopic = "hb"
View Source
const LogTopic = "log"
View Source
const MaxMsgPayloadSize = 1024 * 25
MaxMsgPayloadSize maximum payload size we will process from a client
View Source
const RPCFromCoreTopic = "fromcore"
View Source
const RPCToCoreTopic = "tocore"
Variables ¶
View Source
var ( ErrCreateAgentGroup = errors.New("failed to create agent group") ErrMaintainAgentGroupChannels = errors.New("failed to maintain agent group channels") )
View Source
var ( // ErrMalformedEntity indicates malformed entity specification (e.g. // invalid username or password). ErrMalformedEntity = errors.New("malformed entity specification") // ErrNotFound indicates a non-existent entity request. ErrNotFound = errors.New("non-existent entity") // ErrConflict indicates that entity already exists. ErrConflict = errors.New("entity already exists") ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided") // ErrScanMetadata indicates problem with metadata in db ErrScanMetadata = errors.New("failed to scan metadata in db") // ErrSelectEntity indicates error while reading entity from database ErrSelectEntity = errors.New("select entity from db error") // ErrEntityConnected indicates error while checking connection in database ErrEntityConnected = errors.New("check connection in database error") // ErrUpdateEntity indicates error while updating a entity ErrUpdateEntity = errors.New("failed to update entity") // ErrRemoveEntity indicates a error while deleting a agent group ErrRemoveEntity = errors.New("failed to remove entity") )
View Source
var ( ErrCreateAgent = errors.New("failed to create agent") // ErrThings indicates failure to communicate with Mainflux Things service. // It can be due to networking error or invalid/unauthorized request. ErrThings = errors.New("failed to receive response from Things service") )
View Source
var ( // ErrSchemaVersion a message was received indicating a version we don't support ErrSchemaVersion = errors.New("unsupported schema version") // ErrSchemaMalformed a message contained a schema we couldn't parse ErrSchemaMalformed = errors.New("schema malformed") // ErrPayloadTooBig a message contained a payload that was abnormally large ErrPayloadTooBig = errors.New("payload too big") )
Functions ¶
This section is empty.
Types ¶
type AgentCommsService ¶
type AgentCommsService interface { // Start set up communication with the message bus to communicate with agents Start() error // Stop end communication with the message bus Stop() error // NotifyAgentNewGroupMembership RPC Core -> Agent: Notify a specific Agent of new AgentGroup membership it now belongs to NotifyAgentNewGroupMembership(a Agent, ag AgentGroup) error // NotifyAgentGroupMemberships RPC Core -> Agent: Notify a specific Agent of all AgentGroup memberships it belongs to NotifyAgentGroupMemberships(a Agent) error // NotifyAgentAllDatasets RPC Core -> Agent: Notify Agent of all Policy it should currently run based on group membership and current Datasets NotifyAgentAllDatasets(a Agent) error // NotifyAgentStop RPC Core -> Agent: Notify Agent that it should Stop (Send the message to Agent Channel) NotifyAgentStop(agent Agent, reason string) error // NotifyGroupNewDataset RPC Core -> Agent: Notify AgentGroup of a newly created Dataset, exposing a new Policy to run NotifyGroupNewDataset(ctx context.Context, ag AgentGroup, datasetID string, policyID string, ownerID string) error // NotifyGroupRemoval RPC core -> Agent: Notify AgentGroup that the group has been removed NotifyGroupRemoval(ctx context.Context, ag AgentGroup) error // NotifyGroupPolicyRemoval RPC core -> Agent: Notify AgentGroup that a Policy has been removed NotifyGroupPolicyRemoval(ag AgentGroup, policyID string, policyName string, backend string) error // NotifyGroupDatasetRemoval RPC core -> Agent: Notify AgentGroup that a Dataset has been removed NotifyGroupDatasetRemoval(ag AgentGroup, dsID string, policyID string) error // NotifyGroupPolicyUpdate RPC core -> Agent: Notify AgentGroup that a Policy has been updated NotifyGroupPolicyUpdate(ctx context.Context, ag AgentGroup, policyID string, ownerID string) error //NotifyAgentReset RPC core -> Agent: Notify Agent to reset the backend NotifyAgentReset(agent Agent, fullReset bool, reason string) error // NotifyGroupDatasetEdit RPC core -> Agent: Notify Agent an already created Dataset goes invalid or valid NotifyGroupDatasetEdit(ctx context.Context, ag AgentGroup, datasetID, policyID, ownerID string, valid bool) error }
func CommsMetricsMiddleware ¶
func CommsMetricsMiddleware(svc AgentCommsService, counter metrics.Counter, latency metrics.Histogram) AgentCommsService
func NewFleetCommsService ¶
func NewFleetCommsService(logger *zap.Logger, policyClient pb.PolicyServiceClient, agentRepo AgentRepository, agentGroupRepo AgentGroupRepository, agentPubSub mfnats.PubSub) AgentCommsService
type AgentGroup ¶
type AgentGroupRepository ¶
type AgentGroupRepository interface { // Save persists the AgentGroup. Successful operation is indicated by non-nil // error response. Save(ctx context.Context, group AgentGroup) (string, error) // RetrieveAllByAgent get all AgentGroup which an Agent belongs to. RetrieveAllByAgent(ctx context.Context, a Agent) ([]AgentGroup, error) // RetrieveByID get an AgentGroup by id RetrieveByID(ctx context.Context, groupID string, ownerID string) (AgentGroup, error) // RetrieveAllAgentGroupsByOwner get all AgentGroup by owner. RetrieveAllAgentGroupsByOwner(ctx context.Context, ownerID string, pm PageMetadata) (PageAgentGroup, error) // Update a existing agent group by owner and id Update(ctx context.Context, ownerID string, group AgentGroup) (AgentGroup, error) // Delete a existing agent group by owner and id Delete(ctx context.Context, groupID string, ownerID string) error // RetrieveMatchingGroups Groups this Agent currently belongs to, according to matching agent and group tags RetrieveMatchingGroups(ctx context.Context, ownerID string, thingID string) (MatchingGroups, error) }
type AgentGroupService ¶
type AgentGroupService interface { // CreateAgentGroup creates new AgentGroup, associated channel, applies to Agents as appropriate CreateAgentGroup(ctx context.Context, token string, s AgentGroup) (AgentGroup, error) // ViewAgentGroupByID Retrieve an AgentGroup by id ViewAgentGroupByID(ctx context.Context, token string, id string) (AgentGroup, error) // ViewAgentGroupByIDInternal Retrieve an AgentGroup by id, without a token ViewAgentGroupByIDInternal(ctx context.Context, groupID string, ownerID string) (AgentGroup, error) // ListAgentGroups Retrieve a list of AgentGroups by owner ListAgentGroups(ctx context.Context, token string, pm PageMetadata) (PageAgentGroup, error) // EditAgentGroup edit a existing agent group by id and owner EditAgentGroup(ctx context.Context, token string, ag AgentGroup) (AgentGroup, error) // RemoveAgentGroup Remove a existing agent group by owner an id RemoveAgentGroup(ctx context.Context, token string, id string) error // ValidateAgentGroup validate AgentGroup ValidateAgentGroup(ctx context.Context, token string, s AgentGroup) (AgentGroup, error) }
type AgentMetricsRPC ¶
type AgentMetricsRPC struct { SchemaVersion string `json:"schema_version"` Func string `json:"func"` Payload []AgentMetricsRPCPayload `json:"payload"` }
type AgentMetricsRPCPayload ¶
type AgentPoliciesReqRPCPayload ¶
type AgentPoliciesReqRPCPayload struct { }
type AgentPolicyRPC ¶
type AgentPolicyRPC struct { SchemaVersion string `json:"schema_version"` Func string `json:"func"` Payload []AgentPolicyRPCPayload `json:"payload"` FullList bool `json:"full_list"` }
type AgentPolicyRPCPayload ¶
type AgentRepository ¶
type AgentRepository interface { AgentHeartbeatRepository // may move this out so it can be in e.g. redis // Save persists the Agent. Successful operation is indicated by non-nil // error response. Save(ctx context.Context, agent Agent) error // UpdateDataByIDWithChannel update the tags and metadata for the Agent having the provided ID and owner UpdateDataByIDWithChannel(ctx context.Context, agent Agent) error // RetrieveByIDWithChannel retrieves the Agent having the provided ID and channelID access (i.e. from a Message) RetrieveByIDWithChannel(ctx context.Context, thingID string, channelID string) (Agent, error) // RetrieveAll retrieves the subset of Agents owned by the specified user RetrieveAll(ctx context.Context, owner string, pm PageMetadata) (Page, error) // RetrieveAllByAgentGroupID retrieves Agents in the specified group RetrieveAllByAgentGroupID(ctx context.Context, owner string, agentGroupID string, onlinishOnly bool) ([]Agent, error) // RetrieveMatchingAgents retrieve the matching agents by tags RetrieveMatchingAgents(ctx context.Context, owner string, tags types.Tags) (types.Metadata, error) // UpdateAgentByID update the the tags and name for the Agent having provided ID and owner UpdateAgentByID(ctx context.Context, ownerID string, agent Agent) error // RetrieveByID retrieves the Agent having the provided ID and owner RetrieveByID(ctx context.Context, ownerID string, thingID string) (Agent, error) // Delete an existing agent by owner and id Delete(ctx context.Context, ownerID string, thingID string) error // RetrieveAgentMetadataByOwner retrieves the Metadata having the OwnerID RetrieveAgentMetadataByOwner(ctx context.Context, ownerID string) ([]types.Metadata, error) // SetStaleStatus change status to stale according provided duration without heartbeats SetStaleStatus(ctx context.Context, minutes time.Duration) (int64, error) // RetrieveAgentInfoByChannelID gRPC version to retrieve ownerID, name and agent tags by a provided channelID RetrieveAgentInfoByChannelID(ctx context.Context, channelID string) (Agent, error) }
type AgentResetRPC ¶
type AgentResetRPC struct { SchemaVersion string `json:"schema_version"` Func string `json:"func"` Payload AgentResetRPCPayload `json:"payload"` }
type AgentResetRPCPayload ¶
type AgentService ¶
type AgentService interface { // CreateAgent creates new agent CreateAgent(ctx context.Context, token string, a Agent) (Agent, error) // ViewAgentByID retrieves a Agent by provided thingID ViewAgentByID(ctx context.Context, token string, thingID string) (Agent, error) // ViewAgentMatchingGroupsByID Groups this Agent currently belongs to, according to matching agent and group tags ViewAgentMatchingGroupsByID(ctx context.Context, token string, thingID string) (MatchingGroups, error) // ViewAgentByIDInternal retrieves a Agent by provided thingID ViewAgentByIDInternal(ctx context.Context, ownerID string, thingID string) (Agent, error) // ListAgents retrieves data about subset of agents that belongs to the // user identified by the provided key. ListAgents(ctx context.Context, token string, pm PageMetadata) (Page, error) // EditAgent edit a Agent by provided thingID EditAgent(ctx context.Context, token string, agent Agent) (Agent, error) // ValidateAgent validates agent ValidateAgent(ctx context.Context, token string, a Agent) (Agent, error) // RemoveAgent removes an existing agent by owner and id RemoveAgent(ctx context.Context, token string, thingID string) error // ListAgentBackends List the available backends from fleet agents ListAgentBackends(ctx context.Context, token string) ([]string, error) // ViewAgentBackend retrieves a Backend by provided backend name ViewAgentBackend(ctx context.Context, token string, name string) (interface{}, error) //ViewAgentInfoByChannelIDInternal return a correspondent ownerID, name and agent tags by a provided channel id ViewAgentInfoByChannelIDInternal(ctx context.Context, channelID string) (Agent, error) // ResetAgent reset a agent on edge by a provided agent ResetAgent(ct context.Context, token string, agentID string) error // GetPolicyState get all policies state per agent in a formatted way from a given existent agent GetPolicyState(ctx context.Context, agent Agent) (map[string]interface{}, error) // ViewAgentMatchingGroupsByIDInternal Groups this Agent currently belongs to, according to matching agent and group tags ViewAgentMatchingGroupsByIDInternal(ctx context.Context, agentID string, ownerID string) (MatchingGroups, error) }
AgentService Agent CRUD interface
type AgentStopRPC ¶
type AgentStopRPC struct { SchemaVersion string `json:"schema_version"` Func string `json:"func"` Payload AgentStopRPCPayload `json:"payload"` }
type AgentStopRPCPayload ¶
type AgentStopRPCPayload struct {
Reason string `json:"reason"`
}
type BackendInfo ¶
type BackendStateInfo ¶
type BackendStateInfo struct { State string `json:"state"` Error string `json:"error,omitempty"` RestartCount int64 `json:"restart_count,omitempty"` LastError string `json:"last_error,omitempty"` LastRestartTS time.Time `json:"last_restart_ts,omitempty"` LastRestartReason string `json:"last_restart_reason,omitempty"` }
type Capabilities ¶
type Capabilities struct { SchemaVersion string `json:"schema_version"` OrbAgent OrbAgentInfo `json:"orb_agent"` AgentTags map[string]string `json:"agent_tags"` Backends map[string]BackendInfo `json:"backends"` }
type DatasetRemovedRPC ¶
type DatasetRemovedRPC struct { SchemaVersion string `json:"schema_version"` Func string `json:"func"` Payload DatasetRemovedRPCPayload `json:"payload"` }
type Group ¶
type Group struct { GroupID string GroupName types.Identifier }
type GroupMembershipData ¶
type GroupMembershipRPC ¶
type GroupMembershipRPC struct { SchemaVersion string `json:"schema_version"` Func string `json:"func"` Payload GroupMembershipRPCPayload `json:"payload"` }
type GroupMembershipRPCPayload ¶
type GroupMembershipRPCPayload struct { Groups []GroupMembershipData `json:"groups"` FullList bool `json:"full_list"` }
type GroupMembershipReqRPCPayload ¶
type GroupMembershipReqRPCPayload struct { }
type GroupRemovedRPC ¶
type GroupRemovedRPC struct { SchemaVersion string `json:"schema_version"` Func string `json:"func"` Payload GroupRemovedRPCPayload `json:"payload"` }
type GroupRemovedRPCPayload ¶
type GroupStateInfo ¶
type Heartbeat ¶
type Heartbeat struct { SchemaVersion string `json:"schema_version"` TimeStamp time.Time `json:"ts"` State State `json:"state"` BackendState map[string]BackendStateInfo `json:"backend_state"` PolicyState map[string]PolicyStateInfo `json:"policy_state"` GroupState map[string]GroupStateInfo `json:"group_state"` }
type MatchingGroups ¶
type OrbAgentInfo ¶
type OrbAgentInfo struct {
Version string `json:"version"`
}
type Page ¶
type Page struct { PageMetadata Agents []Agent }
Page contains page related metadata as well as list of agents that belong to this page.
type PageAgentGroup ¶
type PageAgentGroup struct { PageMetadata AgentGroups []AgentGroup }
type PageMetadata ¶
type PageMetadata struct { Total uint64 Offset uint64 `json:"offset,omitempty"` Limit uint64 `json:"limit,omitempty"` Name string `json:"name,omitempty"` Order string `json:"order,omitempty"` Dir string `json:"dir,omitempty"` Metadata types.Metadata `json:"metadata,omitempty"` Tags types.Tags `json:"tags,omitempty"` }
PageMetadata contains page metadata that helps navigation.
type PolicyStateInfo ¶
type PolicyStateInfo struct { Name string `json:"name"` Datasets []string `json:"datasets,omitempty"` State string `json:"state"` Error string `json:"error,omitempty"` Version int32 `json:"version,omitempty"` LastScrapeBytes int64 `json:"last_scrape_bytes,omitempty"` LastScrapeTS time.Time `json:"last_scrape_ts,omitempty"` Backend string `json:"backend,omitempty"` }
type SchemaVersionCheck ¶
type SchemaVersionCheck struct {
SchemaVersion string `json:"schema_version"`
}
type Service ¶
type Service interface { AgentService AgentGroupService }
func NewFleetService ¶
func NewFleetService(logger *zap.Logger, auth mainflux.AuthServiceClient, agentRepo AgentRepository, agentGroupRepository AgentGroupRepository, agentComms AgentCommsService, mfsdk mfsdk.SDK, aDone chan bool) Service
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
api
|
|
Package postgres contains repository implementations using PostgreSQL as the underlying database.
|
Package postgres contains repository implementations using PostgreSQL as the underlying database. |
redis
|
|
consumer
Package esconsumer contains events esconsumer for events
|
Package esconsumer contains events esconsumer for events |
producer
Package producer contains the domain events needed to support event sourcing of Sink service actions.
|
Package producer contains the domain events needed to support event sourcing of Sink service actions. |
Click to show internal directories.
Click to hide internal directories.