Documentation ¶
Index ¶
- Constants
- Variables
- func BucketSequenceString(_ *bolt.Tx, bucket *bolt.Bucket) string
- func GetBucketData(tx *bolt.Tx, bucketPath *BucketPath, key []byte) []byte
- func GetBucketsByPrefix(tx *bolt.Tx, bucket *bolt.Bucket, partialName []byte) ([][]byte, error)
- func NewBoltDBError(err error) bacerrors.Error
- type BoltJobStore
- func (b *BoltJobStore) AddExecutionHistory(ctx context.Context, jobID, executionID string, events ...models.Event) error
- func (b *BoltJobStore) AddJobHistory(ctx context.Context, jobID string, events ...models.Event) error
- func (b *BoltJobStore) BeginTx(ctx context.Context) (jobstore.TxContext, error)
- func (b *BoltJobStore) Close(ctx context.Context) error
- func (b *BoltJobStore) CreateEvaluation(ctx context.Context, eval models.Evaluation) error
- func (b *BoltJobStore) CreateExecution(ctx context.Context, execution models.Execution) error
- func (b *BoltJobStore) CreateJob(ctx context.Context, job models.Job) error
- func (b *BoltJobStore) DeleteEvaluation(ctx context.Context, id string) error
- func (b *BoltJobStore) DeleteJob(ctx context.Context, jobID string) error
- func (b *BoltJobStore) GetEvaluation(ctx context.Context, id string) (models.Evaluation, error)
- func (b *BoltJobStore) GetEventStore() watcher.EventStore
- func (b *BoltJobStore) GetExecutions(ctx context.Context, options jobstore.GetExecutionsOptions) ([]models.Execution, error)
- func (b *BoltJobStore) GetInProgressJobs(ctx context.Context, jobType string) ([]models.Job, error)
- func (b *BoltJobStore) GetJob(ctx context.Context, id string) (models.Job, error)
- func (b *BoltJobStore) GetJobHistory(ctx context.Context, jobID string, query jobstore.JobHistoryQuery) (*jobstore.JobHistoryQueryResponse, error)
- func (b *BoltJobStore) GetJobs(ctx context.Context, query jobstore.JobQuery) (*jobstore.JobQueryResponse, error)
- func (b *BoltJobStore) UpdateExecution(ctx context.Context, request jobstore.UpdateExecutionRequest) error
- func (b *BoltJobStore) UpdateJobState(ctx context.Context, request jobstore.UpdateJobStateRequest) error
- type BucketPath
- type Index
- type Option
Constants ¶
const ( DefaultBucketSearchSliceSize = 16 BucketPathDelimiter = "/" )
const ( BoltDBBucketNotFound bacerrors.ErrorCode = "BoltDBBucketNotFound" BoltDBBucketExists bacerrors.ErrorCode = "BoltDBBucketExists" BoltDBTxNotWritable bacerrors.ErrorCode = "BoltDBTxNotWritable" BoltDBIncompatibleValue bacerrors.ErrorCode = "BoltDBIncompatibleValue" BoltDBKeyRequired bacerrors.ErrorCode = "BoltDBKeyRequired" BoltDBKeyTooLarge bacerrors.ErrorCode = "BoltDBKeyTooLarge" BoltDBValueTooLarge bacerrors.ErrorCode = "BoltDBValueTooLarge" )
const ( BucketJobs = "jobs" BucketJobExecutions = "executions" BucketJobEvaluations = "evaluations" BucketJobHistory = "history" BucketTagsIndex = "idx_tags" // tag -> Job id BucketProgressIndex = "idx_inprogress" // job-id -> {} BucketNamespacesIndex = "idx_namespaces" // namespace -> Job id BucketExecutionsIndex = "idx_executions" // execution-id -> Job id BucketEvaluationsIndex = "idx_evaluations" // evaluation-id -> Job id )
const BoltDBComponent = "BoltDB"
Variables ¶
var SpecKey = []byte("spec")
Functions ¶
func BucketSequenceString ¶
BucketSequenceString returns the next sequence in the provided bucket, formatted as a 16 character padded string to ensure that bolt's lexicographic ordering will return them in the correct order
func GetBucketData ¶
func GetBucketData(tx *bolt.Tx, bucketPath *BucketPath, key []byte) []byte
GetBucketData is a helper that will use the provided details to find a key in a specific bucket and return its data.
func GetBucketsByPrefix ¶
GetBucketsByPrefix will search through the provided bucket to find other buckets with a name that starts with the partialname that is provided.
func NewBoltDBError ¶ added in v1.5.1
Types ¶
type BoltJobStore ¶
type BoltJobStore struct {
// contains filtered or unexported fields
}
func NewBoltJobStore ¶
func NewBoltJobStore(dbPath string, options ...Option) (*BoltJobStore, error)
NewBoltJobStore creates a new job store where data is held in buckets, and indexed by special Index instances, also backed by buckets. Data is currently structured as followed
bucket Jobs
bucket jobID key spec key state -> state bucket executions -> key executionID -> Execution bucket history -> key []sequence -> History bucket evaluations -> key executionID -> Execution
Indexes are structured as :
TagsIndex = tag -> Job id ProgressIndex = job-id -> {} NamespacesIndex = namespace -> Job id ExecutionsIndex = execution-id -> Job id EvaluationsIndex = evaluation-id -> Job id
func (*BoltJobStore) AddExecutionHistory ¶ added in v1.5.0
func (b *BoltJobStore) AddExecutionHistory(ctx context.Context, jobID, executionID string, events ...models.Event) error
AddExecutionHistory appends a new history entry to the execution history
func (*BoltJobStore) AddJobHistory ¶ added in v1.5.0
func (b *BoltJobStore) AddJobHistory(ctx context.Context, jobID string, events ...models.Event) error
AddJobHistory appends a new history entry to the job history
func (*BoltJobStore) BeginTx ¶ added in v1.3.1
BeginTx starts a new writable transaction for the store
func (*BoltJobStore) CreateEvaluation ¶
func (b *BoltJobStore) CreateEvaluation(ctx context.Context, eval models.Evaluation) error
CreateEvaluation creates a new evaluation
func (*BoltJobStore) CreateExecution ¶
CreateExecution creates a record of a new execution
func (*BoltJobStore) DeleteEvaluation ¶
func (b *BoltJobStore) DeleteEvaluation(ctx context.Context, id string) error
DeleteEvaluation deletes the specified evaluation
func (*BoltJobStore) DeleteJob ¶
func (b *BoltJobStore) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes the specified job from the system entirely
func (*BoltJobStore) GetEvaluation ¶
func (b *BoltJobStore) GetEvaluation(ctx context.Context, id string) (models.Evaluation, error)
GetEvaluation retrieves the specified evaluation
func (*BoltJobStore) GetEventStore ¶ added in v1.5.2
func (b *BoltJobStore) GetEventStore() watcher.EventStore
GetEventStore returns the event store
func (*BoltJobStore) GetExecutions ¶
func (b *BoltJobStore) GetExecutions(ctx context.Context, options jobstore.GetExecutionsOptions) ([]models.Execution, error)
GetExecutions returns the current job state for the provided job id
func (*BoltJobStore) GetInProgressJobs ¶
GetInProgressJobs gets a list of the currently in-progress jobs, if a job type is supplied then only jobs of that type will be retrieved
func (*BoltJobStore) GetJob ¶
GetJob retrieves the Job identified by the id string. If the job isn't found it will return an indicating the error.
func (*BoltJobStore) GetJobHistory ¶
func (b *BoltJobStore) GetJobHistory(ctx context.Context, jobID string, query jobstore.JobHistoryQuery, ) (*jobstore.JobHistoryQueryResponse, error)
GetJobHistory retrieves the paginated job history for a given job ID based on the specified query.
This method performs a read transaction on the Bolt DB and fetches the job history for the specified jobID. It supports pagination by processing an offset and limit defined either in the query or via a `NextToken`. Pagination tokens help in fetching the next set of results if the query returns a partial result due to the limit.
Pagination Behavior:
- The `NextToken` in the query allows the caller to continue fetching subsequent pages.
- If the result set reaches the limit specified in the query, a new `NextToken` is generated.
- If no records are found in the current query, but the job or execution is not in a terminal state, the same `NextToken` will be returned to indicate that more history might still be available in the future.
- Pagination only stops when there are no more records to fetch *and* the job/execution is in a terminal state.
func (*BoltJobStore) GetJobs ¶
func (b *BoltJobStore) GetJobs(ctx context.Context, query jobstore.JobQuery) (*jobstore.JobQueryResponse, error)
GetJobs returns all Jobs that match the provided query
func (*BoltJobStore) UpdateExecution ¶
func (b *BoltJobStore) UpdateExecution(ctx context.Context, request jobstore.UpdateExecutionRequest) error
UpdateExecution updates the state of a single execution by loading from storage, updating and then writing back in a single transaction
func (*BoltJobStore) UpdateJobState ¶
func (b *BoltJobStore) UpdateJobState(ctx context.Context, request jobstore.UpdateJobStateRequest) error
UpdateJobState updates the current state for a single Job, appending an entry to the history at the same time
type BucketPath ¶
type BucketPath struct {
// contains filtered or unexported fields
}
func NewBucketPath ¶
func NewBucketPath(sections ...string) *BucketPath
NewBucketPath creates a bucket path which can be used to describe the nested relationship between buckets, rather than calling b.Bucket() on each b found. BucketPaths are typically described using strings like "root.bucket.here".
func (*BucketPath) Sub ¶
func (bp *BucketPath) Sub(names ...[]byte) *BucketPath
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index is a bucket type that encodes both a label and an identifier, for use as a sentinel marker to show the presence of a thing. For example an index for job `94b136a3` having label `gpu`, we would create the `gpu` bucket if it didn't exist, and then a bucket with the job ID.
Most methods will take a label, and an identifier and these serve as the attenuating field and the item itself. So for a client id index, where we want to have a list of job ids for each client, the index will look like
jobs_clients |----- CLIENT ID 1 |---- JOBID 1 |---- JOBID 2 |----- CLIENT ID 2 .....
In this case, JOBID1 is the identifier, and CLIENT ID 1 is the subpath/label. In some cases, such as indices for jobs in a specific state, we may not have/need the label and so subpath can be excluded instead. The primary use of this is currently the list of InProgress jobs where there is no attenuator (e.g. clientid)