Overview ¶
Package store contains the Store interface and implementations
Index ¶
- Constants
- Variables
- func AgentKey(id string) []byte
- func AgentPrefix() []byte
- func ApplySortOffsetAndLimit[T any](list []T, opts QueryOptions, fieldAccessor fieldAccessor[T]) []T
- func CurrentRolloutsForConfiguration(idx search.Index, configurationName string) ([]string, error)
- func DeleteResource[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string, ...) (resource R, exists bool, err error)
- func DeleteResourceAndNotify[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string, ...) (resource R, exists bool, err error)
- func FindAgents(ctx context.Context, idx search.Index, key string, value string) ([]string, error)
- func FindResource[R model.Resource](ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, kind model.Kind, ...) (resource R, key []byte, bucket *bbolt.Bucket, exists bool, err error)
- func FindSuggestions(idx search.Index, key string, prefix string) ([]string, error)
- func GetSeedResources(ctx context.Context, files embed.FS, folders []string) ([]model.Resource, error)
- func HandleRolloutUpdates(ctx context.Context, s Store, logger *zap.Logger)
- func InitBoltstoreDB(storageFilePath string) (*bbolt.DB, error)
- func IsNewConfigurationVersion(curResource *model.AnyResource, newResource model.Resource) (bool, *model.Configuration, error)
- func KeyFromResource(r model.Resource) []byte
- func MaskSensitiveParameters[R model.Resource](ctx context.Context, resource R)
- func MergeUpdates(into, from BasicEventUpdates) bool
- func NewBPCookieStore(secret string) *sessions.CookieStore
- func NewDependencyError(d DependentResources) error
- func PreserveSensitiveParameters(ctx context.Context, updated model.Resource, existing *model.AnyResource) error
- func Resource[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string) (resource R, exists bool, err error)
- func ResourceKey(kind model.Kind, name string) []byte
- func Resources[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind) ([]R, error)
- func ResourcesByUniqueKeys[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKeys []string, ...) ([]R, error)
- func ResourcesPrefix(kind model.Kind) []byte
- func RunReportConnectedAgentsTests(ctx context.Context, t *testing.T, store Store)
- func RunTestSeedDeprecated(ctx context.Context, t *testing.T, store Store)
- func Seed(ctx context.Context, store Store, logger *zap.Logger, files embed.FS, ...) error
- func SeedSearchIndexes(ctx context.Context, store Store, logger *zap.Logger)
- func StartedRolloutsFromIndex(_ context.Context, index search.Index) ([]string, error)
- func UpdateDependentResources(ctx context.Context, store Store, resources []model.Resource, ...) ([]model.ResourceStatus, error)
- func UpdateResource[R model.Resource](ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, kind model.Kind, ...) (r R, status model.UpdateStatus, err error)
- func UpdateRolloutMetrics(ctx context.Context, agentIndex search.Index, config *model.Configuration) ([]string, int, error)
- func UpsertResource(ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, r model.Resource) (model.UpdateStatus, error)
- type AgentUpdater
- type ArchiveStore
- type BasicEventUpdates
- type BoltstoreCommon
- type BoltstoreCore
- func (s *BoltstoreCore) Agent(ctx context.Context, id string) (*model.Agent, error)
- func (s *BoltstoreCore) AgentConfiguration(ctx context.Context, agent *model.Agent) (*model.Configuration, error)
- func (s *BoltstoreCore) AgentIndex(ctx context.Context) search.Index
- func (s *BoltstoreCore) AgentMetrics(ctx context.Context, ids []string, options ...stats.QueryOption) (stats.MetricData, error)
- func (s *BoltstoreCore) AgentRolloutUpdates(_ context.Context) eventbus.Source[RolloutEventUpdates]
- func (s *BoltstoreCore) AgentVersion(ctx context.Context, name string) (*model.AgentVersion, error)
- func (s *BoltstoreCore) AgentVersions(ctx context.Context) ([]*model.AgentVersion, error)
- func (s *BoltstoreCore) Agents(ctx context.Context, options ...QueryOption) ([]*model.Agent, error)
- func (s *BoltstoreCore) AgentsCount(ctx context.Context, options ...QueryOption) (int, error)
- func (s *BoltstoreCore) AgentsIDsMatchingConfiguration(ctx context.Context, configuration *model.Configuration) ([]string, error)
- func (s *BoltstoreCore) CleanupDisconnectedAgents(ctx context.Context, since time.Time) error
- func (s *BoltstoreCore) Configuration(ctx context.Context, name string) (*model.Configuration, error)
- func (s *BoltstoreCore) ConfigurationIndex(ctx context.Context) search.Index
- func (s *BoltstoreCore) ConfigurationMetrics(ctx context.Context, name string, options ...stats.QueryOption) (stats.MetricData, error)
- func (s *BoltstoreCore) Configurations(ctx context.Context, options ...QueryOption) ([]*model.Configuration, error)
- func (s *BoltstoreCore) Database() *bbolt.DB
- func (s *BoltstoreCore) DeleteAgentVersion(ctx context.Context, name string) (*model.AgentVersion, error)
- func (s *BoltstoreCore) DeleteConfiguration(ctx context.Context, name string) (*model.Configuration, error)
- func (s *BoltstoreCore) DeleteDestination(ctx context.Context, name string) (*model.Destination, error)
- func (s *BoltstoreCore) DeleteDestinationType(ctx context.Context, name string) (*model.DestinationType, error)
- func (s *BoltstoreCore) DeleteProcessor(ctx context.Context, name string) (*model.Processor, error)
- func (s *BoltstoreCore) DeleteProcessorType(ctx context.Context, name string) (*model.ProcessorType, error)
- func (s *BoltstoreCore) DeleteResourcesCore(ctx context.Context, resources []model.Resource) ([]model.ResourceStatus, error)
- func (s *BoltstoreCore) DeleteSource(ctx context.Context, name string) (*model.Source, error)
- func (s *BoltstoreCore) DeleteSourceType(ctx context.Context, name string) (*model.SourceType, error)
- func (s *BoltstoreCore) Destination(ctx context.Context, name string) (*model.Destination, error)
- func (s *BoltstoreCore) DestinationType(ctx context.Context, name string) (*model.DestinationType, error)
- func (s *BoltstoreCore) DestinationTypes(ctx context.Context) ([]*model.DestinationType, error)
- func (s *BoltstoreCore) Destinations(ctx context.Context) ([]*model.Destination, error)
- func (s *BoltstoreCore) DisconnectUnreportedAgents(ctx context.Context, since time.Time) error
- func (s *BoltstoreCore) FindAgentConfiguration(ctx context.Context, agent *model.Agent) (*model.Configuration, error)
- func (s *BoltstoreCore) MeasurementsSize(ctx context.Context) (int, error)
- func (s *BoltstoreCore) OverviewMetrics(ctx context.Context, options ...stats.QueryOption) (stats.MetricData, error)
- func (s *BoltstoreCore) PauseRollout(ctx context.Context, configurationName string) (*model.Configuration, error)
- func (s *BoltstoreCore) ProcessMetrics(ctx context.Context) error
- func (s *BoltstoreCore) Processor(ctx context.Context, name string) (*model.Processor, error)
- func (s *BoltstoreCore) ProcessorType(ctx context.Context, name string) (*model.ProcessorType, error)
- func (s *BoltstoreCore) ProcessorTypes(ctx context.Context) ([]*model.ProcessorType, error)
- func (s *BoltstoreCore) Processors(ctx context.Context) ([]*model.Processor, error)
- func (s *BoltstoreCore) ReportConnectedAgents(ctx context.Context, agentIDs []string, time time.Time) error
- func (s *BoltstoreCore) ResourceHistory(ctx context.Context, resourceKind model.Kind, resourceName string) ([]*model.AnyResource, error)
- func (s *BoltstoreCore) ResumeRollout(ctx context.Context, configurationName string) (*model.Configuration, error)
- func (s *BoltstoreCore) SaveAgentMetrics(ctx context.Context, metrics []*record.Metric) error
- func (s *BoltstoreCore) Source(ctx context.Context, name string) (*model.Source, error)
- func (s *BoltstoreCore) SourceType(ctx context.Context, name string) (*model.SourceType, error)
- func (s *BoltstoreCore) SourceTypes(ctx context.Context) ([]*model.SourceType, error)
- func (s *BoltstoreCore) Sources(ctx context.Context) ([]*model.Source, error)
- func (s *BoltstoreCore) StartMeasurements(ctx context.Context)
- func (s *BoltstoreCore) StartRollout(ctx context.Context, configurationName string, options *model.RolloutOptions) (*model.Configuration, error)
- func (s *BoltstoreCore) UpdateConfiguration(ctx context.Context, name string, updater ConfigurationUpdater) (config *model.Configuration, status model.UpdateStatus, err error)
- func (s *BoltstoreCore) UpdateRollout(ctx context.Context, configuration string) (updatedConfig *model.Configuration, err error)
- func (s *BoltstoreCore) UpdateRollouts(ctx context.Context) ([]*model.Configuration, error)
- func (s *BoltstoreCore) Updates(_ context.Context) eventbus.Source[BasicEventUpdates]
- func (s *BoltstoreCore) UpsertAgent(ctx context.Context, id string, updater AgentUpdater) (*model.Agent, error)
- func (s *BoltstoreCore) UpsertAgents(ctx context.Context, agentIDs []string, updater AgentUpdater) ([]*model.Agent, error)
- type BroadCastBuilder
- type ConfigurationUpdater
- type Dependency
- type DependencyError
- type DependentResources
- type Event
- type EventType
- type EventUpdates
- func (u *EventUpdates) AddAffectedConfigurations(configurations []*model.Configuration)
- func (u *EventUpdates) AddAffectedDestinations(destinations []*model.Destination)
- func (u *EventUpdates) AddAffectedProcessors(processors []*model.Processor)
- func (u *EventUpdates) AddAffectedSources(sources []*model.Source)
- func (u *EventUpdates) AffectsConfiguration(configuration *model.Configuration) bool
- func (u *EventUpdates) AffectsDestination(destination *model.Destination) bool
- func (u *EventUpdates) AffectsProcessor(processor *model.Processor) bool
- func (u *EventUpdates) AffectsResourceProcessors(processors []model.ResourceConfiguration) bool
- func (u *EventUpdates) AffectsSource(source *model.Source) bool
- func (u *EventUpdates) AgentVersions() Events[*model.AgentVersion]
- func (u *EventUpdates) Agents() Events[*model.Agent]
- func (u *EventUpdates) Configurations() Events[*model.Configuration]
- func (u *EventUpdates) CouldAffectConfigurations() bool
- func (u *EventUpdates) CouldAffectDestinations() bool
- func (u *EventUpdates) CouldAffectProcessors() bool
- func (u *EventUpdates) CouldAffectSources() bool
- func (u *EventUpdates) DestinationTypes() Events[*model.DestinationType]
- func (u *EventUpdates) Destinations() Events[*model.Destination]
- func (u *EventUpdates) Empty() bool
- func (u *EventUpdates) HasDestinationEvents() bool
- func (u *EventUpdates) HasDestinationTypeEvents() bool
- func (u *EventUpdates) HasProcessorEvents() bool
- func (u *EventUpdates) HasProcessorTypeEvents() bool
- func (u *EventUpdates) HasSourceEvents() bool
- func (u *EventUpdates) HasSourceTypeEvents() bool
- func (u *EventUpdates) IncludeAgent(agent *model.Agent, eventType EventType)
- func (u *EventUpdates) IncludeAgentVersion(agentVersion *model.AgentVersion, eventType EventType)
- func (u *EventUpdates) IncludeConfiguration(configuration *model.Configuration, eventType EventType)
- func (u *EventUpdates) IncludeDestination(destination *model.Destination, eventType EventType)
- func (u *EventUpdates) IncludeDestinationType(destinationType *model.DestinationType, eventType EventType)
- func (u *EventUpdates) IncludeProcessor(processor *model.Processor, eventType EventType)
- func (u *EventUpdates) IncludeProcessorType(processorType *model.ProcessorType, eventType EventType)
- func (u *EventUpdates) IncludeResource(r model.Resource, eventType EventType)
- func (u *EventUpdates) IncludeSource(source *model.Source, eventType EventType)
- func (u *EventUpdates) IncludeSourceType(sourceType *model.SourceType, eventType EventType)
- func (u *EventUpdates) Merge(other BasicEventUpdates) bool
- func (u *EventUpdates) ProcessorTypes() Events[*model.ProcessorType]
- func (u *EventUpdates) Processors() Events[*model.Processor]
- func (u *EventUpdates) Size() int
- func (u *EventUpdates) SourceTypes() Events[*model.SourceType]
- func (u *EventUpdates) Sources() Events[*model.Source]
- func (u *EventUpdates) TransitiveUpdates() []model.Resource
- type Events
- func (e Events[T]) ByType(eventType EventType) []Event[T]
- func (e Events[T]) CanSafelyMerge(other Events[T]) bool
- func (e Events[T]) Clone() Events[T]
- func (e Events[T]) Contains(uniqueKey string, eventType EventType) bool
- func (e Events[T]) ContainsKey(uniqueKey string) bool
- func (e Events[T]) Empty() bool
- func (e Events[T]) Include(item T, eventType EventType)
- func (e Events[T]) Keys() []string
- func (e Events[T]) Merge(other Events[T])
- func (e Events[T]) Updates() []Event[T]
- type Options
- type QueryOption
- type QueryOptions
- type RolloutEventUpdates
- type RolloutUpdateCreator
- type RolloutUpdates
- type Store
- type Updates
Constants ¶
const ( BucketResources = "Resources" BucketAgents = "Agents" BucketMeasurements = "Measurements" BucketArchive = "Archive" )
bucket names
Variables ¶
var ErrDoesNotSupportHistory = errors.New("store does not support resource history")
ErrDoesNotSupportHistory is used when a store does not implement resource history.
var ErrResourceInUse = errors.New("resource in use")
ErrResourceInUse is used in delete functions to indicate the delete could not be performed because the Resource is a dependency of another. i.e. the Source that is being deleted is being referenced in a Configuration.
var ErrStoreResourceMissing = errors.New("resource not found")
ErrStoreResourceMissing is used internally in store functions to indicate a delete could not be performed because no such resource exists. It should not ever be returned by a store function as an error
var UpdatesContextKey key
UpdatesContextKey is the context key for updates
Functions ¶
func AgentPrefix ¶
func AgentPrefix() []byte
AgentPrefix returns the prefix for agents in the store.
func ApplySortOffsetAndLimit ¶
func ApplySortOffsetAndLimit[T any](list []T, opts QueryOptions, fieldAccessor fieldAccessor[T]) []T
ApplySortOffsetAndLimit applies the sort, offset, and limit options to the list.
func CurrentRolloutsForConfiguration ¶
CurrentRolloutsForConfiguration returns a list of all rollouts that are currently in progress for the specified configuration.
func DeleteResource ¶
func DeleteResource[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string, emptyResource R) (resource R, exists bool, err error)
DeleteResource removes the resource with the given kind and uniqueKey. Returns ResourceMissingError if the resource wasn't found. Returns DependencyError if the resource is referenced by another.
emptyResource will be populated with the deleted resource. For convenience, if the delete is successful, the populated resource will also be returned. If there was an error, nil will be returned for the resource.
func DeleteResourceAndNotify ¶
func DeleteResourceAndNotify[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string, emptyResource R) (resource R, exists bool, err error)
DeleteResourceAndNotify removes the resource with the given kind and uniqueKey. Returns ResourceMissingError if the resource wasn't found.
func FindAgents ¶
FindAgents returns a list of agent IDs that match the specified key/value pair. The key must be a valid search field
func FindResource ¶
func FindResource[R model.Resource](ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, kind model.Kind, uniqueKey string) (resource R, key []byte, bucket *bbolt.Bucket, exists bool, err error)
FindResource finds a resource by kind and unique key. If the resource is versioned, the latest version is returned.
func FindSuggestions ¶
FindSuggestions returns a list of all values for the specified key that start with the specified prefix.
func GetSeedResources ¶
func GetSeedResources(ctx context.Context, files embed.FS, folders []string) ([]model.Resource, error)
GetSeedResources returns all of the resources contained in SeedFolders.
func HandleRolloutUpdates ¶
HandleRolloutUpdates is a blocking call that subscribes to the store AgentRolloutUpdates and calls UpdateRollout on all agent configurations
func InitBoltstoreDB ¶
InitBoltstoreDB takes in the full path to a storage file and returns an opened bbolt database. It will return an error if the file cannot be opened.
func IsNewConfigurationVersion ¶
func IsNewConfigurationVersion(curResource *model.AnyResource, newResource model.Resource) (bool, *model.Configuration, error)
IsNewConfigurationVersion handles the special case for configurations where we only create new versions in certain cases. if the rollout has already started (not pending) and the spec has changed, we create a new version.
func KeyFromResource ¶
KeyFromResource returns the key for a resource in the store.
func MaskSensitiveParameters ¶ added in v1.19.0
MaskSensitiveParameters masks sensitive parameter values based on the ParameterDefinitions in the ResourceType. This should be called after reading a value from the store before returning it.
func MergeUpdates ¶
func MergeUpdates(into, from BasicEventUpdates) bool
MergeUpdates merges the updates from the given Updates into the current Updates.
func NewBPCookieStore ¶
func NewBPCookieStore(secret string) *sessions.CookieStore
NewBPCookieStore creates a new CookieStore with the specified secret.
func NewDependencyError ¶
func NewDependencyError(d DependentResources) error
NewDependencyError creates a new DependencyError with the given dependencies.
func PreserveSensitiveParameters ¶ added in v1.19.0
func PreserveSensitiveParameters(ctx context.Context, updated model.Resource, existing *model.AnyResource) error
PreserveSensitiveParameters will replace sensitive parameters in the current Resource with values from the existing Resource. This should be called before writing an updated value to the store.
func Resource ¶
func Resource[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKey string) (resource R, exists bool, err error)
Resource returns a resource of the given kind and unique key.
func ResourceKey ¶
ResourceKey returns the key for a resource in the store.
func Resources ¶
func Resources[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind) ([]R, error)
Resources returns all resources of the given kind.
func ResourcesByUniqueKeys ¶
func ResourcesByUniqueKeys[R model.Resource](ctx context.Context, s BoltstoreCommon, kind model.Kind, uniqueKeys []string, opts QueryOptions) ([]R, error)
ResourcesByUniqueKeys returns the resources of the specified kind with the specified uniqueKeys. If requesting some resources results in an error, the errors will be accumulated and return with the list of resources successfully retrieved.
func ResourcesPrefix ¶
ResourcesPrefix returns the prefix for a resource kind in the store.
func RunReportConnectedAgentsTests ¶ added in v1.23.0
RunReportConnectedAgentsTests runs tests for reporting connected agents and cleaning up unreported agents
func RunTestSeedDeprecated ¶ added in v1.23.0
RunTestSeedDeprecated runs tests for the deprecated resources in the seed folder
func Seed ¶
func Seed(ctx context.Context, store Store, logger *zap.Logger, files embed.FS, folders []string) error
Seed adds bundled resources to the store
func SeedSearchIndexes ¶
SeedSearchIndexes seeds the search indexes with the current data in the store
func StartedRolloutsFromIndex ¶
StartedRolloutsFromIndex returns a list of all rollouts that are not pending.
func UpdateDependentResources ¶
func UpdateDependentResources(ctx context.Context, store Store, resources []model.Resource, statuses []model.ResourceStatus, errs error) ([]model.ResourceStatus, error)
UpdateDependentResources updates any resources that are dependent on the resources updated. The updated resources will be retrieved from the Updates and additional resources will be added to Updates as they are updated.
This function also takes the existing statuses and errors (joined using errors.Join) and returns them with any additionally modified resources and errors.
func UpdateResource ¶
func UpdateResource[R model.Resource](ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, kind model.Kind, name string, updater func(R) error) (r R, status model.UpdateStatus, err error)
UpdateResource updates a resource in the store. If the resource does not exist, model.StatusUnchanged is returned with the error ErrResourceMissing. If the resource does exist, it is updated using the updater function and the updated resource is returned.
func UpdateRolloutMetrics ¶
func UpdateRolloutMetrics(ctx context.Context, agentIndex search.Index, config *model.Configuration) ([]string, int, error)
UpdateRolloutMetrics finds the agent counts for the configuration, updates the rollout status on the configuration, and returns a slice of the agent IDs that are waiting for a rollout and the number of those agents that should be moved into the pending state.
func UpsertResource ¶
func UpsertResource(ctx context.Context, s BoltstoreCommon, tx *bbolt.Tx, r model.Resource) (model.UpdateStatus, error)
UpsertResource upserts a resource into the store. If the resource already exists, it will be updated.
Types ¶
type AgentUpdater ¶
AgentUpdater is given the current Agent model (possibly empty except for ID) and should update the Agent directly. We take this approach so that appropriate locking and/or transactions can be used for the operation as needed by the Store implementation.
type ArchiveStore ¶
type ArchiveStore interface { // ResourceHistory returns all versions of the specified resource. ResourceHistory(ctx context.Context, resourceKind model.Kind, resourceName string) ([]*model.AnyResource, error) }
ArchiveStore provides access to archived resources for version history.
type BasicEventUpdates ¶
type BasicEventUpdates interface { // Agents returns a collection of agent events. Agents() Events[*model.Agent] // AgentVersions returns a collection of agent version events. AgentVersions() Events[*model.AgentVersion] // Sources returns a collection of source events. Sources() Events[*model.Source] // SourceTypes returns a collection of source type events. SourceTypes() Events[*model.SourceType] // Processors returns a collection of processor events. Processors() Events[*model.Processor] // ProcessorTypes returns a collection of processor type events. ProcessorTypes() Events[*model.ProcessorType] // Destinations returns a collection of destination events. Destinations() Events[*model.Destination] // DestinationTypes returns a collection of destination type events. DestinationTypes() Events[*model.DestinationType] // Configurations returns a collection of configuration events. Configurations() Events[*model.Configuration] // IncludeResource will add a resource event to Updates. IncludeResource(r model.Resource, eventType EventType) // IncludeAgent will add an agent event to Updates. IncludeAgent(agent *model.Agent, eventType EventType) // CouldAffectConfigurations returns true if the updates could affect configurations. CouldAffectConfigurations() bool // CouldAffectDestinations returns true if the updates could affect destinations. CouldAffectDestinations() bool // CouldAffectProcessors returns true if the updates could affect processors. CouldAffectProcessors() bool // CouldAffectSources returns true if the updates could affect sources. CouldAffectSources() bool // AffectsDestination returns true if the updates affect the given destination. AffectsDestination(destination *model.Destination) bool // AffectsProcessor returns true if the updates affect the given processor. AffectsProcessor(processor *model.Processor) bool // AffectsSource returns true if the updates affect the given source. AffectsSource(source *model.Source) bool // AffectsConfiguration returns true if the updates affect the given configuration. AffectsConfiguration(configuration *model.Configuration) bool // AffectsResourceProcessors returns true if the updates affect any of the given resource processors. AffectsResourceProcessors(processors []model.ResourceConfiguration) bool // AddAffectedSources will add the given sources to the updates. AddAffectedSources(sources []*model.Source) // AddAffectedProcessors will add the given processors to the updates. AddAffectedProcessors(processors []*model.Processor) // AddAffectedDestinations will add the given destinations to the updates. AddAffectedDestinations(destinations []*model.Destination) // AddAffectedConfigurations will add the given configurations to the updates. AddAffectedConfigurations(configurations []*model.Configuration) // TransitiveUpdates returns a list of resources that need to have their resources updated. TransitiveUpdates() []model.Resource // Size returns the number of events in the updates. Size() int // Empty returns true if the updates are empty. Empty() bool // Merge merges another set of updates into this one, returns true // if it was able to merge any updates. Merge(other BasicEventUpdates) bool }
BasicEventUpdates is a collection of events created by a store operation.
func NewEventUpdates ¶
func NewEventUpdates() BasicEventUpdates
NewEventUpdates returns a new Updates object.
func UpdatesForContext ¶
func UpdatesForContext(ctx context.Context) (updates BasicEventUpdates, newContext context.Context, shouldNotify bool)
UpdatesForContext returns the Updates for this Context. If there is no Updates on the Context, it creates a new Updates and adds it to the Context. It returns either the existing Context or a child Context with new Updates as appropriate. It returns true if the Updates were already found on the context.
Keeping the Updates on the context allows for recursive calls to ApplyResources without creating multiple Updates which would result in multiple Updates events. Store implementations should use this function instead of NewUpdates and should only notify if shouldNotify is true.
type BoltstoreCommon ¶
type BoltstoreCommon interface { Database() *bbolt.DB AgentsBucket(ctx context.Context, tx *bbolt.Tx) (*bbolt.Bucket, error) MeasurementsBucket(ctx context.Context, tx *bbolt.Tx, metric string) (*bbolt.Bucket, error) ResourcesBucket(ctx context.Context, tx *bbolt.Tx, kind model.Kind) (*bbolt.Bucket, error) ArchiveBucket(ctx context.Context, tx *bbolt.Tx) (*bbolt.Bucket, error) ResourceKey(r model.Resource) []byte AgentsIndex(ctx context.Context) search.Index ConfigurationsIndex(ctx context.Context) search.Index ZapLogger() *zap.Logger Notify(ctx context.Context, updates BasicEventUpdates) CreateEventUpdate() BasicEventUpdates }
BoltstoreCommon is an interface for common implementation details between different boltstore implementations
type BoltstoreCore ¶
type BoltstoreCore struct { StoreUpdates *Updates DB *bbolt.DB Logger *zap.Logger SessionStorage sessions.Store sync.RWMutex BoltstoreCommon }
BoltstoreCore is an implementation of the store interface that uses BoltDB as the underlying storage mechanism
func (*BoltstoreCore) AgentConfiguration ¶
func (s *BoltstoreCore) AgentConfiguration(ctx context.Context, agent *model.Agent) (*model.Configuration, error)
AgentConfiguration returns the configuration that should be applied to an agent.
func (*BoltstoreCore) AgentIndex ¶
func (s *BoltstoreCore) AgentIndex(ctx context.Context) search.Index
AgentIndex provides access to the search Index implementation managed by the Store
func (*BoltstoreCore) AgentMetrics ¶
func (s *BoltstoreCore) AgentMetrics(ctx context.Context, ids []string, options ...stats.QueryOption) (stats.MetricData, error)
AgentMetrics provides metrics for an individual agents. They are essentially configuration metrics filtered to a list of agents.
Note: While the same record.Metric struct is used to return the metrics, these are not the same metrics provided to Store. They will be aggregated and counter metrics will be converted into rates.
func (*BoltstoreCore) AgentRolloutUpdates ¶
func (s *BoltstoreCore) AgentRolloutUpdates(_ context.Context) eventbus.Source[RolloutEventUpdates]
AgentRolloutUpdates will receive agent update events that are meant to be processed for purpose of rollouts.
func (*BoltstoreCore) AgentVersion ¶
func (s *BoltstoreCore) AgentVersion(ctx context.Context, name string) (*model.AgentVersion, error)
AgentVersion returns the agent version with the given name.
func (*BoltstoreCore) AgentVersions ¶
func (s *BoltstoreCore) AgentVersions(ctx context.Context) ([]*model.AgentVersion, error)
AgentVersions returns all agent versions in the store with the given options.
func (*BoltstoreCore) Agents ¶
func (s *BoltstoreCore) Agents(ctx context.Context, options ...QueryOption) ([]*model.Agent, error)
Agents returns all agents in the store with the given options.
func (*BoltstoreCore) AgentsCount ¶
func (s *BoltstoreCore) AgentsCount(ctx context.Context, options ...QueryOption) (int, error)
AgentsCount returns the number of agents in the store with the given options.
func (*BoltstoreCore) AgentsIDsMatchingConfiguration ¶
func (s *BoltstoreCore) AgentsIDsMatchingConfiguration(ctx context.Context, configuration *model.Configuration) ([]string, error)
AgentsIDsMatchingConfiguration returns the list of agent IDs that are using the specified configuration
func (*BoltstoreCore) CleanupDisconnectedAgents ¶
CleanupDisconnectedAgents removes all containerized agents that have been disconnected since the given time
func (*BoltstoreCore) Configuration ¶
func (s *BoltstoreCore) Configuration(ctx context.Context, name string) (*model.Configuration, error)
Configuration returns the configuration with the given name.
func (*BoltstoreCore) ConfigurationIndex ¶
func (s *BoltstoreCore) ConfigurationIndex(ctx context.Context) search.Index
ConfigurationIndex provides access to the search Index for Configurations
func (*BoltstoreCore) ConfigurationMetrics ¶
func (s *BoltstoreCore) ConfigurationMetrics(ctx context.Context, name string, options ...stats.QueryOption) (stats.MetricData, error)
ConfigurationMetrics provides all metrics associated with a configuration aggregated from all agents using the configuration.
Note: While the same record.Metric struct is used to return the metrics, these are not the same metrics provided to Store. They will be aggregated and counter metrics will be converted into rates.
func (*BoltstoreCore) Configurations ¶
func (s *BoltstoreCore) Configurations(ctx context.Context, options ...QueryOption) ([]*model.Configuration, error)
Configurations returns the configurations in the store with the given options.
func (*BoltstoreCore) Database ¶
func (s *BoltstoreCore) Database() *bbolt.DB
Database returns the underlying bbolt database
func (*BoltstoreCore) DeleteAgentVersion ¶
func (s *BoltstoreCore) DeleteAgentVersion(ctx context.Context, name string) (*model.AgentVersion, error)
DeleteAgentVersion deletes the agent version with the given name.
func (*BoltstoreCore) DeleteConfiguration ¶
func (s *BoltstoreCore) DeleteConfiguration(ctx context.Context, name string) (*model.Configuration, error)
DeleteConfiguration deletes the configuration with the given name.
func (*BoltstoreCore) DeleteDestination ¶
func (s *BoltstoreCore) DeleteDestination(ctx context.Context, name string) (*model.Destination, error)
DeleteDestination deletes the destination with the given name.
func (*BoltstoreCore) DeleteDestinationType ¶
func (s *BoltstoreCore) DeleteDestinationType(ctx context.Context, name string) (*model.DestinationType, error)
DeleteDestinationType deletes the destination type with the given name.
func (*BoltstoreCore) DeleteProcessor ¶
DeleteProcessor deletes the processor with the given name.
func (*BoltstoreCore) DeleteProcessorType ¶
func (s *BoltstoreCore) DeleteProcessorType(ctx context.Context, name string) (*model.ProcessorType, error)
DeleteProcessorType deletes the processor type with the given name.
func (*BoltstoreCore) DeleteResourcesCore ¶
func (s *BoltstoreCore) DeleteResourcesCore(ctx context.Context, resources []model.Resource) ([]model.ResourceStatus, error)
DeleteResourcesCore iterates threw a slice of resources, and removes them from storage by name.
func (*BoltstoreCore) DeleteSource ¶
DeleteSource deletes the source with the given name.
func (*BoltstoreCore) DeleteSourceType ¶
func (s *BoltstoreCore) DeleteSourceType(ctx context.Context, name string) (*model.SourceType, error)
DeleteSourceType deletes the source type with the given name.
func (*BoltstoreCore) Destination ¶
func (s *BoltstoreCore) Destination(ctx context.Context, name string) (*model.Destination, error)
Destination returns the destination with the given name.
func (*BoltstoreCore) DestinationType ¶
func (s *BoltstoreCore) DestinationType(ctx context.Context, name string) (*model.DestinationType, error)
DestinationType returns the destination type with the given name.
func (*BoltstoreCore) DestinationTypes ¶
func (s *BoltstoreCore) DestinationTypes(ctx context.Context) ([]*model.DestinationType, error)
DestinationTypes returns the destination types in the store.
func (*BoltstoreCore) Destinations ¶
func (s *BoltstoreCore) Destinations(ctx context.Context) ([]*model.Destination, error)
Destinations returns the destinations in the store.
func (*BoltstoreCore) DisconnectUnreportedAgents ¶ added in v1.23.0
DisconnectUnreportedAgents sets the Status of agents to Disconnected if the agent ReportedAt time is before the specified time.
func (*BoltstoreCore) FindAgentConfiguration ¶
func (s *BoltstoreCore) FindAgentConfiguration(ctx context.Context, agent *model.Agent) (*model.Configuration, error)
FindAgentConfiguration uses label matching to find the appropriate configuration for this agent. If a configuration is found that does not match the Current configuration, the configuration will be assigned to Future.
func (*BoltstoreCore) MeasurementsSize ¶
func (s *BoltstoreCore) MeasurementsSize(ctx context.Context) (int, error)
MeasurementsSize returns the count of keys in the store, and is used only for testing
func (*BoltstoreCore) OverviewMetrics ¶
func (s *BoltstoreCore) OverviewMetrics(ctx context.Context, options ...stats.QueryOption) (stats.MetricData, error)
OverviewMetrics provides all metrics needed for the overview page. This page shows configs and destinations.
func (*BoltstoreCore) PauseRollout ¶
func (s *BoltstoreCore) PauseRollout(ctx context.Context, configurationName string) (*model.Configuration, error)
PauseRollout will pause a rollout for the specified configuration. Does nothing if the rollout does not have a RolloutStatusStarted status. Returns the current Configuration with its Rollout status.
func (*BoltstoreCore) ProcessMetrics ¶
func (s *BoltstoreCore) ProcessMetrics(ctx context.Context) error
ProcessMetrics is called in the background at regular intervals and performs metric roll-up and removes old data
func (*BoltstoreCore) ProcessorType ¶
func (s *BoltstoreCore) ProcessorType(ctx context.Context, name string) (*model.ProcessorType, error)
ProcessorType returns the processor type with the given name.
func (*BoltstoreCore) ProcessorTypes ¶
func (s *BoltstoreCore) ProcessorTypes(ctx context.Context) ([]*model.ProcessorType, error)
ProcessorTypes returns the processor types in the store.
func (*BoltstoreCore) Processors ¶
Processors returns the processors in the store.
func (*BoltstoreCore) ReportConnectedAgents ¶ added in v1.23.0
func (s *BoltstoreCore) ReportConnectedAgents(ctx context.Context, agentIDs []string, time time.Time) error
ReportConnectedAgents sets the ReportedAt time for the specified agents to the specified time. This update should not fire an update event for the agents on the Updates eventbus.
func (*BoltstoreCore) ResourceHistory ¶
func (s *BoltstoreCore) ResourceHistory(ctx context.Context, resourceKind model.Kind, resourceName string) ([]*model.AnyResource, error)
ResourceHistory returns the history of a resource given its kind and name.
func (*BoltstoreCore) ResumeRollout ¶
func (s *BoltstoreCore) ResumeRollout(ctx context.Context, configurationName string) (*model.Configuration, error)
ResumeRollout will resume a rollout for the specified configuration. Does nothing if the Rollout status is not RolloutStatusStarted or RolloutStatusStarted. For RolloutStatusError - it will increase the maxErrors of the rollout by the current number of errors + 1. For RolloutStatusStarted - it will pause the rollout.
func (*BoltstoreCore) SaveAgentMetrics ¶
SaveAgentMetrics saves new metrics. These metrics will be aggregated to determine metrics associated with agents and configurations.
func (*BoltstoreCore) SourceType ¶
func (s *BoltstoreCore) SourceType(ctx context.Context, name string) (*model.SourceType, error)
SourceType returns the source type with the given name.
func (*BoltstoreCore) SourceTypes ¶
func (s *BoltstoreCore) SourceTypes(ctx context.Context) ([]*model.SourceType, error)
SourceTypes returns the source types in the store.
func (*BoltstoreCore) StartMeasurements ¶
func (s *BoltstoreCore) StartMeasurements(ctx context.Context)
StartMeasurements starts the background process for writing and rolling up metrics
func (*BoltstoreCore) StartRollout ¶
func (s *BoltstoreCore) StartRollout(ctx context.Context, configurationName string, options *model.RolloutOptions) (*model.Configuration, error)
StartRollout will start a rollout for the specified configuration with the specified options. If nil is passed for options, any existing rollout options on the configuration status will be used. If there are no rollout options in the configuration status, default values will be used for the rollout. If there is an existing rollout a different version of this configuration, it will be replaced. Does nothing if the rollout does not have a RolloutStatusPending status. Returns the current Configuration with its Rollout status.
func (*BoltstoreCore) UpdateConfiguration ¶
func (s *BoltstoreCore) UpdateConfiguration(ctx context.Context, name string, updater ConfigurationUpdater) (config *model.Configuration, status model.UpdateStatus, err error)
UpdateConfiguration updates the configuration with the given name using the updater function.
func (*BoltstoreCore) UpdateRollout ¶
func (s *BoltstoreCore) UpdateRollout(ctx context.Context, configuration string) (updatedConfig *model.Configuration, err error)
UpdateRollout updates a rollout in progress. Does nothing if the rollout does not have a RolloutStatusStarted status. Returns the current Configuration with its Rollout status.
func (*BoltstoreCore) UpdateRollouts ¶
func (s *BoltstoreCore) UpdateRollouts(ctx context.Context) ([]*model.Configuration, error)
UpdateRollouts updates all rollouts in progress. It returns each of the Configurations that contains an active rollout.
func (*BoltstoreCore) Updates ¶
func (s *BoltstoreCore) Updates(_ context.Context) eventbus.Source[BasicEventUpdates]
Updates returns a channel that will receive updates when resources are added, updated, or deleted.
func (*BoltstoreCore) UpsertAgent ¶
func (s *BoltstoreCore) UpsertAgent(ctx context.Context, id string, updater AgentUpdater) (*model.Agent, error)
UpsertAgent creates or updates the given agent and calls the updater method on it.
func (*BoltstoreCore) UpsertAgents ¶
func (s *BoltstoreCore) UpsertAgents(ctx context.Context, agentIDs []string, updater AgentUpdater) ([]*model.Agent, error)
UpsertAgents upserts the agents with the given IDs. If the agent does not exist, it will be created.
type BroadCastBuilder ¶
type BroadCastBuilder[T any] func(ctx context.Context, options Options, logger *zap.Logger, maxEventsToMerge int) broadcast.Broadcast[T]
BroadCastBuilder is a function that builds a broadcast.Broadcast[BasicUpdates] using routing and broadcast options.
func BuildBasicEventBroadcast ¶
func BuildBasicEventBroadcast() BroadCastBuilder[BasicEventUpdates]
BuildBasicEventBroadcast returns a BroadCastBuilder that builds a broadcast.Broadcast[BasicUpdates] using routing and broadcast options for oss.
func BuildRolloutEventBroadcast ¶
func BuildRolloutEventBroadcast() BroadCastBuilder[RolloutEventUpdates]
BuildRolloutEventBroadcast returns a BroadCastBuilder that builds a broadcast.Broadcast[RolloutEventUpdates] using routing and broadcast options for oss.
type ConfigurationUpdater ¶
type ConfigurationUpdater func(current *model.Configuration)
ConfigurationUpdater is given the current Configuration model and should update the Configuration directly.
type Dependency ¶
Dependency is a single item in DependentResources.
type DependencyError ¶
type DependencyError struct {
Dependencies DependentResources
DependencyError is returned when trying to delete a resource that is being referenced by other resources.
func (*DependencyError) Error ¶
func (de *DependencyError) Error() string
type DependentResources ¶
type DependentResources []Dependency
DependentResources is the return type of store.dependentResources and used to construct DependencyError. It has help methods empty(), message(), and add().
func FindDependentResources ¶
func FindDependentResources(ctx context.Context, configurationIndex search.Index, name string, kind model.Kind) (DependentResources, error)
FindDependentResources finds the dependent resources using the ConfigurationIndex provided by the Store.
func (*DependentResources) Add ¶
func (r *DependentResources) Add(d Dependency)
Add adds a new dependency to the list.
func (*DependentResources) Empty ¶
func (r *DependentResources) Empty() bool
Empty returns true if the list of dependencies is empty.
func (*DependentResources) Message ¶
func (r *DependentResources) Message() string
Message returns a string representation of the list of dependencies.
type Event ¶
type Event[T model.HasUniqueKey] struct { Type EventType `json:"type"` Item T `json:"item"` }
Event represents an insert, update, or remove of something stored in Store.
type EventType ¶
type EventType uint8
EventType is the type of event
const ( EventTypeInsert EventType = 1 EventTypeUpdate EventType = 2 EventTypeRemove EventType = 3 EventTypeLabel EventType = 4 EventTypeRollout EventType = 5 )
Insert, Update, Remove, and Label are the possible changes to a resource. Remove can indicate that the resource has been deleted or that it no longer matches a filtered list of resources being observed.
Label is currently used to indicate that Agent labels have changed but there may also be other updates so it can be considered a subset of EventTypeUpdate where every EventTypeLabel is also an EventTypeUpdate.
type EventUpdates ¶
type EventUpdates struct { AgentsField Events[*model.Agent] `json:"agents"` AgentVersionsField Events[*model.AgentVersion] `json:"agentVersions"` SourcesField Events[*model.Source] `json:"sources"` SourceTypesField Events[*model.SourceType] `json:"sourceTypes"` ProcessorsField Events[*model.Processor] `json:"processors"` ProcessorTypesField Events[*model.ProcessorType] `json:"processorTypes"` DestinationsField Events[*model.Destination] `json:"destinations"` DestinationTypesField Events[*model.DestinationType] `json:"destinationTypes"` ConfigurationsField Events[*model.Configuration] `json:"configurations"` // contains filtered or unexported fields }
EventUpdates is a collection of events created by a store operation.
func (*EventUpdates) AddAffectedConfigurations ¶
func (u *EventUpdates) AddAffectedConfigurations(configurations []*model.Configuration)
AddAffectedConfigurations will add updates for Configurations that are affected by other resource updates.
func (*EventUpdates) AddAffectedDestinations ¶
func (u *EventUpdates) AddAffectedDestinations(destinations []*model.Destination)
AddAffectedDestinations will add updates for Destinations that are affected by other resource updates.
func (*EventUpdates) AddAffectedProcessors ¶
func (u *EventUpdates) AddAffectedProcessors(processors []*model.Processor)
AddAffectedProcessors will add updates for Processors that are affected by other resource updates.
func (*EventUpdates) AddAffectedSources ¶
func (u *EventUpdates) AddAffectedSources(sources []*model.Source)
AddAffectedSources will add updates for Sources that are affected by other resource updates.
func (*EventUpdates) AffectsConfiguration ¶
func (u *EventUpdates) AffectsConfiguration(configuration *model.Configuration) bool
AffectsConfiguration returns true if the updates affect the given configuration.
func (*EventUpdates) AffectsDestination ¶
func (u *EventUpdates) AffectsDestination(destination *model.Destination) bool
AffectsDestination returns true if the updates affect the given destination.
func (*EventUpdates) AffectsProcessor ¶
func (u *EventUpdates) AffectsProcessor(processor *model.Processor) bool
AffectsProcessor returns true if the updates affect the given processor.
func (*EventUpdates) AffectsResourceProcessors ¶
func (u *EventUpdates) AffectsResourceProcessors(processors []model.ResourceConfiguration) bool
AffectsResourceProcessors returns true if the updates affect any of the given resource processors.
func (*EventUpdates) AffectsSource ¶
func (u *EventUpdates) AffectsSource(source *model.Source) bool
AffectsSource returns true if the updates affect the given source.
func (*EventUpdates) AgentVersions ¶
func (u *EventUpdates) AgentVersions() Events[*model.AgentVersion]
AgentVersions returns a collection of agent version events.
func (*EventUpdates) Agents ¶
func (u *EventUpdates) Agents() Events[*model.Agent]
Agents returns a collection of agent events.
func (*EventUpdates) Configurations ¶
func (u *EventUpdates) Configurations() Events[*model.Configuration]
Configurations returns a collection of configuration events.
func (*EventUpdates) CouldAffectConfigurations ¶
func (u *EventUpdates) CouldAffectConfigurations() bool
CouldAffectConfigurations returns true if the updates could affect configurations.
func (*EventUpdates) CouldAffectDestinations ¶
func (u *EventUpdates) CouldAffectDestinations() bool
CouldAffectDestinations returns true if the updates could affect destinations.
func (*EventUpdates) CouldAffectProcessors ¶
func (u *EventUpdates) CouldAffectProcessors() bool
CouldAffectProcessors returns true if the updates could affect processors.
func (*EventUpdates) CouldAffectSources ¶
func (u *EventUpdates) CouldAffectSources() bool
CouldAffectSources returns true if the updates could affect sources.
func (*EventUpdates) DestinationTypes ¶
func (u *EventUpdates) DestinationTypes() Events[*model.DestinationType]
DestinationTypes returns a collection of destination type events.
func (*EventUpdates) Destinations ¶
func (u *EventUpdates) Destinations() Events[*model.Destination]
Destinations returns a collection of destination events.
func (*EventUpdates) Empty ¶
func (u *EventUpdates) Empty() bool
Empty returns true if no events exist.
func (*EventUpdates) HasDestinationEvents ¶
func (u *EventUpdates) HasDestinationEvents() bool
HasDestinationEvents returns true if any destination events exist.
func (*EventUpdates) HasDestinationTypeEvents ¶
func (u *EventUpdates) HasDestinationTypeEvents() bool
HasDestinationTypeEvents returns true if any destination type events exist.
func (*EventUpdates) HasProcessorEvents ¶
func (u *EventUpdates) HasProcessorEvents() bool
HasProcessorEvents returns true if any processor events exist.
func (*EventUpdates) HasProcessorTypeEvents ¶
func (u *EventUpdates) HasProcessorTypeEvents() bool
HasProcessorTypeEvents returns true if any processor type events exist.
func (*EventUpdates) HasSourceEvents ¶
func (u *EventUpdates) HasSourceEvents() bool
HasSourceEvents returns true if any source events exist.
func (*EventUpdates) HasSourceTypeEvents ¶
func (u *EventUpdates) HasSourceTypeEvents() bool
HasSourceTypeEvents returns true if any source type events exist.
func (*EventUpdates) IncludeAgent ¶
func (u *EventUpdates) IncludeAgent(agent *model.Agent, eventType EventType)
IncludeAgent will add an agent event to Updates.
func (*EventUpdates) IncludeAgentVersion ¶
func (u *EventUpdates) IncludeAgentVersion(agentVersion *model.AgentVersion, eventType EventType)
IncludeAgentVersion will add an agent version event to Updates.
func (*EventUpdates) IncludeConfiguration ¶
func (u *EventUpdates) IncludeConfiguration(configuration *model.Configuration, eventType EventType)
IncludeConfiguration will add a configuration event to Updates.
func (*EventUpdates) IncludeDestination ¶
func (u *EventUpdates) IncludeDestination(destination *model.Destination, eventType EventType)
IncludeDestination will add a destination event to Updates.
func (*EventUpdates) IncludeDestinationType ¶
func (u *EventUpdates) IncludeDestinationType(destinationType *model.DestinationType, eventType EventType)
IncludeDestinationType will add a destination type event to Updates.
func (*EventUpdates) IncludeProcessor ¶
func (u *EventUpdates) IncludeProcessor(processor *model.Processor, eventType EventType)
IncludeProcessor will add a processor event to Updates.
func (*EventUpdates) IncludeProcessorType ¶
func (u *EventUpdates) IncludeProcessorType(processorType *model.ProcessorType, eventType EventType)
IncludeProcessorType will add a processor type event to Updates.
func (*EventUpdates) IncludeResource ¶
func (u *EventUpdates) IncludeResource(r model.Resource, eventType EventType)
IncludeResource will add a resource event to Updates. If the resource is not supported by Updates, this will do nothing.
func (*EventUpdates) IncludeSource ¶
func (u *EventUpdates) IncludeSource(source *model.Source, eventType EventType)
IncludeSource will add a source event to Updates.
func (*EventUpdates) IncludeSourceType ¶
func (u *EventUpdates) IncludeSourceType(sourceType *model.SourceType, eventType EventType)
IncludeSourceType will add a source type event to Updates.
func (*EventUpdates) Merge ¶
func (u *EventUpdates) Merge(other BasicEventUpdates) bool
Merge merges another set of updates into this one, returns true if it was able to merge any updates.
func (*EventUpdates) ProcessorTypes ¶
func (u *EventUpdates) ProcessorTypes() Events[*model.ProcessorType]
ProcessorTypes returns a collection of processor type events.
func (*EventUpdates) Processors ¶
func (u *EventUpdates) Processors() Events[*model.Processor]
Processors returns a collection of processor events.
func (*EventUpdates) SourceTypes ¶
func (u *EventUpdates) SourceTypes() Events[*model.SourceType]
SourceTypes returns a collection of source type events.
func (*EventUpdates) Sources ¶
func (u *EventUpdates) Sources() Events[*model.Source]
Sources returns a collection of source events.
func (*EventUpdates) TransitiveUpdates ¶
func (u *EventUpdates) TransitiveUpdates() []model.Resource
TransitiveUpdates returns a list of resources that need to have their resources updated.
type Events ¶
type Events[T model.HasUniqueKey] map[string]Event[T]
Events is a map of ID to Event
func NewEvents ¶
func NewEvents[T model.HasUniqueKey]() Events[T]
NewEvents returns a new empty set of events
func NewEventsWithItem ¶
func NewEventsWithItem[T model.HasUniqueKey](item T, eventType EventType) Events[T]
NewEventsWithItem returns a new set of events with an initial item and eventType
func (Events[T]) CanSafelyMerge ¶
CanSafelyMerge returns true if the other events can be merged into this one. Currently we only merge events with different keys.
func (Events[T]) ContainsKey ¶
ContainsKey returns true if the key already exists with any EventType
func (Events[T]) Merge ¶
Merge will add events from the other events. Note that this will currently overwrite any existing events. Use CanSafelyMerge first to determine if a Merge can be done without overwriting and losing events.
func (Events[T]) Updates ¶
Updates returns all events of type EventTypeUpdate
type Options ¶
type Options struct { // SessionsSecret is used to encode sessions SessionsSecret string // MaxEventsToMerge is the maximum number of update events (inserts, updates, deletes, etc) to merge into a single // event. MaxEventsToMerge int // DisableMeasurementsCleanup indicates that the store should not clean up measurements. This is useful for testing. DisableMeasurementsCleanup bool // DisableRolloutUpdater indicates that the store should not update rollouts. This is useful for testing. DisableRolloutUpdater bool }
Options are options that are common to all store implementations
type QueryOption ¶
type QueryOption func(*QueryOptions)
QueryOption is an option used in Store queries
func WithLimit ¶
func WithLimit(limit int) QueryOption
WithLimit sets the maximum number of results to return. For paging, if the pages have 10 items per page, set the limit to 10.
func WithOffset ¶
func WithOffset(offset int) QueryOption
WithOffset sets the offset for the results to return. For paging, if the pages have 10 items per page and this is the 3rd page, set the offset to 20.
func WithQuery ¶
func WithQuery(query *search.Query) QueryOption
WithQuery adds a search query string to the query options
func WithSelector ¶
func WithSelector(selector model.Selector) QueryOption
WithSelector adds a selector to the query options
func WithSort ¶
func WithSort(field string) QueryOption
WithSort sets the sort order for the request. The sort value is the name of the field, sorted ascending. To sort descending, prefix the field with a minus sign (-). Some Stores only allow sorting by certain fields. Sort values not supported will be ignored.
type QueryOptions ¶
type QueryOptions struct { Selector model.Selector Query *search.Query Offset int Limit int Sort string }
QueryOptions represents the set of options available for a store query
func MakeQueryOptions ¶
func MakeQueryOptions(options []QueryOption) QueryOptions
MakeQueryOptions creates a queryOptions struct from a slice of QueryOption
type RolloutEventUpdates ¶
type RolloutEventUpdates interface { // Updates retrieves the agent updates Updates() Events[*model.ConfigurationVersions] // Empty returns true if the updates are empty. Empty() bool // Merge merges another set of updates into this one, returns true // if it was able to merge any updates. Merge(other RolloutEventUpdates) bool }
RolloutEventUpdates is a collection of rollout related events
func NewRolloutUpdates ¶
NewRolloutUpdates creates a new RolloutUpdates
type RolloutUpdateCreator ¶
RolloutUpdateCreator is a function that creates a RolloutEventUpdates from a set of agent events.
type RolloutUpdates ¶
type RolloutUpdates struct {
UpdatesField Events[*model.ConfigurationVersions] `json:"updates"`
RolloutUpdates is a basic implementation of the RolloutEventUpdates interface
func (*RolloutUpdates) Empty ¶
func (r *RolloutUpdates) Empty() bool
Empty returns true if the updates are empty.
func (*RolloutUpdates) Merge ¶
func (r *RolloutUpdates) Merge(other RolloutEventUpdates) bool
Merge merges another set of updates into this one, returns true if it was able to merge any updates.
func (*RolloutUpdates) Updates ¶
func (r *RolloutUpdates) Updates() Events[*model.ConfigurationVersions]
Updates retrieves the agent updates
type Store ¶
type Store interface { Clear() Close() error Agent(ctx context.Context, id string) (*model.Agent, error) Agents(ctx context.Context, options ...QueryOption) ([]*model.Agent, error) AgentsCount(context.Context, ...QueryOption) (int, error) // UpsertAgent adds a new Agent to the Store or updates an existing one UpsertAgent(ctx context.Context, agentID string, updater AgentUpdater) (*model.Agent, error) UpsertAgents(ctx context.Context, agentIDs []string, updater AgentUpdater) ([]*model.Agent, error) DeleteAgents(ctx context.Context, agentIDs []string) ([]*model.Agent, error) AgentVersion(ctx context.Context, name string) (*model.AgentVersion, error) AgentVersions(ctx context.Context) ([]*model.AgentVersion, error) DeleteAgentVersion(ctx context.Context, name string) (*model.AgentVersion, error) Configurations(ctx context.Context, options ...QueryOption) ([]*model.Configuration, error) // Configuration returns a configuration by name. // // The name can optionally include a specific version, e.g. name:version. // // name:latest will return the latest version // name:current will return the current version that was applied to agents // // If the name does not include a version, the current version will be returned. // If the name does not exist, nil will be returned with no error. Configuration(ctx context.Context, name string) (*model.Configuration, error) UpdateConfiguration(ctx context.Context, name string, updater ConfigurationUpdater) (config *model.Configuration, status model.UpdateStatus, err error) DeleteConfiguration(ctx context.Context, name string) (*model.Configuration, error) Source(ctx context.Context, name string) (*model.Source, error) Sources(ctx context.Context) ([]*model.Source, error) DeleteSource(ctx context.Context, name string) (*model.Source, error) SourceType(ctx context.Context, name string) (*model.SourceType, error) SourceTypes(ctx context.Context) ([]*model.SourceType, error) DeleteSourceType(ctx context.Context, name string) (*model.SourceType, error) Processor(ctx context.Context, name string) (*model.Processor, error) Processors(ctx context.Context) ([]*model.Processor, error) DeleteProcessor(ctx context.Context, name string) (*model.Processor, error) ProcessorType(ctx context.Context, name string) (*model.ProcessorType, error) ProcessorTypes(ctx context.Context) ([]*model.ProcessorType, error) DeleteProcessorType(ctx context.Context, name string) (*model.ProcessorType, error) Destination(ctx context.Context, name string) (*model.Destination, error) Destinations(ctx context.Context) ([]*model.Destination, error) DeleteDestination(ctx context.Context, name string) (*model.Destination, error) DestinationType(ctx context.Context, name string) (*model.DestinationType, error) DestinationTypes(ctx context.Context) ([]*model.DestinationType, error) DeleteDestinationType(ctx context.Context, name string) (*model.DestinationType, error) // ApplyResources inserts or updates the specified resources. The resulting status of each resource is returned. The // resource may be modified as a result of the operation. If the caller needs to preserve the original resource, // model.Clone can be used to create a copy. ApplyResources(ctx context.Context, resources []model.Resource) ([]model.ResourceStatus, error) // Batch delete of a slice of resources, returns the successfully deleted resources or an error. DeleteResources(ctx context.Context, resources []model.Resource) ([]model.ResourceStatus, error) // AgentConfiguration returns the configuration that should be applied to an agent. // // It returns an error if the agent is nil. Returns the pending configuration if a rollout is in progress. Returns the // current configuration if there is one associated with the agent. Returns the configuration corresponding to the // configuration= label if the label exists on the agent. Returns the configuration matching the agent's labels if one // exists. Returns nil if there are no matches. If the agent does not have an associated configuration or the // associated configuration does not exist, no error is returned but the configuration will be nil. If there is an // error accessing the backend store, the error will be returned. AgentConfiguration(ctx context.Context, agent *model.Agent) (*model.Configuration, error) // AgentsIDsMatchingConfiguration returns the list of agent IDs that are using the specified configuration AgentsIDsMatchingConfiguration(ctx context.Context, conf *model.Configuration) ([]string, error) // ReportConnectedAgents sets the ReportedAt time for the specified agents to the specified time. This update should // not fire an update event for the agents on the Updates eventbus. ReportConnectedAgents(ctx context.Context, agentIDs []string, time time.Time) error // DisconnectUnreportedAgents sets the Status of agents to Disconnected if the agent ReportedAt time is before the // specified time. DisconnectUnreportedAgents(ctx context.Context, since time.Time) error // CleanupDisconnectedAgents removes agents that have disconnected before the specified time CleanupDisconnectedAgents(ctx context.Context, since time.Time) error // Updates will receive pipelines and configurations that have been updated or deleted, either because the // configuration changed or a component in them was updated. Agents inserted/updated from UpsertAgent and agents // removed from CleanupDisconnectedAgents are also sent with Updates. Updates(ctx context.Context) eventbus.Source[BasicEventUpdates] // AgentRolloutUpdates will receive agent update events that are meant to be processed for purpose of rollouts. AgentRolloutUpdates(ctx context.Context) eventbus.Source[RolloutEventUpdates] // AgentIndex provides access to the search AgentIndex implementation managed by the Store AgentIndex(ctx context.Context) search.Index // ConfigurationIndex provides access to the search Index for Configurations ConfigurationIndex(ctx context.Context) search.Index // UserSessions must implement the gorilla sessions.Store interface UserSessions() sessions.Store // Measurements stores stats for agents and configurations Measurements() stats.Measurements // StartRollout will start a rollout for the specified configuration with the specified options. If nil is passed for // options, any existing rollout options on the configuration status will be used. If there are no rollout options in // the configuration status, default values will be used for the rollout. If there is an existing rollout a different // version of this configuration, it will be replaced. Does nothing if the rollout does not have a // RolloutStatusPending status. Returns the current Configuration with its Rollout status. StartRollout(ctx context.Context, configurationName string, options *model.RolloutOptions) (*model.Configuration, error) // ResumeRollout will resume a rollout for the specified configuration. // Does nothing if the Rollout status is not RolloutStatusStarted or RolloutStatusStarted. // For RolloutStatusError - it will increase the maxErrors of the // rollout by the current number of errors + 1. // For RolloutStatusStarted - it will pause the rollout. PauseRollout(ctx context.Context, configurationName string) (*model.Configuration, error) // ResumeRollout will resume a rollout for the specified configuration. // Does nothing if the Rollout status is not RolloutStatusStarted or RolloutStatusStarted. // For RolloutStatusError - it will increase the maxErrors of the // rollout by the current number of errors + 1. // For RolloutStatusStarted - it will pause the rollout. ResumeRollout(ctx context.Context, configurationName string) (*model.Configuration, error) // UpdateRollout updates a rollout in progress. Does nothing if the rollout does not have a RolloutStatusStarted // status. Returns the current Configuration with its Rollout status. UpdateRollout(ctx context.Context, configurationName string) (*model.Configuration, error) // UpdatesRollouts updates all rollouts in progress. It returns each of the Configurations that contains an active // rollout. It is possible for this to partially succeed and return some updated rollouts and an error containing // errors from updating some other rollouts. UpdateRollouts(ctx context.Context) ([]*model.Configuration, error) // UpdateAllRollouts updates all active rollouts. The error returned is not intended to be returned to a client but can // be logged. UpdateAllRollouts(ctx context.Context) error ArchiveStore }
Store handles interacting with a storage backend,
func NewBoltStore ¶
NewBoltStore returns a new store boltstore struct that implements the store.Store interface.
type Updates ¶
type Updates struct {
// contains filtered or unexported fields
Updates is a wrapped event bus for store updates.
func NewUpdates ¶
func NewUpdates(ctx context.Context, options Options, logger *zap.Logger, basicEventBroadcaster BroadCastBuilder[BasicEventUpdates], rolloutBroadcaster BroadCastBuilder[RolloutEventUpdates], rolloutUpdateCreator RolloutUpdateCreator, ) *Updates
NewUpdates creates a new UpdatesEventBus.
func (*Updates) RolloutUpdates ¶
func (s *Updates) RolloutUpdates() eventbus.Source[RolloutEventUpdates]
RolloutUpdates returns the external channel that can be provided to external clients
func (*Updates) Send ¶
func (s *Updates) Send(ctx context.Context, updates BasicEventUpdates)
Send adds an Updates event to the internal channel where it can be merged and relayed to the external channel.
Source Files
Path | Synopsis |
Package search provides a search engine with indexing and suggestions for the store
Package search provides a search engine with indexing and suggestions for the store |
Package stats provides the store for measurements associated with agents and configurations.
Package stats provides the store for measurements associated with agents and configurations. |
Package storetest has helper functions for setting up storage for tests
Package storetest has helper functions for setting up storage for tests |