boltjobstore

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultBucketSearchSliceSize = 16
	BucketPathDelimiter          = "/"
)
View Source
const (
	BoltDBBucketNotFound    bacerrors.ErrorCode = "BoltDBBucketNotFound"
	BoltDBBucketExists      bacerrors.ErrorCode = "BoltDBBucketExists"
	BoltDBTxNotWritable     bacerrors.ErrorCode = "BoltDBTxNotWritable"
	BoltDBIncompatibleValue bacerrors.ErrorCode = "BoltDBIncompatibleValue"
	BoltDBKeyRequired       bacerrors.ErrorCode = "BoltDBKeyRequired"
	BoltDBKeyTooLarge       bacerrors.ErrorCode = "BoltDBKeyTooLarge"
	BoltDBValueTooLarge     bacerrors.ErrorCode = "BoltDBValueTooLarge"
)
View Source
const (
	BucketJobs           = "jobs"
	BucketJobExecutions  = "executions"
	BucketJobEvaluations = "evaluations"
	BucketJobHistory     = "history"

	BucketTagsIndex        = "idx_tags"        // tag -> Job id
	BucketProgressIndex    = "idx_inprogress"  // job-id -> {}
	BucketNamespacesIndex  = "idx_namespaces"  // namespace -> Job id
	BucketExecutionsIndex  = "idx_executions"  // execution-id -> Job id
	BucketEvaluationsIndex = "idx_evaluations" // evaluation-id -> Job id
)
View Source
const BoltDBComponent = "BoltDB"

Variables

View Source
var SpecKey = []byte("spec")

Functions

func BucketSequenceString

func BucketSequenceString(_ *bolt.Tx, bucket *bolt.Bucket) string

BucketSequenceString returns the next sequence in the provided bucket, formatted as a 16 character padded string to ensure that bolt's lexicographic ordering will return them in the correct order

func GetBucketData

func GetBucketData(tx *bolt.Tx, bucketPath *BucketPath, key []byte) []byte

GetBucketData is a helper that will use the provided details to find a key in a specific bucket and return its data.

func GetBucketsByPrefix

func GetBucketsByPrefix(tx *bolt.Tx, bucket *bolt.Bucket, partialName []byte) ([][]byte, error)

GetBucketsByPrefix will search through the provided bucket to find other buckets with a name that starts with the partialname that is provided.

func NewBoltDBError added in v1.5.1

func NewBoltDBError(err error) bacerrors.Error

Types

type BoltJobStore

type BoltJobStore struct {
	// contains filtered or unexported fields
}

func NewBoltJobStore

func NewBoltJobStore(dbPath string, options ...Option) (*BoltJobStore, error)

NewBoltJobStore creates a new job store where data is held in buckets, and indexed by special Index instances, also backed by buckets. Data is currently structured as followed

bucket Jobs

bucket jobID
	key    spec
	key state -> state
	bucket executions -> key executionID -> Execution
	bucket history -> key  []sequence -> History
	bucket evaluations -> key executionID -> Execution

Indexes are structured as :

TagsIndex        = tag -> Job id
ProgressIndex    = job-id -> {}
NamespacesIndex  = namespace -> Job id
ExecutionsIndex  = execution-id -> Job id
EvaluationsIndex = evaluation-id -> Job id

func (*BoltJobStore) AddExecutionHistory added in v1.5.0

func (b *BoltJobStore) AddExecutionHistory(ctx context.Context, jobID, executionID string, events ...models.Event) error

AddExecutionHistory appends a new history entry to the execution history

func (*BoltJobStore) AddJobHistory added in v1.5.0

func (b *BoltJobStore) AddJobHistory(ctx context.Context, jobID string, events ...models.Event) error

AddJobHistory appends a new history entry to the job history

func (*BoltJobStore) BeginTx added in v1.3.1

func (b *BoltJobStore) BeginTx(ctx context.Context) (jobstore.TxContext, error)

BeginTx starts a new writable transaction for the store

func (*BoltJobStore) Close

func (b *BoltJobStore) Close(ctx context.Context) error

func (*BoltJobStore) CreateEvaluation

func (b *BoltJobStore) CreateEvaluation(ctx context.Context, eval models.Evaluation) error

CreateEvaluation creates a new evaluation

func (*BoltJobStore) CreateExecution

func (b *BoltJobStore) CreateExecution(ctx context.Context, execution models.Execution) error

CreateExecution creates a record of a new execution

func (*BoltJobStore) CreateJob

func (b *BoltJobStore) CreateJob(ctx context.Context, job models.Job) error

CreateJob creates a new record of a job in the data store

func (*BoltJobStore) DeleteEvaluation

func (b *BoltJobStore) DeleteEvaluation(ctx context.Context, id string) error

DeleteEvaluation deletes the specified evaluation

func (*BoltJobStore) DeleteJob

func (b *BoltJobStore) DeleteJob(ctx context.Context, jobID string) error

DeleteJob removes the specified job from the system entirely

func (*BoltJobStore) GetEvaluation

func (b *BoltJobStore) GetEvaluation(ctx context.Context, id string) (models.Evaluation, error)

GetEvaluation retrieves the specified evaluation

func (*BoltJobStore) GetExecutions

func (b *BoltJobStore) GetExecutions(ctx context.Context, options jobstore.GetExecutionsOptions) ([]models.Execution, error)

GetExecutions returns the current job state for the provided job id

func (*BoltJobStore) GetInProgressJobs

func (b *BoltJobStore) GetInProgressJobs(ctx context.Context, jobType string) ([]models.Job, error)

GetInProgressJobs gets a list of the currently in-progress jobs, if a job type is supplied then only jobs of that type will be retrieved

func (*BoltJobStore) GetJob

func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error)

GetJob retrieves the Job identified by the id string. If the job isn't found it will return an indicating the error.

func (*BoltJobStore) GetJobHistory

GetJobHistory retrieves the paginated job history for a given job ID based on the specified query.

This method performs a read transaction on the Bolt DB and fetches the job history for the specified jobID. It supports pagination by processing an offset and limit defined either in the query or via a `NextToken`. Pagination tokens help in fetching the next set of results if the query returns a partial result due to the limit.

Pagination Behavior:

  • The `NextToken` in the query allows the caller to continue fetching subsequent pages.
  • If the result set reaches the limit specified in the query, a new `NextToken` is generated.
  • If no records are found in the current query, but the job or execution is not in a terminal state, the same `NextToken` will be returned to indicate that more history might still be available in the future.
  • Pagination only stops when there are no more records to fetch *and* the job/execution is in a terminal state.

func (*BoltJobStore) GetJobs

GetJobs returns all Jobs that match the provided query

func (*BoltJobStore) UpdateExecution

func (b *BoltJobStore) UpdateExecution(ctx context.Context, request jobstore.UpdateExecutionRequest) error

UpdateExecution updates the state of a single execution by loading from storage, updating and then writing back in a single transaction

func (*BoltJobStore) UpdateJobState

func (b *BoltJobStore) UpdateJobState(ctx context.Context, request jobstore.UpdateJobStateRequest) error

UpdateJobState updates the current state for a single Job, appending an entry to the history at the same time

func (*BoltJobStore) Watch

type BucketPath

type BucketPath struct {
	// contains filtered or unexported fields
}

func NewBucketPath

func NewBucketPath(sections ...string) *BucketPath

NewBucketPath creates a bucket path which can be used to describe the nested relationship between buckets, rather than calling b.Bucket() on each b found. BucketPaths are typically described using strings like "root.bucket.here".

func (*BucketPath) Get

func (bp *BucketPath) Get(tx *bolt.Tx, create bool) (*bolt.Bucket, error)

Get retrieves the Bucket, or an error, for the bucket found at this path

func (*BucketPath) Sub

func (bp *BucketPath) Sub(names ...[]byte) *BucketPath

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index is a bucket type that encodes both a label and an identifier, for use as a sentinel marker to show the presence of a thing. For example an index for job `94b136a3` having label `gpu`, we would create the `gpu` bucket if it didn't exist, and then a bucket with the job ID.

Most methods will take a label, and an identifier and these serve as the attenuating field and the item itself. So for a client id index, where we want to have a list of job ids for each client, the index will look like

jobs_clients
   |----- CLIENT ID 1
             |---- JOBID 1
             |---- JOBID 2
   |----- CLIENT ID 2
    .....

In this case, JOBID1 is the identifier, and CLIENT ID 1 is the subpath/label. In some cases, such as indices for jobs in a specific state, we may not have/need the label and so subpath can be excluded instead. The primary use of this is currently the list of InProgress jobs where there is no attenuator (e.g. clientid)

func NewIndex

func NewIndex(bucketPath string) *Index

func (*Index) Add

func (i *Index) Add(tx *bolt.Tx, identifier []byte, subpath ...[]byte) error

func (*Index) List

func (i *Index) List(tx *bolt.Tx, subpath ...[]byte) ([][]byte, error)

func (*Index) Remove

func (i *Index) Remove(tx *bolt.Tx, identifier []byte, subpath ...[]byte) error

type Option

type Option func(store *BoltJobStore)

func WithClock

func WithClock(clock clock.Clock) Option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL