Documentation ¶
Index ¶
- Constants
- func Get(ctx context.Context, url string, linkedAccount store.LinkedAccount, ...) error
- func NewHTTP(requestHandler *RequestHandler, authInterceptor kitNetHttp.Interceptor) (*http.Server, error)
- type APIsConfig
- type AuthorizationConfig
- type Cache
- func (s *Cache) Dump() interface{}
- func (s *Cache) DumpClouds() map[string]*CloudData
- func (s *Cache) DumpDevices() []subscriptionData
- func (s *Cache) DumpLinkedAccounts() []provisionCacheData
- func (s *Cache) DumpTasks() []Task
- func (s *Cache) LoadCloud(cloudID string) (store.LinkedCloud, bool)
- func (s *Cache) LoadDeviceSubscription(cloudID, linkedAccountID, deviceID string) (subscriptionData, bool)
- func (s *Cache) LoadDevicesSubscription(cloudID, linkedAccountID string) (subscriptionData, bool)
- func (s *Cache) LoadOrCreateCloud(cloud store.LinkedCloud) (store.LinkedCloud, bool)
- func (s *Cache) LoadOrCreateLinkedAccount(linkedAccount store.LinkedAccount) (store.LinkedAccount, bool, error)
- func (s *Cache) LoadOrCreateSubscription(sub Subscription) (subscriptionData, bool, error)
- func (s *Cache) LoadResourceSubscription(cloudID, linkedAccountID, deviceID, href string) (subscriptionData, bool)
- func (s *Cache) LoadSubscription(ID string) (subscriptionData, bool)
- func (s *Cache) PullOutCloud(cloudID string) (*CloudData, bool)
- func (s *Cache) PullOutDevice(cloudID, linkedAccountID, deviceID string) (*DeviceData, bool)
- func (s *Cache) PullOutLinkedAccount(cloudID, linkedAccountID string) (*LinkedAccountData, bool)
- func (s *Cache) PullOutResource(cloudID, linkedAccountID, deviceID, href string) (*ResourceData, bool)
- func (s *Cache) PullOutSubscription(subscripionID string) (subscriptionData, bool)
- func (s *Cache) UpdateLinkedAccount(l store.LinkedAccount) error
- type ClientsConfig
- type CloudData
- type Config
- type Device
- type DeviceData
- func (d *DeviceData) Dump() interface{}
- func (d *DeviceData) DumpResources() map[string]*ResourceData
- func (d *DeviceData) DumpTasks(linkedCloud store.LinkedCloud, linkedAccount store.LinkedAccount, ...) []Task
- func (d *DeviceData) LoadOrCreate(sub Subscription) (Subscription, bool)
- func (d *DeviceData) PullOut(sub Subscription) (Subscription, bool)
- func (d *DeviceData) Subscription() (Subscription, bool)
- type DevicesSubscription
- type EventBusConfig
- type GrpcGatewayConfig
- type HTTPConfig
- type HTTPSubscriptionConfig
- type IdentityStoreConfig
- type LinkedAccountData
- func (d *LinkedAccountData) Dump() interface{}
- func (d *LinkedAccountData) DumpDevices() map[string]*DeviceData
- func (d *LinkedAccountData) DumpTasks(linkedCloud store.LinkedCloud) []Task
- func (d *LinkedAccountData) LinkedAccount() store.LinkedAccount
- func (d *LinkedAccountData) LoadOrCreate(sub Subscription) (Subscription, bool)
- func (d *LinkedAccountData) PullOut(sub Subscription) (Subscription, bool)
- func (d *LinkedAccountData) Subscription() (Subscription, bool)
- type LinkedAccountHandler
- type LinkedCloudHandler
- type OnTaskTrigger
- type PullDevicesConfig
- type Representation
- type RequestHandler
- func (rh *RequestHandler) AddLinkedAccount(w http.ResponseWriter, r *http.Request)
- func (rh *RequestHandler) AddLinkedCloud(w http.ResponseWriter, r *http.Request)
- func (rh *RequestHandler) DeleteLinkedAccount(w http.ResponseWriter, r *http.Request)
- func (rh *RequestHandler) DeleteLinkedCloud(w http.ResponseWriter, r *http.Request)
- func (rh *RequestHandler) OAuthCallback(w http.ResponseWriter, r *http.Request)
- func (rh *RequestHandler) ProcessEvent(w http.ResponseWriter, r *http.Request)
- func (rh *RequestHandler) RetrieveLinkedClouds(w http.ResponseWriter, r *http.Request)
- type ResourceAggregateConfig
- type ResourceData
- func (d *ResourceData) Dump() interface{}
- func (d *ResourceData) DumpTasks(linkedCloud store.LinkedCloud, linkedAccount store.LinkedAccount, ...) []Task
- func (d *ResourceData) LoadOrCreate(sub Subscription) (Subscription, bool)
- func (d *ResourceData) PullOut(sub Subscription) (Subscription, bool)
- func (d *ResourceData) Subscription() (Subscription, bool)
- type RetrieveDeviceContentAllResponse
- type RetrieveDeviceWithLinksResponse
- type Server
- type StorageConfig
- type Store
- func (s *Store) Dump() interface{}
- func (s *Store) DumpDevices() []subscriptionData
- func (s *Store) DumpLinkedAccounts() []provisionCacheData
- func (s *Store) DumpTasks() []Task
- func (s *Store) LoadCloud(cloudID string) (store.LinkedCloud, bool)
- func (s *Store) LoadDeviceSubscription(cloudID, linkedAccountID, deviceID string) (subscriptionData, bool)
- func (s *Store) LoadDevicesSubscription(cloudID, linkedAccountID string) (subscriptionData, bool)
- func (s *Store) LoadOrCreateCloud(ctx context.Context, cloud store.LinkedCloud) (store.LinkedCloud, bool, error)
- func (s *Store) LoadOrCreateLinkedAccount(ctx context.Context, linkedAccount store.LinkedAccount) (store.LinkedAccount, bool, error)
- func (s *Store) LoadOrCreateSubscription(sub Subscription) (subscriptionData, bool, error)
- func (s *Store) LoadResourceSubscription(cloudID, linkedAccountID, deviceID, href string) (subscriptionData, bool)
- func (s *Store) LoadSubscription(subscripionID string) (subscriptionData, bool)
- func (s *Store) PullOutCloud(ctx context.Context, cloudID string) (*CloudData, error)
- func (s *Store) PullOutDevice(cloudID, linkedAccountID, deviceID string) (*DeviceData, bool)
- func (s *Store) PullOutLinkedAccount(ctx context.Context, cloudID, linkedAccountID string) (*LinkedAccountData, error)
- func (s *Store) PullOutResource(cloudID, linkedAccountID, deviceID, href string) (*ResourceData, bool)
- func (s *Store) PullOutSubscription(subscripionID string) (subscriptionData, bool)
- func (s *Store) UpdateLinkedAccount(ctx context.Context, linkedAccount store.LinkedAccount) error
- type Subscription
- type SubscriptionConfig
- type SubscriptionManager
- func (s *SubscriptionManager) HandleCancelEvent(ctx context.Context, header events.EventHeader, ...) error
- func (s *SubscriptionManager) HandleDeviceEvent(ctx context.Context, header events.EventHeader, body []byte, ...) error
- func (s *SubscriptionManager) HandleDevicesEvent(ctx context.Context, header events.EventHeader, body []byte, ...) error
- func (s *SubscriptionManager) HandleDevicesOffline(ctx context.Context, d subscriptionData, header events.EventHeader, ...) error
- func (s *SubscriptionManager) HandleDevicesOnline(ctx context.Context, d subscriptionData, header events.EventHeader, ...) error
- func (s *SubscriptionManager) HandleDevicesRegistered(ctx context.Context, d subscriptionData, devices events.DevicesRegistered, ...) error
- func (s *SubscriptionManager) HandleDevicesUnregistered(ctx context.Context, subscriptionData subscriptionData, correlationID string, ...) error
- func (s *SubscriptionManager) HandleEvent(ctx context.Context, header events.EventHeader, body []byte) (int, error)
- func (s *SubscriptionManager) HandleResourceChangedEvent(ctx context.Context, subscriptionData subscriptionData, ...) error
- func (s *SubscriptionManager) HandleResourceEvent(ctx context.Context, header events.EventHeader, body []byte, ...) error
- func (s *SubscriptionManager) HandleResourcesPublished(ctx context.Context, d subscriptionData, header events.EventHeader, ...) error
- func (s *SubscriptionManager) HandleResourcesUnpublished(ctx context.Context, d subscriptionData, header events.EventHeader, ...) error
- func (s *SubscriptionManager) Run(ctx context.Context, interval time.Duration)
- func (s *SubscriptionManager) SubscribeToDevice(ctx context.Context, deviceID string, linkedAccount store.LinkedAccount, ...) error
- func (s *SubscriptionManager) SubscribeToDevices(ctx context.Context, linkedAccount store.LinkedAccount, ...) error
- func (s *SubscriptionManager) SubscribeToResource(ctx context.Context, deviceID, href string, linkedAccount store.LinkedAccount, ...) error
- type Task
- type TaskProcessor
- type TaskProcessorConfig
- type TaskType
- type Type
Constants ¶
View Source
const AcceptHeader string = "Accept"
View Source
const AuthorizationHeader string = "Authorization"
View Source
const CacheExpiration = time.Minute * 10
View Source
const NOT_SUPPORTED_ERR = "not supported"
Variables ¶
This section is empty.
Functions ¶
func Get ¶
func Get(ctx context.Context, url string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud, v interface{}) error
func NewHTTP ¶
func NewHTTP(requestHandler *RequestHandler, authInterceptor kitNetHttp.Interceptor) (*http.Server, error)
NewHTTP returns HTTP server
Types ¶
type APIsConfig ¶
type APIsConfig struct {
HTTP HTTPConfig `yaml:"http" json:"http"`
}
func (*APIsConfig) Validate ¶
func (c *APIsConfig) Validate() error
type AuthorizationConfig ¶
type AuthorizationConfig struct { OwnerClaim string `yaml:"ownerClaim" json:"ownerClaim"` oauth2.Config `yaml:",inline" json:",inline"` }
func (*AuthorizationConfig) Validate ¶
func (c *AuthorizationConfig) Validate() error
type Cache ¶
type Cache struct {
// contains filtered or unexported fields
}
func (*Cache) DumpClouds ¶
func (*Cache) DumpDevices ¶
func (s *Cache) DumpDevices() []subscriptionData
func (*Cache) DumpLinkedAccounts ¶
func (s *Cache) DumpLinkedAccounts() []provisionCacheData
func (*Cache) LoadDeviceSubscription ¶
func (*Cache) LoadDevicesSubscription ¶
func (*Cache) LoadOrCreateCloud ¶
func (s *Cache) LoadOrCreateCloud(cloud store.LinkedCloud) (store.LinkedCloud, bool)
func (*Cache) LoadOrCreateLinkedAccount ¶
func (s *Cache) LoadOrCreateLinkedAccount(linkedAccount store.LinkedAccount) (store.LinkedAccount, bool, error)
func (*Cache) LoadOrCreateSubscription ¶
func (s *Cache) LoadOrCreateSubscription(sub Subscription) (subscriptionData, bool, error)
func (*Cache) LoadResourceSubscription ¶
func (*Cache) LoadSubscription ¶
func (*Cache) PullOutDevice ¶
func (s *Cache) PullOutDevice(cloudID, linkedAccountID, deviceID string) (*DeviceData, bool)
func (*Cache) PullOutLinkedAccount ¶
func (s *Cache) PullOutLinkedAccount(cloudID, linkedAccountID string) (*LinkedAccountData, bool)
func (*Cache) PullOutResource ¶
func (s *Cache) PullOutResource(cloudID, linkedAccountID, deviceID, href string) (*ResourceData, bool)
func (*Cache) PullOutSubscription ¶
func (*Cache) UpdateLinkedAccount ¶
func (s *Cache) UpdateLinkedAccount(l store.LinkedAccount) error
type ClientsConfig ¶
type ClientsConfig struct { IdentityStore IdentityStoreConfig `yaml:"identityStore" json:"identityStore"` Eventbus EventBusConfig `yaml:"eventBus" json:"eventBus"` GrpcGateway GrpcGatewayConfig `yaml:"grpcGateway" json:"grpcGateway"` ResourceAggregate ResourceAggregateConfig `yaml:"resourceAggregate" json:"resourceAggregate"` Storage StorageConfig `yaml:"storage" json:"storage"` Subscription SubscriptionConfig `yaml:"subscription" json:"subscription"` }
func (*ClientsConfig) Validate ¶
func (c *ClientsConfig) Validate() error
type CloudData ¶
type CloudData struct {
// contains filtered or unexported fields
}
func NewCloudData ¶
func NewCloudData(linkedCloud store.LinkedCloud) *CloudData
func (*CloudData) DumpLinkedAccounts ¶
func (d *CloudData) DumpLinkedAccounts() map[string]*LinkedAccountData
type Config ¶
type Config struct { Log log.Config `yaml:"log" json:"log"` APIs APIsConfig `yaml:"apis" json:"apis"` Clients ClientsConfig `yaml:"clients" json:"clients"` TaskProcessor TaskProcessorConfig `yaml:"taskProcessor" json:"taskProcessor"` }
Config represents application configuration
type DeviceData ¶
type DeviceData struct {
// contains filtered or unexported fields
}
func NewDeviceData ¶
func NewDeviceData() *DeviceData
func (*DeviceData) Dump ¶
func (d *DeviceData) Dump() interface{}
func (*DeviceData) DumpResources ¶
func (d *DeviceData) DumpResources() map[string]*ResourceData
func (*DeviceData) DumpTasks ¶
func (d *DeviceData) DumpTasks(linkedCloud store.LinkedCloud, linkedAccount store.LinkedAccount, deviceID string) []Task
func (*DeviceData) LoadOrCreate ¶
func (d *DeviceData) LoadOrCreate(sub Subscription) (Subscription, bool)
func (*DeviceData) PullOut ¶
func (d *DeviceData) PullOut(sub Subscription) (Subscription, bool)
func (*DeviceData) Subscription ¶
func (d *DeviceData) Subscription() (Subscription, bool)
type DevicesSubscription ¶
type DevicesSubscription struct {
// contains filtered or unexported fields
}
func NewDevicesSubscription ¶
func NewDevicesSubscription(ctx context.Context, rdClient pb.GrpcGatewayClient, raClient raService.ResourceAggregateClient, subscriber *subscriber.Subscriber, reconnectInterval time.Duration) *DevicesSubscription
func (*DevicesSubscription) Add ¶
func (c *DevicesSubscription) Add(deviceID string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error
func (*DevicesSubscription) Delete ¶
func (c *DevicesSubscription) Delete(userID, deviceID string) error
type EventBusConfig ¶
type EventBusConfig struct {
NATS natsClient.Config `yaml:"nats" json:"nats"`
}
func (*EventBusConfig) Validate ¶
func (c *EventBusConfig) Validate() error
type GrpcGatewayConfig ¶
type GrpcGatewayConfig struct {
Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
}
func (*GrpcGatewayConfig) Validate ¶
func (c *GrpcGatewayConfig) Validate() error
type HTTPConfig ¶
type HTTPConfig struct { EventsURL string `yaml:"eventsURL" json:"eventsUrl"` PullDevices PullDevicesConfig `yaml:"pullDevices" json:"pullDevices"` Connection listener.Config `yaml:",inline" json:",inline"` Authorization AuthorizationConfig `yaml:"authorization" json:"authorization"` }
func (*HTTPConfig) Validate ¶
func (c *HTTPConfig) Validate() error
type HTTPSubscriptionConfig ¶
type HTTPSubscriptionConfig struct { ReconnectInterval time.Duration `yaml:"reconnectInterval" json:"reconnectInterval"` ResubscribeInterval time.Duration `yaml:"resubscribeInterval" json:"resubscribeInterval"` }
func (*HTTPSubscriptionConfig) Validate ¶
func (c *HTTPSubscriptionConfig) Validate() error
type IdentityStoreConfig ¶
type IdentityStoreConfig struct {
Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
}
func (*IdentityStoreConfig) Validate ¶
func (c *IdentityStoreConfig) Validate() error
type LinkedAccountData ¶
type LinkedAccountData struct {
// contains filtered or unexported fields
}
func NewLinkedAccountData ¶
func NewLinkedAccountData(linkedAccount store.LinkedAccount) *LinkedAccountData
func (*LinkedAccountData) Dump ¶
func (d *LinkedAccountData) Dump() interface{}
func (*LinkedAccountData) DumpDevices ¶
func (d *LinkedAccountData) DumpDevices() map[string]*DeviceData
func (*LinkedAccountData) DumpTasks ¶
func (d *LinkedAccountData) DumpTasks(linkedCloud store.LinkedCloud) []Task
func (*LinkedAccountData) LinkedAccount ¶
func (d *LinkedAccountData) LinkedAccount() store.LinkedAccount
func (*LinkedAccountData) LoadOrCreate ¶
func (d *LinkedAccountData) LoadOrCreate(sub Subscription) (Subscription, bool)
func (*LinkedAccountData) PullOut ¶
func (d *LinkedAccountData) PullOut(sub Subscription) (Subscription, bool)
func (*LinkedAccountData) Subscription ¶
func (d *LinkedAccountData) Subscription() (Subscription, bool)
type LinkedAccountHandler ¶
type LinkedAccountHandler struct {
// contains filtered or unexported fields
}
func (*LinkedAccountHandler) Handle ¶
func (h *LinkedAccountHandler) Handle(ctx context.Context, iter store.LinkedAccountIter) (err error)
type LinkedCloudHandler ¶
type LinkedCloudHandler struct {
// contains filtered or unexported fields
}
func (*LinkedCloudHandler) Handle ¶
func (h *LinkedCloudHandler) Handle(ctx context.Context, iter store.LinkedCloudIter) (err error)
type OnTaskTrigger ¶
type OnTaskTrigger func(Task)
type PullDevicesConfig ¶
type PullDevicesConfig struct { Disabled bool `yaml:"disabled" json:"disabled"` Interval time.Duration `yaml:"interval" json:"interval"` }
func (*PullDevicesConfig) Validate ¶
func (c *PullDevicesConfig) Validate() error
type Representation ¶
type Representation struct { Href string `json:"href"` Representation interface{} `json:"rep"` }
type RequestHandler ¶
type RequestHandler struct {
// contains filtered or unexported fields
}
RequestHandler handles incoming requests
func NewRequestHandler ¶
func NewRequestHandler( ownerClaim string, provider *pkgOAuth2.PlgdProvider, subManager *SubscriptionManager, store *Store, triggerTask OnTaskTrigger, ) *RequestHandler
func (*RequestHandler) AddLinkedAccount ¶
func (rh *RequestHandler) AddLinkedAccount(w http.ResponseWriter, r *http.Request)
func (*RequestHandler) AddLinkedCloud ¶
func (rh *RequestHandler) AddLinkedCloud(w http.ResponseWriter, r *http.Request)
func (*RequestHandler) DeleteLinkedAccount ¶
func (rh *RequestHandler) DeleteLinkedAccount(w http.ResponseWriter, r *http.Request)
func (*RequestHandler) DeleteLinkedCloud ¶
func (rh *RequestHandler) DeleteLinkedCloud(w http.ResponseWriter, r *http.Request)
func (*RequestHandler) OAuthCallback ¶
func (rh *RequestHandler) OAuthCallback(w http.ResponseWriter, r *http.Request)
func (*RequestHandler) ProcessEvent ¶
func (rh *RequestHandler) ProcessEvent(w http.ResponseWriter, r *http.Request)
func (*RequestHandler) RetrieveLinkedClouds ¶
func (rh *RequestHandler) RetrieveLinkedClouds(w http.ResponseWriter, r *http.Request)
type ResourceAggregateConfig ¶
type ResourceAggregateConfig struct {
Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
}
func (*ResourceAggregateConfig) Validate ¶
func (c *ResourceAggregateConfig) Validate() error
type ResourceData ¶
type ResourceData struct {
// contains filtered or unexported fields
}
func NewResourceData ¶
func NewResourceData() *ResourceData
func (*ResourceData) Dump ¶
func (d *ResourceData) Dump() interface{}
func (*ResourceData) DumpTasks ¶
func (d *ResourceData) DumpTasks(linkedCloud store.LinkedCloud, linkedAccount store.LinkedAccount, deviceID, href string) []Task
func (*ResourceData) LoadOrCreate ¶
func (d *ResourceData) LoadOrCreate(sub Subscription) (Subscription, bool)
func (*ResourceData) PullOut ¶
func (d *ResourceData) PullOut(sub Subscription) (Subscription, bool)
func (*ResourceData) Subscription ¶
func (d *ResourceData) Subscription() (Subscription, bool)
type RetrieveDeviceContentAllResponse ¶
type RetrieveDeviceContentAllResponse struct { Device Links []Representation `json:"links"` }
type RetrieveDeviceWithLinksResponse ¶
type RetrieveDeviceWithLinksResponse struct { Device Links []schema.ResourceLink `json:"links"` }
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server handles HTTP request
type StorageConfig ¶
func (*StorageConfig) Validate ¶
func (c *StorageConfig) Validate() error
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) DumpDevices ¶
func (s *Store) DumpDevices() []subscriptionData
func (*Store) DumpLinkedAccounts ¶
func (s *Store) DumpLinkedAccounts() []provisionCacheData
func (*Store) LoadDeviceSubscription ¶
func (*Store) LoadDevicesSubscription ¶
func (*Store) LoadOrCreateCloud ¶
func (s *Store) LoadOrCreateCloud(ctx context.Context, cloud store.LinkedCloud) (store.LinkedCloud, bool, error)
func (*Store) LoadOrCreateLinkedAccount ¶
func (s *Store) LoadOrCreateLinkedAccount(ctx context.Context, linkedAccount store.LinkedAccount) (store.LinkedAccount, bool, error)
func (*Store) LoadOrCreateSubscription ¶
func (s *Store) LoadOrCreateSubscription(sub Subscription) (subscriptionData, bool, error)
func (*Store) LoadResourceSubscription ¶
func (*Store) LoadSubscription ¶
func (*Store) PullOutCloud ¶
func (*Store) PullOutDevice ¶
func (s *Store) PullOutDevice(cloudID, linkedAccountID, deviceID string) (*DeviceData, bool)
func (*Store) PullOutLinkedAccount ¶
func (*Store) PullOutResource ¶
func (s *Store) PullOutResource(cloudID, linkedAccountID, deviceID, href string) (*ResourceData, bool)
func (*Store) PullOutSubscription ¶
func (*Store) UpdateLinkedAccount ¶
type Subscription ¶
type SubscriptionConfig ¶
type SubscriptionConfig struct {
HTTP HTTPSubscriptionConfig `yaml:"http" json:"http"`
}
func (*SubscriptionConfig) Validate ¶
func (c *SubscriptionConfig) Validate() error
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
func NewSubscriptionManager ¶
func NewSubscriptionManager( eventsURL string, isClient pbIS.IdentityStoreClient, raClient raService.ResourceAggregateClient, store *Store, devicesSubscription *DevicesSubscription, provider *oauth2.PlgdProvider, triggerTask OnTaskTrigger, ) *SubscriptionManager
func (*SubscriptionManager) HandleCancelEvent ¶
func (s *SubscriptionManager) HandleCancelEvent(ctx context.Context, header events.EventHeader, linkedAccount store.LinkedAccount) error
func (*SubscriptionManager) HandleDeviceEvent ¶
func (s *SubscriptionManager) HandleDeviceEvent(ctx context.Context, header events.EventHeader, body []byte, subscriptionData subscriptionData) error
HandleDeviceEvent handles device events.
func (*SubscriptionManager) HandleDevicesEvent ¶
func (s *SubscriptionManager) HandleDevicesEvent(ctx context.Context, header events.EventHeader, body []byte, d subscriptionData) error
func (*SubscriptionManager) HandleDevicesOffline ¶
func (s *SubscriptionManager) HandleDevicesOffline(ctx context.Context, d subscriptionData, header events.EventHeader, devices events.DevicesOffline) error
HandleDevicesOffline sets device off to resource aggregate and unregister device to projection.
func (*SubscriptionManager) HandleDevicesOnline ¶
func (s *SubscriptionManager) HandleDevicesOnline(ctx context.Context, d subscriptionData, header events.EventHeader, devices events.DevicesOnline) error
HandleDevicesOnline sets device online to resource aggregate and register device to projection.
func (*SubscriptionManager) HandleDevicesRegistered ¶
func (s *SubscriptionManager) HandleDevicesRegistered(ctx context.Context, d subscriptionData, devices events.DevicesRegistered, header events.EventHeader) error
func (*SubscriptionManager) HandleDevicesUnregistered ¶
func (s *SubscriptionManager) HandleDevicesUnregistered(ctx context.Context, subscriptionData subscriptionData, correlationID string, devices events.DevicesUnregistered) error
func (*SubscriptionManager) HandleEvent ¶
func (s *SubscriptionManager) HandleEvent(ctx context.Context, header events.EventHeader, body []byte) (int, error)
func (*SubscriptionManager) HandleResourceChangedEvent ¶
func (s *SubscriptionManager) HandleResourceChangedEvent(ctx context.Context, subscriptionData subscriptionData, header events.EventHeader, body []byte) error
func (*SubscriptionManager) HandleResourceEvent ¶
func (s *SubscriptionManager) HandleResourceEvent(ctx context.Context, header events.EventHeader, body []byte, subscriptionData subscriptionData) error
func (*SubscriptionManager) HandleResourcesPublished ¶
func (s *SubscriptionManager) HandleResourcesPublished(ctx context.Context, d subscriptionData, header events.EventHeader, links events.ResourcesPublished) error
HandleResourcesPublished publish resources to resource aggregate and subscribes to resources.
func (*SubscriptionManager) HandleResourcesUnpublished ¶
func (s *SubscriptionManager) HandleResourcesUnpublished(ctx context.Context, d subscriptionData, header events.EventHeader, links events.ResourcesUnpublished) error
HandleResourcesUnpublished unpublish resources from resource aggregate and cancel resources subscriptions.
func (*SubscriptionManager) Run ¶
func (s *SubscriptionManager) Run(ctx context.Context, interval time.Duration)
func (*SubscriptionManager) SubscribeToDevice ¶
func (s *SubscriptionManager) SubscribeToDevice(ctx context.Context, deviceID string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error
func (*SubscriptionManager) SubscribeToDevices ¶
func (s *SubscriptionManager) SubscribeToDevices(ctx context.Context, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error
func (*SubscriptionManager) SubscribeToResource ¶
func (s *SubscriptionManager) SubscribeToResource(ctx context.Context, deviceID, href string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error
type TaskProcessor ¶
type TaskProcessor struct {
// contains filtered or unexported fields
}
func NewTaskProcessor ¶
func NewTaskProcessor(raClient raService.ResourceAggregateClient, maxParallelGets, cacheSize int, timeout, delay time.Duration) *TaskProcessor
func (*TaskProcessor) Run ¶
func (h *TaskProcessor) Run(ctx context.Context, subscriptionManager *SubscriptionManager) error
func (*TaskProcessor) Trigger ¶
func (h *TaskProcessor) Trigger(task Task)
type TaskProcessorConfig ¶
type TaskProcessorConfig struct { CacheSize int `yaml:"cacheSize" json:"cacheSize"` Timeout time.Duration `yaml:"timeout" json:"timeout"` MaxParallel int `yaml:"maxParallel" json:"maxParallel"` Delay time.Duration `yaml:"delay" json:"delay"` // Used for CTT test with 10s. }
func (*TaskProcessorConfig) Validate ¶
func (c *TaskProcessorConfig) Validate() error
Source Files ¶
- addLinkedAccount.go
- addLinkedCloud.go
- cache.go
- config.go
- deleteLinkedAccount.go
- deleteLinkedCloud.go
- deviceSubscription.go
- deviceSubscriptionHandlers.go
- devicesSubscription.go
- message.go
- oauthCallback.go
- processEvent.go
- publishResource.go
- pull.go
- requestHandler.go
- resourceSubscription.go
- retrieveLinkedClouds.go
- retrieveResource.go
- service.go
- store.go
- subscriptions.go
- taskProcessor.go
- updateResource.go
Click to show internal directories.
Click to hide internal directories.