Documentation ¶
Index ¶
- Constants
- Variables
- func DecIndexID(key, val []byte) ([]byte, interface{}, error)
- func DecodeOrgNameKey(k []byte) (platform.ID, string, error)
- func EncBodyJSON(ent Entity) ([]byte, string, error)
- func EncIDKey(ent Entity) ([]byte, string, error)
- func EncUniqKey(ent Entity) ([]byte, string, error)
- func IndexKey(foreignKey, primaryKey []byte) (newKey []byte, err error)
- func IsErrUnexpectedDecodeVal(ok bool) error
- func IsNotFound(err error) bool
- func UnexpectedIndexError(err error) *errors.Error
- func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error)
- func WithIndexMigrationCleanup(m *IndexMigration)
- func WithIndexReadPathEnabled(i *Index)
- type Bucket
- type ConvertValToEntFn
- type Cursor
- type CursorConfig
- type CursorDirection
- type CursorHint
- type CursorHints
- type CursorOption
- type CursorPredicateFunc
- type DecodeBucketValFn
- type DeleteOpts
- type DeleteRelationsFn
- type EncodeEntFn
- type EncodeFn
- type Entity
- type FilterFn
- type FindCaptureFn
- type FindOpts
- type ForwardCursor
- type Index
- func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error
- func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error
- func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error)
- func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn VisitFunc) error
- type IndexDiff
- type IndexMapping
- type IndexMigration
- type IndexMigrationOption
- type IndexOption
- type IndexSourceOnFunc
- type IndexStore
- func (s *IndexStore) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error
- func (s *IndexStore) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error
- func (s *IndexStore) Find(ctx context.Context, tx Tx, opts FindOpts) error
- func (s *IndexStore) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{}, error)
- func (s *IndexStore) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error
- type InitialMigration
- type PutOptionFn
- type SchemaStore
- type Service
- func (s *Service) AddLogEntry(ctx context.Context, k, v []byte, t time.Time) error
- func (s *Service) AddLogEntryTx(ctx context.Context, tx Tx, k, v []byte, t time.Time) error
- func (s *Service) AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error
- func (s *Service) CancelRun(ctx context.Context, taskID, runID platform.ID) error
- func (s *Service) CreateRun(ctx context.Context, taskID platform.ID, scheduledFor time.Time, ...) (*taskmodel.Run, error)
- func (s *Service) CreateTask(ctx context.Context, tc taskmodel.TaskCreate) (*taskmodel.Task, error)
- func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) error
- func (s *Service) CurrentlyRunning(ctx context.Context, taskID platform.ID) ([]*taskmodel.Run, error)
- func (s *Service) DeleteTask(ctx context.Context, id platform.ID) error
- func (s *Service) DeleteVariable(ctx context.Context, id platform.ID) error
- func (s *Service) FindLogs(ctx context.Context, filter taskmodel.LogFilter) ([]*taskmodel.Log, int, error)
- func (s *Service) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
- func (s *Service) FindRuns(ctx context.Context, filter taskmodel.RunFilter) ([]*taskmodel.Run, int, error)
- func (s *Service) FindTaskByID(ctx context.Context, id platform.ID) (*taskmodel.Task, error)
- func (s *Service) FindTasks(ctx context.Context, filter taskmodel.TaskFilter) ([]*taskmodel.Task, int, error)
- func (s *Service) FindVariableByID(ctx context.Context, id platform.ID) (*influxdb.Variable, error)
- func (s *Service) FindVariables(ctx context.Context, filter influxdb.VariableFilter, ...) ([]*influxdb.Variable, error)
- func (s *Service) FinishRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
- func (s *Service) FirstLogEntry(ctx context.Context, k []byte) ([]byte, time.Time, error)
- func (s *Service) ForEachLogEntry(ctx context.Context, k []byte, opts platform.FindOptions, ...) error
- func (s *Service) ForEachLogEntryTx(ctx context.Context, tx Tx, k []byte, opts platform.FindOptions, ...) error
- func (s *Service) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*taskmodel.Run, error)
- func (s *Service) LastLogEntry(ctx context.Context, k []byte) ([]byte, time.Time, error)
- func (s *Service) ManualRuns(ctx context.Context, taskID platform.ID) ([]*taskmodel.Run, error)
- func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) error
- func (s *Service) RetryRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
- func (s *Service) StartManualRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
- func (s *Service) UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, ...) error
- func (s *Service) UpdateTask(ctx context.Context, id platform.ID, upd taskmodel.TaskUpdate) (*taskmodel.Task, error)
- func (s *Service) UpdateVariable(ctx context.Context, id platform.ID, update *influxdb.VariableUpdate) (*influxdb.Variable, error)
- func (s *Service) WithResourceLogger(audit resource.Logger)
- func (s *Service) WithStore(store Store)
- type ServiceConfig
- type Store
- type StoreBase
- func (s *StoreBase) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error
- func (s *StoreBase) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error
- func (s *StoreBase) EntKey(ctx context.Context, ent Entity) ([]byte, error)
- func (s *StoreBase) Find(ctx context.Context, tx Tx, opts FindOpts) error
- func (s *StoreBase) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{}, error)
- func (s *StoreBase) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error
- type Tx
- type VisitFunc
Constants ¶
const OpPrefix = "kv/"
OpPrefix is the prefix for kv errors.
Variables ¶
var ( // ErrKeyNotFound is the error returned when the key requested is not found. ErrKeyNotFound = errors.New("key not found") // ErrBucketNotFound is the error returned when the bucket cannot be found. ErrBucketNotFound = errors.New("bucket not found") // ErrTxNotWritable is the error returned when an mutable operation is called during // a non-writable transaction. ErrTxNotWritable = errors.New("transaction is not writable") // ErrSeekMissingPrefix is returned when seek bytes is missing the prefix defined via // WithCursorPrefix ErrSeekMissingPrefix = errors.New("seek missing prefix bytes") )
var ( // ErrKeyInvalidCharacters is returned when a foreignKey or primaryKey contains // ErrKeyInvalidCharacters = errors.New("key: contains invalid characters") )
var ( // ErrKeyValueLogBoundsNotFound is returned when oplog entries cannot be located // for the provided bounds ErrKeyValueLogBoundsNotFound = &errors.Error{ Code: errors.ENotFound, Msg: "oplog not found", } )
var NotUniqueError = &errors.Error{ Code: errors.EConflict, Msg: "name already exists", }
NotUniqueError is used when attempting to create a resource that already exists.
Functions ¶
func DecIndexID ¶
DecIndexID decodes the bucket val into an influxdb.ID.
func DecodeOrgNameKey ¶
DecodeOrgNameKey decodes a raw bucket key into the organization id and name used to create it.
func EncBodyJSON ¶
EncBodyJSON JSON encodes the entity body and returns the raw bytes and indicates that it uses the entity body.
func EncUniqKey ¶
EncUniqKey encodes the unique key.
func IndexKey ¶
IndexKey returns a value suitable for use as the key component when storing values in the index. IndexKey returns an ErrKeyInvalidCharacters error if either the foreignKey or primaryKey contains a /.
func IsNotFound ¶
IsNotFound returns a boolean indicating whether the error is known to report that a key or was not found.
func UnexpectedIndexError ¶
UnexpectedIndexError is used when the error comes from an internal system.
func WalkCursor ¶
func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error)
WalkCursor consumers the forward cursor call visit for each k/v pair found
func WithIndexMigrationCleanup ¶
func WithIndexMigrationCleanup(m *IndexMigration)
WithIndexMigrationCleanup removes index entries which point to missing items in the source bucket.
func WithIndexReadPathEnabled ¶
func WithIndexReadPathEnabled(i *Index)
WithIndexReadPathEnabled enables the read paths of the index (Walk) This should be enabled once the index has been fully populated and the Insert and Delete paths are correctly integrated.
Types ¶
type Bucket ¶
type Bucket interface { // TODO context? // Get returns a key within this bucket. Errors if key does not exist. Get(key []byte) ([]byte, error) // GetBatch returns a corresponding set of values for the provided // set of keys. If a value cannot be found for any provided key its // value will be nil at the same index for the provided key. GetBatch(keys ...[]byte) ([][]byte, error) // Cursor returns a cursor at the beginning of this bucket optionally // using the provided hints to improve performance. Cursor(hints ...CursorHint) (Cursor, error) // Put should error if the transaction it was called in is not writable. Put(key, value []byte) error // Delete should error if the transaction it was called in is not writable. Delete(key []byte) error // ForwardCursor returns a forward cursor from the seek position provided. // Other options can be supplied to provide direction and hints. ForwardCursor(seek []byte, opts ...CursorOption) (ForwardCursor, error) }
Bucket is the abstraction used to perform get/put/delete/get-many operations in a key value store.
type ConvertValToEntFn ¶
ConvertValToEntFn converts a key and decoded bucket value to an entity.
type Cursor ¶
type Cursor interface { // Seek moves the cursor forward until reaching prefix in the key name. Seek(prefix []byte) (k []byte, v []byte) // First moves the cursor to the first key in the bucket. First() (k []byte, v []byte) // Last moves the cursor to the last key in the bucket. Last() (k []byte, v []byte) // Next moves the cursor to the next key in the bucket. Next() (k []byte, v []byte) // Prev moves the cursor to the prev key in the bucket. Prev() (k []byte, v []byte) }
Cursor is an abstraction for iterating/ranging through data. A concrete implementation of a cursor can be found in cursor.go.
type CursorConfig ¶
type CursorConfig struct { Direction CursorDirection Hints CursorHints Prefix []byte SkipFirst bool Limit *int }
CursorConfig is a type used to configure a new forward cursor. It includes a direction and a set of hints
func NewCursorConfig ¶
func NewCursorConfig(opts ...CursorOption) CursorConfig
NewCursorConfig constructs and configures a CursorConfig used to configure a forward cursor.
type CursorDirection ¶
type CursorDirection int
CursorDirection is an integer used to define the direction a request cursor operates in.
const ( // CursorAscending directs a cursor to range in ascending order CursorAscending CursorDirection = iota // CursorAscending directs a cursor to range in descending order CursorDescending )
type CursorHint ¶
type CursorHint func(*CursorHints)
CursorHint configures CursorHints
func WithCursorHintKeyStart ¶
func WithCursorHintKeyStart(start string) CursorHint
WithCursorHintKeyStart is a hint to the store that the caller is interested in reading keys from start.
func WithCursorHintPredicate ¶
func WithCursorHintPredicate(f CursorPredicateFunc) CursorHint
WithCursorHintPredicate is a hint to the store to return only key / values which return true for the f.
The primary concern of the predicate is to improve performance. Therefore, it should perform tests on the data at minimal cost. If the predicate has no meaningful impact on reducing memory or CPU usage, there is no benefit to using it.
func WithCursorHintPrefix ¶
func WithCursorHintPrefix(prefix string) CursorHint
WithCursorHintPrefix is a hint to the store that the caller is only interested keys with the specified prefix.
type CursorHints ¶
type CursorHints struct { KeyPrefix *string KeyStart *string PredicateFn CursorPredicateFunc }
type CursorOption ¶
type CursorOption func(*CursorConfig)
CursorOption is a functional option for configuring a forward cursor
func WithCursorDirection ¶
func WithCursorDirection(direction CursorDirection) CursorOption
WithCursorDirection sets the cursor direction on a provided cursor config
func WithCursorHints ¶
func WithCursorHints(hints ...CursorHint) CursorOption
WithCursorHints configs the provided hints on the cursor config
func WithCursorLimit ¶
func WithCursorLimit(limit int) CursorOption
WithCursorLimit restricts the number of key values return by the cursor to the provided limit count.
func WithCursorPrefix ¶
func WithCursorPrefix(prefix []byte) CursorOption
WithCursorPrefix configures the forward cursor to retrieve keys with a particular prefix. This implies the cursor will start and end at a specific location based on the prefix [prefix, prefix + 1).
The value of the seek bytes must be prefixed with the provided prefix, otherwise an error will be returned.
func WithCursorSkipFirstItem ¶
func WithCursorSkipFirstItem() CursorOption
WithCursorSkipFirstItem skips returning the first item found within the seek.
type CursorPredicateFunc ¶
type DecodeBucketValFn ¶
DecodeBucketValFn decodes the raw []byte.
type DeleteOpts ¶
type DeleteOpts struct { DeleteRelationFns []DeleteRelationsFn FilterFn FilterFn }
DeleteOpts provides indicators to the store.Delete call for deleting a given entity. The FilterFn indicates the current value should be deleted when returning true.
type DeleteRelationsFn ¶
DeleteRelationsFn is a hook that a store that composes other stores can use to delete an entity and any relations it may share. An example would be deleting an an entity and its associated index.
type EncodeEntFn ¶
EncodeEntFn encodes the entity. This is used both for the key and vals in the store base.
type EncodeFn ¶
EncodeFn returns an encoding when called. Closures are your friend here.
func EncStringCaseInsensitive ¶
EncStringCaseInsensitive encodes a string and makes it case insensitive by lower casing everything.
type FilterFn ¶
FilterFn will provide an indicator to the Find or Delete calls that the entity that was seen is one that is valid and should be either captured or deleted (depending on the caller of the filter func).
type FindCaptureFn ¶
FindCaptureFn is the mechanism for closing over the key and decoded value pair for adding results to the call sites collection. This generic implementation allows it to be reused. The returned decodedVal should always satisfy whatever decoding of the bucket value was set on the storeo that calls Find.
type FindOpts ¶
type FindOpts struct { Descending bool Offset int Limit int Prefix []byte CaptureFn FindCaptureFn FilterEntFn FilterFn }
FindOpts provided a means to search through the bucket. When a filter func is provided, that will run against the entity and if the filter responds true, will count it towards the number of entries seen and the capture func will be run with it provided to it.
type ForwardCursor ¶
type ForwardCursor interface { // Next moves the cursor to the next key in the bucket. Next() (k, v []byte) // Err returns non-nil if an error occurred during cursor iteration. // This should always be checked after Next returns a nil key/value. Err() error // Close is reponsible for freeing any resources created by the cursor. Close() error }
ForwardCursor is an abstraction for interacting/ranging through data in one direction.
type Index ¶
type Index struct { IndexMapping // contains filtered or unexported fields }
Index is used to define and manage an index for a source bucket.
When using the index you must provide it with an IndexMapping. The IndexMapping provides the index with the contract it needs to populate the entire index and traverse a populated index correctly. The IndexMapping provides a way to retrieve the key on which to index with when provided with the value from the source. It also provides the way to access the source bucket.
The following is an illustration of its use:
byUserID := func(v []byte) ([]byte, error) { auth := &influxdb.Authorization{} if err := json.Unmarshal(v, auth); err != nil { return err } return auth.UserID.Encode() } // configure a write only index indexByUser := NewIndex(NewSource([]byte(`authorizationsbyuserv1/), byUserID)) indexByUser.Insert(tx, someUserID, someAuthID) indexByUser.Delete(tx, someUserID, someAuthID) indexByUser.Walk(tx, someUserID, func(k, v []byte) error { auth := &influxdb.Authorization{} if err := json.Unmarshal(v, auth); err != nil { return err } // do something with auth return nil }) // verify the current index against the source and return the differences // found in each diff, err := indexByUser.Verify(ctx, tx)
func NewIndex ¶
func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index
NewIndex configures and returns a new *Index for a given index mapping. By default the read path (Walk) is disabled. This is because the index needs to be fully populated before depending upon the read path. The read path can be enabled using WithIndexReadPathEnabled option.
func (*Index) Delete ¶
Delete removes the foreignKey and primaryKey mapping from the underlying index.
func (*Index) Insert ¶
Insert creates a single index entry for the provided primary key on the foreign key.
type IndexDiff ¶
type IndexDiff struct { // PresentInIndex is a map of foreign key to primary keys // present in the index. PresentInIndex map[string]map[string]struct{} // MissingFromIndex is a map of foreign key to associated primary keys // missing from the index given the source bucket. // These items could be due to the fact an index populate migration has // not yet occurred, the index populate code is incorrect or the write path // for your resource type does not yet insert into the index as well (Create actions). MissingFromIndex map[string]map[string]struct{} // MissingFromSource is a map of foreign key to associated primary keys // missing from the source but accounted for in the index. // This happens when index items are not properly removed from the index // when an item is removed from the source (Delete actions). MissingFromSource map[string]map[string]struct{} }
IndexDiff contains a set of items present in the source not in index, along with a set of things in the index which are not in the source.
type IndexMapping ¶
type IndexMapping interface { SourceBucket() []byte IndexBucket() []byte IndexSourceOn(value []byte) (foreignKey []byte, err error) }
IndexMapping is a type which configures and Index to map items from a source bucket to an index bucket via a mapping known as IndexSourceOn. This function is called on the values in the source to derive the foreign key on which to index each item.
func NewIndexMapping ¶
func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) IndexMapping
NewIndexMapping creates an implementation of IndexMapping for the provided source bucket to a destination index bucket.
type IndexMigration ¶
type IndexMigration struct { IndexMapping // contains filtered or unexported fields }
IndexMigration is a migration for adding and removing an index. These are constructed via the Index.Migration function.
func NewIndexMigration ¶
func NewIndexMigration(mapping IndexMapping, opts ...IndexMigrationOption) *IndexMigration
NewIndexMigration construct a migration for creating and populating an index
func (*IndexMigration) Down ¶
func (i *IndexMigration) Down(ctx context.Context, store SchemaStore) error
Down deletes all entries from the index.
func (*IndexMigration) MigrationName ¶
func (i *IndexMigration) MigrationName() string
Name returns a readable name for the index migration.
func (*IndexMigration) Populate ¶
Populate does a full population of the index using the IndexSourceOn IndexMapping function. Once completed it marks the index as ready for use. It return a nil error on success and the count of inserted items.
func (*IndexMigration) Up ¶
func (i *IndexMigration) Up(ctx context.Context, store SchemaStore) (err error)
Up initializes the index bucket and populates the index.
type IndexMigrationOption ¶
type IndexMigrationOption func(*IndexMigration)
IndexMigrationOption is a functional option for the IndexMigration type
func WithIndexMigationBatchSize ¶
func WithIndexMigationBatchSize(n int) IndexMigrationOption
WithIndexMigationBatchSize configures the size of the batches when committing changes to entire index during migration (e.g. size of put batch on index populate).
type IndexOption ¶
type IndexOption func(*Index)
IndexOption is a function which configures an index
type IndexSourceOnFunc ¶
IndexSourceOnFunc is a function which can be used to derive the foreign key of a value in a source bucket.
type IndexStore ¶
IndexStore provides a entity store that uses an index lookup. The index store manages deleting and creating indexes for the caller. The index is automatically used if the FindEnt entity entity does not have the primary key.
func (*IndexStore) Delete ¶
func (s *IndexStore) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error
Delete deletes entities and associated indexes.
func (*IndexStore) Find ¶
Find provides a mechanism for looking through the bucket via the set options. When a prefix is provided, it will be used within the entity store. If you would like to search the index store, then you can by calling the index store directly.
func (*IndexStore) FindEnt ¶
FindEnt returns the decoded entity body via teh provided entity. An example entity should not include a Body, but rather the ID, Name, or OrgID. If no ID is provided, then the algorithm assumes you are looking up the entity by the index.
func (*IndexStore) Put ¶
func (s *IndexStore) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error
Put will persist the entity into both the entity store and the index store.
type InitialMigration ¶
type InitialMigration struct{}
func (InitialMigration) Down ¶
func (m InitialMigration) Down(ctx context.Context, store SchemaStore) error
Down is a no operation required for service to be used as a migration
func (InitialMigration) MigrationName ¶
func (m InitialMigration) MigrationName() string
MigrationName returns the string initial migration which allows this store to be used as a migration
func (InitialMigration) Up ¶
func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error
Up initializes all the owned buckets of the underlying store
type PutOptionFn ¶
type PutOptionFn func(o *putOption) error
PutOptionFn provides a hint to the store to make some guarantees about the put action. I.e. If it is new, then will validate there is no existing entity by the given PK.
func PutNew ¶
func PutNew() PutOptionFn
PutNew will create an entity that is not does not already exist. Guarantees uniqueness by the store's uniqueness guarantees.
func PutUpdate ¶
func PutUpdate() PutOptionFn
PutUpdate will update an entity that must already exist.
type SchemaStore ¶
type SchemaStore interface { Store // CreateBucket creates a bucket on the underlying store if it does not exist CreateBucket(ctx context.Context, bucket []byte) error // DeleteBucket deletes a bucket on the underlying store if it exists DeleteBucket(ctx context.Context, bucket []byte) error }
SchemaStore is a superset of Store along with store schema change functionality like bucket creation and deletion.
This type is made available via the `kv/migration` package. It should be consumed via this package to create and delete buckets using a migration. Checkout the internal tool `cmd/internal/kvmigrate` for building a new migration Go file into the correct location (in kv/migration/all.go). Configuring your bucket here will ensure it is created properly on initialization of InfluxDB.
type Service ¶
type Service struct { Config ServiceConfig IDGenerator platform.IDGenerator // FluxLanguageService is used for parsing flux. // If this is unset, operations that require parsing flux // will fail. FluxLanguageService fluxlang.FluxLanguageService TokenGenerator influxdb.TokenGenerator // TODO(desa:ariel): this should not be embedded influxdb.TimeGenerator // contains filtered or unexported fields }
Service is the struct that influxdb services are implemented on.
func NewService ¶
func NewService(log *zap.Logger, kv Store, orgs influxdb.OrganizationService, configs ...ServiceConfig) *Service
NewService returns an instance of a Service.
func (*Service) AddLogEntry ¶
AddLogEntry logs an keyValue for a particular resource type ID pairing.
func (*Service) AddLogEntryTx ¶
func (*Service) AddRunLog ¶
func (s *Service) AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error
AddRunLog adds a log line to the run.
func (*Service) CreateRun ¶
func (s *Service) CreateRun(ctx context.Context, taskID platform.ID, scheduledFor time.Time, runAt time.Time) (*taskmodel.Run, error)
CreateRun creates a run with a scheduledFor time as now.
func (*Service) CreateTask ¶
CreateTask creates a new task. The owner of the task is inferred from the authorizer associated with ctx.
func (*Service) CreateVariable ¶
CreateVariable creates a new variable and assigns it an ID
func (*Service) CurrentlyRunning ¶
func (*Service) DeleteTask ¶
DeleteTask removes a task by ID and purges all associated data and scheduled runs.
func (*Service) DeleteVariable ¶
DeleteVariable removes a single variable from the store by its ID
func (*Service) FindLogs ¶
func (s *Service) FindLogs(ctx context.Context, filter taskmodel.LogFilter) ([]*taskmodel.Log, int, error)
FindLogs returns logs for a run.
func (*Service) FindRunByID ¶
func (s *Service) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
FindRunByID returns a single run.
func (*Service) FindRuns ¶
func (s *Service) FindRuns(ctx context.Context, filter taskmodel.RunFilter) ([]*taskmodel.Run, int, error)
FindRuns returns a list of runs that match a filter and the total count of returned runs.
func (*Service) FindTaskByID ¶
FindTaskByID returns a single task
func (*Service) FindTasks ¶
func (s *Service) FindTasks(ctx context.Context, filter taskmodel.TaskFilter) ([]*taskmodel.Task, int, error)
FindTasks returns a list of tasks that match a filter (limit 100) and the total count of matching tasks.
func (*Service) FindVariableByID ¶
FindVariableByID finds a single variable in the store by its ID
func (*Service) FindVariables ¶
func (s *Service) FindVariables(ctx context.Context, filter influxdb.VariableFilter, opt ...influxdb.FindOptions) ([]*influxdb.Variable, error)
FindVariables returns all variables in the store
func (*Service) FinishRun ¶
FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
func (*Service) FirstLogEntry ¶
FirstLogEntry retrieves the first log entry for a key value log.
func (*Service) ForEachLogEntry ¶
func (s *Service) ForEachLogEntry(ctx context.Context, k []byte, opts platform.FindOptions, fn func([]byte, time.Time) error) error
ForEachLogEntry retrieves the keyValue log for a resource type ID combination. KeyValues may be returned in ascending and descending order.
func (*Service) ForEachLogEntryTx ¶
func (*Service) ForceRun ¶
func (s *Service) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*taskmodel.Run, error)
ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible. The value of scheduledFor may or may not align with the task's schedule.
func (*Service) LastLogEntry ¶
LastLogEntry retrieves the first log entry for a key value log.
func (*Service) ManualRuns ¶
func (*Service) ReplaceVariable ¶
ReplaceVariable replaces a variable that exists in the store or creates it if it does not
func (*Service) RetryRun ¶
RetryRun creates and returns a new run (which is a retry of another run).
func (*Service) StartManualRun ¶
func (*Service) UpdateRunState ¶
func (s *Service) UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, state taskmodel.RunStatus) error
UpdateRunState sets the run state at the respective time.
func (*Service) UpdateTask ¶
func (s *Service) UpdateTask(ctx context.Context, id platform.ID, upd taskmodel.TaskUpdate) (*taskmodel.Task, error)
UpdateTask updates a single task with changeset.
func (*Service) UpdateVariable ¶
func (s *Service) UpdateVariable(ctx context.Context, id platform.ID, update *influxdb.VariableUpdate) (*influxdb.Variable, error)
UpdateVariable updates a single variable in the store with a changeset
func (*Service) WithResourceLogger ¶
WithResourceLogger sets the resource audit logger for the service.
type ServiceConfig ¶
type ServiceConfig struct { Clock clock.Clock FluxLanguageService fluxlang.FluxLanguageService }
ServiceConfig allows us to configure Services
type Store ¶
type Store interface { // View opens up a transaction that will not write to any data. Implementing interfaces // should take care to ensure that all view transactions do not mutate any data. View(context.Context, func(Tx) error) error // Update opens up a transaction that will mutate data. Update(context.Context, func(Tx) error) error // Backup copies all K:Vs to a writer, file format determined by implementation. Backup(ctx context.Context, w io.Writer) error // Restore replaces the underlying data file with the data from r. Restore(ctx context.Context, r io.Reader) error // RLock takes a read lock on the underlying KV store. RLock() // RUnlock releases a previously-taken read lock RUnlock() }
Store is an interface for a generic key value store. It is modeled after the boltdb database struct.
type StoreBase ¶
type StoreBase struct { Resource string BktName []byte EncodeEntKeyFn EncodeEntFn EncodeEntBodyFn EncodeEntFn DecodeEntFn DecodeBucketValFn ConvertValToEntFn ConvertValToEntFn }
StoreBase is the base behavior for accessing buckets in kv. It provides mechanisms that can be used in composing stores together (i.e. IndexStore).
func NewOrgNameKeyStore ¶
NewOrgNameKeyStore creates a store for an entity's unique index on organization id and name. This is used throughout the kv pkg here to provide an entity uniquness by name within an org.
func NewStoreBase ¶
func NewStoreBase(resource string, bktName []byte, encKeyFn, encBodyFn EncodeEntFn, decFn DecodeBucketValFn, decToEntFn ConvertValToEntFn) *StoreBase
NewStoreBase creates a new store base.
func (*StoreBase) EntKey ¶
EntKey returns the key for the entity provided. This is a shortcut for grabbing the EntKey without having to juggle the encoding funcs.
func (*StoreBase) Find ¶
Find provides a mechanism for looking through the bucket via the set options. When a prefix is provided, the prefix is used to seek the bucket.
type Tx ¶
type Tx interface { // Bucket possibly creates and returns bucket, b. Bucket(b []byte) (Bucket, error) // Context returns the context associated with this Tx. Context() context.Context // WithContext associates a context with this Tx. WithContext(ctx context.Context) }
Tx is a transaction in the store.