Documentation ¶
Index ¶
- Variables
- func NewInMemRunReaderWriter() *runReaderWriter
- type CreateTaskRequest
- type DesiredState
- type Executor
- type LogReader
- type LogWriter
- type NopLogReader
- func (NopLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error)
- func (NopLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error)
- func (NopLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error)
- type NopLogWriter
- type PointLogWriter
- type PointsWriter
- type QueryLogReader
- func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error)
- func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error)
- func (qlr *QueryLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error)
- type QueuedRun
- type RequestStillQueuedError
- type RunCreation
- type RunLogBase
- type RunNotYetDueError
- type RunPromise
- type RunResult
- type RunStatus
- type Scheduler
- type Store
- type StoreTask
- type StoreTaskMeta
- func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, error)) (RunCreation, error)
- func (*StoreTaskMeta) Descriptor() ([]byte, []int)
- func (stm StoreTaskMeta) Equal(other StoreTaskMeta) bool
- func (stm *StoreTaskMeta) FinishRun(runID platform.ID) bool
- func (m *StoreTaskMeta) GetCurrentlyRunning() []*StoreTaskMetaRun
- func (m *StoreTaskMeta) GetEffectiveCron() string
- func (m *StoreTaskMeta) GetLatestCompleted() int64
- func (m *StoreTaskMeta) GetManualRuns() []*StoreTaskMetaManualRun
- func (m *StoreTaskMeta) GetMaxConcurrency() int32
- func (m *StoreTaskMeta) GetOffset() int32
- func (m *StoreTaskMeta) GetStatus() string
- func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64, makeID func() (platform.ID, error)) error
- func (m *StoreTaskMeta) Marshal() (dAtA []byte, err error)
- func (m *StoreTaskMeta) MarshalTo(dAtA []byte) (int, error)
- func (stm *StoreTaskMeta) NextDueRun() (int64, error)
- func (*StoreTaskMeta) ProtoMessage()
- func (m *StoreTaskMeta) Reset()
- func (m *StoreTaskMeta) Size() (n int)
- func (m *StoreTaskMeta) String() string
- func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error
- func (m *StoreTaskMeta) XXX_DiscardUnknown()
- func (m *StoreTaskMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *StoreTaskMeta) XXX_Merge(src proto.Message)
- func (m *StoreTaskMeta) XXX_Size() int
- func (m *StoreTaskMeta) XXX_Unmarshal(b []byte) error
- type StoreTaskMetaManualRun
- func (*StoreTaskMetaManualRun) Descriptor() ([]byte, []int)
- func (m *StoreTaskMetaManualRun) GetEnd() int64
- func (m *StoreTaskMetaManualRun) GetLatestCompleted() int64
- func (m *StoreTaskMetaManualRun) GetRequestedAt() int64
- func (m *StoreTaskMetaManualRun) GetRunID() uint64
- func (m *StoreTaskMetaManualRun) GetStart() int64
- func (m *StoreTaskMetaManualRun) Marshal() (dAtA []byte, err error)
- func (m *StoreTaskMetaManualRun) MarshalTo(dAtA []byte) (int, error)
- func (*StoreTaskMetaManualRun) ProtoMessage()
- func (m *StoreTaskMetaManualRun) Reset()
- func (m *StoreTaskMetaManualRun) Size() (n int)
- func (m *StoreTaskMetaManualRun) String() string
- func (m *StoreTaskMetaManualRun) Unmarshal(dAtA []byte) error
- func (m *StoreTaskMetaManualRun) XXX_DiscardUnknown()
- func (m *StoreTaskMetaManualRun) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *StoreTaskMetaManualRun) XXX_Merge(src proto.Message)
- func (m *StoreTaskMetaManualRun) XXX_Size() int
- func (m *StoreTaskMetaManualRun) XXX_Unmarshal(b []byte) error
- type StoreTaskMetaRun
- func (*StoreTaskMetaRun) Descriptor() ([]byte, []int)
- func (m *StoreTaskMetaRun) GetNow() int64
- func (m *StoreTaskMetaRun) GetRangeEnd() int64
- func (m *StoreTaskMetaRun) GetRangeStart() int64
- func (m *StoreTaskMetaRun) GetRequestedAt() int64
- func (m *StoreTaskMetaRun) GetRunID() uint64
- func (m *StoreTaskMetaRun) GetTry() uint32
- func (m *StoreTaskMetaRun) Marshal() (dAtA []byte, err error)
- func (m *StoreTaskMetaRun) MarshalTo(dAtA []byte) (int, error)
- func (*StoreTaskMetaRun) ProtoMessage()
- func (m *StoreTaskMetaRun) Reset()
- func (m *StoreTaskMetaRun) Size() (n int)
- func (m *StoreTaskMetaRun) String() string
- func (m *StoreTaskMetaRun) Unmarshal(dAtA []byte) error
- func (m *StoreTaskMetaRun) XXX_DiscardUnknown()
- func (m *StoreTaskMetaRun) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *StoreTaskMetaRun) XXX_Merge(src proto.Message)
- func (m *StoreTaskMetaRun) XXX_Size() int
- func (m *StoreTaskMetaRun) XXX_Unmarshal(b []byte) error
- type StoreTaskWithMeta
- type StoreValidation
- type TaskSearchParams
- type TaskStatus
- type TickScheduler
- func (s *TickScheduler) CancelRun(_ context.Context, taskID, runID platform.ID) error
- func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err error)
- func (s *TickScheduler) PrometheusCollectors() []prometheus.Collector
- func (s *TickScheduler) ReleaseTask(taskID platform.ID) error
- func (s *TickScheduler) Start(ctx context.Context)
- func (s *TickScheduler) Stop()
- func (s *TickScheduler) Tick(now int64)
- func (s *TickScheduler) UpdateTask(task *StoreTask, meta *StoreTaskMeta) error
- type TickSchedulerOption
- type UpdateTaskRequest
- type UpdateTaskResult
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthMeta = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowMeta = fmt.Errorf("proto: integer overflow") )
var ( // ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally. ErrRunCanceled = errors.New("run canceled") // ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not. ErrTaskNotClaimed = errors.New("task not claimed") // ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is. ErrTaskAlreadyClaimed = errors.New("task already claimed") )
var ( // ErrTaskNotFound indicates no task could be found for given parameters. ErrTaskNotFound = errors.New("task not found") // ErrUserNotFound is an error for when we can't find a user ErrUserNotFound = errors.New("user not found") // ErrOrgNotFound is an error for when we can't find an org ErrOrgNotFound = errors.New("org not found") // ErrManualQueueFull is returned when a manual run request cannot be completed. ErrManualQueueFull = errors.New("manual queue at capacity") // ErrRunNotFound is returned when searching for a run that doesn't exist. ErrRunNotFound = errors.New("run not found") // ErrRunNotFinished is returned when a retry is invalid due to the run not being finished yet. ErrRunNotFinished = errors.New("run is still in progress") )
Functions ¶
func NewInMemRunReaderWriter ¶
func NewInMemRunReaderWriter() *runReaderWriter
Types ¶
type CreateTaskRequest ¶
type CreateTaskRequest struct {
// Owners.
Org, User platform.ID
// Script content of the task.
Script string
// Unix timestamp (seconds elapsed since January 1, 1970 UTC).
// The first run of the task will be run according to the earliest time after ScheduleAfter,
// matching the task's schedul via its cron or every option.
ScheduleAfter int64
// The initial task status.
// If empty, will be treated as DefaultTaskStatus.
Status TaskStatus
}
CreateTaskRequest encapsulates state of a new task to be created.
type DesiredState ¶
type DesiredState interface { // CreateNextRun requests the next run from the desired state, delegating to (*StoreTaskMeta).CreateNextRun. // This allows the scheduler to be "dumb" and just tell DesiredState what time the scheduler thinks it is, // and the DesiredState will create the appropriate run according to the task's cron schedule, // and according to what's in progress and what's been finished. // // If a Run is requested and the cron schedule says the schedule isn't ready, a RunNotYetDueError is returned. CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error) // FinishRun indicates that the given run is no longer intended to be executed. // This may be called after a successful or failed execution, or upon cancellation. FinishRun(ctx context.Context, taskID, runID platform.ID) error }
DesiredState persists the desired state of a run.
type Executor ¶
type Executor interface { // Execute attempts to begin execution of a run. // If there is an error invoking execution, that error is returned and RunPromise is nil. // TODO(mr): this assumes you can execute a run just from a taskID and a now time. // We may need to include the script content in this method signature. Execute(ctx context.Context, run QueuedRun) (RunPromise, error) // Wait blocks until all RunPromises created through Execute have finished. // Once Wait has been called, it is an error to call Execute before Wait has returned. // After Wait returns, it is safe to call Execute again. Wait() }
Executor handles execution of a run.
type LogReader ¶
type LogReader interface { // ListRuns returns a list of runs belonging to a task. ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) // FindRunByID finds a run given a orgID and runID. // orgID is necessary to look in the correct system bucket. FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) // ListLogs lists logs for a task or a specified run of a task. ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) }
LogReader reads log information and log data from a store.
type LogWriter ¶
type LogWriter interface { // UpdateRunState sets the run state and the respective time. UpdateRunState(ctx context.Context, base RunLogBase, when time.Time, state RunStatus) error // AddRunLog adds a log line to the run. AddRunLog(ctx context.Context, base RunLogBase, when time.Time, log string) error }
LogWriter writes task logs and task state changes to a store.
type NopLogReader ¶
type NopLogReader struct{}
NopLogWriter is a LogWriter that doesn't do anything when its methods are called. This is useful for test, but not much else.
func (NopLogReader) FindRunByID ¶
type NopLogWriter ¶
type NopLogWriter struct{}
NopLogWriter is a LogWriter that doesn't do anything when its methods are called. This is useful for test, but not much else.
func (NopLogWriter) AddRunLog ¶
func (NopLogWriter) AddRunLog(context.Context, RunLogBase, time.Time, string) error
func (NopLogWriter) UpdateRunState ¶
func (NopLogWriter) UpdateRunState(context.Context, RunLogBase, time.Time, RunStatus) error
type PointLogWriter ¶
type PointLogWriter struct {
// contains filtered or unexported fields
}
PointLogWriter writes task and run logs as time-series points.
func NewPointLogWriter ¶
func NewPointLogWriter(pw PointsWriter) *PointLogWriter
NewPointLogWriter returns a PointLogWriter.
func (*PointLogWriter) AddRunLog ¶
func (p *PointLogWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when time.Time, log string) error
func (*PointLogWriter) UpdateRunState ¶
func (p *PointLogWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error
type PointsWriter ¶
Copy of storage.PointsWriter interface. Duplicating it here to avoid having tasks/backend depend directly on storage.
type QueryLogReader ¶
type QueryLogReader struct {
// contains filtered or unexported fields
}
func NewQueryLogReader ¶
func NewQueryLogReader(qs query.QueryService) *QueryLogReader
func (*QueryLogReader) FindRunByID ¶
type QueuedRun ¶
type QueuedRun struct {
TaskID, RunID platform.ID
// The Unix timestamp (seconds since January 1, 1970 UTC) that will be set when a run a manually requested
RequestedAt int64
// The Unix timestamp (seconds since January 1, 1970 UTC) that will be set
// as the "now" option when executing the task.
Now int64
}
QueuedRun is a task run that has been assigned an ID, but whose execution has not necessarily started.
type RequestStillQueuedError ¶
type RequestStillQueuedError struct {
// Unix timestamps matching existing request's start and end.
Start, End int64
}
RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
func ParseRequestStillQueuedError ¶
func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError
ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg. If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.
func (RequestStillQueuedError) Error ¶
func (e RequestStillQueuedError) Error() string
type RunCreation ¶
type RunCreation struct { Created QueuedRun // Unix timestamp for when the next run is due. NextDue int64 // Whether there are any manual runs queued for this task. // If so, the scheduler should begin executing them after handling real-time tasks. HasQueue bool }
RunCreation is returned by CreateNextRun.
type RunLogBase ¶
type RunLogBase struct { // The parent task that owns the run. Task *StoreTask // The ID of the run. RunID platform.ID // The Unix timestamp indicating the run's scheduled time. RunScheduledFor int64 // When the log is requested, should be ignored when it is zero. RequestedAt int64 }
RunLogBase is the base information for a logs about an individual run.
type RunNotYetDueError ¶
type RunNotYetDueError struct { // DueAt is the unix timestamp of when the next run is due. DueAt int64 }
RunNotYetDueError is returned from CreateNextRun if a run is not yet due.
func (RunNotYetDueError) Error ¶
func (e RunNotYetDueError) Error() string
type RunPromise ¶
type RunPromise interface { // Run returns the details about the queued run. Run() QueuedRun // Wait blocks until the run completes. // Wait may be called concurrently. // Subsequent calls to Wait will return identical values. Wait() (RunResult, error) // Cancel interrupts the RunFuture. // Calls to Wait() will immediately unblock and return nil, ErrRunCanceled. // Cancel is safe to call concurrently. // If Wait() has already returned, Cancel is a no-op. Cancel() }
RunPromise represents an in-progress run whose result is not yet known.
type Scheduler ¶
type Scheduler interface { // Start allows the scheduler to Tick. A scheduler without start will do nothing Start(ctx context.Context) // Stop a scheduler from ticking. Stop() // ClaimTask begins control of task execution in this scheduler. ClaimTask(task *StoreTask, meta *StoreTaskMeta) error // UpdateTask will update the concurrency and the runners for a task UpdateTask(task *StoreTask, meta *StoreTaskMeta) error // ReleaseTask immediately cancels any in-progress runs for the given task ID, // and releases any resources related to management of that task. ReleaseTask(taskID platform.ID) error // Cancel stops an executing run. CancelRun(ctx context.Context, taskID, runID platform.ID) error }
Scheduler accepts tasks and handles their scheduling.
TODO(mr): right now the methods on Scheduler are synchronous. We'll probably want to make them asynchronous in the near future, which likely means we will change the method signatures to something where we can wait for the result to complete and possibly inspect any relevant output.
type Store ¶
type Store interface { // CreateTask creates a task with from the given CreateTaskRequest. // If the task is created successfully, the ID of the new task is returned. CreateTask(ctx context.Context, req CreateTaskRequest) (platform.ID, error) // UpdateTask updates an existing task. // It returns an error if there was no task matching the given ID. // If the returned error is not nil, the returned result should not be inspected. UpdateTask(ctx context.Context, req UpdateTaskRequest) (UpdateTaskResult, error) // ListTasks lists the tasks in the store that match the search params. ListTasks(ctx context.Context, params TaskSearchParams) ([]StoreTaskWithMeta, error) // FindTaskByID returns the task with the given ID. // If no task matches the ID, the returned task is nil and ErrTaskNotFound is returned. FindTaskByID(ctx context.Context, id platform.ID) (*StoreTask, error) // FindTaskMetaByID returns the metadata about a task. // If no task meta matches the ID, the returned meta is nil and ErrTaskNotFound is returned. FindTaskMetaByID(ctx context.Context, id platform.ID) (*StoreTaskMeta, error) // FindTaskByIDWithMeta combines finding the task and the meta into a single call. FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*StoreTask, *StoreTaskMeta, error) // DeleteTask returns whether an entry matching the given ID was deleted. // If err is non-nil, deleted is false. // If err is nil, deleted is false if no entry matched the ID, // or deleted is true if there was a matching entry and it was deleted. DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) // CreateNextRun creates the earliest needed run scheduled no later than the given Unix timestamp now. // Internally, the Store should rely on the underlying task's StoreTaskMeta to create the next run. CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error) // FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it. FinishRun(ctx context.Context, taskID, runID platform.ID) error // ManuallyRunTimeRange enqueues a request to run the task with the given ID for all schedules no earlier than start and no later than end (Unix timestamps). // requestedAt is the Unix timestamp when the request was initiated. // ManuallyRunTimeRange must delegate to an underlying StoreTaskMeta's ManuallyRunTimeRange method. ManuallyRunTimeRange(ctx context.Context, taskID platform.ID, start, end, requestedAt int64) (*StoreTaskMetaManualRun, error) // DeleteOrg deletes the org. DeleteOrg(ctx context.Context, orgID platform.ID) error // DeleteUser deletes a user with userID. DeleteUser(ctx context.Context, userID platform.ID) error // Close closes the store for usage and cleans up running processes. Close() error }
Store is the interface around persisted tasks.
func NewInMemStore ¶
func NewInMemStore() Store
NewInMemStore returns a new in-memory store. This store is not designed to be efficient, it is here for testing purposes.
type StoreTask ¶
type StoreTask struct { ID platform.ID // IDs for the owning organization and user. Org, User platform.ID // The user-supplied name of the Task. Name string // The script content of the task. Script string }
StoreTask is a stored representation of a Task.
type StoreTaskMeta ¶
type StoreTaskMeta struct { MaxConcurrency int32 `protobuf:"varint,1,opt,name=max_concurrency,json=maxConcurrency,proto3" json:"max_concurrency,omitempty"` // latest_completed is the unix timestamp of the latest "naturally" completed run. // If a run for time t finishes before a run for time t - u, latest_completed will reflect time t. LatestCompleted int64 `protobuf:"varint,2,opt,name=latest_completed,json=latestCompleted,proto3" json:"latest_completed,omitempty"` // status indicates if the task is enabled or disabled. Status string `protobuf:"bytes,3,opt,name=status,proto3" json:"status,omitempty"` // currently_running is the collection of runs in-progress. // If a runner crashes or otherwise disappears, this indicates to the new runner what needs to be picked up. CurrentlyRunning []*StoreTaskMetaRun `protobuf:"bytes,4,rep,name=currently_running,json=currentlyRunning" json:"currently_running,omitempty"` // effective_cron is the effective cron string as reported by the task's options. EffectiveCron string `protobuf:"bytes,5,opt,name=effective_cron,json=effectiveCron,proto3" json:"effective_cron,omitempty"` // Task's configured delay, in seconds. Offset int32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset,omitempty"` ManualRuns []*StoreTaskMetaManualRun `protobuf:"bytes,16,rep,name=manual_runs,json=manualRuns" json:"manual_runs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
StoreTaskMeta is the internal state of a task.
func NewStoreTaskMeta ¶
func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta
NewStoreTaskMeta returns a new StoreTaskMeta based on the given request and parsed options.
func (*StoreTaskMeta) CreateNextRun ¶
func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, error)) (RunCreation, error)
CreateNextRun attempts to update stm's CurrentlyRunning slice with a new run. The new run's now is assigned the earliest possible time according to stm.EffectiveCron, that is later than any in-progress run and stm's LatestCompleted timestamp. If the run's now would be later than the passed-in now, CreateNextRun returns a RunNotYetDueError.
makeID is a function provided by the caller to create an ID, in case we can create a run. Because a StoreTaskMeta doesn't know the ID of the task it belongs to, it never sets RunCreation.Created.TaskID.
func (*StoreTaskMeta) Descriptor ¶
func (*StoreTaskMeta) Descriptor() ([]byte, []int)
func (StoreTaskMeta) Equal ¶
func (stm StoreTaskMeta) Equal(other StoreTaskMeta) bool
Equal returns true if all of stm's fields compare equal to other. Note that this method operates on values, unlike the other methods which operate on pointers.
Equal is probably not very useful outside of test.
func (*StoreTaskMeta) FinishRun ¶
func (stm *StoreTaskMeta) FinishRun(runID platform.ID) bool
FinishRun removes the run matching runID from m's CurrentlyRunning slice, and if that run's Now value is greater than m's LatestCompleted value, updates the value of LatestCompleted to the run's Now value.
If runID matched a run, FinishRun returns true. Otherwise it returns false.
func (*StoreTaskMeta) GetCurrentlyRunning ¶
func (m *StoreTaskMeta) GetCurrentlyRunning() []*StoreTaskMetaRun
func (*StoreTaskMeta) GetEffectiveCron ¶
func (m *StoreTaskMeta) GetEffectiveCron() string
func (*StoreTaskMeta) GetLatestCompleted ¶
func (m *StoreTaskMeta) GetLatestCompleted() int64
func (*StoreTaskMeta) GetManualRuns ¶
func (m *StoreTaskMeta) GetManualRuns() []*StoreTaskMetaManualRun
func (*StoreTaskMeta) GetMaxConcurrency ¶
func (m *StoreTaskMeta) GetMaxConcurrency() int32
func (*StoreTaskMeta) GetOffset ¶
func (m *StoreTaskMeta) GetOffset() int32
func (*StoreTaskMeta) GetStatus ¶
func (m *StoreTaskMeta) GetStatus() string
func (*StoreTaskMeta) ManuallyRunTimeRange ¶
func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64, makeID func() (platform.ID, error)) error
ManuallyRunTimeRange requests a manual run covering the approximate range specified by the Unix timestamps start and end. More specifically, it requests runs scheduled no earlier than start, but possibly later than start, if start does not land on the task's schedule; and as late as, but not necessarily equal to, end. requestedAt is the Unix timestamp indicating when this run range was requested.
There is no schedule validation in this method, so ManuallyRunTimeRange can be used to create a run at a specific time that isn't aligned with the task's schedule.
If adding the range would exceed the queue size, ManuallyRunTimeRange returns ErrManualQueueFull.
func (*StoreTaskMeta) Marshal ¶
func (m *StoreTaskMeta) Marshal() (dAtA []byte, err error)
func (*StoreTaskMeta) NextDueRun ¶
func (stm *StoreTaskMeta) NextDueRun() (int64, error)
NextDueRun returns the Unix timestamp of when the next call to CreateNextRun will be ready. The returned timestamp reflects the task's delay, so it does not necessarily exactly match the schedule time.
func (*StoreTaskMeta) ProtoMessage ¶
func (*StoreTaskMeta) ProtoMessage()
func (*StoreTaskMeta) Reset ¶
func (m *StoreTaskMeta) Reset()
func (*StoreTaskMeta) Size ¶
func (m *StoreTaskMeta) Size() (n int)
func (*StoreTaskMeta) String ¶
func (m *StoreTaskMeta) String() string
func (*StoreTaskMeta) Unmarshal ¶
func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error
func (*StoreTaskMeta) XXX_DiscardUnknown ¶
func (m *StoreTaskMeta) XXX_DiscardUnknown()
func (*StoreTaskMeta) XXX_Marshal ¶
func (m *StoreTaskMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*StoreTaskMeta) XXX_Merge ¶
func (dst *StoreTaskMeta) XXX_Merge(src proto.Message)
func (*StoreTaskMeta) XXX_Size ¶
func (m *StoreTaskMeta) XXX_Size() int
func (*StoreTaskMeta) XXX_Unmarshal ¶
func (m *StoreTaskMeta) XXX_Unmarshal(b []byte) error
type StoreTaskMetaManualRun ¶
type StoreTaskMetaManualRun struct { // start is the earliest allowable unix time stamp for this queue of runs. Start int64 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"` // end is the latest allowable unix time stamp for this queue of runs. End int64 `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"` // latest_completed is the timestamp of the latest completed run from this queue. LatestCompleted int64 `protobuf:"varint,3,opt,name=latest_completed,json=latestCompleted,proto3" json:"latest_completed,omitempty"` // requested_at is the unix timestamp indicating when this run was requested. RequestedAt int64 `protobuf:"varint,4,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"` // run_id is set ahead of time for retries of individual runs. Manually run time ranges do not receive an ID. RunID uint64 `protobuf:"varint,5,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
StoreTaskMetaManualRun indicates a manually requested run for a time range. It has a start and end pair of unix timestamps indicating the time range covered by the request.
func (*StoreTaskMetaManualRun) Descriptor ¶
func (*StoreTaskMetaManualRun) Descriptor() ([]byte, []int)
func (*StoreTaskMetaManualRun) GetEnd ¶
func (m *StoreTaskMetaManualRun) GetEnd() int64
func (*StoreTaskMetaManualRun) GetLatestCompleted ¶
func (m *StoreTaskMetaManualRun) GetLatestCompleted() int64
func (*StoreTaskMetaManualRun) GetRequestedAt ¶
func (m *StoreTaskMetaManualRun) GetRequestedAt() int64
func (*StoreTaskMetaManualRun) GetRunID ¶
func (m *StoreTaskMetaManualRun) GetRunID() uint64
func (*StoreTaskMetaManualRun) GetStart ¶
func (m *StoreTaskMetaManualRun) GetStart() int64
func (*StoreTaskMetaManualRun) Marshal ¶
func (m *StoreTaskMetaManualRun) Marshal() (dAtA []byte, err error)
func (*StoreTaskMetaManualRun) MarshalTo ¶
func (m *StoreTaskMetaManualRun) MarshalTo(dAtA []byte) (int, error)
func (*StoreTaskMetaManualRun) ProtoMessage ¶
func (*StoreTaskMetaManualRun) ProtoMessage()
func (*StoreTaskMetaManualRun) Reset ¶
func (m *StoreTaskMetaManualRun) Reset()
func (*StoreTaskMetaManualRun) Size ¶
func (m *StoreTaskMetaManualRun) Size() (n int)
func (*StoreTaskMetaManualRun) String ¶
func (m *StoreTaskMetaManualRun) String() string
func (*StoreTaskMetaManualRun) Unmarshal ¶
func (m *StoreTaskMetaManualRun) Unmarshal(dAtA []byte) error
func (*StoreTaskMetaManualRun) XXX_DiscardUnknown ¶
func (m *StoreTaskMetaManualRun) XXX_DiscardUnknown()
func (*StoreTaskMetaManualRun) XXX_Marshal ¶
func (m *StoreTaskMetaManualRun) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*StoreTaskMetaManualRun) XXX_Merge ¶
func (dst *StoreTaskMetaManualRun) XXX_Merge(src proto.Message)
func (*StoreTaskMetaManualRun) XXX_Size ¶
func (m *StoreTaskMetaManualRun) XXX_Size() int
func (*StoreTaskMetaManualRun) XXX_Unmarshal ¶
func (m *StoreTaskMetaManualRun) XXX_Unmarshal(b []byte) error
type StoreTaskMetaRun ¶
type StoreTaskMetaRun struct { // now is the unix timestamp of the "now" value for the run. Now int64 `protobuf:"varint,1,opt,name=now,proto3" json:"now,omitempty"` Try uint32 `protobuf:"varint,2,opt,name=try,proto3" json:"try,omitempty"` RunID uint64 `protobuf:"varint,3,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"` // range_start is the start of the manual run's time range. RangeStart int64 `protobuf:"varint,4,opt,name=range_start,json=rangeStart,proto3" json:"range_start,omitempty"` // range_end is the end of the manual run's time range. RangeEnd int64 `protobuf:"varint,5,opt,name=range_end,json=rangeEnd,proto3" json:"range_end,omitempty"` // requested_at is the unix timestamp indicating when this run was requested. // It is the same value as the "parent" StoreTaskMetaManualRun, if this run was the result of a manual request. RequestedAt int64 `protobuf:"varint,6,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*StoreTaskMetaRun) Descriptor ¶
func (*StoreTaskMetaRun) Descriptor() ([]byte, []int)
func (*StoreTaskMetaRun) GetNow ¶
func (m *StoreTaskMetaRun) GetNow() int64
func (*StoreTaskMetaRun) GetRangeEnd ¶
func (m *StoreTaskMetaRun) GetRangeEnd() int64
func (*StoreTaskMetaRun) GetRangeStart ¶
func (m *StoreTaskMetaRun) GetRangeStart() int64
func (*StoreTaskMetaRun) GetRequestedAt ¶
func (m *StoreTaskMetaRun) GetRequestedAt() int64
func (*StoreTaskMetaRun) GetRunID ¶
func (m *StoreTaskMetaRun) GetRunID() uint64
func (*StoreTaskMetaRun) GetTry ¶
func (m *StoreTaskMetaRun) GetTry() uint32
func (*StoreTaskMetaRun) Marshal ¶
func (m *StoreTaskMetaRun) Marshal() (dAtA []byte, err error)
func (*StoreTaskMetaRun) ProtoMessage ¶
func (*StoreTaskMetaRun) ProtoMessage()
func (*StoreTaskMetaRun) Reset ¶
func (m *StoreTaskMetaRun) Reset()
func (*StoreTaskMetaRun) Size ¶
func (m *StoreTaskMetaRun) Size() (n int)
func (*StoreTaskMetaRun) String ¶
func (m *StoreTaskMetaRun) String() string
func (*StoreTaskMetaRun) Unmarshal ¶
func (m *StoreTaskMetaRun) Unmarshal(dAtA []byte) error
func (*StoreTaskMetaRun) XXX_DiscardUnknown ¶
func (m *StoreTaskMetaRun) XXX_DiscardUnknown()
func (*StoreTaskMetaRun) XXX_Marshal ¶
func (m *StoreTaskMetaRun) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*StoreTaskMetaRun) XXX_Merge ¶
func (dst *StoreTaskMetaRun) XXX_Merge(src proto.Message)
func (*StoreTaskMetaRun) XXX_Size ¶
func (m *StoreTaskMetaRun) XXX_Size() int
func (*StoreTaskMetaRun) XXX_Unmarshal ¶
func (m *StoreTaskMetaRun) XXX_Unmarshal(b []byte) error
type StoreTaskWithMeta ¶
type StoreTaskWithMeta struct { Task StoreTask Meta StoreTaskMeta }
StoreTaskWithMeta is a single struct with a StoreTask and a StoreTaskMeta.
type StoreValidation ¶
type StoreValidation struct{}
StoreValidation is used for namespacing the store validation methods.
var StoreValidator StoreValidation
StoreValidator is a package-level StoreValidation, so that you can write
backend.StoreValidator.CreateArgs(...)
func (StoreValidation) CreateArgs ¶
func (StoreValidation) CreateArgs(req CreateTaskRequest) (options.Options, error)
CreateArgs returns the script's parsed options, and an error if any of the provided fields are invalid for creating a task.
func (StoreValidation) UpdateArgs ¶
func (StoreValidation) UpdateArgs(req UpdateTaskRequest) (options.Options, error)
UpdateArgs validates the UpdateTaskRequest. If the update only includes a new status (i.e. req.Script is empty), the returned options are zero. If the update contains neither a new script nor a new status, or if the script is invalid, an error is returned.
type TaskSearchParams ¶
type TaskSearchParams struct { // Return tasks belonging to this exact organization ID. May be nil. Org platform.ID // Return tasks belonging to this exact user ID. May be nil. User platform.ID // Return tasks starting after this ID. After platform.ID // Size of each page. Must be non-negative. // If zero, the implementation picks an appropriate default page size. // Valid page sizes are implementation-dependent. PageSize int }
TaskSearchParams is used when searching or listing tasks.
type TaskStatus ¶
type TaskStatus string
const ( TaskActive TaskStatus = "active" TaskInactive TaskStatus = "inactive" DefaultTaskStatus TaskStatus = TaskActive )
type TickScheduler ¶
type TickScheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, now int64, opts ...TickSchedulerOption) *TickScheduler
NewScheduler returns a new scheduler with the given desired state and the given now UTC timestamp.
func (*TickScheduler) CancelRun ¶
CancelRun cancels a run, it has the unused Context argument so that it can implement a task.RunController
func (*TickScheduler) ClaimTask ¶
func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err error)
func (*TickScheduler) PrometheusCollectors ¶
func (s *TickScheduler) PrometheusCollectors() []prometheus.Collector
func (*TickScheduler) ReleaseTask ¶
func (s *TickScheduler) ReleaseTask(taskID platform.ID) error
func (*TickScheduler) Start ¶
func (s *TickScheduler) Start(ctx context.Context)
func (*TickScheduler) Stop ¶
func (s *TickScheduler) Stop()
func (*TickScheduler) Tick ¶
func (s *TickScheduler) Tick(now int64)
Tick updates the time of the scheduler. Any owned tasks who are due to execute and who have a free concurrency slot, will begin a new execution.
func (*TickScheduler) UpdateTask ¶
func (s *TickScheduler) UpdateTask(task *StoreTask, meta *StoreTaskMeta) error
type TickSchedulerOption ¶
type TickSchedulerOption func(*TickScheduler)
TickSchedulerOption is a option you can use to modify the schedulers behavior.
func WithLogger ¶
func WithLogger(logger *zap.Logger) TickSchedulerOption
WithLogger sets the logger for the scheduler. If not set, the scheduler will use a no-op logger.
func WithTicker ¶
func WithTicker(ctx context.Context, d time.Duration) TickSchedulerOption
WithTicker sets a time.Ticker with period d, and calls TickScheduler.Tick when the ticker rolls over to a new second. With a sub-second d, TickScheduler.Tick should be called roughly no later than d after a second: this can help ensure tasks happen early with a second window.
type UpdateTaskRequest ¶
type UpdateTaskRequest struct { // ID of the task. ID platform.ID // New script content of the task. // If empty, do not modify the existing script. Script string // The new desired task status. // If empty, do not modify the existing status. Status TaskStatus }
UpdateTaskRequest encapsulates requested changes to a task.
type UpdateTaskResult ¶
type UpdateTaskResult struct { OldScript string OldStatus TaskStatus NewTask StoreTask NewMeta StoreTaskMeta }
UpdateTaskResult describes the result of modifying a single task. Having the content returned from ModifyTask makes it much simpler for callers to decide how to notify on status changes, etc.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package bolt provides an bolt-backed store implementation.
|
Package bolt provides an bolt-backed store implementation. |
Package executor contains implementations of backend.Executor that depend on the query service.
|
Package executor contains implementations of backend.Executor that depend on the query service. |