storage

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2019 License: Apache-2.0, BSD-3-Clause, MIT Imports: 15 Imported by: 13

Documentation

Overview

Package storage contains logic around the Service Manager persistent storage

Index

Constants

This section is empty.

Variables

View Source
var ErrQueueClosed = errors.New("queue closed")

ErrQueueClosed error stating that the queue is closed

View Source
var ErrQueueFull = errors.New("queue is full")

ErrQueueFull error stating that the queue is full

Functions

func NewNotificationQueue added in v0.3.0

func NewNotificationQueue(size int) (*notificationQueue, error)

NewNotificationQueue returns new NotificationQueue with specific size

Types

type CreateInterceptor added in v0.2.0

type CreateInterceptor interface {
	AroundTxCreate(h InterceptCreateAroundTxFunc) InterceptCreateAroundTxFunc
	OnTxCreate(f InterceptCreateOnTxFunc) InterceptCreateOnTxFunc
}

CreateInterceptor provides hooks on entity creation

type CreateInterceptorChain added in v0.2.0

type CreateInterceptorChain struct {
	// contains filtered or unexported fields
}

func (*CreateInterceptorChain) AroundTxCreate added in v0.2.0

func (*CreateInterceptorChain) Name added in v0.2.0

func (c *CreateInterceptorChain) Name() string

func (*CreateInterceptorChain) OnTxCreate added in v0.2.0

type CreateInterceptorProvider added in v0.2.0

type CreateInterceptorProvider interface {
	Named
	Provide() CreateInterceptor
}

CreateInterceptorProvider provides CreateInterceptors for each request

type DeleteInterceptor added in v0.2.0

type DeleteInterceptor interface {
	AroundTxDelete(h InterceptDeleteAroundTxFunc) InterceptDeleteAroundTxFunc
	OnTxDelete(f InterceptDeleteOnTxFunc) InterceptDeleteOnTxFunc
}

DeleteInterceptor provides hooks on entity deletion

type DeleteInterceptorChain added in v0.2.0

type DeleteInterceptorChain struct {
	// contains filtered or unexported fields
}

func (*DeleteInterceptorChain) AroundTxDelete added in v0.2.0

func (*DeleteInterceptorChain) Name added in v0.2.0

func (d *DeleteInterceptorChain) Name() string

func (*DeleteInterceptorChain) OnTxDelete added in v0.2.0

type DeleteInterceptorProvider added in v0.2.0

type DeleteInterceptorProvider interface {
	Named
	Provide() DeleteInterceptor
}

DeleteInterceptorProvider provides DeleteInterceptorChain for each request

type Entity added in v0.2.0

type Entity interface {
	GetID() string
	ToObject() types.Object
	FromObject(object types.Object) (Entity, bool)
	BuildLabels(labels types.Labels, newLabel func(id, key, value string) Label) ([]Label, error)
	NewLabel(id, key, value string) Label
}

type HealthIndicator added in v0.1.1

type HealthIndicator struct {
	Pinger Pinger
}

HealthIndicator returns a new indicator for the storage

func (*HealthIndicator) Health added in v0.1.1

func (i *HealthIndicator) Health() *health.Health

Health returns the health of the storage component

func (*HealthIndicator) Name added in v0.1.1

func (i *HealthIndicator) Name() string

Name returns the name of the storage component

type InterceptCreateAroundTxFunc added in v0.2.0

type InterceptCreateAroundTxFunc func(ctx context.Context, obj types.Object) (types.Object, error)

InterceptCreateAroundTxFunc hook for entity creation outside of transaction

type InterceptCreateOnTxFunc added in v0.2.0

type InterceptCreateOnTxFunc func(ctx context.Context, txStorage Repository, obj types.Object) (types.Object, error)

InterceptCreateOnTxFunc hook for entity creation in transaction

type InterceptDeleteAroundTxFunc added in v0.2.0

type InterceptDeleteAroundTxFunc func(ctx context.Context, deletionCriteria ...query.Criterion) (types.ObjectList, error)

InterceptDeleteAroundTxFunc hook for entity deletion outside of transaction

type InterceptDeleteOnTxFunc added in v0.2.0

type InterceptDeleteOnTxFunc func(ctx context.Context, txStorage Repository, objects types.ObjectList, deletionCriteria ...query.Criterion) (types.ObjectList, error)

InterceptDeleteOnTxFunc hook for entity deletion in transaction

type InterceptUpdateAroundTxFunc added in v0.2.0

type InterceptUpdateAroundTxFunc func(ctx context.Context, newObj types.Object, labelChanges ...*query.LabelChange) (types.Object, error)

InterceptUpdateAroundTxFunc hook for entity update outside of transaction

type InterceptUpdateOnTxFunc added in v0.2.0

type InterceptUpdateOnTxFunc func(ctx context.Context, txStorage Repository, oldObj, newObj types.Object, labelChanges ...*query.LabelChange) (types.Object, error)

InterceptUpdateOnTxFunc hook for entity update in transaction

type InterceptableTransactionalRepository added in v0.2.0

type InterceptableTransactionalRepository struct {
	// contains filtered or unexported fields
}

func NewInterceptableTransactionalRepository added in v0.2.0

func NewInterceptableTransactionalRepository(repository TransactionalRepository) *InterceptableTransactionalRepository

func (*InterceptableTransactionalRepository) AddCreateInterceptorProvider added in v0.2.0

func (itr *InterceptableTransactionalRepository) AddCreateInterceptorProvider(objectType types.ObjectType, provider OrderedCreateInterceptorProvider)

func (*InterceptableTransactionalRepository) AddDeleteInterceptorProvider added in v0.2.0

func (itr *InterceptableTransactionalRepository) AddDeleteInterceptorProvider(objectType types.ObjectType, provider OrderedDeleteInterceptorProvider)

func (*InterceptableTransactionalRepository) AddUpdateInterceptorProvider added in v0.2.0

func (itr *InterceptableTransactionalRepository) AddUpdateInterceptorProvider(objectType types.ObjectType, provider OrderedUpdateInterceptorProvider)

func (*InterceptableTransactionalRepository) Create added in v0.2.0

func (*InterceptableTransactionalRepository) Delete added in v0.2.0

func (*InterceptableTransactionalRepository) Get added in v0.2.0

func (*InterceptableTransactionalRepository) InTransaction added in v0.2.0

func (itr *InterceptableTransactionalRepository) InTransaction(ctx context.Context, f func(ctx context.Context, storage Repository) error) error

func (*InterceptableTransactionalRepository) List added in v0.2.0

func (*InterceptableTransactionalRepository) Update added in v0.2.0

type InterceptorOrder added in v0.2.0

type InterceptorOrder struct {
	OnTxPosition     InterceptorPosition
	AroundTxPosition InterceptorPosition
}

type InterceptorPosition added in v0.2.0

type InterceptorPosition struct {
	PositionType PositionType
	Name         string
}

type KeyStore added in v0.3.3

type KeyStore interface {
	// Lock locks the storage so that only one process can manipulate the encryption key. Returns an error if the process has already acquired the lock
	Lock(ctx context.Context) error

	// Unlock releases the acquired lock.
	Unlock(ctx context.Context) error

	// GetEncryptionKey returns the encryption key from the storage after applying the specified transformation function
	GetEncryptionKey(ctx context.Context, transformationFunc func(context.Context, []byte, []byte) ([]byte, error)) ([]byte, error)

	// SetEncryptionKey sets the provided encryption key in the KeyStore after applying the specified transformation function
	SetEncryptionKey(ctx context.Context, key []byte, transformationFunc func(context.Context, []byte, []byte) ([]byte, error)) error
}

KeyStore interface for encryption key operations

type Label added in v0.2.0

type Label interface {
	GetKey() string
	GetValue() string
}

type Named added in v0.2.0

type Named interface {
	Name() string
}

Named interface for named entities

type NotificationCleaner added in v0.3.0

type NotificationCleaner struct {
	Storage  Repository
	Settings Settings
	// contains filtered or unexported fields
}

NotificationCleaner schedules a go routine which cleans old notifications

func (*NotificationCleaner) Start added in v0.3.0

func (nc *NotificationCleaner) Start(ctx context.Context, group *sync.WaitGroup) error

Start schedules the cleaner. It cannot be used concurrently.

type NotificationQueue added in v0.3.0

type NotificationQueue interface {
	// Enqueue adds a new notification for processing.
	Enqueue(notification *types.Notification) error

	// Channel returns the go channel with received notifications which has to be processed.
	Channel() <-chan *types.Notification

	// Close closes the queue.
	Close()

	// ID returns unique queue identifier
	ID() string
}

NotificationQueue is used for receiving notifications

type NotificationSettings added in v0.3.0

type NotificationSettings struct {
	QueuesSize           int           `mapstructure:"queues_size" description:"maximum number of notifications queued for sending to a client"`
	MinReconnectInterval time.Duration `mapstructure:"min_reconnect_interval" description:"minimum timeout between storage listen reconnects"`
	MaxReconnectInterval time.Duration `mapstructure:"max_reconnect_interval" description:"maximum timeout between storage listen reconnects"`
	CleanInterval        time.Duration `mapstructure:"clean_interval" description:"time between notification clean-up"`
	KeepFor              time.Duration `mapstructure:"keep_for" description:"the time to keep a notification in the storage"`
}

NotificationSettings type to be loaded from the environment

func DefaultNotificationSettings added in v0.3.0

func DefaultNotificationSettings() *NotificationSettings

DefaultNotificationSettings returns default values for Notificator settings

func (*NotificationSettings) Validate added in v0.3.0

func (s *NotificationSettings) Validate() error

Validate validates the Notification settings

type Notificator added in v0.3.0

type Notificator interface {
	// Start starts the Notificator
	Start(ctx context.Context, group *sync.WaitGroup) error

	// RegisterConsumer returns notification queue, last_known_revision and error if any.
	// Notifications after lastKnownRevision will be added to the queue.
	// If lastKnownRevision is -1 no previous notifications will be sent.
	// When consumer wants to stop listening for notifications it must unregister the notification queue.
	RegisterConsumer(consumer *types.Platform, lastKnownRevision int64) (NotificationQueue, int64, error)

	// UnregisterConsumer must be called to stop receiving notifications in the queue
	UnregisterConsumer(queue NotificationQueue) error

	// RegisterFilter adds a new filter which decides if a platform should receive given notification
	RegisterFilter(f ReceiversFilterFunc)
}

Notificator is used for receiving notifications for SM events

type OpenCloser added in v0.1.2

type OpenCloser interface {
	// Open initializes the storage, e.g. opens a connection to the underlying storage
	Open(options *Settings) error

	// Close clears resources associated with this storage, e.g. closes the connection the underlying storage
	Close() error
}

OpenCloser represents an openable and closeable storage

type OrderedCreateInterceptorProvider added in v0.2.0

type OrderedCreateInterceptorProvider struct {
	InterceptorOrder
	CreateInterceptorProvider
}

type OrderedDeleteInterceptorProvider added in v0.2.0

type OrderedDeleteInterceptorProvider struct {
	InterceptorOrder
	DeleteInterceptorProvider
}

type OrderedUpdateInterceptorProvider added in v0.2.0

type OrderedUpdateInterceptorProvider struct {
	InterceptorOrder
	UpdateInterceptorProvider
}

type PingFunc added in v0.1.2

type PingFunc func() error

PingFunc is an adapter that allows to use regular functions as Pinger

func (PingFunc) Ping added in v0.1.2

func (mf PingFunc) Ping() error

Ping allows PingFunc to act as a Pinger

type Pinger added in v0.1.2

type Pinger interface {
	// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
	Ping() error
}

Pinger allows pinging the storage to check liveliness

type PositionType added in v0.2.0

type PositionType string

PositionType could be "before", "after" or "none"

const (
	// PositionNone states that a position is not set and the item will be appended
	PositionNone PositionType = "none"

	// PositionBefore states that a position should be calculated before another position
	PositionBefore PositionType = "before"

	// PositionAfter states that a position should be calculated after another position
	PositionAfter PositionType = "after"
)

type ReceiversFilterFunc added in v0.3.3

type ReceiversFilterFunc func(recipients []*types.Platform, notification *types.Notification) (filteredRecipients []*types.Platform)

ReceiversFilterFunc filters recipients for a given notifications

type Repository added in v0.1.2

type Repository interface {
	// Create stores a broker in SM DB
	Create(ctx context.Context, obj types.Object) (types.Object, error)

	// Get retrieves a broker using the provided id from SM DB
	Get(ctx context.Context, objectType types.ObjectType, id string) (types.Object, error)

	// List retrieves all brokers from SM DB
	List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)

	// Delete deletes a broker from SM DB
	Delete(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)

	// Update updates a broker from SM DB
	Update(ctx context.Context, obj types.Object, labelChanges ...*query.LabelChange) (types.Object, error)
}

type Settings

type Settings struct {
	URI                string                `mapstructure:"uri" description:"URI of the storage"`
	MigrationsURL      string                `mapstructure:"migrations_url" description:"location of a directory containing sql migrations scripts"`
	EncryptionKey      string                `mapstructure:"encryption_key" description:"key to use for encrypting database entries"`
	SkipSSLValidation  bool                  `mapstructure:"skip_ssl_validation" description:"whether to skip ssl verification when connecting to the storage"`
	MaxIdleConnections int                   `mapstructure:"max_idle_connections" description:"sets the maximum number of connections in the idle connection pool"`
	Notification       *NotificationSettings `mapstructure:"notification"`
}

Settings type to be loaded from the environment

func DefaultSettings

func DefaultSettings() *Settings

DefaultSettings returns default values for storage settings

func (*Settings) Validate

func (s *Settings) Validate() error

Validate validates the storage settings

type Storage

type Storage interface {
	OpenCloser
	Pinger
	TransactionalRepository

	Introduce(entity Entity)
}

Storage interface provides entity-specific storages

type TransactionalEncryptingRepository added in v0.3.3

type TransactionalEncryptingRepository struct {
	// contains filtered or unexported fields
}

TransactionalEncryptingRepository is a TransactionalRepository with that also encrypts credentials of Secured objects before storing in the database them and decrypts credentials of Secured objects when reading them from the database

func NewEncryptingRepository added in v0.3.3

func NewEncryptingRepository(repository TransactionalRepository, encrypter security.Encrypter, key []byte) (*TransactionalEncryptingRepository, error)

NewEncryptingRepository creates a new TransactionalEncryptingRepository using the specified encrypter and encryption key

func (TransactionalEncryptingRepository) Create added in v0.3.3

func (er TransactionalEncryptingRepository) Create(ctx context.Context, obj types.Object) (types.Object, error)

func (TransactionalEncryptingRepository) Delete added in v0.3.3

func (er TransactionalEncryptingRepository) Delete(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)

func (TransactionalEncryptingRepository) Get added in v0.3.3

func (er TransactionalEncryptingRepository) Get(ctx context.Context, objectType types.ObjectType, id string) (types.Object, error)

func (*TransactionalEncryptingRepository) InTransaction added in v0.3.3

func (er *TransactionalEncryptingRepository) InTransaction(ctx context.Context, f func(ctx context.Context, storage Repository) error) error

InTransaction wraps repository passed in the transaction to also encypt/decrypt credentials

func (TransactionalEncryptingRepository) List added in v0.3.3

func (er TransactionalEncryptingRepository) List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)

func (TransactionalEncryptingRepository) Update added in v0.3.3

func (er TransactionalEncryptingRepository) Update(ctx context.Context, obj types.Object, labelChanges ...*query.LabelChange) (types.Object, error)

type TransactionalRepository added in v0.2.0

type TransactionalRepository interface {
	Repository

	// InTransaction initiates a transaction and allows passing a function to be executed within the transaction
	InTransaction(ctx context.Context, f func(ctx context.Context, storage Repository) error) error
}

TransactionalRepository is a storage repository that can initiate a transaction

func InitializeWithSafeTermination added in v0.3.3

func InitializeWithSafeTermination(ctx context.Context, s Storage, settings *Settings, wg *sync.WaitGroup, decorators ...TransactionalRepositoryDecorator) (TransactionalRepository, error)

type TransactionalRepositoryDecorator added in v0.3.3

type TransactionalRepositoryDecorator func(TransactionalRepository) (TransactionalRepository, error)

TransactionalRepositoryDecorator allows decorating a TransactionalRepository

func EncryptingDecorator added in v0.3.3

func EncryptingDecorator(ctx context.Context, encrypter security.Encrypter, keyStore KeyStore) TransactionalRepositoryDecorator

EncryptingDecorator creates a TransactionalRepositoryDecorator that can be used to add encrypting/decrypting logic to a TransactionalRepository

type UpdateContext added in v0.2.0

type UpdateContext struct {
	Object        types.Object
	ObjectChanges []byte
	LabelChanges  []*query.LabelChange
}

UpdateContext provides changes done by the update operation

type UpdateInterceptor added in v0.2.0

type UpdateInterceptor interface {
	AroundTxUpdate(h InterceptUpdateAroundTxFunc) InterceptUpdateAroundTxFunc
	OnTxUpdate(f InterceptUpdateOnTxFunc) InterceptUpdateOnTxFunc
}

UpdateInterceptor provides hooks on entity update

type UpdateInterceptorChain added in v0.2.0

type UpdateInterceptorChain struct {
	// contains filtered or unexported fields
}

func (*UpdateInterceptorChain) AroundTxUpdate added in v0.2.0

func (*UpdateInterceptorChain) Name added in v0.2.0

func (u *UpdateInterceptorChain) Name() string

func (*UpdateInterceptorChain) OnTxUpdate added in v0.2.0

type UpdateInterceptorProvider added in v0.2.0

type UpdateInterceptorProvider interface {
	Named
	Provide() UpdateInterceptor
}

UpdateInterceptorProvider provides UpdateInterceptors for each request

Directories

Path Synopsis
notification_connection/notification_connectionfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
postgresfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL