store

package
v1.19.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package store contains the Store interface and implementations

Index

Constants

View Source
const (
	BucketResources    = "Resources"
	BucketAgents       = "Agents"
	BucketMeasurements = "Measurements"
	BucketArchive      = "Archive"
)

bucket names

Variables

View Source
var ErrDoesNotSupportHistory = errors.New("store does not support resource history")

ErrDoesNotSupportHistory is used when a store does not implement resource history.

View Source
var ErrResourceInUse = errors.New("resource in use")

ErrResourceInUse is used in delete functions to indicate the delete could not be performed because the Resource is a dependency of another. i.e. the Source that is being deleted is being referenced in a Configuration.

View Source
var ErrStoreResourceMissing = errors.New("resource not found")

ErrStoreResourceMissing is used internally in store functions to indicate a delete could not be performed because no such resource exists. It should not ever be returned by a store function as an error

View Source
var UpdatesContextKey key

UpdatesContextKey is the context key for updates

Functions

func AgentKey

func AgentKey(id string) []byte

AgentKey returns the key for an agent in the store.

func AgentPrefix

func AgentPrefix() []byte

AgentPrefix returns the prefix for agents in the store.

func ApplySortOffsetAndLimit

func ApplySortOffsetAndLimit[T any](list []T, opts QueryOptions, fieldAccessor fieldAccessor[T]) []T

ApplySortOffsetAndLimit applies the sort, offset, and limit options to the list.

func CurrentRolloutsForConfiguration

func CurrentRolloutsForConfiguration(idx search.Index, configurationName string) ([]string, error)

CurrentRolloutsForConfiguration returns a list of all rollouts that are currently in progress for the specified configuration.

func DeleteResource

func DeleteResource[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string, emptyResource R) (resource R, exists bool, err error)

DeleteResource removes the resource with the given kind and uniqueKey. Returns ResourceMissingError if the resource wasn't found. Returns DependencyError if the resource is referenced by another.

emptyResource will be populated with the deleted resource. For convenience, if the delete is successful, the populated resource will also be returned. If there was an error, nil will be returned for the resource.

func DeleteResourceAndNotify

func DeleteResourceAndNotify[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string, emptyResource R) (resource R, exists bool, err error)

DeleteResourceAndNotify removes the resource with the given kind and uniqueKey. Returns ResourceMissingError if the resource wasn't found.

func FindAgents

func FindAgents(ctx context.Context, idx search.Index, key string, value string) ([]string, error)

FindAgents returns a list of agent IDs that match the specified key/value pair. The key must be a valid search field

func FindResource

func FindResource[R model.Resource](ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, kind model.Kind, uniqueKey string) (resource R, key []byte, bucket *bbolt.Bucket, exists bool, err error)

FindResource finds a resource by kind and unique key. If the resource is versioned, the latest version is returned.

func FindSuggestions

func FindSuggestions(idx search.Index, key string, prefix string) ([]string, error)

FindSuggestions returns a list of all values for the specified key that start with the specified prefix.

func GetSeedResources

func GetSeedResources(ctx context.Context, files embed.FS, folders []string) ([]model.Resource, error)

GetSeedResources returns all of the resources contained in SeedFolders.

func HandleRolloutUpdates

func HandleRolloutUpdates(ctx context.Context, s Store, logger *zap.Logger)

HandleRolloutUpdates is a blocking call that subscribes to the store AgentRolloutUpdates and calls UpdateRollout on all agent configurations

func InitBoltstoreDB

func InitBoltstoreDB(storageFilePath string) (*bbolt.DB, error)

InitBoltstoreDB takes in the full path to a storage file and returns an opened bbolt database. It will return an error if the file cannot be opened.

func IsNewConfigurationVersion

func IsNewConfigurationVersion(curResource *model.AnyResource, newResource model.Resource) (bool, *model.Configuration, error)

IsNewConfigurationVersion handles the special case for configurations where we only create new versions in certain cases. if the rollout has already started (not pending) and the spec has changed, we create a new version.

func KeyFromResource

func KeyFromResource(r model.Resource) []byte

KeyFromResource returns the key for a resource in the store.

func MaskSensitiveParameters added in v1.19.0

func MaskSensitiveParameters[R model.Resource](ctx context.Context, resource R)

MaskSensitiveParameters masks sensitive parameter values based on the ParameterDefinitions in the ResourceType. This should be called after reading a value from the store before returning it.

func MergeUpdates

func MergeUpdates(into, from BasicEventUpdates) bool

MergeUpdates merges the updates from the given Updates into the current Updates.

func NewBPCookieStore

func NewBPCookieStore(secret string) *sessions.CookieStore

NewBPCookieStore creates a new CookieStore with the specified secret.

func NewDependencyError

func NewDependencyError(d DependentResources) error

NewDependencyError creates a new DependencyError with the given dependencies.

func PreserveSensitiveParameters added in v1.19.0

func PreserveSensitiveParameters(ctx context.Context, updated model.Resource, existing *model.AnyResource) error

PreserveSensitiveParameters will replace sensitive parameters in the current Resource with values from the existing Resource. This should be called before writing an updated value to the store.

func Resource

func Resource[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string) (resource R, exists bool, err error)

Resource returns a resource of the given kind and unique key.

func ResourceKey

func ResourceKey(kind model.Kind, name string) []byte

ResourceKey returns the key for a resource in the store.

func Resources

func Resources[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind) ([]R, error)

Resources returns all resources of the given kind.

func ResourcesByUniqueKeys

func ResourcesByUniqueKeys[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKeys []string, opts QueryOptions) ([]R, error)

ResourcesByUniqueKeys returns the resources of the specified kind with the specified uniqueKeys. If requesting some resources results in an error, the errors will be accumulated and return with the list of resources successfully retrieved.

func ResourcesPrefix

func ResourcesPrefix(kind model.Kind) []byte

ResourcesPrefix returns the prefix for a resource kind in the store.

func Seed

func Seed(ctx context.Context, store Store, logger *zap.Logger, files embed.FS, folders []string) error

Seed adds bundled resources to the store

func SeedSearchIndexes

func SeedSearchIndexes(ctx context.Context, store Store, logger *zap.Logger)

SeedSearchIndexes seeds the search indexes with the current data in the store

func StartedRolloutsFromIndex

func StartedRolloutsFromIndex(_ context.Context, index search.Index) ([]string, error)

StartedRolloutsFromIndex returns a list of all rollouts that are not pending.

func UpdateDependentResources

func UpdateDependentResources(ctx context.Context, store Store, resources []model.Resource, statuses []model.ResourceStatus, errs error) ([]model.ResourceStatus, error)

UpdateDependentResources updates any resources that are dependent on the resources updated. The updated resources will be retrieved from the Updates and additional resources will be added to Updates as they are updated.

This function also takes the existing statuses and errors (joined using errors.Join) and returns them with any additionally modified resources and errors.

func UpdateResource

func UpdateResource[R model.Resource](ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, kind model.Kind, name string, updater func(R) error) (r R, status model.UpdateStatus, err error)

UpdateResource updates a resource in the store. If the resource does not exist, model.StatusUnchanged is returned with the error ErrResourceMissing. If the resource does exist, it is updated using the updater function and the updated resource is returned.

func UpdateRolloutMetrics

func UpdateRolloutMetrics(ctx context.Context, agentIndex search.Index, config *model.Configuration) (int, error)

UpdateRolloutMetrics finds the agent counts for the configuration, updates the

rollout status and returns the number of new agents pending

func UpsertResource

func UpsertResource(ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, r model.Resource) (model.UpdateStatus, error)

UpsertResource upserts a resource into the store. If the resource already exists, it will be updated.

Types

type AgentUpdater

type AgentUpdater func(current *model.Agent)

AgentUpdater is given the current Agent model (possibly empty except for ID) and should update the Agent directly. We take this approach so that appropriate locking and/or transactions can be used for the operation as needed by the Store implementation.

type ArchiveStore

type ArchiveStore interface {
	// ResourceHistory returns all versions of the specified resource.
	ResourceHistory(ctx context.Context, resourceKind model.Kind, resourceName string) ([]*model.AnyResource, error)
}

ArchiveStore provides access to archived resources for version history.

type BasicEventUpdates

type BasicEventUpdates interface {
	// Agents returns a collection of agent events.
	Agents() Events[*model.Agent]
	// AgentVersions returns a collection of agent version events.
	AgentVersions() Events[*model.AgentVersion]
	// Sources returns a collection of source events.
	Sources() Events[*model.Source]
	// SourceTypes returns a collection of source type events.
	SourceTypes() Events[*model.SourceType]
	// Processors returns a collection of processor events.
	Processors() Events[*model.Processor]
	// ProcessorTypes returns a collection of processor type events.
	ProcessorTypes() Events[*model.ProcessorType]
	// Destinations returns a collection of destination events.
	Destinations() Events[*model.Destination]
	// DestinationTypes returns a collection of destination type events.
	DestinationTypes() Events[*model.DestinationType]
	// Configurations returns a collection of configuration events.
	Configurations() Events[*model.Configuration]

	// IncludeResource will add a resource event to Updates.
	IncludeResource(r model.Resource, eventType EventType)
	// IncludeAgent will add an agent event to Updates.
	IncludeAgent(agent *model.Agent, eventType EventType)

	// CouldAffectConfigurations returns true if the updates could affect configurations.
	CouldAffectConfigurations() bool
	// CouldAffectDestinations returns true if the updates could affect destinations.
	CouldAffectDestinations() bool
	// CouldAffectProcessors returns true if the updates could affect processors.
	CouldAffectProcessors() bool
	// CouldAffectSources returns true if the updates could affect sources.
	CouldAffectSources() bool

	// AffectsDestination returns true if the updates affect the given destination.
	AffectsDestination(destination *model.Destination) bool
	// AffectsProcessor returns true if the updates affect the given processor.
	AffectsProcessor(processor *model.Processor) bool
	// AffectsSource returns true if the updates affect the given source.
	AffectsSource(source *model.Source) bool
	// AffectsConfiguration returns true if the updates affect the given configuration.
	AffectsConfiguration(configuration *model.Configuration) bool
	// AffectsResourceProcessors returns true if the updates affect any of the given resource processors.
	AffectsResourceProcessors(processors []model.ResourceConfiguration) bool

	// AddAffectedSources will add the given sources to the updates.
	AddAffectedSources(sources []*model.Source)
	// AddAffectedProcessors will add the given processors to the updates.
	AddAffectedProcessors(processors []*model.Processor)
	// AddAffectedDestinations will add the given destinations to the updates.
	AddAffectedDestinations(destinations []*model.Destination)
	// AddAffectedConfigurations will add the given configurations to the updates.
	AddAffectedConfigurations(configurations []*model.Configuration)

	// TransitiveUpdates returns a list of resources that need to have their resources updated.
	TransitiveUpdates() []model.Resource

	// Size returns the number of events in the updates.
	Size() int
	// Empty returns true if the updates are empty.
	Empty() bool
	// Merge merges another set of updates into this one, returns true
	// if it was able to merge any updates.
	Merge(other BasicEventUpdates) bool
}

BasicEventUpdates is a collection of events created by a store operation.

func NewEventUpdates

func NewEventUpdates() BasicEventUpdates

NewEventUpdates returns a new Updates object.

func UpdatesForContext

func UpdatesForContext(ctx context.Context) (updates BasicEventUpdates, newContext context.Context, shouldNotify bool)

UpdatesForContext returns the Updates for this Context. If there is no Updates on the Context, it creates a new Updates and adds it to the Context. It returns either the existing Context or a child Context with new Updates as appropriate. It returns true if the Updates were already found on the context.

Keeping the Updates on the context allows for recursive calls to ApplyResources without creating multiple Updates which would result in multiple Updates events. Store implementations should use this function instead of NewUpdates and should only notify if shouldNotify is true.

type BoltstoreCommon

type BoltstoreCommon interface {
	Database() *bbolt.DB
	AgentsBucket(ctx context.Context, tx *bbolt.Tx) (*bbolt.Bucket, error)
	MeasurementsBucket(ctx context.Context, tx *bbolt.Tx, metric string) (*bbolt.Bucket, error)
	ResourcesBucket(ctx context.Context, tx *bbolt.Tx, kind model.Kind) (*bbolt.Bucket, error)
	ArchiveBucket(ctx context.Context, tx *bbolt.Tx) (*bbolt.Bucket, error)
	ResourceKey(r model.Resource) []byte
	AgentsIndex(ctx context.Context) search.Index
	ConfigurationsIndex(ctx context.Context) search.Index
	ZapLogger() *zap.Logger
	Notify(ctx context.Context, updates BasicEventUpdates)
	CreateEventUpdate() BasicEventUpdates
}

BoltstoreCommon is an interface for common implementation details between different boltstore implementations

type BoltstoreCore

type BoltstoreCore struct {
	StoreUpdates   *Updates
	DB             *bbolt.DB
	Logger         *zap.Logger
	SessionStorage sessions.Store
	sync.RWMutex
	BoltstoreCommon
}

BoltstoreCore is an implementation of the store interface that uses BoltDB as the underlying storage mechanism

func (*BoltstoreCore) Agent

func (s *BoltstoreCore) Agent(ctx context.Context, id string) (*model.Agent, error)

Agent returns the agent with the given ID.

func (*BoltstoreCore) AgentConfiguration

func (s *BoltstoreCore) AgentConfiguration(ctx context.Context, agent *model.Agent) (*model.Configuration, error)

AgentConfiguration returns the configuration that should be applied to an agent.

func (*BoltstoreCore) AgentIndex

func (s *BoltstoreCore) AgentIndex(ctx context.Context) search.Index

AgentIndex provides access to the search Index implementation managed by the Store

func (*BoltstoreCore) AgentMetrics

func (s *BoltstoreCore) AgentMetrics(ctx context.Context, ids []string, options ...stats.QueryOption) (stats.MetricData, error)

AgentMetrics provides metrics for an individual agents. They are essentially configuration metrics filtered to a list of agents.

Note: While the same record.Metric struct is used to return the metrics, these are not the same metrics provided to Store. They will be aggregated and counter metrics will be converted into rates.

func (*BoltstoreCore) AgentRolloutUpdates

func (s *BoltstoreCore) AgentRolloutUpdates(_ context.Context) eventbus.Source[RolloutEventUpdates]

AgentRolloutUpdates will receive agent update events that are meant to be processed for purpose of rollouts.

func (*BoltstoreCore) AgentVersion

func (s *BoltstoreCore) AgentVersion(ctx context.Context, name string) (*model.AgentVersion, error)

AgentVersion returns the agent version with the given name.

func (*BoltstoreCore) AgentVersions

func (s *BoltstoreCore) AgentVersions(ctx context.Context) ([]*model.AgentVersion, error)

AgentVersions returns all agent versions in the store with the given options.

func (*BoltstoreCore) Agents

func (s *BoltstoreCore) Agents(ctx context.Context, options ...QueryOption) ([]*model.Agent, error)

Agents returns all agents in the store with the given options.

func (*BoltstoreCore) AgentsCount

func (s *BoltstoreCore) AgentsCount(ctx context.Context, options ...QueryOption) (int, error)

AgentsCount returns the number of agents in the store with the given options.

func (*BoltstoreCore) AgentsIDsMatchingConfiguration

func (s *BoltstoreCore) AgentsIDsMatchingConfiguration(ctx context.Context, configuration *model.Configuration) ([]string, error)

AgentsIDsMatchingConfiguration returns the list of agent IDs that are using the specified configuration

func (*BoltstoreCore) CleanupDisconnectedAgents

func (s *BoltstoreCore) CleanupDisconnectedAgents(ctx context.Context, since time.Time) error

CleanupDisconnectedAgents removes all containerized agents that have been disconnected since the given time

func (*BoltstoreCore) Configuration

func (s *BoltstoreCore) Configuration(ctx context.Context, name string) (*model.Configuration, error)

Configuration returns the configuration with the given name.

func (*BoltstoreCore) ConfigurationIndex

func (s *BoltstoreCore) ConfigurationIndex(ctx context.Context) search.Index

ConfigurationIndex provides access to the search Index for Configurations

func (*BoltstoreCore) ConfigurationMetrics

func (s *BoltstoreCore) ConfigurationMetrics(ctx context.Context, name string, options ...stats.QueryOption) (stats.MetricData, error)

ConfigurationMetrics provides all metrics associated with a configuration aggregated from all agents using the configuration.

Note: While the same record.Metric struct is used to return the metrics, these are not the same metrics provided to Store. They will be aggregated and counter metrics will be converted into rates.

func (*BoltstoreCore) Configurations

func (s *BoltstoreCore) Configurations(ctx context.Context, options ...QueryOption) ([]*model.Configuration, error)

Configurations returns the configurations in the store with the given options.

func (*BoltstoreCore) Database

func (s *BoltstoreCore) Database() *bbolt.DB

Database returns the underlying bbolt database

func (*BoltstoreCore) DeleteAgentVersion

func (s *BoltstoreCore) DeleteAgentVersion(ctx context.Context, name string) (*model.AgentVersion, error)

DeleteAgentVersion deletes the agent version with the given name.

func (*BoltstoreCore) DeleteConfiguration

func (s *BoltstoreCore) DeleteConfiguration(ctx context.Context, name string) (*model.Configuration, error)

DeleteConfiguration deletes the configuration with the given name.

func (*BoltstoreCore) DeleteDestination

func (s *BoltstoreCore) DeleteDestination(ctx context.Context, name string) (*model.Destination, error)

DeleteDestination deletes the destination with the given name.

func (*BoltstoreCore) DeleteDestinationType

func (s *BoltstoreCore) DeleteDestinationType(ctx context.Context, name string) (*model.DestinationType, error)

DeleteDestinationType deletes the destination type with the given name.

func (*BoltstoreCore) DeleteProcessor

func (s *BoltstoreCore) DeleteProcessor(ctx context.Context, name string) (*model.Processor, error)

DeleteProcessor deletes the processor with the given name.

func (*BoltstoreCore) DeleteProcessorType

func (s *BoltstoreCore) DeleteProcessorType(ctx context.Context, name string) (*model.ProcessorType, error)

DeleteProcessorType deletes the processor type with the given name.

func (*BoltstoreCore) DeleteResourcesCore

func (s *BoltstoreCore) DeleteResourcesCore(ctx context.Context, resources []model.Resource) ([]model.ResourceStatus, error)

DeleteResourcesCore iterates threw a slice of resources, and removes them from storage by name.

func (*BoltstoreCore) DeleteSource

func (s *BoltstoreCore) DeleteSource(ctx context.Context, name string) (*model.Source, error)

DeleteSource deletes the source with the given name.

func (*BoltstoreCore) DeleteSourceType

func (s *BoltstoreCore) DeleteSourceType(ctx context.Context, name string) (*model.SourceType, error)

DeleteSourceType deletes the source type with the given name.

func (*BoltstoreCore) Destination

func (s *BoltstoreCore) Destination(ctx context.Context, name string) (*model.Destination, error)

Destination returns the destination with the given name.

func (*BoltstoreCore) DestinationType

func (s *BoltstoreCore) DestinationType(ctx context.Context, name string) (*model.DestinationType, error)

DestinationType returns the destination type with the given name.

func (*BoltstoreCore) DestinationTypes

func (s *BoltstoreCore) DestinationTypes(ctx context.Context) ([]*model.DestinationType, error)

DestinationTypes returns the destination types in the store.

func (*BoltstoreCore) Destinations

func (s *BoltstoreCore) Destinations(ctx context.Context) ([]*model.Destination, error)

Destinations returns the destinations in the store.

func (*BoltstoreCore) FindAgentConfiguration

func (s *BoltstoreCore) FindAgentConfiguration(ctx context.Context, agent *model.Agent) (*model.Configuration, error)

FindAgentConfiguration uses label matching to find the appropriate configuration for this agent. If a configuration is found that does not match the Current configuration, the configuration will be assigned to Future.

func (*BoltstoreCore) MeasurementsSize

func (s *BoltstoreCore) MeasurementsSize(ctx context.Context) (int, error)

MeasurementsSize returns the count of keys in the store, and is used only for testing

func (*BoltstoreCore) OverviewMetrics

func (s *BoltstoreCore) OverviewMetrics(ctx context.Context, options ...stats.QueryOption) (stats.MetricData, error)

OverviewMetrics provides all metrics needed for the overview page. This page shows configs and destinations.

func (*BoltstoreCore) PauseRollout

func (s *BoltstoreCore) PauseRollout(ctx context.Context, configurationName string) (*model.Configuration, error)

PauseRollout will pause a rollout for the specified configuration. Does nothing if the rollout does not have a RolloutStatusStarted status. Returns the current Configuration with its Rollout status.

func (*BoltstoreCore) ProcessMetrics

func (s *BoltstoreCore) ProcessMetrics(ctx context.Context) error

ProcessMetrics is called in the background at regular intervals and performs metric roll-up and removes old data

func (*BoltstoreCore) Processor

func (s *BoltstoreCore) Processor(ctx context.Context, name string) (*model.Processor, error)

Processor returns the processor with the given name.

func (*BoltstoreCore) ProcessorType

func (s *BoltstoreCore) ProcessorType(ctx context.Context, name string) (*model.ProcessorType, error)

ProcessorType returns the processor type with the given name.

func (*BoltstoreCore) ProcessorTypes

func (s *BoltstoreCore) ProcessorTypes(ctx context.Context) ([]*model.ProcessorType, error)

ProcessorTypes returns the processor types in the store.

func (*BoltstoreCore) Processors

func (s *BoltstoreCore) Processors(ctx context.Context) ([]*model.Processor, error)

Processors returns the processors in the store.

func (*BoltstoreCore) ResourceHistory

func (s *BoltstoreCore) ResourceHistory(ctx context.Context, resourceKind model.Kind, resourceName string) ([]*model.AnyResource, error)

ResourceHistory returns the history of a resource given its kind and name.

func (*BoltstoreCore) ResumeRollout

func (s *BoltstoreCore) ResumeRollout(ctx context.Context, configurationName string) (*model.Configuration, error)

ResumeRollout will resume a rollout for the specified configuration. Does nothing if the Rollout status is not RolloutStatusStarted or RolloutStatusStarted. For RolloutStatusError - it will increase the maxErrors of the rollout by the current number of errors + 1. For RolloutStatusStarted - it will pause the rollout.

func (*BoltstoreCore) SaveAgentMetrics

func (s *BoltstoreCore) SaveAgentMetrics(ctx context.Context, metrics []*record.Metric) error

SaveAgentMetrics saves new metrics. These metrics will be aggregated to determine metrics associated with agents and configurations.

func (*BoltstoreCore) Source

func (s *BoltstoreCore) Source(ctx context.Context, name string) (*model.Source, error)

Source returns the source with the given name.

func (*BoltstoreCore) SourceType

func (s *BoltstoreCore) SourceType(ctx context.Context, name string) (*model.SourceType, error)

SourceType returns the source type with the given name.

func (*BoltstoreCore) SourceTypes

func (s *BoltstoreCore) SourceTypes(ctx context.Context) ([]*model.SourceType, error)

SourceTypes returns the source types in the store.

func (*BoltstoreCore) Sources

func (s *BoltstoreCore) Sources(ctx context.Context) ([]*model.Source, error)

Sources returns the sources in the store.

func (*BoltstoreCore) StartMeasurements

func (s *BoltstoreCore) StartMeasurements(ctx context.Context)

StartMeasurements starts the background process for writing and rolling up metrics

func (*BoltstoreCore) StartRollout

func (s *BoltstoreCore) StartRollout(ctx context.Context, configurationName string, options *model.RolloutOptions) (*model.Configuration, error)

StartRollout will start a rollout for the specified configuration with the specified options. If nil is passed for options, any existing rollout options on the configuration status will be used. If there are no rollout options in the configuration status, default values will be used for the rollout. If there is an existing rollout a different version of this configuration, it will be replaced. Does nothing if the rollout does not have a RolloutStatusPending status. Returns the current Configuration with its Rollout status.

func (*BoltstoreCore) UpdateConfiguration

func (s *BoltstoreCore) UpdateConfiguration(ctx context.Context, name string, updater ConfigurationUpdater) (config *model.Configuration, status model.UpdateStatus, err error)

UpdateConfiguration updates the configuration with the given name using the updater function.

func (*BoltstoreCore) UpdateRollout

func (s *BoltstoreCore) UpdateRollout(ctx context.Context, configuration string) (updatedConfig *model.Configuration, err error)

UpdateRollout updates a rollout in progress. Does nothing if the rollout does not have a RolloutStatusStarted status. Returns the current Configuration with its Rollout status.

func (*BoltstoreCore) UpdateRollouts

func (s *BoltstoreCore) UpdateRollouts(ctx context.Context) ([]*model.Configuration, error)

UpdateRollouts updates all rollouts in progress. It returns each of the Configurations that contains an active rollout.

func (*BoltstoreCore) Updates

Updates returns a channel that will receive updates when resources are added, updated, or deleted.

func (*BoltstoreCore) UpsertAgent

func (s *BoltstoreCore) UpsertAgent(ctx context.Context, id string, updater AgentUpdater) (*model.Agent, error)

UpsertAgent creates or updates the given agent and calls the updater method on it.

func (*BoltstoreCore) UpsertAgents

func (s *BoltstoreCore) UpsertAgents(ctx context.Context, agentIDs []string, updater AgentUpdater) ([]*model.Agent, error)

UpsertAgents upserts the agents with the given IDs. If the agent does not exist, it will be created.

type BroadCastBuilder

type BroadCastBuilder[T any] func(ctx context.Context, options Options, logger *zap.Logger, maxEventsToMerge int) broadcast.Broadcast[T]

BroadCastBuilder is a function that builds a broadcast.Broadcast[BasicUpdates] using routing and broadcast options.

func BuildBasicEventBroadcast

func BuildBasicEventBroadcast() BroadCastBuilder[BasicEventUpdates]

BuildBasicEventBroadcast returns a BroadCastBuilder that builds a broadcast.Broadcast[BasicUpdates] using routing and broadcast options for oss.

func BuildRolloutEventBroadcast

func BuildRolloutEventBroadcast() BroadCastBuilder[RolloutEventUpdates]

BuildRolloutEventBroadcast returns a BroadCastBuilder that builds a broadcast.Broadcast[RolloutEventUpdates] using routing and broadcast options for oss.

type ConfigurationUpdater

type ConfigurationUpdater func(current *model.Configuration)

ConfigurationUpdater is given the current Configuration model and should update the Configuration directly.

type Dependency

type Dependency struct {
	Name string
	Kind model.Kind
}

Dependency is a single item in DependentResources.

type DependencyError

type DependencyError struct {
	Dependencies DependentResources
}

DependencyError is returned when trying to delete a resource that is being referenced by other resources.

func (*DependencyError) Error

func (de *DependencyError) Error() string

type DependentResources

type DependentResources []Dependency

DependentResources is the return type of store.dependentResources and used to construct DependencyError. It has help methods empty(), message(), and add().

func FindDependentResources

func FindDependentResources(ctx context.Context, configurationIndex search.Index, name string, kind model.Kind) (DependentResources, error)

FindDependentResources finds the dependent resources using the ConfigurationIndex provided by the Store.

func (*DependentResources) Add

func (r *DependentResources) Add(d Dependency)

Add adds a new dependency to the list.

func (*DependentResources) Empty

func (r *DependentResources) Empty() bool

Empty returns true if the list of dependencies is empty.

func (*DependentResources) Message

func (r *DependentResources) Message() string

Message returns a string representation of the list of dependencies.

type Event

type Event[T model.HasUniqueKey] struct {
	Type EventType `json:"type"`
	Item T         `json:"item"`
}

Event represents an insert, update, or remove of something stored in Store.

type EventType

type EventType uint8

EventType is the type of event

const (
	EventTypeInsert  EventType = 1
	EventTypeUpdate  EventType = 2
	EventTypeRemove  EventType = 3
	EventTypeLabel   EventType = 4
	EventTypeRollout EventType = 5
)

Insert, Update, Remove, and Label are the possible changes to a resource. Remove can indicate that the resource has been deleted or that it no longer matches a filtered list of resources being observed.

Label is currently used to indicate that Agent labels have changed but there may also be other updates so it can be considered a subset of EventTypeUpdate where every EventTypeLabel is also an EventTypeUpdate.

type EventUpdates

type EventUpdates struct {
	AgentsField           Events[*model.Agent]           `json:"agents"`
	AgentVersionsField    Events[*model.AgentVersion]    `json:"agentVersions"`
	SourcesField          Events[*model.Source]          `json:"sources"`
	SourceTypesField      Events[*model.SourceType]      `json:"sourceTypes"`
	ProcessorsField       Events[*model.Processor]       `json:"processors"`
	ProcessorTypesField   Events[*model.ProcessorType]   `json:"processorTypes"`
	DestinationsField     Events[*model.Destination]     `json:"destinations"`
	DestinationTypesField Events[*model.DestinationType] `json:"destinationTypes"`
	ConfigurationsField   Events[*model.Configuration]   `json:"configurations"`
	// contains filtered or unexported fields
}

EventUpdates is a collection of events created by a store operation.

func (*EventUpdates) AddAffectedConfigurations

func (u *EventUpdates) AddAffectedConfigurations(configurations []*model.Configuration)

AddAffectedConfigurations will add updates for Configurations that are affected by other resource updates.

func (*EventUpdates) AddAffectedDestinations

func (u *EventUpdates) AddAffectedDestinations(destinations []*model.Destination)

AddAffectedDestinations will add updates for Destinations that are affected by other resource updates.

func (*EventUpdates) AddAffectedProcessors

func (u *EventUpdates) AddAffectedProcessors(processors []*model.Processor)

AddAffectedProcessors will add updates for Processors that are affected by other resource updates.

func (*EventUpdates) AddAffectedSources

func (u *EventUpdates) AddAffectedSources(sources []*model.Source)

AddAffectedSources will add updates for Sources that are affected by other resource updates.

func (*EventUpdates) AffectsConfiguration

func (u *EventUpdates) AffectsConfiguration(configuration *model.Configuration) bool

AffectsConfiguration returns true if the updates affect the given configuration.

func (*EventUpdates) AffectsDestination

func (u *EventUpdates) AffectsDestination(destination *model.Destination) bool

AffectsDestination returns true if the updates affect the given destination.

func (*EventUpdates) AffectsProcessor

func (u *EventUpdates) AffectsProcessor(processor *model.Processor) bool

AffectsProcessor returns true if the updates affect the given processor.

func (*EventUpdates) AffectsResourceProcessors

func (u *EventUpdates) AffectsResourceProcessors(processors []model.ResourceConfiguration) bool

AffectsResourceProcessors returns true if the updates affect any of the given resource processors.

func (*EventUpdates) AffectsSource

func (u *EventUpdates) AffectsSource(source *model.Source) bool

AffectsSource returns true if the updates affect the given source.

func (*EventUpdates) AgentVersions

func (u *EventUpdates) AgentVersions() Events[*model.AgentVersion]

AgentVersions returns a collection of agent version events.

func (*EventUpdates) Agents

func (u *EventUpdates) Agents() Events[*model.Agent]

Agents returns a collection of agent events.

func (*EventUpdates) Configurations

func (u *EventUpdates) Configurations() Events[*model.Configuration]

Configurations returns a collection of configuration events.

func (*EventUpdates) CouldAffectConfigurations

func (u *EventUpdates) CouldAffectConfigurations() bool

CouldAffectConfigurations returns true if the updates could affect configurations.

func (*EventUpdates) CouldAffectDestinations

func (u *EventUpdates) CouldAffectDestinations() bool

CouldAffectDestinations returns true if the updates could affect destinations.

func (*EventUpdates) CouldAffectProcessors

func (u *EventUpdates) CouldAffectProcessors() bool

CouldAffectProcessors returns true if the updates could affect processors.

func (*EventUpdates) CouldAffectSources

func (u *EventUpdates) CouldAffectSources() bool

CouldAffectSources returns true if the updates could affect sources.

func (*EventUpdates) DestinationTypes

func (u *EventUpdates) DestinationTypes() Events[*model.DestinationType]

DestinationTypes returns a collection of destination type events.

func (*EventUpdates) Destinations

func (u *EventUpdates) Destinations() Events[*model.Destination]

Destinations returns a collection of destination events.

func (*EventUpdates) Empty

func (u *EventUpdates) Empty() bool

Empty returns true if no events exist.

func (*EventUpdates) HasDestinationEvents

func (u *EventUpdates) HasDestinationEvents() bool

HasDestinationEvents returns true if any destination events exist.

func (*EventUpdates) HasDestinationTypeEvents

func (u *EventUpdates) HasDestinationTypeEvents() bool

HasDestinationTypeEvents returns true if any destination type events exist.

func (*EventUpdates) HasProcessorEvents

func (u *EventUpdates) HasProcessorEvents() bool

HasProcessorEvents returns true if any processor events exist.

func (*EventUpdates) HasProcessorTypeEvents

func (u *EventUpdates) HasProcessorTypeEvents() bool

HasProcessorTypeEvents returns true if any processor type events exist.

func (*EventUpdates) HasSourceEvents

func (u *EventUpdates) HasSourceEvents() bool

HasSourceEvents returns true if any source events exist.

func (*EventUpdates) HasSourceTypeEvents

func (u *EventUpdates) HasSourceTypeEvents() bool

HasSourceTypeEvents returns true if any source type events exist.

func (*EventUpdates) IncludeAgent

func (u *EventUpdates) IncludeAgent(agent *model.Agent, eventType EventType)

IncludeAgent will add an agent event to Updates.

func (*EventUpdates) IncludeAgentVersion

func (u *EventUpdates) IncludeAgentVersion(agentVersion *model.AgentVersion, eventType EventType)

IncludeAgentVersion will add an agent version event to Updates.

func (*EventUpdates) IncludeConfiguration

func (u *EventUpdates) IncludeConfiguration(configuration *model.Configuration, eventType EventType)

IncludeConfiguration will add a configuration event to Updates.

func (*EventUpdates) IncludeDestination

func (u *EventUpdates) IncludeDestination(destination *model.Destination, eventType EventType)

IncludeDestination will add a destination event to Updates.

func (*EventUpdates) IncludeDestinationType

func (u *EventUpdates) IncludeDestinationType(destinationType *model.DestinationType, eventType EventType)

IncludeDestinationType will add a destination type event to Updates.

func (*EventUpdates) IncludeProcessor

func (u *EventUpdates) IncludeProcessor(processor *model.Processor, eventType EventType)

IncludeProcessor will add a processor event to Updates.

func (*EventUpdates) IncludeProcessorType

func (u *EventUpdates) IncludeProcessorType(processorType *model.ProcessorType, eventType EventType)

IncludeProcessorType will add a processor type event to Updates.

func (*EventUpdates) IncludeResource

func (u *EventUpdates) IncludeResource(r model.Resource, eventType EventType)

IncludeResource will add a resource event to Updates. If the resource is not supported by Updates, this will do nothing.

func (*EventUpdates) IncludeSource

func (u *EventUpdates) IncludeSource(source *model.Source, eventType EventType)

IncludeSource will add a source event to Updates.

func (*EventUpdates) IncludeSourceType

func (u *EventUpdates) IncludeSourceType(sourceType *model.SourceType, eventType EventType)

IncludeSourceType will add a source type event to Updates.

func (*EventUpdates) Merge

func (u *EventUpdates) Merge(other BasicEventUpdates) bool

Merge merges another set of updates into this one, returns true if it was able to merge any updates.

func (*EventUpdates) ProcessorTypes

func (u *EventUpdates) ProcessorTypes() Events[*model.ProcessorType]

ProcessorTypes returns a collection of processor type events.

func (*EventUpdates) Processors

func (u *EventUpdates) Processors() Events[*model.Processor]

Processors returns a collection of processor events.

func (*EventUpdates) Size

func (u *EventUpdates) Size() int

Size returns the sum of all events.

func (*EventUpdates) SourceTypes

func (u *EventUpdates) SourceTypes() Events[*model.SourceType]

SourceTypes returns a collection of source type events.

func (*EventUpdates) Sources

func (u *EventUpdates) Sources() Events[*model.Source]

Sources returns a collection of source events.

func (*EventUpdates) TransitiveUpdates

func (u *EventUpdates) TransitiveUpdates() []model.Resource

TransitiveUpdates returns a list of resources that need to have their resources updated.

type Events

type Events[T model.HasUniqueKey] map[string]Event[T]

Events is a map of ID to Event

func NewEvents

func NewEvents[T model.HasUniqueKey]() Events[T]

NewEvents returns a new empty set of events

func NewEventsWithItem

func NewEventsWithItem[T model.HasUniqueKey](item T, eventType EventType) Events[T]

NewEventsWithItem returns a new set of events with an initial item and eventType

func (Events[T]) ByType

func (e Events[T]) ByType(eventType EventType) []Event[T]

ByType returns all events of the specified type

func (Events[T]) CanSafelyMerge

func (e Events[T]) CanSafelyMerge(other Events[T]) bool

CanSafelyMerge returns true if the other events can be merged into this one. Currently we only merge events with different keys.

func (Events[T]) Clone

func (e Events[T]) Clone() Events[T]

Clone copies the event.

func (Events[T]) Contains

func (e Events[T]) Contains(uniqueKey string, eventType EventType) bool

Contains returns true if the item already exists

func (Events[T]) ContainsKey

func (e Events[T]) ContainsKey(uniqueKey string) bool

ContainsKey returns true if the key already exists with any EventType

func (Events[T]) Empty

func (e Events[T]) Empty() bool

Empty returns true if there are no events

func (Events[T]) Include

func (e Events[T]) Include(item T, eventType EventType)

Include an item of with the specified event type.

func (Events[T]) Keys

func (e Events[T]) Keys() []string

Keys returns all keys of the events

func (Events[T]) Merge

func (e Events[T]) Merge(other Events[T])

Merge will add events from the other events. Note that this will currently overwrite any existing events. Use CanSafelyMerge first to determine if a Merge can be done without overwriting and losing events.

func (Events[T]) Updates

func (e Events[T]) Updates() []Event[T]

Updates returns all events of type EventTypeUpdate

type Options

type Options struct {
	// SessionsSecret is used to encode sessions
	SessionsSecret string
	// MaxEventsToMerge is the maximum number of update events (inserts, updates, deletes, etc) to merge into a single
	// event.
	MaxEventsToMerge int
	// DisableMeasurementsCleanup indicates that the store should not clean up measurements. This is useful for testing.
	DisableMeasurementsCleanup bool
	// DisableRolloutUpdater indicates that the store should not update rollouts. This is useful for testing.
	DisableRolloutUpdater bool
}

Options are options that are common to all store implementations

type QueryOption

type QueryOption func(*QueryOptions)

QueryOption is an option used in Store queries

func WithLimit

func WithLimit(limit int) QueryOption

WithLimit sets the maximum number of results to return. For paging, if the pages have 10 items per page, set the limit to 10.

func WithOffset

func WithOffset(offset int) QueryOption

WithOffset sets the offset for the results to return. For paging, if the pages have 10 items per page and this is the 3rd page, set the offset to 20.

func WithQuery

func WithQuery(query *search.Query) QueryOption

WithQuery adds a search query string to the query options

func WithSelector

func WithSelector(selector model.Selector) QueryOption

WithSelector adds a selector to the query options

func WithSort

func WithSort(field string) QueryOption

WithSort sets the sort order for the request. The sort value is the name of the field, sorted ascending. To sort descending, prefix the field with a minus sign (-). Some Stores only allow sorting by certain fields. Sort values not supported will be ignored.

type QueryOptions

type QueryOptions struct {
	Selector model.Selector
	Query    *search.Query
	Offset   int
	Limit    int
	Sort     string
}

QueryOptions represents the set of options available for a store query

func MakeQueryOptions

func MakeQueryOptions(options []QueryOption) QueryOptions

MakeQueryOptions creates a queryOptions struct from a slice of QueryOption

type RolloutEventUpdates

type RolloutEventUpdates interface {
	// Updates retrieves the agent updates
	Updates() Events[*model.ConfigurationVersions]

	// Empty returns true if the updates are empty.
	Empty() bool
	// Merge merges another set of updates into this one, returns true
	// if it was able to merge any updates.
	Merge(other RolloutEventUpdates) bool
}

RolloutEventUpdates is a collection of rollout related events

func NewRolloutUpdates

func NewRolloutUpdates(_ context.Context, agentEvents Events[*model.Agent]) RolloutEventUpdates

NewRolloutUpdates creates a new RolloutUpdates

type RolloutUpdateCreator

type RolloutUpdateCreator func(context.Context, Events[*model.Agent]) RolloutEventUpdates

RolloutUpdateCreator is a function that creates a RolloutEventUpdates from a set of agent events.

type RolloutUpdates

type RolloutUpdates struct {
	UpdatesField Events[*model.ConfigurationVersions] `json:"updates"`
}

RolloutUpdates is a basic implementation of the RolloutEventUpdates interface

func (*RolloutUpdates) Empty

func (r *RolloutUpdates) Empty() bool

Empty returns true if the updates are empty.

func (*RolloutUpdates) Merge

func (r *RolloutUpdates) Merge(other RolloutEventUpdates) bool

Merge merges another set of updates into this one, returns true if it was able to merge any updates.

func (*RolloutUpdates) Updates

Updates retrieves the agent updates

type Store

type Store interface {
	Clear()

	Close() error

	Agent(ctx context.Context, id string) (*model.Agent, error)
	Agents(ctx context.Context, options ...QueryOption) ([]*model.Agent, error)
	AgentsCount(context.Context, ...QueryOption) (int, error)
	// UpsertAgent adds a new Agent to the Store or updates an existing one
	UpsertAgent(ctx context.Context, agentID string, updater AgentUpdater) (*model.Agent, error)
	UpsertAgents(ctx context.Context, agentIDs []string, updater AgentUpdater) ([]*model.Agent, error)
	DeleteAgents(ctx context.Context, agentIDs []string) ([]*model.Agent, error)

	AgentVersion(ctx context.Context, name string) (*model.AgentVersion, error)
	AgentVersions(ctx context.Context) ([]*model.AgentVersion, error)
	DeleteAgentVersion(ctx context.Context, name string) (*model.AgentVersion, error)

	Configurations(ctx context.Context, options ...QueryOption) ([]*model.Configuration, error)
	// Configuration returns a configuration by name.
	//
	// The name can optionally include a specific version, e.g. name:version.
	//
	// name:latest will return the latest version
	// name:current will return the current version that was applied to agents
	//
	// If the name does not include a version, the current version will be returned.
	// If the name does not exist, nil will be returned with no error.
	Configuration(ctx context.Context, name string) (*model.Configuration, error)
	UpdateConfiguration(ctx context.Context, name string, updater ConfigurationUpdater) (config *model.Configuration, status model.UpdateStatus, err error)
	DeleteConfiguration(ctx context.Context, name string) (*model.Configuration, error)

	Source(ctx context.Context, name string) (*model.Source, error)
	Sources(ctx context.Context) ([]*model.Source, error)
	DeleteSource(ctx context.Context, name string) (*model.Source, error)

	SourceType(ctx context.Context, name string) (*model.SourceType, error)
	SourceTypes(ctx context.Context) ([]*model.SourceType, error)
	DeleteSourceType(ctx context.Context, name string) (*model.SourceType, error)

	Processor(ctx context.Context, name string) (*model.Processor, error)
	Processors(ctx context.Context) ([]*model.Processor, error)
	DeleteProcessor(ctx context.Context, name string) (*model.Processor, error)

	ProcessorType(ctx context.Context, name string) (*model.ProcessorType, error)
	ProcessorTypes(ctx context.Context) ([]*model.ProcessorType, error)
	DeleteProcessorType(ctx context.Context, name string) (*model.ProcessorType, error)

	Destination(ctx context.Context, name string) (*model.Destination, error)
	Destinations(ctx context.Context) ([]*model.Destination, error)
	DeleteDestination(ctx context.Context, name string) (*model.Destination, error)

	DestinationType(ctx context.Context, name string) (*model.DestinationType, error)
	DestinationTypes(ctx context.Context) ([]*model.DestinationType, error)
	DeleteDestinationType(ctx context.Context, name string) (*model.DestinationType, error)
	// ApplyResources inserts or updates the specified resources. The resulting status of each resource is returned. The
	// resource may be modified as a result of the operation. If the caller needs to preserve the original resource,
	// model.Clone can be used to create a copy.
	ApplyResources(ctx context.Context, resources []model.Resource) ([]model.ResourceStatus, error)
	// Batch delete of a slice of resources, returns the successfully deleted resources or an error.
	DeleteResources(ctx context.Context, resources []model.Resource) ([]model.ResourceStatus, error)

	// AgentConfiguration returns the configuration that should be applied to an agent.
	//
	// It returns an error if the agent is nil. Returns the pending configuration if a rollout is in progress. Returns the
	// current configuration if there is one associated with the agent. Returns the configuration corresponding to the
	// configuration= label if the label exists on the agent. Returns the configuration matching the agent's labels if one
	// exists. Returns nil if there are no matches. If the agent does not have an associated configuration or the
	// associated configuration does not exist, no error is returned but the configuration will be nil. If there is an
	// error accessing the backend store, the error will be returned.
	AgentConfiguration(ctx context.Context, agent *model.Agent) (*model.Configuration, error)

	// AgentsIDsMatchingConfiguration returns the list of agent IDs that are using the specified configuration
	AgentsIDsMatchingConfiguration(ctx context.Context, conf *model.Configuration) ([]string, error)

	// CleanupDisconnectedAgents removes agents that have disconnected before the specified time
	CleanupDisconnectedAgents(ctx context.Context, since time.Time) error

	// Updates will receive pipelines and configurations that have been updated or deleted, either because the
	// configuration changed or a component in them was updated. Agents inserted/updated from UpsertAgent and agents
	// removed from CleanupDisconnectedAgents are also sent with Updates.
	Updates(ctx context.Context) eventbus.Source[BasicEventUpdates]

	// AgentRolloutUpdates will receive agent update events that are meant to be processed for purpose of rollouts.
	AgentRolloutUpdates(ctx context.Context) eventbus.Source[RolloutEventUpdates]

	// AgentIndex provides access to the search AgentIndex implementation managed by the Store
	AgentIndex(ctx context.Context) search.Index

	// ConfigurationIndex provides access to the search Index for Configurations
	ConfigurationIndex(ctx context.Context) search.Index

	// UserSessions must implement the gorilla sessions.Store interface
	UserSessions() sessions.Store

	// Measurements stores stats for agents and configurations
	Measurements() stats.Measurements

	// StartRollout will start a rollout for the specified configuration with the specified options. If nil is passed for
	// options, any existing rollout options on the configuration status will be used. If there are no rollout options in
	// the configuration status, default values will be used for the rollout. If there is an existing rollout a different
	// version of this configuration, it will be replaced. Does nothing if the rollout does not have a
	// RolloutStatusPending status. Returns the current Configuration with its Rollout status.
	StartRollout(ctx context.Context, configurationName string, options *model.RolloutOptions) (*model.Configuration, error)

	// ResumeRollout will resume a rollout for the specified configuration.
	// Does nothing if the Rollout status is not RolloutStatusStarted or RolloutStatusStarted.
	// For RolloutStatusError - it will increase the maxErrors of the
	// rollout by the current number of errors + 1.
	// For RolloutStatusStarted - it will pause the rollout.
	PauseRollout(ctx context.Context, configurationName string) (*model.Configuration, error)

	// ResumeRollout will resume a rollout for the specified configuration.
	// Does nothing if the Rollout status is not RolloutStatusStarted or RolloutStatusStarted.
	// For RolloutStatusError - it will increase the maxErrors of the
	// rollout by the current number of errors + 1.
	// For RolloutStatusStarted - it will pause the rollout.
	ResumeRollout(ctx context.Context, configurationName string) (*model.Configuration, error)

	// UpdateRollout updates a rollout in progress. Does nothing if the rollout does not have a RolloutStatusStarted
	// status. Returns the current Configuration with its Rollout status.
	UpdateRollout(ctx context.Context, configurationName string) (*model.Configuration, error)

	// UpdatesRollouts updates all rollouts in progress. It returns each of the Configurations that contains an active
	// rollout. It is possible for this to partially succeed and return some updated rollouts and an error containing
	// errors from updating some other rollouts.
	UpdateRollouts(ctx context.Context) ([]*model.Configuration, error)

	// UpdateAllRollouts updates all active rollouts. The error returned is not intended to be returned to a client but can
	// be logged.
	UpdateAllRollouts(ctx context.Context) error
}

Store handles interacting with a storage backend,

func NewBoltStore

func NewBoltStore(ctx context.Context, db *bbolt.DB, options Options, logger *zap.Logger) Store

NewBoltStore returns a new store boltstore struct that implements the store.Store interface.

func NewMapStore

func NewMapStore(ctx context.Context, options Options, logger *zap.Logger) Store

NewMapStore returns an in memory Store

type Updates

type Updates struct {
	// contains filtered or unexported fields
}

Updates is a wrapped event bus for store updates.

func NewUpdates

func NewUpdates(ctx context.Context, options Options, logger *zap.Logger,
	basicEventBroadcaster BroadCastBuilder[BasicEventUpdates],
	rolloutBroadcaster BroadCastBuilder[RolloutEventUpdates],
	rolloutUpdateCreator RolloutUpdateCreator,
) *Updates

NewUpdates creates a new UpdatesEventBus.

func (*Updates) RolloutUpdates

func (s *Updates) RolloutUpdates() eventbus.Source[RolloutEventUpdates]

RolloutUpdates returns the external channel that can be provided to external clients

func (*Updates) Send

func (s *Updates) Send(ctx context.Context, updates BasicEventUpdates)

Send adds an Updates event to the internal channel where it can be merged and relayed to the external channel.

func (*Updates) Shutdown

func (s *Updates) Shutdown(_ context.Context)

Shutdown stops the event bus.

func (*Updates) Updates

func (s *Updates) Updates() eventbus.Source[BasicEventUpdates]

Updates returns the external channel that can be provided to external clients.

Directories

Path Synopsis
Package search provides a search engine with indexing and suggestions for the store
Package search provides a search engine with indexing and suggestions for the store
Package stats provides the store for measurements associated with agents and configurations.
Package stats provides the store for measurements associated with agents and configurations.
Package storetest has helper functions for setting up storage for tests
Package storetest has helper functions for setting up storage for tests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL