Documentation ¶
Overview ¶
package kv
The KV package is a set of services and abstractions built around key value storage. There exist in-memory and persisted implementations of the core `Store` family of interfaces outside of this package (see `inmem` and `bolt` packages).
The `Store` interface exposes transactional access to a backing kv persistence layer. It allows for read-only (View) and read-write (Update) transactions to be opened. These methods take a function which is passed an implementation of the transaction interface (Tx). This interface exposes a way to manipulate namespaced keys and values (Buckets).
All keys and values are namespaced (grouped) using buckets. Buckets can only be created on implementations of the `SchemaStore` interface. This is a superset of the `Store` interface, which has the additional bucket creation and deletion methods.
Bucket creation and deletion should be facilitated via a migration (see `kv/migration`).
Index ¶
- Constants
- Variables
- func CorruptScraperError(err error) *influxdb.Error
- func DecIndexID(key, val []byte) ([]byte, interface{}, error)
- func DecodeOrgNameKey(k []byte) (influxdb.ID, string, error)
- func EncBodyJSON(ent Entity) ([]byte, string, error)
- func EncIDKey(ent Entity) ([]byte, string, error)
- func EncUniqKey(ent Entity) ([]byte, string, error)
- func ErrUnprocessableScraper(err error) *influxdb.Error
- func IndexKey(foreignKey, primaryKey []byte) (newKey []byte, err error)
- func InternalScraperServiceError(err error) *influxdb.Error
- func IsErrUnexpectedDecodeVal(ok bool) error
- func IsNotFound(err error) bool
- func UnexpectedIndexError(err error) *influxdb.Error
- func UnexpectedScrapersBucketError(err error) *influxdb.Error
- func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error)
- func WithIndexMigrationCleanup(m *IndexMigration)
- func WithIndexReadPathEnabled(i *Index)
- type Bucket
- type ConvertValToEntFn
- type Cursor
- type CursorConfig
- type CursorDirection
- type CursorHint
- type CursorHints
- type CursorOption
- type CursorPredicateFunc
- type DecodeBucketValFn
- type DeleteOpts
- type DeleteRelationsFn
- type DocumentStore
- func (s *DocumentStore) CreateDocument(ctx context.Context, d *influxdb.Document) error
- func (s *DocumentStore) FindDocument(ctx context.Context, id influxdb.ID) (*influxdb.Document, error)
- func (s *DocumentStore) FindDocuments(ctx context.Context, _ influxdb.ID) ([]*influxdb.Document, error)
- func (s *DocumentStore) PutDocument(ctx context.Context, d *influxdb.Document) error
- type EncodeEntFn
- type EncodeFn
- type Entity
- type FilterFn
- type FindCaptureFn
- type FindOpts
- type ForwardCursor
- type Index
- func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error
- func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error
- func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error)
- func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn VisitFunc) error
- type IndexDiff
- type IndexMapping
- type IndexMigration
- type IndexMigrationOption
- type IndexOption
- type IndexSourceOnFunc
- type IndexStore
- func (s *IndexStore) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error
- func (s *IndexStore) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error
- func (s *IndexStore) Find(ctx context.Context, tx Tx, opts FindOpts) error
- func (s *IndexStore) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{}, error)
- func (s *IndexStore) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error
- type InitialMigration
- type Pair
- type PutOptionFn
- type SchemaStore
- type Service
- func (s *Service) AddLogEntry(ctx context.Context, k, v []byte, t time.Time) error
- func (s *Service) AddLogEntryTx(ctx context.Context, tx Tx, k, v []byte, t time.Time) error
- func (s *Service) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error
- func (s *Service) AddTarget(ctx context.Context, target *influxdb.ScraperTarget, userID influxdb.ID) (err error)
- func (s *Service) Backup(ctx context.Context, w io.Writer) error
- func (s *Service) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error
- func (s *Service) CreateDocumentStore(ctx context.Context, ns string) (influxdb.DocumentStore, error)
- func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, ...) (*influxdb.Run, error)
- func (s *Service) CreateSource(ctx context.Context, src *influxdb.Source) error
- func (s *Service) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error)
- func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) error
- func (s *Service) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
- func (s *Service) DefaultSource(ctx context.Context) (*influxdb.Source, error)
- func (s *Service) DeleteSource(ctx context.Context, id influxdb.ID) error
- func (s *Service) DeleteTask(ctx context.Context, id influxdb.ID) error
- func (s *Service) DeleteVariable(ctx context.Context, id influxdb.ID) error
- func (s *Service) FindDocumentStore(ctx context.Context, ns string) (influxdb.DocumentStore, error)
- func (s *Service) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error)
- func (s *Service) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
- func (s *Service) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error)
- func (s *Service) FindSourceByID(ctx context.Context, id influxdb.ID) (*influxdb.Source, error)
- func (s *Service) FindSources(ctx context.Context, opt influxdb.FindOptions) ([]*influxdb.Source, int, error)
- func (s *Service) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error)
- func (s *Service) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error)
- func (s *Service) FindVariableByID(ctx context.Context, id influxdb.ID) (*influxdb.Variable, error)
- func (s *Service) FindVariables(ctx context.Context, filter influxdb.VariableFilter, ...) ([]*influxdb.Variable, error)
- func (s *Service) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
- func (s *Service) FirstLogEntry(ctx context.Context, k []byte) ([]byte, time.Time, error)
- func (s *Service) ForEachLogEntry(ctx context.Context, k []byte, opts platform.FindOptions, ...) error
- func (s *Service) ForEachLogEntryTx(ctx context.Context, tx Tx, k []byte, opts platform.FindOptions, ...) error
- func (s *Service) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error)
- func (s *Service) GetTargetByID(ctx context.Context, id influxdb.ID) (*influxdb.ScraperTarget, error)
- func (s *Service) LastLogEntry(ctx context.Context, k []byte) ([]byte, time.Time, error)
- func (s *Service) ListTargets(ctx context.Context, filter influxdb.ScraperTargetFilter) ([]influxdb.ScraperTarget, error)
- func (s *Service) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
- func (s *Service) PutSource(ctx context.Context, src *influxdb.Source) error
- func (s *Service) PutTarget(ctx context.Context, target *influxdb.ScraperTarget) error
- func (s *Service) RemoveTarget(ctx context.Context, id influxdb.ID) error
- func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) error
- func (s *Service) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
- func (s *Service) StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
- func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, ...) error
- func (s *Service) UpdateSource(ctx context.Context, id influxdb.ID, upd influxdb.SourceUpdate) (*influxdb.Source, error)
- func (s *Service) UpdateTarget(ctx context.Context, update *influxdb.ScraperTarget, userID influxdb.ID) (*influxdb.ScraperTarget, error)
- func (s *Service) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error)
- func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *influxdb.VariableUpdate) (*influxdb.Variable, error)
- func (s *Service) WithResourceLogger(audit resource.Logger)
- func (s *Service) WithStore(store Store)
- type ServiceConfig
- type Store
- type StoreBase
- func (s *StoreBase) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error
- func (s *StoreBase) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error
- func (s *StoreBase) EntKey(ctx context.Context, ent Entity) ([]byte, error)
- func (s *StoreBase) Find(ctx context.Context, tx Tx, opts FindOpts) error
- func (s *StoreBase) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{}, error)
- func (s *StoreBase) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error
- type Tx
- type VisitFunc
Constants ¶
const ( // DefaultSourceID it the default source identifier DefaultSourceID = "020f755c3c082000" // DefaultSourceOrganizationID is the default source's organization identifier DefaultSourceOrganizationID = "50616e67652c206c" )
const OpPrefix = "kv/"
OpPrefix is the prefix for kv errors.
Variables ¶
var ( // ErrScraperNotFound is used when the scraper configuration is not found. ErrScraperNotFound = &influxdb.Error{ Msg: "scraper target is not found", Code: influxdb.ENotFound, } // ErrInvalidScraperID is used when the service was provided // an invalid ID format. ErrInvalidScraperID = &influxdb.Error{ Code: influxdb.EInvalid, Msg: "provided scraper target ID has invalid format", } // ErrInvalidScrapersBucketID is used when the service was provided // an invalid ID format. ErrInvalidScrapersBucketID = &influxdb.Error{ Code: influxdb.EInvalid, Msg: "provided bucket ID has invalid format", } // ErrInvalidScrapersOrgID is used when the service was provided // an invalid ID format. ErrInvalidScrapersOrgID = &influxdb.Error{ Code: influxdb.EInvalid, Msg: "provided organization ID has invalid format", } )
var ( // ErrKeyNotFound is the error returned when the key requested is not found. ErrKeyNotFound = errors.New("key not found") // ErrBucketNotFound is the error returned when the bucket cannot be found. ErrBucketNotFound = errors.New("bucket not found") // ErrTxNotWritable is the error returned when an mutable operation is called during // a non-writable transaction. ErrTxNotWritable = errors.New("transaction is not writable") // ErrSeekMissingPrefix is returned when seek bytes is missing the prefix defined via // WithCursorPrefix ErrSeekMissingPrefix = errors.New("seek missing prefix bytes") )
var DefaultSource = influxdb.Source{ Default: true, Name: "autogen", Type: influxdb.SelfSourceType, }
DefaultSource is the default source.
var ( // ErrKeyInvalidCharacters is returned when a foreignKey or primaryKey contains // ErrKeyInvalidCharacters = errors.New("key: contains invalid characters") )
var ( // ErrKeyValueLogBoundsNotFound is returned when oplog entries cannot be located // for the provided bounds ErrKeyValueLogBoundsNotFound = &platform.Error{ Code: platform.ENotFound, Msg: "oplog not found", } )
var NotUniqueError = &influxdb.Error{
Code: influxdb.EConflict,
Msg: "name already exists",
}
NotUniqueError is used when attempting to create a resource that already exists.
Functions ¶
func CorruptScraperError ¶
func CorruptScraperError(err error) *influxdb.Error
CorruptScraperError is used when the config cannot be unmarshalled from the bytes stored in the kv.
func DecIndexID ¶
DecIndexID decodes the bucket val into an influxdb.ID.
func DecodeOrgNameKey ¶
DecodeOrgNameKey decodes a raw bucket key into the organization id and name used to create it.
func EncBodyJSON ¶
EncBodyJSON JSON encodes the entity body and returns the raw bytes and indicates that it uses the entity body.
func EncUniqKey ¶
EncUniqKey encodes the unique key.
func ErrUnprocessableScraper ¶
func ErrUnprocessableScraper(err error) *influxdb.Error
ErrUnprocessableScraper is used when a scraper is not able to be converted to JSON.
func IndexKey ¶ added in v2.0.3
IndexKey returns a value suitable for use as the key component when storing values in the index. IndexKey returns an ErrKeyInvalidCharacters error if either the foreignKey or primaryKey contains a /.
func InternalScraperServiceError ¶
func InternalScraperServiceError(err error) *influxdb.Error
InternalScraperServiceError is used when the error comes from an internal system.
func IsNotFound ¶
IsNotFound returns a boolean indicating whether the error is known to report that a key or was not found.
func UnexpectedIndexError ¶
func UnexpectedIndexError(err error) *influxdb.Error
UnexpectedIndexError is used when the error comes from an internal system.
func UnexpectedScrapersBucketError ¶
func UnexpectedScrapersBucketError(err error) *influxdb.Error
UnexpectedScrapersBucketError is used when the error comes from an internal system.
func WalkCursor ¶
func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error)
WalkCursor consumers the forward cursor call visit for each k/v pair found
func WithIndexMigrationCleanup ¶
func WithIndexMigrationCleanup(m *IndexMigration)
WithIndexMigrationCleanup removes index entries which point to missing items in the source bucket.
func WithIndexReadPathEnabled ¶
func WithIndexReadPathEnabled(i *Index)
WithIndexReadPathEnabled enables the read paths of the index (Walk) This should be enabled once the index has been fully populated and the Insert and Delete paths are correctly integrated.
Types ¶
type Bucket ¶
type Bucket interface { // TODO context? // Get returns a key within this bucket. Errors if key does not exist. Get(key []byte) ([]byte, error) // GetBatch returns a corresponding set of values for the provided // set of keys. If a value cannot be found for any provided key its // value will be nil at the same index for the provided key. GetBatch(keys ...[]byte) ([][]byte, error) // Cursor returns a cursor at the beginning of this bucket optionally // using the provided hints to improve performance. Cursor(hints ...CursorHint) (Cursor, error) // Put should error if the transaction it was called in is not writable. Put(key, value []byte) error // Delete should error if the transaction it was called in is not writable. Delete(key []byte) error // ForwardCursor returns a forward cursor from the seek position provided. // Other options can be supplied to provide direction and hints. ForwardCursor(seek []byte, opts ...CursorOption) (ForwardCursor, error) }
Bucket is the abstraction used to perform get/put/delete/get-many operations in a key value store.
type ConvertValToEntFn ¶
ConvertValToEntFn converts a key and decoded bucket value to an entity.
type Cursor ¶
type Cursor interface { // Seek moves the cursor forward until reaching prefix in the key name. Seek(prefix []byte) (k []byte, v []byte) // First moves the cursor to the first key in the bucket. First() (k []byte, v []byte) // Last moves the cursor to the last key in the bucket. Last() (k []byte, v []byte) // Next moves the cursor to the next key in the bucket. Next() (k []byte, v []byte) // Prev moves the cursor to the prev key in the bucket. Prev() (k []byte, v []byte) }
Cursor is an abstraction for iterating/ranging through data. A concrete implementation of a cursor can be found in cursor.go.
func NewStaticCursor ¶
NewStaticCursor returns an instance of a StaticCursor. It destructively sorts the provided pairs to be in key ascending order.
type CursorConfig ¶
type CursorConfig struct { Direction CursorDirection Hints CursorHints Prefix []byte SkipFirst bool Limit *int }
CursorConfig is a type used to configure a new forward cursor. It includes a direction and a set of hints
func NewCursorConfig ¶
func NewCursorConfig(opts ...CursorOption) CursorConfig
NewCursorConfig constructs and configures a CursorConfig used to configure a forward cursor.
type CursorDirection ¶
type CursorDirection int
CursorDirection is an integer used to define the direction a request cursor operates in.
const ( // CursorAscending directs a cursor to range in ascending order CursorAscending CursorDirection = iota // CursorAscending directs a cursor to range in descending order CursorDescending )
type CursorHint ¶
type CursorHint func(*CursorHints)
CursorHint configures CursorHints
func WithCursorHintKeyStart ¶
func WithCursorHintKeyStart(start string) CursorHint
WithCursorHintKeyStart is a hint to the store that the caller is interested in reading keys from start.
func WithCursorHintPredicate ¶
func WithCursorHintPredicate(f CursorPredicateFunc) CursorHint
WithCursorHintPredicate is a hint to the store to return only key / values which return true for the f.
The primary concern of the predicate is to improve performance. Therefore, it should perform tests on the data at minimal cost. If the predicate has no meaningful impact on reducing memory or CPU usage, there is no benefit to using it.
func WithCursorHintPrefix ¶
func WithCursorHintPrefix(prefix string) CursorHint
WithCursorHintPrefix is a hint to the store that the caller is only interested keys with the specified prefix.
type CursorHints ¶
type CursorHints struct { KeyPrefix *string KeyStart *string PredicateFn CursorPredicateFunc }
type CursorOption ¶
type CursorOption func(*CursorConfig)
CursorOption is a functional option for configuring a forward cursor
func WithCursorDirection ¶
func WithCursorDirection(direction CursorDirection) CursorOption
WithCursorDirection sets the cursor direction on a provided cursor config
func WithCursorHints ¶
func WithCursorHints(hints ...CursorHint) CursorOption
WithCursorHints configs the provided hints on the cursor config
func WithCursorLimit ¶
func WithCursorLimit(limit int) CursorOption
WithCursorLimit restricts the number of key values return by the cursor to the provided limit count.
func WithCursorPrefix ¶
func WithCursorPrefix(prefix []byte) CursorOption
WithCursorPrefix configures the forward cursor to retrieve keys with a particular prefix. This implies the cursor will start and end at a specific location based on the prefix [prefix, prefix + 1).
The value of the seek bytes must be prefixed with the provided prefix, otherwise an error will be returned.
func WithCursorSkipFirstItem ¶
func WithCursorSkipFirstItem() CursorOption
WithCursorSkipFirstItem skips returning the first item found within the seek.
type CursorPredicateFunc ¶
type DecodeBucketValFn ¶
DecodeBucketValFn decodes the raw []byte.
type DeleteOpts ¶
type DeleteOpts struct { DeleteRelationFns []DeleteRelationsFn FilterFn FilterFn }
DeleteOpts provides indicators to the store.Delete call for deleting a given entity. The FilterFn indicates the current value should be deleted when returning true.
type DeleteRelationsFn ¶
DeleteRelationsFn is a hook that a store that composes other stores can use to delete an entity and any relations it may share. An example would be deleting an an entity and its associated index.
type DocumentStore ¶
type DocumentStore struct {
// contains filtered or unexported fields
}
DocumentStore implements influxdb.DocumentStore.
func (*DocumentStore) CreateDocument ¶
func (s *DocumentStore) CreateDocument(ctx context.Context, d *influxdb.Document) error
CreateDocument creates an instance of a document and sets the ID. After which it applies each of the options provided.
func (*DocumentStore) FindDocument ¶
func (s *DocumentStore) FindDocument(ctx context.Context, id influxdb.ID) (*influxdb.Document, error)
FindDocument retrieves the specified document with all its content and labels.
func (*DocumentStore) FindDocuments ¶
func (s *DocumentStore) FindDocuments(ctx context.Context, _ influxdb.ID) ([]*influxdb.Document, error)
FindDocuments retrieves all documents returned by the document find options.
func (*DocumentStore) PutDocument ¶
func (s *DocumentStore) PutDocument(ctx context.Context, d *influxdb.Document) error
type EncodeEntFn ¶
EncodeEntFn encodes the entity. This is used both for the key and vals in the store base.
type EncodeFn ¶
EncodeFn returns an encoding when called. Closures are your friend here.
func EncStringCaseInsensitive ¶
EncStringCaseInsensitive encodes a string and makes it case insensitive by lower casing everything.
type FilterFn ¶
FilterFn will provide an indicator to the Find or Delete calls that the entity that was seen is one that is valid and should be either captured or deleted (depending on the caller of the filter func).
type FindCaptureFn ¶
FindCaptureFn is the mechanism for closing over the key and decoded value pair for adding results to the call sites collection. This generic implementation allows it to be reused. The returned decodedVal should always satisfy whatever decoding of the bucket value was set on the storeo that calls Find.
type FindOpts ¶
type FindOpts struct { Descending bool Offset int Limit int Prefix []byte CaptureFn FindCaptureFn FilterEntFn FilterFn }
FindOpts provided a means to search through the bucket. When a filter func is provided, that will run against the entity and if the filter responds true, will count it towards the number of entries seen and the capture func will be run with it provided to it.
type ForwardCursor ¶
type ForwardCursor interface { // Next moves the cursor to the next key in the bucket. Next() (k, v []byte) // Err returns non-nil if an error occurred during cursor iteration. // This should always be checked after Next returns a nil key/value. Err() error // Close is reponsible for freeing any resources created by the cursor. Close() error }
ForwardCursor is an abstraction for interacting/ranging through data in one direction.
type Index ¶
type Index struct { IndexMapping // contains filtered or unexported fields }
Index is used to define and manage an index for a source bucket.
When using the index you must provide it with an IndexMapping. The IndexMapping provides the index with the contract it needs to populate the entire index and traverse a populated index correctly. The IndexMapping provides a way to retrieve the key on which to index with when provided with the value from the source. It also provides the way to access the source bucket.
The following is an illustration of its use:
byUserID := func(v []byte) ([]byte, error) { auth := &influxdb.Authorization{} if err := json.Unmarshal(v, auth); err != nil { return err } return auth.UserID.Encode() } // configure a write only index indexByUser := NewIndex(NewSource([]byte(`authorizationsbyuserv1/), byUserID)) indexByUser.Insert(tx, someUserID, someAuthID) indexByUser.Delete(tx, someUserID, someAuthID) indexByUser.Walk(tx, someUserID, func(k, v []byte) error { auth := &influxdb.Authorization{} if err := json.Unmarshal(v, auth); err != nil { return err } // do something with auth return nil }) // verify the current index against the source and return the differences // found in each diff, err := indexByUser.Verify(ctx, tx)
func NewIndex ¶
func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index
NewIndex configures and returns a new *Index for a given index mapping. By default the read path (Walk) is disabled. This is because the index needs to be fully populated before depending upon the read path. The read path can be enabled using WithIndexReadPathEnabled option.
func (*Index) Delete ¶
Delete removes the foreignKey and primaryKey mapping from the underlying index.
func (*Index) Insert ¶
Insert creates a single index entry for the provided primary key on the foreign key.
type IndexDiff ¶
type IndexDiff struct { // PresentInIndex is a map of foreign key to primary keys // present in the index. PresentInIndex map[string]map[string]struct{} // MissingFromIndex is a map of foreign key to associated primary keys // missing from the index given the source bucket. // These items could be due to the fact an index populate migration has // not yet occurred, the index populate code is incorrect or the write path // for your resource type does not yet insert into the index as well (Create actions). MissingFromIndex map[string]map[string]struct{} // MissingFromSource is a map of foreign key to associated primary keys // missing from the source but accounted for in the index. // This happens when index items are not properly removed from the index // when an item is removed from the source (Delete actions). MissingFromSource map[string]map[string]struct{} }
IndexDiff contains a set of items present in the source not in index, along with a set of things in the index which are not in the source.
type IndexMapping ¶
type IndexMapping interface { SourceBucket() []byte IndexBucket() []byte IndexSourceOn(value []byte) (foreignKey []byte, err error) }
IndexMapping is a type which configures and Index to map items from a source bucket to an index bucket via a mapping known as IndexSourceOn. This function is called on the values in the source to derive the foreign key on which to index each item.
func NewIndexMapping ¶
func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) IndexMapping
NewIndexMapping creates an implementation of IndexMapping for the provided source bucket to a destination index bucket.
type IndexMigration ¶
type IndexMigration struct { IndexMapping // contains filtered or unexported fields }
IndexMigration is a migration for adding and removing an index. These are constructed via the Index.Migration function.
func NewIndexMigration ¶
func NewIndexMigration(mapping IndexMapping, opts ...IndexMigrationOption) *IndexMigration
NewIndexMigration construct a migration for creating and populating an index
func (*IndexMigration) Down ¶
func (i *IndexMigration) Down(ctx context.Context, store SchemaStore) error
Down deletes all entries from the index.
func (*IndexMigration) MigrationName ¶
func (i *IndexMigration) MigrationName() string
Name returns a readable name for the index migration.
func (*IndexMigration) Populate ¶
Populate does a full population of the index using the IndexSourceOn IndexMapping function. Once completed it marks the index as ready for use. It return a nil error on success and the count of inserted items.
func (*IndexMigration) Up ¶
func (i *IndexMigration) Up(ctx context.Context, store SchemaStore) (err error)
Up initializes the index bucket and populates the index.
type IndexMigrationOption ¶
type IndexMigrationOption func(*IndexMigration)
IndexMigrationOption is a functional option for the IndexMigration type
func WithIndexMigationBatchSize ¶
func WithIndexMigationBatchSize(n int) IndexMigrationOption
WithIndexMigationBatchSize configures the size of the batches when committing changes to entire index during migration (e.g. size of put batch on index populate).
type IndexOption ¶
type IndexOption func(*Index)
IndexOption is a function which configures an index
type IndexSourceOnFunc ¶
IndexSourceOnFunc is a function which can be used to derive the foreign key of a value in a source bucket.
type IndexStore ¶
IndexStore provides a entity store that uses an index lookup. The index store manages deleting and creating indexes for the caller. The index is automatically used if the FindEnt entity entity does not have the primary key.
func (*IndexStore) Delete ¶
func (s *IndexStore) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error
Delete deletes entities and associated indexes.
func (*IndexStore) Find ¶
Find provides a mechanism for looking through the bucket via the set options. When a prefix is provided, it will be used within the entity store. If you would like to search the index store, then you can by calling the index store directly.
func (*IndexStore) FindEnt ¶
FindEnt returns the decoded entity body via teh provided entity. An example entity should not include a Body, but rather the ID, Name, or OrgID. If no ID is provided, then the algorithm assumes you are looking up the entity by the index.
func (*IndexStore) Put ¶
func (s *IndexStore) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error
Put will persist the entity into both the entity store and the index store.
type InitialMigration ¶
type InitialMigration struct{}
func (InitialMigration) Down ¶
func (m InitialMigration) Down(ctx context.Context, store SchemaStore) error
Down is a no operation required for service to be used as a migration
func (InitialMigration) MigrationName ¶
func (m InitialMigration) MigrationName() string
MigrationName returns the string initial migration which allows this store to be used as a migration
func (InitialMigration) Up ¶
func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error
Up initializes all the owned buckets of the underlying store
type PutOptionFn ¶
type PutOptionFn func(o *putOption) error
PutOptionFn provides a hint to the store to make some guarantees about the put action. I.e. If it is new, then will validate there is no existing entity by the given PK.
func PutNew ¶
func PutNew() PutOptionFn
PutNew will create an entity that is not does not already exist. Guarantees uniqueness by the store's uniqueness guarantees.
func PutUpdate ¶
func PutUpdate() PutOptionFn
PutUpdate will update an entity that must already exist.
type SchemaStore ¶
type SchemaStore interface { Store // CreateBucket creates a bucket on the underlying store if it does not exist CreateBucket(ctx context.Context, bucket []byte) error // DeleteBucket deletes a bucket on the underlying store if it exists DeleteBucket(ctx context.Context, bucket []byte) error }
SchemaStore is a superset of Store along with store schema change functionality like bucket creation and deletion.
This type is made available via the `kv/migration` package. It should be consumed via this package to create and delete buckets using a migration. Checkout the internal tool `cmd/internal/kvmigrate` for building a new migration Go file into the correct location (in kv/migration/all.go). Configuring your bucket here will ensure it is created properly on initialization of InfluxDB.
type Service ¶
type Service struct { Config ServiceConfig IDGenerator influxdb.IDGenerator // FluxLanguageService is used for parsing flux. // If this is unset, operations that require parsing flux // will fail. FluxLanguageService influxdb.FluxLanguageService TokenGenerator influxdb.TokenGenerator // TODO(desa:ariel): this should not be embedded influxdb.TimeGenerator // contains filtered or unexported fields }
Service is the struct that influxdb services are implemented on.
func NewService ¶
func NewService(log *zap.Logger, kv Store, orgs influxdb.OrganizationService, configs ...ServiceConfig) *Service
NewService returns an instance of a Service.
func (*Service) AddLogEntry ¶
AddLogEntry logs an keyValue for a particular resource type ID pairing.
func (*Service) AddLogEntryTx ¶
func (*Service) AddRunLog ¶
func (s *Service) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error
AddRunLog adds a log line to the run.
func (*Service) AddTarget ¶
func (s *Service) AddTarget(ctx context.Context, target *influxdb.ScraperTarget, userID influxdb.ID) (err error)
AddTarget add a new scraper target into storage.
func (*Service) CreateDocumentStore ¶
func (s *Service) CreateDocumentStore(ctx context.Context, ns string) (influxdb.DocumentStore, error)
CreateDocumentStore creates an instance of a document store by instantiating the buckets for the store.
func (*Service) CreateRun ¶
func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error)
CreateRun creates a run with a scheduledFor time as now.
func (*Service) CreateSource ¶
CreateSource creates a influxdb source and sets s.ID.
func (*Service) CreateTask ¶
CreateTask creates a new task. The owner of the task is inferred from the authorizer associated with ctx.
func (*Service) CreateVariable ¶
CreateVariable creates a new variable and assigns it an ID
func (*Service) CurrentlyRunning ¶
func (*Service) DefaultSource ¶
DefaultSource retrieves the default source.
func (*Service) DeleteSource ¶
DeleteSource deletes a source and prunes it from the index.
func (*Service) DeleteTask ¶
DeleteTask removes a task by ID and purges all associated data and scheduled runs.
func (*Service) DeleteVariable ¶
DeleteVariable removes a single variable from the store by its ID
func (*Service) FindDocumentStore ¶
FindDocumentStore finds the buckets associated with the namespace provided.
func (*Service) FindLogs ¶
func (s *Service) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error)
FindLogs returns logs for a run.
func (*Service) FindRunByID ¶
func (s *Service) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
FindRunByID returns a single run.
func (*Service) FindRuns ¶
func (s *Service) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error)
FindRuns returns a list of runs that match a filter and the total count of returned runs.
func (*Service) FindSourceByID ¶
FindSourceByID retrieves a source by id.
func (*Service) FindSources ¶
func (s *Service) FindSources(ctx context.Context, opt influxdb.FindOptions) ([]*influxdb.Source, int, error)
FindSources retrieves all sources that match an arbitrary source filter. Filters using ID, or OrganizationID and source Name should be efficient. Other filters will do a linear scan across all sources searching for a match.
func (*Service) FindTaskByID ¶
FindTaskByID returns a single task
func (*Service) FindTasks ¶
func (s *Service) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error)
FindTasks returns a list of tasks that match a filter (limit 100) and the total count of matching tasks.
func (*Service) FindVariableByID ¶
FindVariableByID finds a single variable in the store by its ID
func (*Service) FindVariables ¶
func (s *Service) FindVariables(ctx context.Context, filter influxdb.VariableFilter, opt ...influxdb.FindOptions) ([]*influxdb.Variable, error)
FindVariables returns all variables in the store
func (*Service) FinishRun ¶
FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
func (*Service) FirstLogEntry ¶
FirstLogEntry retrieves the first log entry for a key value log.
func (*Service) ForEachLogEntry ¶
func (s *Service) ForEachLogEntry(ctx context.Context, k []byte, opts platform.FindOptions, fn func([]byte, time.Time) error) error
ForEachLogEntry retrieves the keyValue log for a resource type ID combination. KeyValues may be returned in ascending and descending order.
func (*Service) ForEachLogEntryTx ¶
func (*Service) ForceRun ¶
func (s *Service) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error)
ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible. The value of scheduledFor may or may not align with the task's schedule.
func (*Service) GetTargetByID ¶
func (s *Service) GetTargetByID(ctx context.Context, id influxdb.ID) (*influxdb.ScraperTarget, error)
GetTargetByID retrieves a scraper target by id.
func (*Service) LastLogEntry ¶
LastLogEntry retrieves the first log entry for a key value log.
func (*Service) ListTargets ¶
func (s *Service) ListTargets(ctx context.Context, filter influxdb.ScraperTargetFilter) ([]influxdb.ScraperTarget, error)
ListTargets will list all scrape targets.
func (*Service) ManualRuns ¶
func (*Service) RemoveTarget ¶
RemoveTarget removes a scraper target from the bucket.
func (*Service) ReplaceVariable ¶
ReplaceVariable replaces a variable that exists in the store or creates it if it does not
func (*Service) RetryRun ¶
RetryRun creates and returns a new run (which is a retry of another run).
func (*Service) StartManualRun ¶
func (*Service) UpdateRunState ¶
func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error
UpdateRunState sets the run state at the respective time.
func (*Service) UpdateSource ¶
func (s *Service) UpdateSource(ctx context.Context, id influxdb.ID, upd influxdb.SourceUpdate) (*influxdb.Source, error)
UpdateSource updates a source according the parameters set on upd.
func (*Service) UpdateTarget ¶
func (s *Service) UpdateTarget(ctx context.Context, update *influxdb.ScraperTarget, userID influxdb.ID) (*influxdb.ScraperTarget, error)
UpdateTarget updates a scraper target.
func (*Service) UpdateTask ¶
func (s *Service) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error)
UpdateTask updates a single task with changeset.
func (*Service) UpdateVariable ¶
func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *influxdb.VariableUpdate) (*influxdb.Variable, error)
UpdateVariable updates a single variable in the store with a changeset
func (*Service) WithResourceLogger ¶
WithResourceLogger sets the resource audit logger for the service.
type ServiceConfig ¶
ServiceConfig allows us to configure Services
type Store ¶
type Store interface { // View opens up a transaction that will not write to any data. Implementing interfaces // should take care to ensure that all view transactions do not mutate any data. View(context.Context, func(Tx) error) error // Update opens up a transaction that will mutate data. Update(context.Context, func(Tx) error) error // Backup copies all K:Vs to a writer, file format determined by implementation. Backup(ctx context.Context, w io.Writer) error // Restore replaces the underlying data file with the data from r. Restore(ctx context.Context, r io.Reader) error }
Store is an interface for a generic key value store. It is modeled after the boltdb database struct.
type StoreBase ¶
type StoreBase struct { Resource string BktName []byte EncodeEntKeyFn EncodeEntFn EncodeEntBodyFn EncodeEntFn DecodeEntFn DecodeBucketValFn ConvertValToEntFn ConvertValToEntFn }
StoreBase is the base behavior for accessing buckets in kv. It provides mechanisms that can be used in composing stores together (i.e. IndexStore).
func NewOrgNameKeyStore ¶
NewOrgNameKeyStore creates a store for an entity's unique index on organization id and name. This is used throughout the kv pkg here to provide an entity uniquness by name within an org.
func NewStoreBase ¶
func NewStoreBase(resource string, bktName []byte, encKeyFn, encBodyFn EncodeEntFn, decFn DecodeBucketValFn, decToEntFn ConvertValToEntFn) *StoreBase
NewStoreBase creates a new store base.
func (*StoreBase) EntKey ¶
EntKey returns the key for the entity provided. This is a shortcut for grabbing the EntKey without having to juggle the encoding funcs.
func (*StoreBase) Find ¶
Find provides a mechanism for looking through the bucket via the set options. When a prefix is provided, the prefix is used to seek the bucket.
type Tx ¶
type Tx interface { // Bucket possibly creates and returns bucket, b. Bucket(b []byte) (Bucket, error) // Context returns the context associated with this Tx. Context() context.Context // WithContext associates a context with this Tx. WithContext(ctx context.Context) }
Tx is a transaction in the store.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
package migration This package contains utility types for composing and running schema and data migrations in a strictly serial and ordered nature; against a backing kv.SchemaStore implementation.
|
package migration This package contains utility types for composing and running schema and data migrations in a strictly serial and ordered nature; against a backing kv.SchemaStore implementation. |
all
package all This package is the canonical location for all migrations being made against the single shared kv.Store implementation used by InfluxDB (while it remains a single store).
|
package all This package is the canonical location for all migrations being made against the single shared kv.Store implementation used by InfluxDB (while it remains a single store). |
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |