service

package
v2.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2022 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ResourceAggregate_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "resourceaggregate.pb.ResourceAggregate",
	HandlerType: (*ResourceAggregateServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "PublishResourceLinks",
			Handler:    _ResourceAggregate_PublishResourceLinks_Handler,
		},
		{
			MethodName: "UnpublishResourceLinks",
			Handler:    _ResourceAggregate_UnpublishResourceLinks_Handler,
		},
		{
			MethodName: "NotifyResourceChanged",
			Handler:    _ResourceAggregate_NotifyResourceChanged_Handler,
		},
		{
			MethodName: "UpdateResource",
			Handler:    _ResourceAggregate_UpdateResource_Handler,
		},
		{
			MethodName: "ConfirmResourceUpdate",
			Handler:    _ResourceAggregate_ConfirmResourceUpdate_Handler,
		},
		{
			MethodName: "RetrieveResource",
			Handler:    _ResourceAggregate_RetrieveResource_Handler,
		},
		{
			MethodName: "ConfirmResourceRetrieve",
			Handler:    _ResourceAggregate_ConfirmResourceRetrieve_Handler,
		},
		{
			MethodName: "DeleteResource",
			Handler:    _ResourceAggregate_DeleteResource_Handler,
		},
		{
			MethodName: "ConfirmResourceDelete",
			Handler:    _ResourceAggregate_ConfirmResourceDelete_Handler,
		},
		{
			MethodName: "CreateResource",
			Handler:    _ResourceAggregate_CreateResource_Handler,
		},
		{
			MethodName: "ConfirmResourceCreate",
			Handler:    _ResourceAggregate_ConfirmResourceCreate_Handler,
		},
		{
			MethodName: "UpdateDeviceMetadata",
			Handler:    _ResourceAggregate_UpdateDeviceMetadata_Handler,
		},
		{
			MethodName: "ConfirmDeviceMetadataUpdate",
			Handler:    _ResourceAggregate_ConfirmDeviceMetadataUpdate_Handler,
		},
		{
			MethodName: "CancelPendingMetadataUpdates",
			Handler:    _ResourceAggregate_CancelPendingMetadataUpdates_Handler,
		},
		{
			MethodName: "CancelPendingCommands",
			Handler:    _ResourceAggregate_CancelPendingCommands_Handler,
		},
		{
			MethodName: "DeleteDevices",
			Handler:    _ResourceAggregate_DeleteDevices_Handler,
		},
		{
			MethodName: "BatchNotifyResourceChanged",
			Handler:    _ResourceAggregate_BatchNotifyResourceChanged_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "resource-aggregate/pb/service.proto",
}

ResourceAggregate_ServiceDesc is the grpc.ServiceDesc for ResourceAggregate service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)

Functions

func DeviceMetadataFactoryModel

func DeviceMetadataFactoryModel(ctx context.Context) (cqrsAggregate.AggregateModel, error)

func New

func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logger log.Logger) (*service.Service, error)

func NewService

func NewService(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider, eventStore EventStore, publisher cqrsEventBus.Publisher) (*service.Service, error)

New creates new Server with provided store and publisher.

func PublishEvents

func PublishEvents(publisher eventbus.Publisher, owner, deviceID, resourceID string, events []eventbus.Event) error

func RegisterResourceAggregateServer

func RegisterResourceAggregateServer(s grpc.ServiceRegistrar, srv ResourceAggregateServer)

func ResourceLinksFactoryModel

func ResourceLinksFactoryModel(ctx context.Context) (cqrsAggregate.AggregateModel, error)

func ResourceStateFactoryModel

func ResourceStateFactoryModel(ctx context.Context) (cqrsAggregate.AggregateModel, error)

Types

type APIsConfig

type APIsConfig struct {
	GRPC GRPCConfig `yaml:"grpc" json:"grpc"`
}

func (*APIsConfig) Validate

func (c *APIsConfig) Validate() error

type Aggregate added in v2.7.0

type Aggregate struct {
	// contains filtered or unexported fields
}

func NewAggregate

func NewAggregate(resourceID *commands.ResourceId, snapshotThreshold int, eventstore EventStore, factoryModel cqrsAggregate.FactoryModelFunc, retry cqrsAggregate.RetryFunc) (*Aggregate, error)

NewAggregate creates new resource aggreate - it must be created for every run command.

func (*Aggregate) CancelPendingMetadataUpdates added in v2.7.0

func (a *Aggregate) CancelPendingMetadataUpdates(ctx context.Context, request *commands.CancelPendingMetadataUpdatesRequest) (events []eventstore.Event, err error)

func (*Aggregate) CancelResourceCommand added in v2.7.0

func (a *Aggregate) CancelResourceCommand(ctx context.Context, request *commands.CancelPendingCommandsRequest) (events []eventstore.Event, err error)

func (*Aggregate) ConfirmDeviceMetadataUpdate added in v2.7.0

func (a *Aggregate) ConfirmDeviceMetadataUpdate(ctx context.Context, request *commands.ConfirmDeviceMetadataUpdateRequest) (events []eventstore.Event, err error)

func (*Aggregate) ConfirmResourceCreate added in v2.7.0

func (a *Aggregate) ConfirmResourceCreate(ctx context.Context, request *commands.ConfirmResourceCreateRequest) (events []eventstore.Event, err error)

func (*Aggregate) ConfirmResourceDelete added in v2.7.0

func (a *Aggregate) ConfirmResourceDelete(ctx context.Context, request *commands.ConfirmResourceDeleteRequest) (events []eventstore.Event, err error)

func (*Aggregate) ConfirmResourceRetrieve added in v2.7.0

func (a *Aggregate) ConfirmResourceRetrieve(ctx context.Context, request *commands.ConfirmResourceRetrieveRequest) (events []eventstore.Event, err error)

func (*Aggregate) ConfirmResourceUpdate added in v2.7.0

func (a *Aggregate) ConfirmResourceUpdate(ctx context.Context, request *commands.ConfirmResourceUpdateRequest) (events []eventstore.Event, err error)

func (*Aggregate) CreateResource added in v2.7.0

func (a *Aggregate) CreateResource(ctx context.Context, request *commands.CreateResourceRequest) (events []eventstore.Event, err error)

CreateResource handles a command CreateResource

func (*Aggregate) DeleteResource added in v2.7.0

func (a *Aggregate) DeleteResource(ctx context.Context, request *commands.DeleteResourceRequest) (events []eventstore.Event, err error)

DeleteResource handles a command DeleteResource

func (*Aggregate) DeviceID added in v2.7.0

func (a *Aggregate) DeviceID() string

func (*Aggregate) NotifyResourceChanged added in v2.7.0

func (a *Aggregate) NotifyResourceChanged(ctx context.Context, request *commands.NotifyResourceChangedRequest) (events []eventstore.Event, err error)

NotifyContentChanged handles a command NotifyContentChanged

func (a *Aggregate) PublishResourceLinks(ctx context.Context, request *commands.PublishResourceLinksRequest) (events []eventstore.Event, err error)

HandlePublishResource handles a command PublishResourceLinks

func (*Aggregate) ResourceID added in v2.7.0

func (a *Aggregate) ResourceID() string

func (*Aggregate) RetrieveResource added in v2.7.0

func (a *Aggregate) RetrieveResource(ctx context.Context, request *commands.RetrieveResourceRequest) (events []eventstore.Event, err error)

RetrieveResource handles a command RetriveResource

func (a *Aggregate) UnpublishResourceLinks(ctx context.Context, request *commands.UnpublishResourceLinksRequest) (events []eventstore.Event, err error)

HandleUnpublishResource handles a command UnpublishResourceLinks

func (*Aggregate) UpdateDeviceMetadata added in v2.7.0

func (a *Aggregate) UpdateDeviceMetadata(ctx context.Context, request *commands.UpdateDeviceMetadataRequest) (events []eventstore.Event, err error)

func (*Aggregate) UpdateResource added in v2.7.0

func (a *Aggregate) UpdateResource(ctx context.Context, request *commands.UpdateResourceRequest) (events []eventstore.Event, err error)

HandleUpdateResourceContent handles a command UpdateResource

type ClientsConfig

type ClientsConfig struct {
	Eventbus               EventBusConfig      `yaml:"eventBus" json:"eventBus"`
	Eventstore             EventStoreConfig    `yaml:"eventStore" json:"eventStore"`
	IdentityStore          IdentityStoreConfig `yaml:"identityStore" json:"identityStore"`
	OpenTelemetryCollector otelClient.Config   `yaml:"openTelemetryCollector" json:"openTelemetryCollector"`
}

func (*ClientsConfig) Validate

func (c *ClientsConfig) Validate() error

type Config

type Config struct {
	Log     log.Config    `yaml:"log" json:"log"`
	APIs    APIsConfig    `yaml:"apis" json:"apis"`
	Clients ClientsConfig `yaml:"clients" json:"clients"`
}

Config represent application configuration

func (Config) String

func (c Config) String() string

String return string representation of Config

func (*Config) Validate

func (c *Config) Validate() error

type EventBusConfig

type EventBusConfig struct {
	NATS natsClient.ConfigPublisher `yaml:"nats" json:"nats"`
}

func (*EventBusConfig) Validate

func (c *EventBusConfig) Validate() error

type EventStore

type EventStore interface {
	cqrsEventStore.EventStore
	cqrsMaintenance.EventStore
}

type EventStoreConfig

type EventStoreConfig struct {
	SnapshotThreshold            int                     `yaml:"snapshotThreshold" json:"snapshotThreshold"`
	ConcurrencyExceptionMaxRetry int                     `yaml:"occMaxRetry" json:"occMaxRetry"`
	DefaultCommandTimeToLive     time.Duration           `yaml:"defaultCommandTimeToLive" json:"defaultCommandTimeToLive"`
	Connection                   eventstoreConfig.Config `yaml:",inline" json:",inline"`
}

func (*EventStoreConfig) Validate

func (c *EventStoreConfig) Validate() error

type GRPCConfig

type GRPCConfig struct {
	OwnerCacheExpiration time.Duration `yaml:"ownerCacheExpiration" json:"ownerCacheExpiration"`
	grpcServer.Config    `yaml:",inline" json:",inline"`
}

func (*GRPCConfig) Validate

func (c *GRPCConfig) Validate() error

type IdentityStoreConfig

type IdentityStoreConfig struct {
	Connection client.Config `yaml:"grpc" json:"grpc"`
}

func (*IdentityStoreConfig) Validate

func (c *IdentityStoreConfig) Validate() error

type LogPublishErrFunc

type LogPublishErrFunc func(err error)

type RequestHandler

type RequestHandler struct {
	UnimplementedResourceAggregateServer
	// contains filtered or unexported fields
}

RequestHandler for handling incoming request

func NewRequestHandler

func NewRequestHandler(config Config, eventstore EventStore, publisher eventbus.Publisher, getOwnerDevicesFunc getOwnerDevicesFunc) *RequestHandler

NewRequestHandler factory for new RequestHandler

func (RequestHandler) BatchNotifyResourceChanged added in v2.6.0

func (RequestHandler) CreateResource

func (RequestHandler) DeleteDevices

Delete documents from events database for devices selected by query

Using empty deviceIdFilter in DeleteDevicesRequest is interpreting as requesting to delete all documents for devices owned by the user.

Function returns error or a non-empty DeleteDevicesResponse message, where the DeviceIds field is filled with list of device ids. The list is an intersection of the list provided by DeleteDevicesRequest and device ids owned by the user (ie. from the original list of device ids it filters out devices that are not owned by the user).

func (RequestHandler) DeleteResource

func (RequestHandler) RetrieveResource

func (RequestHandler) UpdateResource

type ResourceAggregateClient

type ResourceAggregateClient interface {
	PublishResourceLinks(ctx context.Context, in *commands.PublishResourceLinksRequest, opts ...grpc.CallOption) (*commands.PublishResourceLinksResponse, error)
	UnpublishResourceLinks(ctx context.Context, in *commands.UnpublishResourceLinksRequest, opts ...grpc.CallOption) (*commands.UnpublishResourceLinksResponse, error)
	NotifyResourceChanged(ctx context.Context, in *commands.NotifyResourceChangedRequest, opts ...grpc.CallOption) (*commands.NotifyResourceChangedResponse, error)
	UpdateResource(ctx context.Context, in *commands.UpdateResourceRequest, opts ...grpc.CallOption) (*commands.UpdateResourceResponse, error)
	ConfirmResourceUpdate(ctx context.Context, in *commands.ConfirmResourceUpdateRequest, opts ...grpc.CallOption) (*commands.ConfirmResourceUpdateResponse, error)
	RetrieveResource(ctx context.Context, in *commands.RetrieveResourceRequest, opts ...grpc.CallOption) (*commands.RetrieveResourceResponse, error)
	ConfirmResourceRetrieve(ctx context.Context, in *commands.ConfirmResourceRetrieveRequest, opts ...grpc.CallOption) (*commands.ConfirmResourceRetrieveResponse, error)
	DeleteResource(ctx context.Context, in *commands.DeleteResourceRequest, opts ...grpc.CallOption) (*commands.DeleteResourceResponse, error)
	ConfirmResourceDelete(ctx context.Context, in *commands.ConfirmResourceDeleteRequest, opts ...grpc.CallOption) (*commands.ConfirmResourceDeleteResponse, error)
	CreateResource(ctx context.Context, in *commands.CreateResourceRequest, opts ...grpc.CallOption) (*commands.CreateResourceResponse, error)
	ConfirmResourceCreate(ctx context.Context, in *commands.ConfirmResourceCreateRequest, opts ...grpc.CallOption) (*commands.ConfirmResourceCreateResponse, error)
	UpdateDeviceMetadata(ctx context.Context, in *commands.UpdateDeviceMetadataRequest, opts ...grpc.CallOption) (*commands.UpdateDeviceMetadataResponse, error)
	ConfirmDeviceMetadataUpdate(ctx context.Context, in *commands.ConfirmDeviceMetadataUpdateRequest, opts ...grpc.CallOption) (*commands.ConfirmDeviceMetadataUpdateResponse, error)
	CancelPendingMetadataUpdates(ctx context.Context, in *commands.CancelPendingMetadataUpdatesRequest, opts ...grpc.CallOption) (*commands.CancelPendingMetadataUpdatesResponse, error)
	CancelPendingCommands(ctx context.Context, in *commands.CancelPendingCommandsRequest, opts ...grpc.CallOption) (*commands.CancelPendingCommandsResponse, error)
	DeleteDevices(ctx context.Context, in *commands.DeleteDevicesRequest, opts ...grpc.CallOption) (*commands.DeleteDevicesResponse, error)
	BatchNotifyResourceChanged(ctx context.Context, in *commands.BatchNotifyResourceChangedRequest, opts ...grpc.CallOption) (*commands.BatchNotifyResourceChangedResponse, error)
}

ResourceAggregateClient is the client API for ResourceAggregate service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.

type ResourceAggregateServer

type ResourceAggregateServer interface {
	PublishResourceLinks(context.Context, *commands.PublishResourceLinksRequest) (*commands.PublishResourceLinksResponse, error)
	UnpublishResourceLinks(context.Context, *commands.UnpublishResourceLinksRequest) (*commands.UnpublishResourceLinksResponse, error)
	NotifyResourceChanged(context.Context, *commands.NotifyResourceChangedRequest) (*commands.NotifyResourceChangedResponse, error)
	UpdateResource(context.Context, *commands.UpdateResourceRequest) (*commands.UpdateResourceResponse, error)
	ConfirmResourceUpdate(context.Context, *commands.ConfirmResourceUpdateRequest) (*commands.ConfirmResourceUpdateResponse, error)
	RetrieveResource(context.Context, *commands.RetrieveResourceRequest) (*commands.RetrieveResourceResponse, error)
	ConfirmResourceRetrieve(context.Context, *commands.ConfirmResourceRetrieveRequest) (*commands.ConfirmResourceRetrieveResponse, error)
	DeleteResource(context.Context, *commands.DeleteResourceRequest) (*commands.DeleteResourceResponse, error)
	ConfirmResourceDelete(context.Context, *commands.ConfirmResourceDeleteRequest) (*commands.ConfirmResourceDeleteResponse, error)
	CreateResource(context.Context, *commands.CreateResourceRequest) (*commands.CreateResourceResponse, error)
	ConfirmResourceCreate(context.Context, *commands.ConfirmResourceCreateRequest) (*commands.ConfirmResourceCreateResponse, error)
	UpdateDeviceMetadata(context.Context, *commands.UpdateDeviceMetadataRequest) (*commands.UpdateDeviceMetadataResponse, error)
	ConfirmDeviceMetadataUpdate(context.Context, *commands.ConfirmDeviceMetadataUpdateRequest) (*commands.ConfirmDeviceMetadataUpdateResponse, error)
	CancelPendingMetadataUpdates(context.Context, *commands.CancelPendingMetadataUpdatesRequest) (*commands.CancelPendingMetadataUpdatesResponse, error)
	CancelPendingCommands(context.Context, *commands.CancelPendingCommandsRequest) (*commands.CancelPendingCommandsResponse, error)
	DeleteDevices(context.Context, *commands.DeleteDevicesRequest) (*commands.DeleteDevicesResponse, error)
	BatchNotifyResourceChanged(context.Context, *commands.BatchNotifyResourceChangedRequest) (*commands.BatchNotifyResourceChangedResponse, error)
	// contains filtered or unexported methods
}

ResourceAggregateServer is the server API for ResourceAggregate service. All implementations must embed UnimplementedResourceAggregateServer for forward compatibility

type UnimplementedResourceAggregateServer

type UnimplementedResourceAggregateServer struct {
}

UnimplementedResourceAggregateServer must be embedded to have forward compatible implementations.

func (UnimplementedResourceAggregateServer) BatchNotifyResourceChanged added in v2.6.0

type UnsafeResourceAggregateServer

type UnsafeResourceAggregateServer interface {
	// contains filtered or unexported methods
}

UnsafeResourceAggregateServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to ResourceAggregateServer will result in compilation errors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL