Documentation ¶
Overview ¶
Package storage contains logic around the Service Manager persistent storage
Index ¶
- Variables
- func NewNotificationQueue(size int) (*notificationQueue, error)
- type CreateInterceptor
- type CreateInterceptorChain
- type CreateInterceptorProvider
- type DeleteInterceptor
- type DeleteInterceptorChain
- type DeleteInterceptorProvider
- type Entity
- type HealthIndicator
- type InterceptCreateAroundTxFunc
- type InterceptCreateOnTxFunc
- type InterceptDeleteAroundTxFunc
- type InterceptDeleteOnTxFunc
- type InterceptUpdateAroundTxFunc
- type InterceptUpdateOnTxFunc
- type InterceptableTransactionalRepository
- func (itr *InterceptableTransactionalRepository) AddCreateInterceptorProvider(objectType types.ObjectType, provider OrderedCreateInterceptorProvider)
- func (itr *InterceptableTransactionalRepository) AddDeleteInterceptorProvider(objectType types.ObjectType, provider OrderedDeleteInterceptorProvider)
- func (itr *InterceptableTransactionalRepository) AddUpdateInterceptorProvider(objectType types.ObjectType, provider OrderedUpdateInterceptorProvider)
- func (itr *InterceptableTransactionalRepository) Create(ctx context.Context, obj types.Object) (types.Object, error)
- func (itr *InterceptableTransactionalRepository) Delete(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)
- func (itr *InterceptableTransactionalRepository) Get(ctx context.Context, objectType types.ObjectType, id string) (types.Object, error)
- func (itr *InterceptableTransactionalRepository) InTransaction(ctx context.Context, f func(ctx context.Context, storage Repository) error) error
- func (itr *InterceptableTransactionalRepository) List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)
- func (itr *InterceptableTransactionalRepository) Update(ctx context.Context, obj types.Object, labelChanges ...*query.LabelChange) (types.Object, error)
- type InterceptorOrder
- type InterceptorPosition
- type KeyStore
- type Label
- type Named
- type NotificationCleaner
- type NotificationQueue
- type NotificationSettings
- type Notificator
- type OpenCloser
- type OrderedCreateInterceptorProvider
- type OrderedDeleteInterceptorProvider
- type OrderedUpdateInterceptorProvider
- type PingFunc
- type Pinger
- type PositionType
- type ReceiversFilterFunc
- type Repository
- type Settings
- type Storage
- type TransactionalEncryptingRepository
- func (er TransactionalEncryptingRepository) Create(ctx context.Context, obj types.Object) (types.Object, error)
- func (er TransactionalEncryptingRepository) Delete(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)
- func (er TransactionalEncryptingRepository) Get(ctx context.Context, objectType types.ObjectType, id string) (types.Object, error)
- func (er *TransactionalEncryptingRepository) InTransaction(ctx context.Context, f func(ctx context.Context, storage Repository) error) error
- func (er TransactionalEncryptingRepository) List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)
- func (er TransactionalEncryptingRepository) Update(ctx context.Context, obj types.Object, labelChanges ...*query.LabelChange) (types.Object, error)
- type TransactionalRepository
- type TransactionalRepositoryDecorator
- type UpdateContext
- type UpdateInterceptor
- type UpdateInterceptorChain
- type UpdateInterceptorProvider
Constants ¶
This section is empty.
Variables ¶
var ErrQueueClosed = errors.New("queue closed")
ErrQueueClosed error stating that the queue is closed
var ErrQueueFull = errors.New("queue is full")
ErrQueueFull error stating that the queue is full
Functions ¶
func NewNotificationQueue ¶ added in v0.3.0
NewNotificationQueue returns new NotificationQueue with specific size
Types ¶
type CreateInterceptor ¶ added in v0.2.0
type CreateInterceptor interface { AroundTxCreate(h InterceptCreateAroundTxFunc) InterceptCreateAroundTxFunc OnTxCreate(f InterceptCreateOnTxFunc) InterceptCreateOnTxFunc }
CreateInterceptor provides hooks on entity creation
type CreateInterceptorChain ¶ added in v0.2.0
type CreateInterceptorChain struct {
// contains filtered or unexported fields
}
func (*CreateInterceptorChain) AroundTxCreate ¶ added in v0.2.0
func (c *CreateInterceptorChain) AroundTxCreate(f InterceptCreateAroundTxFunc) InterceptCreateAroundTxFunc
func (*CreateInterceptorChain) Name ¶ added in v0.2.0
func (c *CreateInterceptorChain) Name() string
func (*CreateInterceptorChain) OnTxCreate ¶ added in v0.2.0
func (c *CreateInterceptorChain) OnTxCreate(f InterceptCreateOnTxFunc) InterceptCreateOnTxFunc
type CreateInterceptorProvider ¶ added in v0.2.0
type CreateInterceptorProvider interface { Named Provide() CreateInterceptor }
CreateInterceptorProvider provides CreateInterceptors for each request
type DeleteInterceptor ¶ added in v0.2.0
type DeleteInterceptor interface { AroundTxDelete(h InterceptDeleteAroundTxFunc) InterceptDeleteAroundTxFunc OnTxDelete(f InterceptDeleteOnTxFunc) InterceptDeleteOnTxFunc }
DeleteInterceptor provides hooks on entity deletion
type DeleteInterceptorChain ¶ added in v0.2.0
type DeleteInterceptorChain struct {
// contains filtered or unexported fields
}
func (*DeleteInterceptorChain) AroundTxDelete ¶ added in v0.2.0
func (d *DeleteInterceptorChain) AroundTxDelete(f InterceptDeleteAroundTxFunc) InterceptDeleteAroundTxFunc
func (*DeleteInterceptorChain) Name ¶ added in v0.2.0
func (d *DeleteInterceptorChain) Name() string
func (*DeleteInterceptorChain) OnTxDelete ¶ added in v0.2.0
func (d *DeleteInterceptorChain) OnTxDelete(f InterceptDeleteOnTxFunc) InterceptDeleteOnTxFunc
type DeleteInterceptorProvider ¶ added in v0.2.0
type DeleteInterceptorProvider interface { Named Provide() DeleteInterceptor }
DeleteInterceptorProvider provides DeleteInterceptorChain for each request
type HealthIndicator ¶ added in v0.1.1
type HealthIndicator struct {
Pinger Pinger
}
HealthIndicator returns a new indicator for the storage
func (*HealthIndicator) Health ¶ added in v0.1.1
func (i *HealthIndicator) Health() *health.Health
Health returns the health of the storage component
func (*HealthIndicator) Name ¶ added in v0.1.1
func (i *HealthIndicator) Name() string
Name returns the name of the storage component
type InterceptCreateAroundTxFunc ¶ added in v0.2.0
InterceptCreateAroundTxFunc hook for entity creation outside of transaction
type InterceptCreateOnTxFunc ¶ added in v0.2.0
type InterceptCreateOnTxFunc func(ctx context.Context, txStorage Repository, obj types.Object) (types.Object, error)
InterceptCreateOnTxFunc hook for entity creation in transaction
type InterceptDeleteAroundTxFunc ¶ added in v0.2.0
type InterceptDeleteAroundTxFunc func(ctx context.Context, deletionCriteria ...query.Criterion) (types.ObjectList, error)
InterceptDeleteAroundTxFunc hook for entity deletion outside of transaction
type InterceptDeleteOnTxFunc ¶ added in v0.2.0
type InterceptDeleteOnTxFunc func(ctx context.Context, txStorage Repository, objects types.ObjectList, deletionCriteria ...query.Criterion) (types.ObjectList, error)
InterceptDeleteOnTxFunc hook for entity deletion in transaction
type InterceptUpdateAroundTxFunc ¶ added in v0.2.0
type InterceptUpdateAroundTxFunc func(ctx context.Context, newObj types.Object, labelChanges ...*query.LabelChange) (types.Object, error)
InterceptUpdateAroundTxFunc hook for entity update outside of transaction
type InterceptUpdateOnTxFunc ¶ added in v0.2.0
type InterceptUpdateOnTxFunc func(ctx context.Context, txStorage Repository, oldObj, newObj types.Object, labelChanges ...*query.LabelChange) (types.Object, error)
InterceptUpdateOnTxFunc hook for entity update in transaction
type InterceptableTransactionalRepository ¶ added in v0.2.0
type InterceptableTransactionalRepository struct {
// contains filtered or unexported fields
}
func NewInterceptableTransactionalRepository ¶ added in v0.2.0
func NewInterceptableTransactionalRepository(repository TransactionalRepository) *InterceptableTransactionalRepository
func (*InterceptableTransactionalRepository) AddCreateInterceptorProvider ¶ added in v0.2.0
func (itr *InterceptableTransactionalRepository) AddCreateInterceptorProvider(objectType types.ObjectType, provider OrderedCreateInterceptorProvider)
func (*InterceptableTransactionalRepository) AddDeleteInterceptorProvider ¶ added in v0.2.0
func (itr *InterceptableTransactionalRepository) AddDeleteInterceptorProvider(objectType types.ObjectType, provider OrderedDeleteInterceptorProvider)
func (*InterceptableTransactionalRepository) AddUpdateInterceptorProvider ¶ added in v0.2.0
func (itr *InterceptableTransactionalRepository) AddUpdateInterceptorProvider(objectType types.ObjectType, provider OrderedUpdateInterceptorProvider)
func (*InterceptableTransactionalRepository) Delete ¶ added in v0.2.0
func (itr *InterceptableTransactionalRepository) Delete(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)
func (*InterceptableTransactionalRepository) Get ¶ added in v0.2.0
func (itr *InterceptableTransactionalRepository) Get(ctx context.Context, objectType types.ObjectType, id string) (types.Object, error)
func (*InterceptableTransactionalRepository) InTransaction ¶ added in v0.2.0
func (itr *InterceptableTransactionalRepository) InTransaction(ctx context.Context, f func(ctx context.Context, storage Repository) error) error
func (*InterceptableTransactionalRepository) List ¶ added in v0.2.0
func (itr *InterceptableTransactionalRepository) List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)
type InterceptorOrder ¶ added in v0.2.0
type InterceptorOrder struct { OnTxPosition InterceptorPosition AroundTxPosition InterceptorPosition }
type InterceptorPosition ¶ added in v0.2.0
type InterceptorPosition struct { PositionType PositionType Name string }
type KeyStore ¶ added in v0.3.3
type KeyStore interface { // Lock locks the storage so that only one process can manipulate the encryption key. Returns an error if the process has already acquired the lock Lock(ctx context.Context) error // Unlock releases the acquired lock. Unlock(ctx context.Context) error // GetEncryptionKey returns the encryption key from the storage after applying the specified transformation function GetEncryptionKey(ctx context.Context, transformationFunc func(context.Context, []byte, []byte) ([]byte, error)) ([]byte, error) // SetEncryptionKey sets the provided encryption key in the KeyStore after applying the specified transformation function SetEncryptionKey(ctx context.Context, key []byte, transformationFunc func(context.Context, []byte, []byte) ([]byte, error)) error }
KeyStore interface for encryption key operations
type Named ¶ added in v0.2.0
type Named interface {
Name() string
}
Named interface for named entities
type NotificationCleaner ¶ added in v0.3.0
type NotificationCleaner struct { Storage Repository Settings Settings // contains filtered or unexported fields }
NotificationCleaner schedules a go routine which cleans old notifications
type NotificationQueue ¶ added in v0.3.0
type NotificationQueue interface { // Enqueue adds a new notification for processing. Enqueue(notification *types.Notification) error // Channel returns the go channel with received notifications which has to be processed. Channel() <-chan *types.Notification // Close closes the queue. Close() // ID returns unique queue identifier ID() string }
NotificationQueue is used for receiving notifications
type NotificationSettings ¶ added in v0.3.0
type NotificationSettings struct { QueuesSize int `mapstructure:"queues_size" description:"maximum number of notifications queued for sending to a client"` MinReconnectInterval time.Duration `mapstructure:"min_reconnect_interval" description:"minimum timeout between storage listen reconnects"` MaxReconnectInterval time.Duration `mapstructure:"max_reconnect_interval" description:"maximum timeout between storage listen reconnects"` CleanInterval time.Duration `mapstructure:"clean_interval" description:"time between notification clean-up"` KeepFor time.Duration `mapstructure:"keep_for" description:"the time to keep a notification in the storage"` }
NotificationSettings type to be loaded from the environment
func DefaultNotificationSettings ¶ added in v0.3.0
func DefaultNotificationSettings() *NotificationSettings
DefaultNotificationSettings returns default values for Notificator settings
func (*NotificationSettings) Validate ¶ added in v0.3.0
func (s *NotificationSettings) Validate() error
Validate validates the Notification settings
type Notificator ¶ added in v0.3.0
type Notificator interface { // Start starts the Notificator Start(ctx context.Context, group *sync.WaitGroup) error // RegisterConsumer returns notification queue, last_known_revision and error if any. // Notifications after lastKnownRevision will be added to the queue. // If lastKnownRevision is -1 no previous notifications will be sent. // When consumer wants to stop listening for notifications it must unregister the notification queue. RegisterConsumer(consumer *types.Platform, lastKnownRevision int64) (NotificationQueue, int64, error) // UnregisterConsumer must be called to stop receiving notifications in the queue UnregisterConsumer(queue NotificationQueue) error // RegisterFilter adds a new filter which decides if a platform should receive given notification RegisterFilter(f ReceiversFilterFunc) }
Notificator is used for receiving notifications for SM events
type OpenCloser ¶ added in v0.1.2
type OpenCloser interface { // Open initializes the storage, e.g. opens a connection to the underlying storage Open(options *Settings) error // Close clears resources associated with this storage, e.g. closes the connection the underlying storage Close() error }
OpenCloser represents an openable and closeable storage
type OrderedCreateInterceptorProvider ¶ added in v0.2.0
type OrderedCreateInterceptorProvider struct { InterceptorOrder CreateInterceptorProvider }
type OrderedDeleteInterceptorProvider ¶ added in v0.2.0
type OrderedDeleteInterceptorProvider struct { InterceptorOrder DeleteInterceptorProvider }
type OrderedUpdateInterceptorProvider ¶ added in v0.2.0
type OrderedUpdateInterceptorProvider struct { InterceptorOrder UpdateInterceptorProvider }
type PingFunc ¶ added in v0.1.2
type PingFunc func() error
PingFunc is an adapter that allows to use regular functions as Pinger
type Pinger ¶ added in v0.1.2
type Pinger interface { // Ping verifies a connection to the database is still alive, establishing a connection if necessary. Ping() error }
Pinger allows pinging the storage to check liveliness
type PositionType ¶ added in v0.2.0
type PositionType string
PositionType could be "before", "after" or "none"
const ( // PositionNone states that a position is not set and the item will be appended PositionNone PositionType = "none" // PositionBefore states that a position should be calculated before another position PositionBefore PositionType = "before" // PositionAfter states that a position should be calculated after another position PositionAfter PositionType = "after" )
type ReceiversFilterFunc ¶ added in v0.3.3
type ReceiversFilterFunc func(recipients []*types.Platform, notification *types.Notification) (filteredRecipients []*types.Platform)
ReceiversFilterFunc filters recipients for a given notifications
type Repository ¶ added in v0.1.2
type Repository interface { // Create stores a broker in SM DB Create(ctx context.Context, obj types.Object) (types.Object, error) // Get retrieves a broker using the provided id from SM DB Get(ctx context.Context, objectType types.ObjectType, id string) (types.Object, error) // List retrieves all brokers from SM DB List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error) // Delete deletes a broker from SM DB Delete(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error) // Update updates a broker from SM DB Update(ctx context.Context, obj types.Object, labelChanges ...*query.LabelChange) (types.Object, error) }
type Settings ¶
type Settings struct { URI string `mapstructure:"uri" description:"URI of the storage"` MigrationsURL string `mapstructure:"migrations_url" description:"location of a directory containing sql migrations scripts"` EncryptionKey string `mapstructure:"encryption_key" description:"key to use for encrypting database entries"` SkipSSLValidation bool `mapstructure:"skip_ssl_validation" description:"whether to skip ssl verification when connecting to the storage"` MaxIdleConnections int `mapstructure:"max_idle_connections" description:"sets the maximum number of connections in the idle connection pool"` Notification *NotificationSettings `mapstructure:"notification"` }
Settings type to be loaded from the environment
func DefaultSettings ¶
func DefaultSettings() *Settings
DefaultSettings returns default values for storage settings
type Storage ¶
type Storage interface { OpenCloser Pinger TransactionalRepository Introduce(entity Entity) }
Storage interface provides entity-specific storages
type TransactionalEncryptingRepository ¶ added in v0.3.3
type TransactionalEncryptingRepository struct {
// contains filtered or unexported fields
}
TransactionalEncryptingRepository is a TransactionalRepository with that also encrypts credentials of Secured objects before storing in the database them and decrypts credentials of Secured objects when reading them from the database
func NewEncryptingRepository ¶ added in v0.3.3
func NewEncryptingRepository(repository TransactionalRepository, encrypter security.Encrypter, key []byte) (*TransactionalEncryptingRepository, error)
NewEncryptingRepository creates a new TransactionalEncryptingRepository using the specified encrypter and encryption key
func (TransactionalEncryptingRepository) Delete ¶ added in v0.3.3
func (er TransactionalEncryptingRepository) Delete(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)
func (*TransactionalEncryptingRepository) InTransaction ¶ added in v0.3.3
func (er *TransactionalEncryptingRepository) InTransaction(ctx context.Context, f func(ctx context.Context, storage Repository) error) error
InTransaction wraps repository passed in the transaction to also encypt/decrypt credentials
type TransactionalRepository ¶ added in v0.2.0
type TransactionalRepository interface { Repository // InTransaction initiates a transaction and allows passing a function to be executed within the transaction InTransaction(ctx context.Context, f func(ctx context.Context, storage Repository) error) error }
TransactionalRepository is a storage repository that can initiate a transaction
func InitializeWithSafeTermination ¶ added in v0.3.3
func InitializeWithSafeTermination(ctx context.Context, s Storage, settings *Settings, wg *sync.WaitGroup, decorators ...TransactionalRepositoryDecorator) (TransactionalRepository, error)
type TransactionalRepositoryDecorator ¶ added in v0.3.3
type TransactionalRepositoryDecorator func(TransactionalRepository) (TransactionalRepository, error)
TransactionalRepositoryDecorator allows decorating a TransactionalRepository
func EncryptingDecorator ¶ added in v0.3.3
func EncryptingDecorator(ctx context.Context, encrypter security.Encrypter, keyStore KeyStore) TransactionalRepositoryDecorator
EncryptingDecorator creates a TransactionalRepositoryDecorator that can be used to add encrypting/decrypting logic to a TransactionalRepository
type UpdateContext ¶ added in v0.2.0
type UpdateContext struct { Object types.Object ObjectChanges []byte LabelChanges []*query.LabelChange }
UpdateContext provides changes done by the update operation
type UpdateInterceptor ¶ added in v0.2.0
type UpdateInterceptor interface { AroundTxUpdate(h InterceptUpdateAroundTxFunc) InterceptUpdateAroundTxFunc OnTxUpdate(f InterceptUpdateOnTxFunc) InterceptUpdateOnTxFunc }
UpdateInterceptor provides hooks on entity update
type UpdateInterceptorChain ¶ added in v0.2.0
type UpdateInterceptorChain struct {
// contains filtered or unexported fields
}
func (*UpdateInterceptorChain) AroundTxUpdate ¶ added in v0.2.0
func (u *UpdateInterceptorChain) AroundTxUpdate(f InterceptUpdateAroundTxFunc) InterceptUpdateAroundTxFunc
func (*UpdateInterceptorChain) Name ¶ added in v0.2.0
func (u *UpdateInterceptorChain) Name() string
func (*UpdateInterceptorChain) OnTxUpdate ¶ added in v0.2.0
func (u *UpdateInterceptorChain) OnTxUpdate(f InterceptUpdateOnTxFunc) InterceptUpdateOnTxFunc
type UpdateInterceptorProvider ¶ added in v0.2.0
type UpdateInterceptorProvider interface { Named Provide() UpdateInterceptor }
UpdateInterceptorProvider provides UpdateInterceptors for each request
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
notification_connection/notification_connectionfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
postgresfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
Code generated by counterfeiter.
|
Code generated by counterfeiter. |