service

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2020 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const ContentQuery = "content"
View Source
const ContentQueryAllValue = "all"
View Source
const ContentQueryBaseValue = "base"
View Source
const ContentQueryDefault = ContentQueryBaseValue
View Source
const HrefKey = "Href"

Variables

This section is empty.

Functions

func NewHTTP

func NewHTTP(requestHandler *RequestHandler, authInterceptor kitNetHttp.Interceptor) *http.Server

NewHTTP returns HTTP server

Types

type Config

type Config struct {
	grpc.Config
	ResourceDirectoryAddr string         `envconfig:"RESOURCE_DIRECTORY_ADDRESS"  default:"127.0.0.1:9100"`
	FQDN                  string         `envconfig:"FQDN" default:"cloud2cloud.pluggedin.cloud"`
	ReconnectInterval     time.Duration  `envconfig:"RECONNECT_INTERVAL" default:"10s"`
	JwksURL               string         `envconfig:"JWKS_URL"`
	OAuth                 manager.Config `envconfig:"OAUTH"`
	EmitEventTimeout      time.Duration  `envconfig:"EMIT_EVENT_TIMEOUT" default:"5s"`
}

Config represent application configuration

func (Config) String

func (c Config) String() string

String return string representation of Config

type Device

type Device struct {
	Device schema.Device `json:"device"`
	Status Status        `json:"status"`
}

type DialCertManager

type DialCertManager = interface {
	GetClientTLSConfig() *tls.Config
}

type ErrFunc

type ErrFunc func(err error)

ErrFunc used by handler to report error from observation

type Event

type Event struct {
	Id             string
	EventType      events.EventType
	DeviceID       string
	Href           string
	Representation interface{}
}

type GoroutinePoolGoFunc

type GoroutinePoolGoFunc func(func()) error

GoroutinePoolGoFunc processes actions via provided function

type GoroutinePoolHandler

type GoroutinePoolHandler struct {
	// contains filtered or unexported fields
}

GoroutinePoolHandler submit events to goroutine pool for process them.

func NewGoroutinePoolHandler

func NewGoroutinePoolHandler(
	goroutinePoolGo GoroutinePoolGoFunc,
	eventsHandler Handler,
	errFunc ErrFunc) *GoroutinePoolHandler

NewGoroutinePoolHandler creates new event processor.

func (*GoroutinePoolHandler) Handle

func (ep *GoroutinePoolHandler) Handle(ctx context.Context, event Event) (err error)

Handle pushes event to queue and process the queue by goroutine pool.

type Handler

type Handler interface {
	Handle(ctx context.Context, iter Iter) (err error)
}

type Iter

type Iter interface {
	Next(ctx context.Context, event *Event) bool
	Err() error
}

Iter provides iterator over events from eventstore or eventbus.

type ListDevicesOfUserFunc

type ListDevicesOfUserFunc func(ctx context.Context, correlationID, userID, accessToken string) (deviceIds []string, statusCode int, err error)

type ListenCertManager

type ListenCertManager = interface {
	GetServerTLSConfig() *tls.Config
}

type Representation

type Representation struct {
	Href           string        `json:"href"`
	Representation interface{}   `json:"rep"`
	Status         pbGRPC.Status `json:"-"`
}

type RequestHandler

type RequestHandler struct {
	// contains filtered or unexported fields
}

RequestHandler for handling incoming request

func NewRequestHandler

func NewRequestHandler(
	rdClient pbGRPC.GrpcGatewayClient,
	subMgr *SubscriptionManager,
	emitEvent emitEventFunc,
) *RequestHandler

NewRequestHandler factory for new RequestHandler

func (*RequestHandler) GetDevices

func (rh *RequestHandler) GetDevices(ctx context.Context, deviceIdsFilter []string) ([]Device, error)
func (rh *RequestHandler) GetResourceLinks(ctx context.Context, deviceIdsFilter []string) (map[string]schema.ResourceLinks, error)

func (*RequestHandler) RetrieveDevice

func (rh *RequestHandler) RetrieveDevice(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) RetrieveDeviceSubscription

func (rh *RequestHandler) RetrieveDeviceSubscription(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) RetrieveDeviceWithContentQuery

func (rh *RequestHandler) RetrieveDeviceWithContentQuery(ctx context.Context, w http.ResponseWriter, routeVars map[string]string, contentQuery string, encoder responseWriterEncoderFunc) (int, error)
func (rh *RequestHandler) RetrieveDeviceWithLinks(ctx context.Context, w http.ResponseWriter, deviceID string, encoder responseWriterEncoderFunc) (int, error)

func (*RequestHandler) RetrieveDeviceWithRepresentations

func (rh *RequestHandler) RetrieveDeviceWithRepresentations(ctx context.Context, w http.ResponseWriter, deviceID string, encoder responseWriterEncoderFunc) (int, error)

func (*RequestHandler) RetrieveDevices

func (rh *RequestHandler) RetrieveDevices(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) RetrieveDevicesAll

func (rh *RequestHandler) RetrieveDevicesAll(ctx context.Context, w http.ResponseWriter, encoder responseWriterEncoderFunc) (int, error)

func (*RequestHandler) RetrieveDevicesBase

func (rh *RequestHandler) RetrieveDevicesBase(ctx context.Context, w http.ResponseWriter, encoder responseWriterEncoderFunc) (int, error)

func (*RequestHandler) RetrieveDevicesSubscription

func (rh *RequestHandler) RetrieveDevicesSubscription(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) RetrieveDevicesWithContentQuery

func (rh *RequestHandler) RetrieveDevicesWithContentQuery(ctx context.Context, w http.ResponseWriter, routeVars map[string]string, contentQuery string, encoder responseWriterEncoderFunc) (statusCode int, err error)

func (*RequestHandler) RetrieveResource

func (rh *RequestHandler) RetrieveResource(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) RetrieveResourceBase

func (rh *RequestHandler) RetrieveResourceBase(ctx context.Context, w http.ResponseWriter, resourceID pbGRPC.ResourceId, encoder responseWriterEncoderFunc) (int, error)

func (*RequestHandler) RetrieveResourceSubscription

func (rh *RequestHandler) RetrieveResourceSubscription(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) RetrieveResourceWithContentQuery

func (rh *RequestHandler) RetrieveResourceWithContentQuery(ctx context.Context, w http.ResponseWriter, routeVars map[string]string, contentQuery string, encoder responseWriterEncoderFunc) (int, error)

func (*RequestHandler) RetrieveResourcesValues

func (rh *RequestHandler) RetrieveResourcesValues(ctx context.Context, resourceIdsFilter []*pbGRPC.ResourceId, deviceIdsFilter []string) (map[string][]Representation, error)

func (*RequestHandler) SubscribeToDevice

func (rh *RequestHandler) SubscribeToDevice(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) SubscribeToDevices

func (rh *RequestHandler) SubscribeToDevices(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) SubscribeToResource

func (rh *RequestHandler) SubscribeToResource(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) UnsubscribeFromDevice

func (rh *RequestHandler) UnsubscribeFromDevice(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) UnsubscribeFromDevices

func (rh *RequestHandler) UnsubscribeFromDevices(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) UnsubscribeFromResource

func (rh *RequestHandler) UnsubscribeFromResource(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) UpdateResource

func (rh *RequestHandler) UpdateResource(w http.ResponseWriter, r *http.Request)

type RetrieveDeviceAllResponse

type RetrieveDeviceAllResponse struct {
	Device
	Links []Representation `json:"links"`
}

type RetrieveDeviceContentAllResponse

type RetrieveDeviceContentAllResponse struct {
	Device
	Links []Representation `json:"links"`
}

type RetrieveDeviceWithLinksResponse

type RetrieveDeviceWithLinksResponse struct {
	Device
	Links []schema.ResourceLink `json:"links"`
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handle HTTP request

func New

func New(
	config Config,
	dialCertManager DialCertManager,
	listenCertManager ListenCertManager,
	authInterceptor kitNetHttp.Interceptor,
	subscriptionStore store.Store,
) *Server

New create new Server with provided store and bus

func (*Server) Serve

func (s *Server) Serve() error

Serve starts the service's HTTP server and blocks.

func (*Server) Shutdown

func (s *Server) Shutdown() error

Shutdown ends serving

type Status

type Status string
const Status_OFFLINE Status = "offline"
const Status_ONLINE Status = "online"

type Subscription

type Subscription interface {
	Cancel() (func(), error)
}

type SubscriptionData

type SubscriptionData struct {
	// contains filtered or unexported fields
}

func (*SubscriptionData) Connect

func (s *SubscriptionData) Connect(ctx context.Context, emitEvent emitEventFunc, deleteSub func(ctx context.Context, subID, userID string) (store.Subscription, error)) error

func (*SubscriptionData) Data

func (*SubscriptionData) IncrementSequenceNumber

func (s *SubscriptionData) IncrementSequenceNumber(ctx context.Context) (uint64, error)

func (*SubscriptionData) Store

func (s *SubscriptionData) Store(sub Subscription)

func (*SubscriptionData) Subscription

func (s *SubscriptionData) Subscription() Subscription

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

func NewSubscriptionManager

func NewSubscriptionManager(ctx context.Context, store store.Store, gwClient pb.GrpcGatewayClient, reconnectInterval time.Duration, emitEvent emitEventFunc) *SubscriptionManager

func (*SubscriptionManager) Connect

func (s *SubscriptionManager) Connect(ID string) error

func (*SubscriptionManager) DumpNotConnectedSubscriptionDatas

func (s *SubscriptionManager) DumpNotConnectedSubscriptionDatas() map[string]*SubscriptionData

func (*SubscriptionManager) Load

func (s *SubscriptionManager) Load(ID, userID string) (store.Subscription, bool)

func (*SubscriptionManager) LoadSubscriptions

func (s *SubscriptionManager) LoadSubscriptions() error

func (*SubscriptionManager) PullOut

func (s *SubscriptionManager) PullOut(ctx context.Context, ID, userID string) (store.Subscription, error)

func (*SubscriptionManager) Run

func (s *SubscriptionManager) Run()

func (*SubscriptionManager) Store

type SubscriptionResponse

type SubscriptionResponse struct {
	SubscriptionID string `json:"subscriptionId"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL