Documentation ¶
Overview ¶
Package dynamodbtask implements the taskdb.TaskDB interface for AWS dynamodb backend. Every run or task is stored in a row with their attributes which includes labels, user, keepalive and start times. Tasks have a runid column to identify which run it belongs to. Tasks also store the flowId of the reflow flow, resultId, exec uri and stdout, stderr and inspect log ids. To make common queries like recently run/tasks, runs/tasks have day time buckets stored. "Date-Keepalive-index" index allows querying runs/tasks based on time buckets. Dynamodbtask also uses a bunch of secondary indices to help with run/task querying. Schema: run: {ID, ID4, Type="run", Labels, Bundle, Args, Date, Keepalive, StartTime, EndTime, User} task: {ID, ID4, Type="task", Labels, Date, Attempt, Keepalive, StartTime, EndTime, FlowID, Inspect, Error, ResultID, RunID, RunID4, AllocID, ImgCmdID, Ident, Stderr, Stdout, URI} alloc: {ID, ID4, Type="alloc", PoolID, AllocID, Resources, URI, Keepalive, StartTime, EndTime} pool: {ID, ID4, Type="pool", PoolID, PoolType, ClusterID.*, Resources, URI, Keepalive, StartTime, EndTime} Note: PoolID: While rows of type "pool" are expected to store the implementation-specific identifier of a pool, rows of type "alloc" will contain the digest of PoolID in this field (of the pool they belong to). AllocID: Similarly, While rows of type "alloc" are expected to store the value Alloc.ID(), rows of type "task" will contain the digest of Alloc.ID() (of the alloc where they are attempted). Indexes: 1. Date-Keepalive-index - for time-based queries. 2. RunID-index - for finding all tasks that belong to a run. 3. ID-index and ID4-ID-index - for queries looking for specific runs or tasks. 4. ImgCmdID-index and Ident-index - for queries looking for specific execs.
Index ¶
- Constants
- type Items
- type ItemsHandler
- type ItemsHandlerFunc
- type TaskDB
- func (t *TaskDB) Allocs(ctx context.Context, q taskdb.AllocQuery) ([]taskdb.Alloc, error)
- func (t *TaskDB) CreateRun(ctx context.Context, id taskdb.RunID, user string) error
- func (t *TaskDB) CreateTask(ctx context.Context, task taskdb.Task) error
- func (t *TaskDB) Flags(flags *flag.FlagSet)
- func (TaskDB) Help() string
- func (t *TaskDB) Init(sess *session.Session, user *infra2.User, labels pool.Labels) (err error)
- func (t *TaskDB) KeepIDAlive(ctx context.Context, id digest.Digest, keepalive time.Time) error
- func (t *TaskDB) KeepRunAlive(ctx context.Context, id taskdb.RunID, keepalive time.Time) error
- func (t *TaskDB) KeepTaskAlive(ctx context.Context, id taskdb.TaskID, keepalive time.Time) error
- func (t *TaskDB) Pools(ctx context.Context, q taskdb.PoolQuery) ([]taskdb.PoolRow, error)
- func (t *TaskDB) Repository() reflow.Repository
- func (t *TaskDB) Runs(ctx context.Context, runQuery taskdb.RunQuery) ([]taskdb.Run, error)
- func (t *TaskDB) Scan(ctx context.Context, kind taskdb.Kind, mappingHandler taskdb.MappingHandler) error
- func (t *TaskDB) SetEndTime(ctx context.Context, id digest.Digest, end time.Time) error
- func (t *TaskDB) SetResources(ctx context.Context, id digest.Digest, resources reflow.Resources) error
- func (t *TaskDB) SetRunAttrs(ctx context.Context, id taskdb.RunID, bundle digest.Digest, args []string) error
- func (t *TaskDB) SetRunComplete(ctx context.Context, id taskdb.RunID, runlog, evalGraph, trace digest.Digest, ...) error
- func (t *TaskDB) SetTaskAttrs(ctx context.Context, id taskdb.TaskID, stdout, stderr, inspect digest.Digest) error
- func (t *TaskDB) SetTaskComplete(ctx context.Context, id taskdb.TaskID, err error, end time.Time) error
- func (t *TaskDB) SetTaskResult(ctx context.Context, id taskdb.TaskID, result digest.Digest) error
- func (t *TaskDB) SetTaskUri(ctx context.Context, id taskdb.TaskID, uri string) error
- func (t *TaskDB) Setup(sess *session.Session, log *log.Logger) error
- func (t *TaskDB) StartAlloc(ctx context.Context, allocID reflow.StringDigest, poolID digest.Digest, ...) error
- func (t *TaskDB) StartPool(ctx context.Context, pool taskdb.Pool) error
- func (t *TaskDB) String() string
- func (t *TaskDB) Tasks(ctx context.Context, q taskdb.TaskQuery) ([]taskdb.Task, error)
- func (t *TaskDB) Version() int
Constants ¶
const ( ID taskdb.Kind = iota ID4 RunID RunID4 FlowID AllocID PoolID ResultID ImgCmdID Ident Attempt KeepAlive StartTime Stdout Stderr ExecInspect Error URI Labels User Type Date Bundle Args EndTime RunLog ExecLog SysLog EvalGraph Trace Resources PoolType ClusterName ReflowVersion )
const (
// ProviderName is the name of this TaskDB's infra config provider.
ProviderName = "dynamodbtask"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Items ¶
type Items []map[string]*dynamodb.AttributeValue
Items is the response from a dynamoDb scan.
type ItemsHandler ¶
type ItemsHandler interface { // HandleItems handles a set of scanned items. HandleItems(items Items) error }
ItemsHandler is an interface for handling Items from a segment scan.
type ItemsHandlerFunc ¶
ItemsHandlerFunc is a convenience type to avoid having to declare a struct to implement the ItemsHandler interface.
func (ItemsHandlerFunc) HandleItems ¶
func (h ItemsHandlerFunc) HandleItems(items Items) error
HandleItems implements the ItemsHandler interface.
type TaskDB ¶
type TaskDB struct { infra2.TableNameFlagsTrait infra2.BucketNameFlagsTrait // DB is the dynamodb. DB dynamodbiface.DynamoDBAPI // Labels on the run. Labels []string // User who initiated this run. User string // Repo is the repository to store large objects referenced from this TaskDB. Repo *blobrepo.Repository // contains filtered or unexported fields }
TaskDB implements the dynamodb backed taskdb.TaskDB interface to store run/task state and metadata. Each association is either: a) RunID and its associated metadata (run labels, user info, and leases) b) TaskID and its associated metadata (RunID that spawned this task, FlowID of the node, and leases)
func (*TaskDB) CreateRun ¶
CreateRun sets a new run in the taskdb with the given id, labels and user.
func (*TaskDB) CreateTask ¶
CreateTask creates a new task in the taskdb with the provided task.
func (*TaskDB) KeepIDAlive ¶
keepalive sets the keepalive for the specified id to keepalive.
func (*TaskDB) KeepRunAlive ¶
KeepRunAlive sets the keepalive for run id to keepalive.
func (*TaskDB) KeepTaskAlive ¶
KeepTaskAlive sets the keepalive for task id to keepalive.
func (*TaskDB) Repository ¶
func (t *TaskDB) Repository() reflow.Repository
Repository implements taskdb.TaskDB.
func (*TaskDB) Scan ¶
func (t *TaskDB) Scan(ctx context.Context, kind taskdb.Kind, mappingHandler taskdb.MappingHandler) error
Scan calls the handler function for every association in the mapping. Note that the handler function may be called asynchronously from multiple threads.
func (*TaskDB) SetEndTime ¶
SetEndTime sets the end time for the given id.
func (*TaskDB) SetResources ¶
func (t *TaskDB) SetResources(ctx context.Context, id digest.Digest, resources reflow.Resources) error
SetResources sets the resources field in the taskdb for the row with the given id.
func (*TaskDB) SetRunAttrs ¶
func (t *TaskDB) SetRunAttrs(ctx context.Context, id taskdb.RunID, bundle digest.Digest, args []string) error
SetRunAttrs sets the reflow bundle and corresponding args for this run.
func (*TaskDB) SetRunComplete ¶
func (t *TaskDB) SetRunComplete(ctx context.Context, id taskdb.RunID, runlog, evalGraph, trace digest.Digest, end time.Time) error
SetRunComplete sets the result of the run post completion.
func (*TaskDB) SetTaskAttrs ¶
func (t *TaskDB) SetTaskAttrs(ctx context.Context, id taskdb.TaskID, stdout, stderr, inspect digest.Digest) error
SetTaskAttrs sets the stdout, stderr and inspect ids for the task.
func (*TaskDB) SetTaskComplete ¶
func (t *TaskDB) SetTaskComplete(ctx context.Context, id taskdb.TaskID, err error, end time.Time) error
SetTaskComplete mark the task as completed as of the given end time.
func (*TaskDB) SetTaskResult ¶
SetTaskResult sets the task result id.
func (*TaskDB) SetTaskUri ¶
SetTaskUri updates the task URI.
func (*TaskDB) StartAlloc ¶
func (t *TaskDB) StartAlloc(ctx context.Context, allocID reflow.StringDigest, poolID digest.Digest, resources reflow.Resources, start time.Time) error
StartAlloc creates a new alloc in the taskdb with the provided parameters.