kv

package
v2.7.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2024 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

package kv

The KV package is a set of services and abstractions built around key value storage. There exist in-memory and persisted implementations of the core `Store` family of interfaces outside of this package (see `inmem` and `bolt` packages).

The `Store` interface exposes transactional access to a backing kv persistence layer. It allows for read-only (View) and read-write (Update) transactions to be opened. These methods take a function which is passed an implementation of the transaction interface (Tx). This interface exposes a way to manipulate namespaced keys and values (Buckets).

All keys and values are namespaced (grouped) using buckets. Buckets can only be created on implementations of the `SchemaStore` interface. This is a superset of the `Store` interface, which has the additional bucket creation and deletion methods.

Bucket creation and deletion should be facilitated via a migration (see `kv/migration`).

Index

Constants

View Source
const (
	// DefaultSourceID it the default source identifier
	DefaultSourceID = "020f755c3c082000"
	// DefaultSourceOrganizationID is the default source's organization identifier
	DefaultSourceOrganizationID = "50616e67652c206c"
)
View Source
const OpPrefix = "kv/"

OpPrefix is the prefix for kv errors.

Variables

View Source
var (
	// ErrScraperNotFound is used when the scraper configuration is not found.
	ErrScraperNotFound = &errors.Error{
		Msg:  "scraper target is not found",
		Code: errors.ENotFound,
	}

	// ErrInvalidScraperID is used when the service was provided
	// an invalid ID format.
	ErrInvalidScraperID = &errors.Error{
		Code: errors.EInvalid,
		Msg:  "provided scraper target ID has invalid format",
	}

	// ErrInvalidScrapersBucketID is used when the service was provided
	// an invalid ID format.
	ErrInvalidScrapersBucketID = &errors.Error{
		Code: errors.EInvalid,
		Msg:  "provided bucket ID has invalid format",
	}

	// ErrInvalidScrapersOrgID is used when the service was provided
	// an invalid ID format.
	ErrInvalidScrapersOrgID = &errors.Error{
		Code: errors.EInvalid,
		Msg:  "provided organization ID has invalid format",
	}
)
View Source
var (
	// ErrKeyNotFound is the error returned when the key requested is not found.
	ErrKeyNotFound = errors.New("key not found")
	// ErrBucketNotFound is the error returned when the bucket cannot be found.
	ErrBucketNotFound = errors.New("bucket not found")
	// ErrTxNotWritable is the error returned when an mutable operation is called during
	// a non-writable transaction.
	ErrTxNotWritable = errors.New("transaction is not writable")
	// ErrSeekMissingPrefix is returned when seek bytes is missing the prefix defined via
	// WithCursorPrefix
	ErrSeekMissingPrefix = errors.New("seek missing prefix bytes")
)
View Source
var DefaultSource = influxdb.Source{
	Default: true,
	Name:    "autogen",
	Type:    influxdb.SelfSourceType,
}

DefaultSource is the default source.

View Source
var (
	// ErrKeyInvalidCharacters is returned when a foreignKey or primaryKey contains
	//
	ErrKeyInvalidCharacters = errors.New("key: contains invalid characters")
)
View Source
var (

	// ErrKeyValueLogBoundsNotFound is returned when oplog entries cannot be located
	// for the provided bounds
	ErrKeyValueLogBoundsNotFound = &errors.Error{
		Code: errors.ENotFound,
		Msg:  "oplog not found",
	}
)
View Source
var NotUniqueError = &errors.Error{
	Code: errors.EConflict,
	Msg:  "name already exists",
}

NotUniqueError is used when attempting to create a resource that already exists.

Functions

func CorruptScraperError

func CorruptScraperError(err error) *errors.Error

CorruptScraperError is used when the config cannot be unmarshalled from the bytes stored in the kv.

func DecIndexID

func DecIndexID(key, val []byte) ([]byte, interface{}, error)

DecIndexID decodes the bucket val into an influxdb.ID.

func DecodeOrgNameKey

func DecodeOrgNameKey(k []byte) (platform.ID, string, error)

DecodeOrgNameKey decodes a raw bucket key into the organization id and name used to create it.

func EncBodyJSON

func EncBodyJSON(ent Entity) ([]byte, string, error)

EncBodyJSON JSON encodes the entity body and returns the raw bytes and indicates that it uses the entity body.

func EncIDKey

func EncIDKey(ent Entity) ([]byte, string, error)

EncIDKey encodes an entity into a key that represents the encoded ID provided.

func EncUniqKey

func EncUniqKey(ent Entity) ([]byte, string, error)

EncUniqKey encodes the unique key.

func ErrUnprocessableScraper

func ErrUnprocessableScraper(err error) *errors.Error

ErrUnprocessableScraper is used when a scraper is not able to be converted to JSON.

func IndexKey added in v2.0.3

func IndexKey(foreignKey, primaryKey []byte) (newKey []byte, err error)

IndexKey returns a value suitable for use as the key component when storing values in the index. IndexKey returns an ErrKeyInvalidCharacters error if either the foreignKey or primaryKey contains a /.

func InternalScraperServiceError

func InternalScraperServiceError(err error) *errors.Error

InternalScraperServiceError is used when the error comes from an internal system.

func IsErrUnexpectedDecodeVal

func IsErrUnexpectedDecodeVal(ok bool) error

func IsNotFound

func IsNotFound(err error) bool

IsNotFound returns a boolean indicating whether the error is known to report that a key or was not found.

func UnexpectedIndexError

func UnexpectedIndexError(err error) *errors.Error

UnexpectedIndexError is used when the error comes from an internal system.

func UnexpectedScrapersBucketError

func UnexpectedScrapersBucketError(err error) *errors.Error

UnexpectedScrapersBucketError is used when the error comes from an internal system.

func WalkCursor

func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error)

WalkCursor consumers the forward cursor call visit for each k/v pair found

func WithIndexMigrationCleanup

func WithIndexMigrationCleanup(m *IndexMigration)

WithIndexMigrationCleanup removes index entries which point to missing items in the source bucket.

func WithIndexReadPathEnabled

func WithIndexReadPathEnabled(i *Index)

WithIndexReadPathEnabled enables the read paths of the index (Walk) This should be enabled once the index has been fully populated and the Insert and Delete paths are correctly integrated.

Types

type Bucket

type Bucket interface {
	// TODO context?
	// Get returns a key within this bucket. Errors if key does not exist.
	Get(key []byte) ([]byte, error)
	// GetBatch returns a corresponding set of values for the provided
	// set of keys. If a value cannot be found for any provided key its
	// value will be nil at the same index for the provided key.
	GetBatch(keys ...[]byte) ([][]byte, error)
	// Cursor returns a cursor at the beginning of this bucket optionally
	// using the provided hints to improve performance.
	Cursor(hints ...CursorHint) (Cursor, error)
	// Put should error if the transaction it was called in is not writable.
	Put(key, value []byte) error
	// Delete should error if the transaction it was called in is not writable.
	Delete(key []byte) error
	// ForwardCursor returns a forward cursor from the seek position provided.
	// Other options can be supplied to provide direction and hints.
	ForwardCursor(seek []byte, opts ...CursorOption) (ForwardCursor, error)
}

Bucket is the abstraction used to perform get/put/delete/get-many operations in a key value store.

type ConvertValToEntFn

type ConvertValToEntFn func(k []byte, v interface{}) (Entity, error)

ConvertValToEntFn converts a key and decoded bucket value to an entity.

type Cursor

type Cursor interface {
	// Seek moves the cursor forward until reaching prefix in the key name.
	Seek(prefix []byte) (k []byte, v []byte)
	// First moves the cursor to the first key in the bucket.
	First() (k []byte, v []byte)
	// Last moves the cursor to the last key in the bucket.
	Last() (k []byte, v []byte)
	// Next moves the cursor to the next key in the bucket.
	Next() (k []byte, v []byte)
	// Prev moves the cursor to the prev key in the bucket.
	Prev() (k []byte, v []byte)
}

Cursor is an abstraction for iterating/ranging through data. A concrete implementation of a cursor can be found in cursor.go.

func NewStaticCursor

func NewStaticCursor(pairs []Pair) Cursor

NewStaticCursor returns an instance of a StaticCursor. It destructively sorts the provided pairs to be in key ascending order.

type CursorConfig

type CursorConfig struct {
	Direction CursorDirection
	Hints     CursorHints
	Prefix    []byte
	SkipFirst bool
	Limit     *int
}

CursorConfig is a type used to configure a new forward cursor. It includes a direction and a set of hints

func NewCursorConfig

func NewCursorConfig(opts ...CursorOption) CursorConfig

NewCursorConfig constructs and configures a CursorConfig used to configure a forward cursor.

type CursorDirection

type CursorDirection int

CursorDirection is an integer used to define the direction a request cursor operates in.

const (
	// CursorAscending directs a cursor to range in ascending order
	CursorAscending CursorDirection = iota
	// CursorAscending directs a cursor to range in descending order
	CursorDescending
)

type CursorHint

type CursorHint func(*CursorHints)

CursorHint configures CursorHints

func WithCursorHintKeyStart

func WithCursorHintKeyStart(start string) CursorHint

WithCursorHintKeyStart is a hint to the store that the caller is interested in reading keys from start.

func WithCursorHintPredicate

func WithCursorHintPredicate(f CursorPredicateFunc) CursorHint

WithCursorHintPredicate is a hint to the store to return only key / values which return true for the f.

The primary concern of the predicate is to improve performance. Therefore, it should perform tests on the data at minimal cost. If the predicate has no meaningful impact on reducing memory or CPU usage, there is no benefit to using it.

func WithCursorHintPrefix

func WithCursorHintPrefix(prefix string) CursorHint

WithCursorHintPrefix is a hint to the store that the caller is only interested keys with the specified prefix.

type CursorHints

type CursorHints struct {
	KeyPrefix   *string
	KeyStart    *string
	PredicateFn CursorPredicateFunc
}

type CursorOption

type CursorOption func(*CursorConfig)

CursorOption is a functional option for configuring a forward cursor

func WithCursorDirection

func WithCursorDirection(direction CursorDirection) CursorOption

WithCursorDirection sets the cursor direction on a provided cursor config

func WithCursorHints

func WithCursorHints(hints ...CursorHint) CursorOption

WithCursorHints configs the provided hints on the cursor config

func WithCursorLimit

func WithCursorLimit(limit int) CursorOption

WithCursorLimit restricts the number of key values return by the cursor to the provided limit count.

func WithCursorPrefix

func WithCursorPrefix(prefix []byte) CursorOption

WithCursorPrefix configures the forward cursor to retrieve keys with a particular prefix. This implies the cursor will start and end at a specific location based on the prefix [prefix, prefix + 1).

The value of the seek bytes must be prefixed with the provided prefix, otherwise an error will be returned.

func WithCursorSkipFirstItem

func WithCursorSkipFirstItem() CursorOption

WithCursorSkipFirstItem skips returning the first item found within the seek.

type CursorPredicateFunc

type CursorPredicateFunc func(key, value []byte) bool

type DecodeBucketValFn

type DecodeBucketValFn func(key, val []byte) (keyRepeat []byte, decodedVal interface{}, err error)

DecodeBucketValFn decodes the raw []byte.

type DeleteOpts

type DeleteOpts struct {
	DeleteRelationFns []DeleteRelationsFn
	FilterFn          FilterFn
}

DeleteOpts provides indicators to the store.Delete call for deleting a given entity. The FilterFn indicates the current value should be deleted when returning true.

type DeleteRelationsFn

type DeleteRelationsFn func(key []byte, decodedVal interface{}) error

DeleteRelationsFn is a hook that a store that composes other stores can use to delete an entity and any relations it may share. An example would be deleting an an entity and its associated index.

type DocumentStore

type DocumentStore struct {
	// contains filtered or unexported fields
}

DocumentStore implements influxdb.DocumentStore.

func (*DocumentStore) CreateDocument

func (s *DocumentStore) CreateDocument(ctx context.Context, d *influxdb.Document) error

CreateDocument creates an instance of a document and sets the ID. After which it applies each of the options provided.

func (*DocumentStore) FindDocument

func (s *DocumentStore) FindDocument(ctx context.Context, id platform.ID) (*influxdb.Document, error)

FindDocument retrieves the specified document with all its content and labels.

func (*DocumentStore) FindDocuments

func (s *DocumentStore) FindDocuments(ctx context.Context, _ platform.ID) ([]*influxdb.Document, error)

FindDocuments retrieves all documents returned by the document find options.

func (*DocumentStore) PutDocument

func (s *DocumentStore) PutDocument(ctx context.Context, d *influxdb.Document) error

type EncodeEntFn

type EncodeEntFn func(ent Entity) ([]byte, string, error)

EncodeEntFn encodes the entity. This is used both for the key and vals in the store base.

type EncodeFn

type EncodeFn func() ([]byte, error)

EncodeFn returns an encoding when called. Closures are your friend here.

func EncBytes

func EncBytes(b []byte) EncodeFn

EncBytes is a basic pass through for providing raw bytes.

func EncID

func EncID(id platform.ID) EncodeFn

EncID encodes an influx ID.

func EncString

func EncString(str string) EncodeFn

EncString encodes a string.

func EncStringCaseInsensitive

func EncStringCaseInsensitive(str string) EncodeFn

EncStringCaseInsensitive encodes a string and makes it case insensitive by lower casing everything.

func Encode

func Encode(encodings ...EncodeFn) EncodeFn

Encode concatenates a list of encodings together.

type Entity

type Entity struct {
	PK        EncodeFn
	UniqueKey EncodeFn

	Body interface{}
}

type FilterFn

type FilterFn func(key []byte, decodedVal interface{}) bool

FilterFn will provide an indicator to the Find or Delete calls that the entity that was seen is one that is valid and should be either captured or deleted (depending on the caller of the filter func).

type FindCaptureFn

type FindCaptureFn func(key []byte, decodedVal interface{}) error

FindCaptureFn is the mechanism for closing over the key and decoded value pair for adding results to the call sites collection. This generic implementation allows it to be reused. The returned decodedVal should always satisfy whatever decoding of the bucket value was set on the storeo that calls Find.

type FindOpts

type FindOpts struct {
	Descending  bool
	Offset      int
	Limit       int
	Prefix      []byte
	CaptureFn   FindCaptureFn
	FilterEntFn FilterFn
}

FindOpts provided a means to search through the bucket. When a filter func is provided, that will run against the entity and if the filter responds true, will count it towards the number of entries seen and the capture func will be run with it provided to it.

type ForwardCursor

type ForwardCursor interface {
	// Next moves the cursor to the next key in the bucket.
	Next() (k, v []byte)
	// Err returns non-nil if an error occurred during cursor iteration.
	// This should always be checked after Next returns a nil key/value.
	Err() error
	// Close is reponsible for freeing any resources created by the cursor.
	Close() error
}

ForwardCursor is an abstraction for interacting/ranging through data in one direction.

type Index

type Index struct {
	IndexMapping
	// contains filtered or unexported fields
}

Index is used to define and manage an index for a source bucket.

When using the index you must provide it with an IndexMapping. The IndexMapping provides the index with the contract it needs to populate the entire index and traverse a populated index correctly. The IndexMapping provides a way to retrieve the key on which to index with when provided with the value from the source. It also provides the way to access the source bucket.

The following is an illustration of its use:

byUserID := func(v []byte) ([]byte, error) {
    auth := &influxdb.Authorization{}

    if err := json.Unmarshal(v, auth); err != nil {
        return err
    }

    return auth.UserID.Encode()
}

// configure a write only index
indexByUser := NewIndex(NewSource([]byte(`authorizationsbyuserv1/), byUserID))

indexByUser.Insert(tx, someUserID, someAuthID)

indexByUser.Delete(tx, someUserID, someAuthID)

indexByUser.Walk(tx, someUserID, func(k, v []byte) error {
    auth := &influxdb.Authorization{}
    if err := json.Unmarshal(v, auth); err != nil {
        return err
    }

    // do something with auth

    return nil
})

// verify the current index against the source and return the differences
// found in each
diff, err := indexByUser.Verify(ctx, tx)

func NewIndex

func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index

NewIndex configures and returns a new *Index for a given index mapping. By default the read path (Walk) is disabled. This is because the index needs to be fully populated before depending upon the read path. The read path can be enabled using WithIndexReadPathEnabled option.

func (*Index) Delete

func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error

Delete removes the foreignKey and primaryKey mapping from the underlying index.

func (*Index) Insert

func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error

Insert creates a single index entry for the provided primary key on the foreign key.

func (*Index) Verify

func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error)

Verify returns the difference between a source and its index The difference contains items in the source that are not in the index and vice-versa.

func (*Index) Walk

func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn VisitFunc) error

Walk walks the source bucket using keys found in the index using the provided foreign key given the index has been fully populated.

type IndexDiff

type IndexDiff struct {
	// PresentInIndex is a map of foreign key to primary keys
	// present in the index.
	PresentInIndex map[string]map[string]struct{}
	// MissingFromIndex is a map of foreign key to associated primary keys
	// missing from the index given the source bucket.
	// These items could be due to the fact an index populate migration has
	// not yet occurred, the index populate code is incorrect or the write path
	// for your resource type does not yet insert into the index as well (Create actions).
	MissingFromIndex map[string]map[string]struct{}
	// MissingFromSource is a map of foreign key to associated primary keys
	// missing from the source but accounted for in the index.
	// This happens when index items are not properly removed from the index
	// when an item is removed from the source (Delete actions).
	MissingFromSource map[string]map[string]struct{}
}

IndexDiff contains a set of items present in the source not in index, along with a set of things in the index which are not in the source.

func (*IndexDiff) Corrupt

func (i *IndexDiff) Corrupt() (corrupt []string)

Corrupt returns a list of foreign keys which have corrupted indexes (partial) These are foreign keys which map to a subset of the primary keys which they should be associated with.

type IndexMapping

type IndexMapping interface {
	SourceBucket() []byte
	IndexBucket() []byte
	IndexSourceOn(value []byte) (foreignKey []byte, err error)
}

IndexMapping is a type which configures and Index to map items from a source bucket to an index bucket via a mapping known as IndexSourceOn. This function is called on the values in the source to derive the foreign key on which to index each item.

func NewIndexMapping

func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) IndexMapping

NewIndexMapping creates an implementation of IndexMapping for the provided source bucket to a destination index bucket.

type IndexMigration

type IndexMigration struct {
	IndexMapping
	// contains filtered or unexported fields
}

IndexMigration is a migration for adding and removing an index. These are constructed via the Index.Migration function.

func NewIndexMigration

func NewIndexMigration(mapping IndexMapping, opts ...IndexMigrationOption) *IndexMigration

NewIndexMigration construct a migration for creating and populating an index

func (*IndexMigration) Down

func (i *IndexMigration) Down(ctx context.Context, store SchemaStore) error

Down deletes all entries from the index.

func (*IndexMigration) MigrationName

func (i *IndexMigration) MigrationName() string

Name returns a readable name for the index migration.

func (*IndexMigration) Populate

func (i *IndexMigration) Populate(ctx context.Context, store Store) (n int, err error)

Populate does a full population of the index using the IndexSourceOn IndexMapping function. Once completed it marks the index as ready for use. It return a nil error on success and the count of inserted items.

func (*IndexMigration) Up

func (i *IndexMigration) Up(ctx context.Context, store SchemaStore) (err error)

Up initializes the index bucket and populates the index.

type IndexMigrationOption

type IndexMigrationOption func(*IndexMigration)

IndexMigrationOption is a functional option for the IndexMigration type

func WithIndexMigationBatchSize

func WithIndexMigationBatchSize(n int) IndexMigrationOption

WithIndexMigationBatchSize configures the size of the batches when committing changes to entire index during migration (e.g. size of put batch on index populate).

type IndexOption

type IndexOption func(*Index)

IndexOption is a function which configures an index

type IndexSourceOnFunc

type IndexSourceOnFunc func([]byte) ([]byte, error)

IndexSourceOnFunc is a function which can be used to derive the foreign key of a value in a source bucket.

type IndexStore

type IndexStore struct {
	Resource   string
	EntStore   *StoreBase
	IndexStore *StoreBase
}

IndexStore provides a entity store that uses an index lookup. The index store manages deleting and creating indexes for the caller. The index is automatically used if the FindEnt entity entity does not have the primary key.

func (*IndexStore) Delete

func (s *IndexStore) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error

Delete deletes entities and associated indexes.

func (*IndexStore) DeleteEnt

func (s *IndexStore) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error

DeleteEnt deletes an entity and associated index.

func (*IndexStore) Find

func (s *IndexStore) Find(ctx context.Context, tx Tx, opts FindOpts) error

Find provides a mechanism for looking through the bucket via the set options. When a prefix is provided, it will be used within the entity store. If you would like to search the index store, then you can by calling the index store directly.

func (*IndexStore) FindEnt

func (s *IndexStore) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{}, error)

FindEnt returns the decoded entity body via teh provided entity. An example entity should not include a Body, but rather the ID, Name, or OrgID. If no ID is provided, then the algorithm assumes you are looking up the entity by the index.

func (*IndexStore) Put

func (s *IndexStore) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error

Put will persist the entity into both the entity store and the index store.

type InitialMigration

type InitialMigration struct{}

func (InitialMigration) Down

func (m InitialMigration) Down(ctx context.Context, store SchemaStore) error

Down is a no operation required for service to be used as a migration

func (InitialMigration) MigrationName

func (m InitialMigration) MigrationName() string

MigrationName returns the string initial migration which allows this store to be used as a migration

func (InitialMigration) Up

Up initializes all the owned buckets of the underlying store

type Pair

type Pair struct {
	Key   []byte
	Value []byte
}

Pair is a struct for key value pairs.

type PutOptionFn

type PutOptionFn func(o *putOption) error

PutOptionFn provides a hint to the store to make some guarantees about the put action. I.e. If it is new, then will validate there is no existing entity by the given PK.

func PutNew

func PutNew() PutOptionFn

PutNew will create an entity that is not does not already exist. Guarantees uniqueness by the store's uniqueness guarantees.

func PutUpdate

func PutUpdate() PutOptionFn

PutUpdate will update an entity that must already exist.

type SchemaStore

type SchemaStore interface {
	Store

	// CreateBucket creates a bucket on the underlying store if it does not exist
	CreateBucket(ctx context.Context, bucket []byte) error
	// DeleteBucket deletes a bucket on the underlying store if it exists
	DeleteBucket(ctx context.Context, bucket []byte) error
}

SchemaStore is a superset of Store along with store schema change functionality like bucket creation and deletion.

This type is made available via the `kv/migration` package. It should be consumed via this package to create and delete buckets using a migration. Checkout the internal tool `cmd/internal/kvmigrate` for building a new migration Go file into the correct location (in kv/migration/all.go). Configuring your bucket here will ensure it is created properly on initialization of InfluxDB.

type Service

type Service struct {
	Config ServiceConfig

	IDGenerator platform.IDGenerator

	// FluxLanguageService is used for parsing flux.
	// If this is unset, operations that require parsing flux
	// will fail.
	FluxLanguageService fluxlang.FluxLanguageService

	TokenGenerator influxdb.TokenGenerator
	// TODO(desa:ariel): this should not be embedded
	influxdb.TimeGenerator
	// contains filtered or unexported fields
}

Service is the struct that influxdb services are implemented on.

func NewService

func NewService(log *zap.Logger, kv Store, orgs influxdb.OrganizationService, configs ...ServiceConfig) *Service

NewService returns an instance of a Service.

func (*Service) AddLogEntry

func (s *Service) AddLogEntry(ctx context.Context, k, v []byte, t time.Time) error

AddLogEntry logs an keyValue for a particular resource type ID pairing.

func (*Service) AddLogEntryTx

func (s *Service) AddLogEntryTx(ctx context.Context, tx Tx, k, v []byte, t time.Time) error

func (*Service) AddRunLog

func (s *Service) AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error

AddRunLog adds a log line to the run.

func (*Service) AddTarget

func (s *Service) AddTarget(ctx context.Context, target *influxdb.ScraperTarget, userID platform.ID) (err error)

AddTarget add a new scraper target into storage.

func (*Service) Backup

func (s *Service) Backup(ctx context.Context, w io.Writer) error

func (*Service) CancelRun

func (s *Service) CancelRun(ctx context.Context, taskID, runID platform.ID) error

CancelRun cancels a currently running run.

func (*Service) CreateDocumentStore

func (s *Service) CreateDocumentStore(ctx context.Context, ns string) (influxdb.DocumentStore, error)

CreateDocumentStore creates an instance of a document store by instantiating the buckets for the store.

func (*Service) CreateRun

func (s *Service) CreateRun(ctx context.Context, taskID platform.ID, scheduledFor time.Time, runAt time.Time) (*taskmodel.Run, error)

CreateRun creates a run with a scheduledFor time as now.

func (*Service) CreateSource

func (s *Service) CreateSource(ctx context.Context, src *influxdb.Source) error

CreateSource creates a influxdb source and sets s.ID.

func (*Service) CreateTask

func (s *Service) CreateTask(ctx context.Context, tc taskmodel.TaskCreate) (*taskmodel.Task, error)

CreateTask creates a new task. The owner of the task is inferred from the authorizer associated with ctx.

func (*Service) CreateVariable

func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) error

CreateVariable creates a new variable and assigns it an ID

func (*Service) CurrentlyRunning

func (s *Service) CurrentlyRunning(ctx context.Context, taskID platform.ID) ([]*taskmodel.Run, error)

func (*Service) DefaultSource

func (s *Service) DefaultSource(ctx context.Context) (*influxdb.Source, error)

DefaultSource retrieves the default source.

func (*Service) DeleteSource

func (s *Service) DeleteSource(ctx context.Context, id platform.ID) error

DeleteSource deletes a source and prunes it from the index.

func (*Service) DeleteTask

func (s *Service) DeleteTask(ctx context.Context, id platform.ID) error

DeleteTask removes a task by ID and purges all associated data and scheduled runs.

func (*Service) DeleteVariable

func (s *Service) DeleteVariable(ctx context.Context, id platform.ID) error

DeleteVariable removes a single variable from the store by its ID

func (*Service) FindDocumentStore

func (s *Service) FindDocumentStore(ctx context.Context, ns string) (influxdb.DocumentStore, error)

FindDocumentStore finds the buckets associated with the namespace provided.

func (*Service) FindLogs

func (s *Service) FindLogs(ctx context.Context, filter taskmodel.LogFilter) ([]*taskmodel.Log, int, error)

FindLogs returns logs for a run.

func (*Service) FindRunByID

func (s *Service) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)

FindRunByID returns a single run.

func (*Service) FindRuns

func (s *Service) FindRuns(ctx context.Context, filter taskmodel.RunFilter) ([]*taskmodel.Run, int, error)

FindRuns returns a list of runs that match a filter and the total count of returned runs.

func (*Service) FindSourceByID

func (s *Service) FindSourceByID(ctx context.Context, id platform.ID) (*influxdb.Source, error)

FindSourceByID retrieves a source by id.

func (*Service) FindSources

func (s *Service) FindSources(ctx context.Context, opt influxdb.FindOptions) ([]*influxdb.Source, int, error)

FindSources retrieves all sources that match an arbitrary source filter. Filters using ID, or OrganizationID and source Name should be efficient. Other filters will do a linear scan across all sources searching for a match.

func (*Service) FindTaskByID

func (s *Service) FindTaskByID(ctx context.Context, id platform.ID) (*taskmodel.Task, error)

FindTaskByID returns a single task

func (*Service) FindTasks

func (s *Service) FindTasks(ctx context.Context, filter taskmodel.TaskFilter) ([]*taskmodel.Task, int, error)

FindTasks returns a list of tasks that match a filter (limit 100) and the total count of matching tasks.

func (*Service) FindVariableByID

func (s *Service) FindVariableByID(ctx context.Context, id platform.ID) (*influxdb.Variable, error)

FindVariableByID finds a single variable in the store by its ID

func (*Service) FindVariables

func (s *Service) FindVariables(ctx context.Context, filter influxdb.VariableFilter, opt ...influxdb.FindOptions) ([]*influxdb.Variable, error)

FindVariables returns all variables in the store

func (*Service) FinishRun

func (s *Service) FinishRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)

FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.

func (*Service) FirstLogEntry

func (s *Service) FirstLogEntry(ctx context.Context, k []byte) ([]byte, time.Time, error)

FirstLogEntry retrieves the first log entry for a key value log.

func (*Service) ForEachLogEntry

func (s *Service) ForEachLogEntry(ctx context.Context, k []byte, opts platform.FindOptions, fn func([]byte, time.Time) error) error

ForEachLogEntry retrieves the keyValue log for a resource type ID combination. KeyValues may be returned in ascending and descending order.

func (*Service) ForEachLogEntryTx

func (s *Service) ForEachLogEntryTx(ctx context.Context, tx Tx, k []byte, opts platform.FindOptions, fn func([]byte, time.Time) error) error

func (*Service) ForceRun

func (s *Service) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*taskmodel.Run, error)

ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible. The value of scheduledFor may or may not align with the task's schedule.

func (*Service) GetTargetByID

func (s *Service) GetTargetByID(ctx context.Context, id platform.ID) (*influxdb.ScraperTarget, error)

GetTargetByID retrieves a scraper target by id.

func (*Service) LastLogEntry

func (s *Service) LastLogEntry(ctx context.Context, k []byte) ([]byte, time.Time, error)

LastLogEntry retrieves the first log entry for a key value log.

func (*Service) ListTargets

func (s *Service) ListTargets(ctx context.Context, filter influxdb.ScraperTargetFilter) ([]influxdb.ScraperTarget, error)

ListTargets will list all scrape targets.

func (*Service) ManualRuns

func (s *Service) ManualRuns(ctx context.Context, taskID platform.ID) ([]*taskmodel.Run, error)

func (*Service) PutSource

func (s *Service) PutSource(ctx context.Context, src *influxdb.Source) error

PutSource will put a source without setting an ID.

func (*Service) PutTarget

func (s *Service) PutTarget(ctx context.Context, target *influxdb.ScraperTarget) error

PutTarget will put a scraper target without setting an ID.

func (*Service) RemoveTarget

func (s *Service) RemoveTarget(ctx context.Context, id platform.ID) error

RemoveTarget removes a scraper target from the bucket.

func (*Service) ReplaceVariable

func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) error

ReplaceVariable replaces a variable that exists in the store or creates it if it does not

func (*Service) RetryRun

func (s *Service) RetryRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)

RetryRun creates and returns a new run (which is a retry of another run).

func (*Service) StartManualRun

func (s *Service) StartManualRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)

func (*Service) UpdateRunState

func (s *Service) UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, state taskmodel.RunStatus) error

UpdateRunState sets the run state at the respective time.

func (*Service) UpdateSource

func (s *Service) UpdateSource(ctx context.Context, id platform.ID, upd influxdb.SourceUpdate) (*influxdb.Source, error)

UpdateSource updates a source according the parameters set on upd.

func (*Service) UpdateTarget

func (s *Service) UpdateTarget(ctx context.Context, update *influxdb.ScraperTarget, userID platform.ID) (*influxdb.ScraperTarget, error)

UpdateTarget updates a scraper target.

func (*Service) UpdateTask

func (s *Service) UpdateTask(ctx context.Context, id platform.ID, upd taskmodel.TaskUpdate) (*taskmodel.Task, error)

UpdateTask updates a single task with changeset.

func (*Service) UpdateVariable

func (s *Service) UpdateVariable(ctx context.Context, id platform.ID, update *influxdb.VariableUpdate) (*influxdb.Variable, error)

UpdateVariable updates a single variable in the store with a changeset

func (*Service) WithResourceLogger

func (s *Service) WithResourceLogger(audit resource.Logger)

WithResourceLogger sets the resource audit logger for the service.

func (*Service) WithStore

func (s *Service) WithStore(store Store)

WithStore sets kv store for the service. Should only be used in tests for mocking.

type ServiceConfig

type ServiceConfig struct {
	Clock               clock.Clock
	FluxLanguageService fluxlang.FluxLanguageService
}

ServiceConfig allows us to configure Services

type Store

type Store interface {
	// View opens up a transaction that will not write to any data. Implementing interfaces
	// should take care to ensure that all view transactions do not mutate any data.
	View(context.Context, func(Tx) error) error
	// Update opens up a transaction that will mutate data.
	Update(context.Context, func(Tx) error) error
	// Backup copies all K:Vs to a writer, file format determined by implementation.
	Backup(ctx context.Context, w io.Writer) error
	// Restore replaces the underlying data file with the data from r.
	Restore(ctx context.Context, r io.Reader) error
	// RLock takes a read lock on the underlying KV store.
	RLock()
	// RUnlock releases a previously-taken read lock
	RUnlock()
}

Store is an interface for a generic key value store. It is modeled after the boltdb database struct.

type StoreBase

type StoreBase struct {
	Resource string
	BktName  []byte

	EncodeEntKeyFn    EncodeEntFn
	EncodeEntBodyFn   EncodeEntFn
	DecodeEntFn       DecodeBucketValFn
	ConvertValToEntFn ConvertValToEntFn
}

StoreBase is the base behavior for accessing buckets in kv. It provides mechanisms that can be used in composing stores together (i.e. IndexStore).

func NewOrgNameKeyStore

func NewOrgNameKeyStore(resource string, bktName []byte, caseSensitive bool) *StoreBase

NewOrgNameKeyStore creates a store for an entity's unique index on organization id and name. This is used throughout the kv pkg here to provide an entity uniquness by name within an org.

func NewStoreBase

func NewStoreBase(resource string, bktName []byte, encKeyFn, encBodyFn EncodeEntFn, decFn DecodeBucketValFn, decToEntFn ConvertValToEntFn) *StoreBase

NewStoreBase creates a new store base.

func (*StoreBase) Delete

func (s *StoreBase) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error

Delete deletes entities by the provided options.

func (*StoreBase) DeleteEnt

func (s *StoreBase) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error

DeleteEnt deletes an entity.

func (*StoreBase) EntKey

func (s *StoreBase) EntKey(ctx context.Context, ent Entity) ([]byte, error)

EntKey returns the key for the entity provided. This is a shortcut for grabbing the EntKey without having to juggle the encoding funcs.

func (*StoreBase) Find

func (s *StoreBase) Find(ctx context.Context, tx Tx, opts FindOpts) error

Find provides a mechanism for looking through the bucket via the set options. When a prefix is provided, the prefix is used to seek the bucket.

func (*StoreBase) FindEnt

func (s *StoreBase) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{}, error)

FindEnt returns the decoded entity body via the provided entity. An example entity should not include a Body, but rather the ID, Name, or OrgID.

func (*StoreBase) Put

func (s *StoreBase) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error

Put will persist the entity.

type Tx

type Tx interface {
	// Bucket possibly creates and returns bucket, b.
	Bucket(b []byte) (Bucket, error)
	// Context returns the context associated with this Tx.
	Context() context.Context
	// WithContext associates a context with this Tx.
	WithContext(ctx context.Context)
}

Tx is a transaction in the store.

type VisitFunc

type VisitFunc func(k, v []byte) (bool, error)

VisitFunc is called for each k, v byte slice pair from the underlying source bucket which are found in the index bucket for a provided foreign key.

Directories

Path Synopsis
package migration
package migration
all
package all
package all
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL