service

package
v2.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2022 License: Apache-2.0 Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AuthorizationHeader string = "Authorization"
	AcceptHeader        string = "Accept"
)
View Source
const CacheExpiration = time.Minute * 10
View Source
const NOT_SUPPORTED_ERR = "not supported"

Variables

This section is empty.

Functions

func Get

func Get(ctx context.Context, tracerProvider trace.TracerProvider, url string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud, v interface{}) error

func NewHTTP

func NewHTTP(requestHandler *RequestHandler, authInterceptor kitNetHttp.Interceptor, logger log.Logger) (http.Handler, error)

NewHTTP returns HTTP handler

Types

type APIsConfig

type APIsConfig struct {
	HTTP HTTPConfig `yaml:"http" json:"http"`
}

func (*APIsConfig) Validate

func (c *APIsConfig) Validate() error

type AuthorizationConfig

type AuthorizationConfig struct {
	OwnerClaim    string `yaml:"ownerClaim" json:"ownerClaim"`
	oauth2.Config `yaml:",inline" json:",inline"`
}

func (*AuthorizationConfig) Validate

func (c *AuthorizationConfig) Validate() error

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

func NewCache

func NewCache() *Cache

func (*Cache) Dump

func (s *Cache) Dump() interface{}

func (*Cache) DumpClouds

func (s *Cache) DumpClouds() map[string]*CloudData

func (*Cache) DumpDevices

func (s *Cache) DumpDevices() []SubscriptionData

func (*Cache) DumpLinkedAccounts

func (s *Cache) DumpLinkedAccounts() []ProvisionCacheData

func (*Cache) DumpTasks

func (s *Cache) DumpTasks() []Task

func (*Cache) LoadCloud

func (s *Cache) LoadCloud(cloudID string) (store.LinkedCloud, bool)

func (*Cache) LoadDeviceSubscription

func (s *Cache) LoadDeviceSubscription(cloudID, linkedAccountID, deviceID string) (SubscriptionData, bool)

func (*Cache) LoadDevicesSubscription

func (s *Cache) LoadDevicesSubscription(cloudID, linkedAccountID string) (SubscriptionData, bool)

func (*Cache) LoadOrCreateCloud

func (s *Cache) LoadOrCreateCloud(cloud store.LinkedCloud) (store.LinkedCloud, bool)

func (*Cache) LoadOrCreateLinkedAccount

func (s *Cache) LoadOrCreateLinkedAccount(linkedAccount store.LinkedAccount) (store.LinkedAccount, bool, error)

func (*Cache) LoadOrCreateSubscription

func (s *Cache) LoadOrCreateSubscription(sub Subscription) (SubscriptionData, bool, error)

func (*Cache) LoadResourceSubscription

func (s *Cache) LoadResourceSubscription(cloudID, linkedAccountID, deviceID, href string) (SubscriptionData, bool)

func (*Cache) LoadSubscription

func (s *Cache) LoadSubscription(id string) (SubscriptionData, bool)

func (*Cache) PullOutCloud

func (s *Cache) PullOutCloud(cloudID string) (*CloudData, bool)

func (*Cache) PullOutDevice

func (s *Cache) PullOutDevice(cloudID, linkedAccountID, deviceID string) (*DeviceData, bool)

func (*Cache) PullOutLinkedAccount

func (s *Cache) PullOutLinkedAccount(cloudID, linkedAccountID string) (*LinkedAccountData, bool)

func (*Cache) PullOutResource

func (s *Cache) PullOutResource(cloudID, linkedAccountID, deviceID, href string) (*ResourceData, bool)

func (*Cache) PullOutSubscription

func (s *Cache) PullOutSubscription(subscripionID string) (SubscriptionData, bool)

func (*Cache) UpdateLinkedAccount

func (s *Cache) UpdateLinkedAccount(l store.LinkedAccount) error

type ClientsConfig

type ClientsConfig struct {
	IdentityStore          IdentityStoreConfig               `yaml:"identityStore" json:"identityStore"`
	Eventbus               EventBusConfig                    `yaml:"eventBus" json:"eventBus"`
	GrpcGateway            GrpcGatewayConfig                 `yaml:"grpcGateway" json:"grpcGateway"`
	ResourceAggregate      ResourceAggregateConfig           `yaml:"resourceAggregate" json:"resourceAggregate"`
	Storage                StorageConfig                     `yaml:"storage" json:"storage"`
	Subscription           SubscriptionConfig                `yaml:"subscription" json:"subscription"`
	OpenTelemetryCollector http.OpenTelemetryCollectorConfig `yaml:"openTelemetryCollector" json:"openTelemetryCollector"`
}

func (*ClientsConfig) Validate

func (c *ClientsConfig) Validate() error

type CloudData

type CloudData struct {
	// contains filtered or unexported fields
}

func NewCloudData

func NewCloudData(linkedCloud store.LinkedCloud) *CloudData

func (*CloudData) Dump

func (d *CloudData) Dump() interface{}

func (*CloudData) DumpLinkedAccounts

func (d *CloudData) DumpLinkedAccounts() map[string]*LinkedAccountData

func (*CloudData) DumpTasks

func (d *CloudData) DumpTasks() []Task

type Config

type Config struct {
	Log           log.Config          `yaml:"log" json:"log"`
	APIs          APIsConfig          `yaml:"apis" json:"apis"`
	Clients       ClientsConfig       `yaml:"clients" json:"clients"`
	TaskProcessor TaskProcessorConfig `yaml:"taskProcessor" json:"taskProcessor"`
}

Config represents application configuration

func (Config) String

func (c Config) String() string

Return string representation of Config

func (*Config) Validate

func (c *Config) Validate() error

type Device

type Device struct {
	Device device.Device `json:"device"`
	Status string        `json:"status"`
}

type DeviceData

type DeviceData struct {
	// contains filtered or unexported fields
}

func NewDeviceData

func NewDeviceData() *DeviceData

func (*DeviceData) Dump

func (d *DeviceData) Dump() interface{}

func (*DeviceData) DumpResources

func (d *DeviceData) DumpResources() map[string]*ResourceData

func (*DeviceData) DumpTasks

func (d *DeviceData) DumpTasks(linkedCloud store.LinkedCloud, linkedAccount store.LinkedAccount, deviceID string) []Task

func (*DeviceData) LoadOrCreate

func (d *DeviceData) LoadOrCreate(sub Subscription) (Subscription, bool)

func (*DeviceData) PullOut

func (d *DeviceData) PullOut(sub Subscription) (Subscription, bool)

func (*DeviceData) Subscription

func (d *DeviceData) Subscription() (Subscription, bool)

type DevicesSubscription

type DevicesSubscription struct {
	// contains filtered or unexported fields
}

func NewDevicesSubscription

func NewDevicesSubscription(ctx context.Context, tracerProvider trace.TracerProvider, rdClient pb.GrpcGatewayClient, raClient raService.ResourceAggregateClient, subscriber *subscriber.Subscriber, reconnectInterval time.Duration) *DevicesSubscription

func (*DevicesSubscription) Add

func (c *DevicesSubscription) Add(ctx context.Context, deviceID string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error

func (*DevicesSubscription) Delete

func (c *DevicesSubscription) Delete(userID, deviceID string) error

type EventBusConfig

type EventBusConfig struct {
	NATS natsClient.Config `yaml:"nats" json:"nats"`
}

func (*EventBusConfig) Validate

func (c *EventBusConfig) Validate() error

type GrpcGatewayConfig

type GrpcGatewayConfig struct {
	Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
}

func (*GrpcGatewayConfig) Validate

func (c *GrpcGatewayConfig) Validate() error

type HTTPConfig

type HTTPConfig struct {
	EventsURL     string              `yaml:"eventsURL" json:"eventsUrl"`
	PullDevices   PullDevicesConfig   `yaml:"pullDevices" json:"pullDevices"`
	Connection    listener.Config     `yaml:",inline" json:",inline"`
	Authorization AuthorizationConfig `yaml:"authorization" json:"authorization"`
	Server        server.Config       `yaml:",inline" json:",inline"`
}

func (*HTTPConfig) Validate

func (c *HTTPConfig) Validate() error

type HTTPSubscriptionConfig

type HTTPSubscriptionConfig struct {
	ReconnectInterval   time.Duration `yaml:"reconnectInterval" json:"reconnectInterval"`
	ResubscribeInterval time.Duration `yaml:"resubscribeInterval" json:"resubscribeInterval"`
}

func (*HTTPSubscriptionConfig) Validate

func (c *HTTPSubscriptionConfig) Validate() error

type IdentityStoreConfig

type IdentityStoreConfig struct {
	Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
}

func (*IdentityStoreConfig) Validate

func (c *IdentityStoreConfig) Validate() error

type LinkedAccountData

type LinkedAccountData struct {
	// contains filtered or unexported fields
}

func NewLinkedAccountData

func NewLinkedAccountData(linkedAccount store.LinkedAccount) *LinkedAccountData

func (*LinkedAccountData) Dump

func (d *LinkedAccountData) Dump() interface{}

func (*LinkedAccountData) DumpDevices

func (d *LinkedAccountData) DumpDevices() map[string]*DeviceData

func (*LinkedAccountData) DumpTasks

func (d *LinkedAccountData) DumpTasks(linkedCloud store.LinkedCloud) []Task

func (*LinkedAccountData) LinkedAccount

func (d *LinkedAccountData) LinkedAccount() store.LinkedAccount

func (*LinkedAccountData) LoadOrCreate

func (d *LinkedAccountData) LoadOrCreate(sub Subscription) (Subscription, bool)

func (*LinkedAccountData) PullOut

func (d *LinkedAccountData) PullOut(sub Subscription) (Subscription, bool)

func (*LinkedAccountData) Subscription

func (d *LinkedAccountData) Subscription() (Subscription, bool)

type LinkedAccountHandler

type LinkedAccountHandler struct {
	// contains filtered or unexported fields
}

func (*LinkedAccountHandler) Handle

func (h *LinkedAccountHandler) Handle(ctx context.Context, iter store.LinkedAccountIter) (err error)

type LinkedCloudHandler

type LinkedCloudHandler struct {
	// contains filtered or unexported fields
}

func (*LinkedCloudHandler) Handle

func (h *LinkedCloudHandler) Handle(ctx context.Context, iter store.LinkedCloudIter) (err error)

type OnTaskTrigger

type OnTaskTrigger func(Task)

type ProvisionCacheData added in v2.6.2

type ProvisionCacheData struct {
	// contains filtered or unexported fields
}

type PullDevicesConfig

type PullDevicesConfig struct {
	Disabled bool          `yaml:"disabled" json:"disabled"`
	Interval time.Duration `yaml:"interval" json:"interval"`
}

func (*PullDevicesConfig) Validate

func (c *PullDevicesConfig) Validate() error

type Representation

type Representation struct {
	Href           string      `json:"href"`
	Representation interface{} `json:"rep"`
}

type RequestHandler

type RequestHandler struct {
	// contains filtered or unexported fields
}

RequestHandler handles incoming requests

func NewRequestHandler

func NewRequestHandler(
	ownerClaim string,
	provider *pkgOAuth2.PlgdProvider,
	subManager *SubscriptionManager,
	store *Store,
	triggerTask OnTaskTrigger,
	tracerProvider trace.TracerProvider,
) *RequestHandler

func (*RequestHandler) AddLinkedAccount

func (rh *RequestHandler) AddLinkedAccount(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) AddLinkedCloud

func (rh *RequestHandler) AddLinkedCloud(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) DeleteLinkedAccount

func (rh *RequestHandler) DeleteLinkedAccount(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) DeleteLinkedCloud

func (rh *RequestHandler) DeleteLinkedCloud(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) OAuthCallback

func (rh *RequestHandler) OAuthCallback(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) ProcessEvent

func (rh *RequestHandler) ProcessEvent(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) RetrieveLinkedClouds

func (rh *RequestHandler) RetrieveLinkedClouds(w http.ResponseWriter, r *http.Request)

type ResourceAggregateConfig

type ResourceAggregateConfig struct {
	Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
}

func (*ResourceAggregateConfig) Validate

func (c *ResourceAggregateConfig) Validate() error

type ResourceData

type ResourceData struct {
	// contains filtered or unexported fields
}

func NewResourceData

func NewResourceData() *ResourceData

func (*ResourceData) Dump

func (d *ResourceData) Dump() interface{}

func (*ResourceData) DumpTasks

func (d *ResourceData) DumpTasks(linkedCloud store.LinkedCloud, linkedAccount store.LinkedAccount, deviceID, href string) []Task

func (*ResourceData) LoadOrCreate

func (d *ResourceData) LoadOrCreate(sub Subscription) (Subscription, bool)

func (*ResourceData) PullOut

func (d *ResourceData) PullOut(sub Subscription) (Subscription, bool)

func (*ResourceData) Subscription

func (d *ResourceData) Subscription() (Subscription, bool)

type RetrieveDeviceContentAllResponse

type RetrieveDeviceContentAllResponse struct {
	Device
	Links []Representation `json:"links"`
}

type RetrieveDeviceWithLinksResponse

type RetrieveDeviceWithLinksResponse struct {
	Device
	Links []schema.ResourceLink `json:"links"`
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles HTTP request

func New

func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logger log.Logger) (*Server, error)

New parses configuration and creates new Server with provided store and bus

func (*Server) Close added in v2.5.0

func (s *Server) Close() error

Shutdown ends serving

func (*Server) Serve

func (s *Server) Serve() error

Serve starts the service's HTTP server and blocks

type StorageConfig

type StorageConfig struct {
	MongoDB mongodb.Config `yaml:"mongoDB" json:"mongoDb"`
}

func (*StorageConfig) Validate

func (c *StorageConfig) Validate() error

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(ctx context.Context, db store.Store) (*Store, error)

func (*Store) Dump

func (s *Store) Dump() interface{}

func (*Store) DumpDevices

func (s *Store) DumpDevices() []SubscriptionData

func (*Store) DumpLinkedAccounts

func (s *Store) DumpLinkedAccounts() []ProvisionCacheData

func (*Store) DumpTasks

func (s *Store) DumpTasks() []Task

func (*Store) LoadCloud

func (s *Store) LoadCloud(cloudID string) (store.LinkedCloud, bool)

func (*Store) LoadDeviceSubscription

func (s *Store) LoadDeviceSubscription(cloudID, linkedAccountID, deviceID string) (SubscriptionData, bool)

func (*Store) LoadDevicesSubscription

func (s *Store) LoadDevicesSubscription(cloudID, linkedAccountID string) (SubscriptionData, bool)

func (*Store) LoadOrCreateCloud

func (s *Store) LoadOrCreateCloud(ctx context.Context, cloud store.LinkedCloud) (store.LinkedCloud, bool, error)

func (*Store) LoadOrCreateLinkedAccount

func (s *Store) LoadOrCreateLinkedAccount(ctx context.Context, linkedAccount store.LinkedAccount) (store.LinkedAccount, bool, error)

func (*Store) LoadOrCreateSubscription

func (s *Store) LoadOrCreateSubscription(sub Subscription) (SubscriptionData, bool, error)

func (*Store) LoadResourceSubscription

func (s *Store) LoadResourceSubscription(cloudID, linkedAccountID, deviceID, href string) (SubscriptionData, bool)

func (*Store) LoadSubscription

func (s *Store) LoadSubscription(subscripionID string) (SubscriptionData, bool)

func (*Store) PullOutCloud

func (s *Store) PullOutCloud(ctx context.Context, cloudID string) (*CloudData, error)

func (*Store) PullOutDevice

func (s *Store) PullOutDevice(cloudID, linkedAccountID, deviceID string) (*DeviceData, bool)

func (*Store) PullOutLinkedAccount

func (s *Store) PullOutLinkedAccount(ctx context.Context, cloudID, linkedAccountID string) (*LinkedAccountData, error)

func (*Store) PullOutResource

func (s *Store) PullOutResource(cloudID, linkedAccountID, deviceID, href string) (*ResourceData, bool)

func (*Store) PullOutSubscription

func (s *Store) PullOutSubscription(subscripionID string) (SubscriptionData, bool)

func (*Store) UpdateLinkedAccount

func (s *Store) UpdateLinkedAccount(ctx context.Context, linkedAccount store.LinkedAccount) error

type Subscription

type Subscription struct {
	ID              string
	Type            Type
	LinkedAccountID string
	LinkedCloudID   string
	DeviceID        string
	Href            string
	SigningSecret   string
	CorrelationID   string
}

type SubscriptionConfig

type SubscriptionConfig struct {
	HTTP HTTPSubscriptionConfig `yaml:"http" json:"http"`
}

func (*SubscriptionConfig) Validate

func (c *SubscriptionConfig) Validate() error

type SubscriptionData added in v2.6.2

type SubscriptionData struct {
	// contains filtered or unexported fields
}

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

func NewSubscriptionManager

func NewSubscriptionManager(
	eventsURL string,
	isClient pbIS.IdentityStoreClient,
	raClient raService.ResourceAggregateClient,
	store *Store,
	devicesSubscription *DevicesSubscription,
	provider *oauth2.PlgdProvider,
	triggerTask OnTaskTrigger,
	tracerProvider trace.TracerProvider,
) *SubscriptionManager

func (*SubscriptionManager) HandleCancelEvent

func (s *SubscriptionManager) HandleCancelEvent(ctx context.Context, header events.EventHeader, linkedAccount store.LinkedAccount) error

func (*SubscriptionManager) HandleDeviceEvent

func (s *SubscriptionManager) HandleDeviceEvent(ctx context.Context, header events.EventHeader, body []byte, subscriptionData SubscriptionData) error

HandleDeviceEvent handles device events.

func (*SubscriptionManager) HandleDevicesEvent

func (s *SubscriptionManager) HandleDevicesEvent(ctx context.Context, header events.EventHeader, body []byte, d SubscriptionData) error

func (*SubscriptionManager) HandleDevicesOffline

func (s *SubscriptionManager) HandleDevicesOffline(ctx context.Context, d SubscriptionData, header events.EventHeader, devices events.DevicesOffline) error

HandleDevicesOffline sets device off to resource aggregate and unregister device to projection.

func (*SubscriptionManager) HandleDevicesOnline

func (s *SubscriptionManager) HandleDevicesOnline(ctx context.Context, d SubscriptionData, header events.EventHeader, devices events.DevicesOnline) error

HandleDevicesOnline sets device online to resource aggregate and register device to projection.

func (*SubscriptionManager) HandleDevicesRegistered

func (s *SubscriptionManager) HandleDevicesRegistered(ctx context.Context, d SubscriptionData, devices events.DevicesRegistered, header events.EventHeader) error

func (*SubscriptionManager) HandleDevicesUnregistered

func (s *SubscriptionManager) HandleDevicesUnregistered(ctx context.Context, subscriptionData SubscriptionData, correlationID string, devices events.DevicesUnregistered) error

func (*SubscriptionManager) HandleEvent

func (s *SubscriptionManager) HandleEvent(ctx context.Context, header events.EventHeader, body []byte) (int, error)

func (*SubscriptionManager) HandleResourceChangedEvent

func (s *SubscriptionManager) HandleResourceChangedEvent(ctx context.Context, subscriptionData SubscriptionData, header events.EventHeader, body []byte) error

func (*SubscriptionManager) HandleResourceEvent

func (s *SubscriptionManager) HandleResourceEvent(ctx context.Context, header events.EventHeader, body []byte, subscriptionData SubscriptionData) error

func (*SubscriptionManager) HandleResourcesPublished

func (s *SubscriptionManager) HandleResourcesPublished(ctx context.Context, d SubscriptionData, header events.EventHeader, links events.ResourcesPublished) error

HandleResourcesPublished publish resources to resource aggregate and subscribes to resources.

func (*SubscriptionManager) HandleResourcesUnpublished

func (s *SubscriptionManager) HandleResourcesUnpublished(ctx context.Context, d SubscriptionData, header events.EventHeader, links events.ResourcesUnpublished) error

HandleResourcesUnpublished unpublish resources from resource aggregate and cancel resources subscriptions.

func (*SubscriptionManager) Run

func (s *SubscriptionManager) Run(ctx context.Context, interval time.Duration)

func (*SubscriptionManager) SubscribeToDevice

func (s *SubscriptionManager) SubscribeToDevice(ctx context.Context, deviceID string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error

func (*SubscriptionManager) SubscribeToDevices

func (s *SubscriptionManager) SubscribeToDevices(ctx context.Context, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error

func (*SubscriptionManager) SubscribeToResource

func (s *SubscriptionManager) SubscribeToResource(ctx context.Context, deviceID, href string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error

type Task

type Task struct {
	// contains filtered or unexported fields
}

type TaskProcessor

type TaskProcessor struct {
	// contains filtered or unexported fields
}

func NewTaskProcessor

func NewTaskProcessor(raClient raService.ResourceAggregateClient, tracerProvider trace.TracerProvider, maxParallelGets, cacheSize int, timeout, delay time.Duration) *TaskProcessor

func (*TaskProcessor) Run

func (h *TaskProcessor) Run(ctx context.Context, subscriptionManager *SubscriptionManager) error

func (*TaskProcessor) Trigger

func (h *TaskProcessor) Trigger(task Task)

type TaskProcessorConfig

type TaskProcessorConfig struct {
	CacheSize   int           `yaml:"cacheSize" json:"cacheSize"`
	Timeout     time.Duration `yaml:"timeout" json:"timeout"`
	MaxParallel int           `yaml:"maxParallel" json:"maxParallel"`
	Delay       time.Duration `yaml:"delay" json:"delay"` // Used for CTT test with 10s.
}

func (*TaskProcessorConfig) Validate

func (c *TaskProcessorConfig) Validate() error

type TaskType

type TaskType string
const (
	TaskType_PullDevice          TaskType = "pulldevice"
	TaskType_SubscribeToDevices  TaskType = "subdevices"
	TaskType_SubscribeToDevice   TaskType = "subdevice"
	TaskType_SubscribeToResource TaskType = "subresource"
)

type Type

type Type string
const (
	Type_Devices  Type = "devices"
	Type_Device   Type = "device"
	Type_Resource Type = "resource"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL