Documentation
¶
Index ¶
- Constants
- type AssignResults
- type AssignedItem
- type AssignmentRepository
- type CandidateEventMatch
- type ChildWorkflowSignalCreatedData
- type CompleteTaskOpts
- type ConcurrencyRepository
- type ConcurrencyRepositoryImpl
- func (s ConcurrencyRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (res *RunConcurrencyResult, err error)
- func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
- type CreateLogLineOpts
- type CreateMatchOpts
- type CreateTaskOpts
- type DAGWithData
- type ErrNamesNotFound
- type EventTriggerOpts
- type EventType
- type FailTaskOpts
- type FailTasksResponse
- type FinalizedTaskResponse
- type GroupMatchCondition
- type InternalEventMatchResults
- type InternalTaskEvent
- type LeaseRepository
- type ListActiveWorkersResult
- type ListFinalizedWorkflowRunsResponse
- type ListLogsOpts
- type ListTaskRunOpts
- type ListWorkflowRunOpts
- type LogLineRepository
- type MatchData
- type MatchRepository
- type MatchRepositoryImpl
- func (s MatchRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*InternalEventMatchResults, error)
- func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- type OLAPRepository
- type OLAPRepositoryImpl
- func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
- func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, ...) error
- func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) error
- func (s OLAPRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, ...) ([]*sqlcv1.GetTaskPointMetricsRow, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, ...) ([]*sqlcv1.ListTaskEventsRow, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
- func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
- func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
- func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
- func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
- func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, ...) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
- func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
- func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
- func (o *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
- type QueueFactoryRepository
- type QueueRepository
- type RateLimitRepository
- type RateLimitResult
- type ReadTaskRunMetricsOpts
- type ReadableTaskStatus
- type RefreshTimeoutBy
- type ReplayTaskOpts
- type ReplayTasksResult
- type Repository
- type RetriedTask
- type RunConcurrencyResult
- type SchedulerRepository
- type Sticky
- type TaskIdEventKeyTuple
- type TaskIdInsertedAtRetryCount
- type TaskInput
- type TaskMetadata
- type TaskOutputEvent
- func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
- func NewCancelledTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
- func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
- func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
- func NewFailedTaskOutputEventFromTask(task *sqlcv1.V1Task, errorMsg string) *TaskOutputEvent
- func NewSkippedTaskOutputEventFromTask(task *sqlcv1.V1Task, output []byte) *TaskOutputEvent
- type TaskRepository
- type TaskRepositoryImpl
- func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
- func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
- func (s TaskRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
- func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) GetQueueCounts(ctx context.Context, tenantId string) (map[string]int, error)
- func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
- func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
- func (r *TaskRepositoryImpl) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
- func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskReassignmentsRow, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskTimeoutsRow, bool, error)
- func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
- func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- type TaskRunMetric
- type TaskWithCancelledReason
- type TaskWithQueue
- type TriggerRepository
- type TriggerRepositoryImpl
- func (s TriggerRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
- func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
- type TriggerTaskData
- type UpdateDAGStatusRow
- type UpdateTaskStatusRow
- type V1StepRunData
- type V1WorkflowRunPopulator
- type WorkerRepository
- type WorkflowNameTriggerOpts
- type WorkflowRepository
- type WorkflowRunData
Constants ¶
const MAX_INTERNAL_RETRIES = 3
const MAX_TENANT_RATE_LIMITS = 10000
const NUM_PARTITIONS = 4
TODO: make this dynamic for the instance
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssignResults ¶
type AssignResults struct { Assigned []*AssignedItem Unassigned []*sqlcv1.V1QueueItem SchedulingTimedOut []*sqlcv1.V1QueueItem RateLimited []*RateLimitResult }
type AssignedItem ¶
type AssignedItem struct { WorkerId pgtype.UUID QueueItem *sqlcv1.V1QueueItem }
type AssignmentRepository ¶
type AssignmentRepository interface { ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*sqlcv1.ListActionsForWorkersRow, error) ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params sqlcv1.ListAvailableSlotsForWorkersParams) ([]*sqlcv1.ListAvailableSlotsForWorkersRow, error) }
type CandidateEventMatch ¶
type ChildWorkflowSignalCreatedData ¶
type ChildWorkflowSignalCreatedData struct { // The external id of the target child task ChildExternalId string `json:"external_id"` // The external id of the parent task ParentExternalId string `json:"parent_external_id"` // The index of the child task ChildIndex int64 `json:"child_index"` // The key of the child task ChildKey *string `json:"child_key"` }
func (*ChildWorkflowSignalCreatedData) Bytes ¶
func (c *ChildWorkflowSignalCreatedData) Bytes() []byte
type CompleteTaskOpts ¶
type CompleteTaskOpts struct { *TaskIdInsertedAtRetryCount // (required) the output bytes for the task Output []byte }
type ConcurrencyRepository ¶
type ConcurrencyRepository interface { // Checks whether the concurrency strategy is active, and if not, sets is_active=False UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (*RunConcurrencyResult, error) }
type ConcurrencyRepositoryImpl ¶
type ConcurrencyRepositoryImpl struct {
// contains filtered or unexported fields
}
func (ConcurrencyRepositoryImpl) DesiredWorkerId ¶
func (ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*ConcurrencyRepositoryImpl) RunConcurrencyStrategy ¶
func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) (res *RunConcurrencyResult, err error)
func (ConcurrencyRepositoryImpl) ToV1StepRunData ¶
func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive ¶
func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) error
type CreateLogLineOpts ¶
type CreateLogLineOpts struct { TaskId int64 TaskInsertedAt pgtype.Timestamptz // (optional) The time when the log line was created. CreatedAt *time.Time // (required) The message of the log line. Message string `validate:"required,min=1,max=10000"` // (optional) The level of the log line. Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"` // (optional) The metadata of the log line. Metadata []byte }
type CreateMatchOpts ¶
type CreateMatchOpts struct { Kind sqlcv1.V1MatchKind Conditions []GroupMatchCondition TriggerDAGId *int64 TriggerDAGInsertedAt pgtype.Timestamptz TriggerExternalId *string TriggerWorkflowRunId *string TriggerStepId *string TriggerStepIndex pgtype.Int8 TriggerExistingTaskId *int64 TriggerExistingTaskInsertedAt pgtype.Timestamptz TriggerParentTaskExternalId pgtype.UUID TriggerParentTaskId pgtype.Int8 TriggerParentTaskInsertedAt pgtype.Timestamptz TriggerChildIndex pgtype.Int8 TriggerChildKey pgtype.Text SignalTaskId *int64 SignalTaskInsertedAt pgtype.Timestamptz SignalExternalId *string SignalKey *string }
type CreateTaskOpts ¶
type CreateTaskOpts struct { // (required) the external id ExternalId string `validate:"required,uuid"` // (required) the workflow run id. note this may be the same as the external id if this is a // single-task workflow, otherwise it represents the external id of the DAG. WorkflowRunId string `validate:"required,uuid"` // (required) the step id StepId string `validate:"required,uuid"` // (required) the input bytes to the task Input *TaskInput // (required) the step index for the task StepIndex int // (optional) the additional metadata for the task AdditionalMetadata []byte // (optional) the desired worker id DesiredWorkerId *string // (optional) the DAG id for the task DagId *int64 // (optional) the DAG inserted at for the task DagInsertedAt pgtype.Timestamptz // (required) the initial state for the task InitialState sqlcv1.V1TaskInitialState // (optional) the parent task external id ParentTaskExternalId *string // (optional) the parent task id ParentTaskId *int64 // (optional) the parent task inserted at ParentTaskInsertedAt *time.Time // (optional) the child index for the task ChildIndex *int64 // (optional) the child key for the task ChildKey *string }
type DAGWithData ¶
type ErrNamesNotFound ¶
type ErrNamesNotFound struct {
Names []string
}
func (*ErrNamesNotFound) Error ¶
func (e *ErrNamesNotFound) Error() string
type EventTriggerOpts ¶
type EventType ¶
type EventType string
const ( EVENT_TYPE_REQUEUED_NO_WORKER EventType = "REQUEUED_NO_WORKER" EVENT_TYPE_REQUEUED_RATE_LIMIT EventType = "REQUEUED_RATE_LIMIT" EVENT_TYPE_SCHEDULING_TIMED_OUT EventType = "SCHEDULING_TIMED_OUT" EVENT_TYPE_ASSIGNED EventType = "ASSIGNED" EVENT_TYPE_STARTED EventType = "STARTED" EVENT_TYPE_FINISHED EventType = "FINISHED" EVENT_TYPE_FAILED EventType = "FAILED" EVENT_TYPE_RETRYING EventType = "RETRYING" EVENT_TYPE_CANCELLED EventType = "CANCELLED" EVENT_TYPE_TIMED_OUT EventType = "TIMED_OUT" EVENT_TYPE_REASSIGNED EventType = "REASSIGNED" EVENT_TYPE_SLOT_RELEASED EventType = "SLOT_RELEASED" EVENT_TYPE_TIMEOUT_REFRESHED EventType = "TIMEOUT_REFRESHED" EVENT_TYPE_RETRIED_BY_USER EventType = "RETRIED_BY_USER" EVENT_TYPE_SENT_TO_WORKER EventType = "SENT_TO_WORKER" EVENT_TYPE_RATE_LIMIT_ERROR EventType = "RATE_LIMIT_ERROR" EVENT_TYPE_ACKNOWLEDGED EventType = "ACKNOWLEDGED" EVENT_TYPE_CREATED EventType = "CREATED" EVENT_TYPE_QUEUED EventType = "QUEUED" )
type FailTaskOpts ¶
type FailTaskOpts struct { *TaskIdInsertedAtRetryCount // (required) whether this is an application-level error or an internal error on the Hatchet side IsAppError bool // (optional) the error message for the task ErrorMessage string }
type FailTasksResponse ¶
type FailTasksResponse struct { *FinalizedTaskResponse RetriedTasks []RetriedTask }
type FinalizedTaskResponse ¶
type FinalizedTaskResponse struct { ReleasedTasks []*sqlcv1.ReleaseTasksRow InternalEvents []InternalTaskEvent }
type GroupMatchCondition ¶
type GroupMatchCondition struct { GroupId string `validate:"required,uuid"` EventType sqlcv1.V1EventType EventKey string // (optional) a hint for querying the event data EventResourceHint *string // the data key which gets inserted into the returned data from a satisfied match condition ReadableDataKey string Expression string Action sqlcv1.V1MatchConditionAction // (optional) the data which was used to satisfy the condition (relevant for replays) Data []byte }
type InternalTaskEvent ¶
type InternalTaskEvent struct { TenantID string `json:"tenant_id"` TaskID int64 `json:"task_id"` TaskExternalID string `json:"task_external_id"` RetryCount int32 `json:"retry_count"` EventType sqlcv1.V1TaskEventType `json:"event_type"` EventKey string `json:"event_key"` Data []byte `json:"data"` }
InternalTaskEvent resembles sqlcv1.V1TaskEvent, but doesn't include the id field as we use COPY FROM to write the events to the database.
type LeaseRepository ¶
type LeaseRepository interface { ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1Queue, error) ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error) ListConcurrencyStrategies(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1StepConcurrency, error) AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind sqlcv1.LeaseKind, resourceIds []string, existingLeases []*sqlcv1.Lease) ([]*sqlcv1.Lease, error) ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*sqlcv1.Lease) error }
type ListActiveWorkersResult ¶
type ListActiveWorkersResult struct { ID string MaxRuns int Labels []*sqlcv1.ListManyWorkerLabelsRow }
type ListFinalizedWorkflowRunsResponse ¶
type ListFinalizedWorkflowRunsResponse struct { WorkflowRunId string OutputEvents []*TaskOutputEvent }
type ListLogsOpts ¶
type ListLogsOpts struct { // (optional) number of logs to skip Offset *int // (optional) number of logs to return Limit *int `validate:"omitnil,min=1,max=1000"` // (optional) a list of log levels to filter by Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"` // (optional) a search query Search *string }
type ListTaskRunOpts ¶
type ListWorkflowRunOpts ¶
type LogLineRepository ¶
type LogLineRepository interface { ListLogLines(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, opts *ListLogsOpts) ([]*sqlcv1.V1LogLine, error) PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) error }
type MatchData ¶
type MatchData struct {
// contains filtered or unexported fields
}
parses match aggregated data
func NewMatchData ¶
func (*MatchData) Action ¶
func (m *MatchData) Action() sqlcv1.V1MatchConditionAction
func (*MatchData) DataValueAsTaskOutputEvent ¶
func (m *MatchData) DataValueAsTaskOutputEvent(key string) *TaskOutputEvent
Helper function for internal events
type MatchRepository ¶
type MatchRepository interface {
ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*InternalEventMatchResults, error)
}
type MatchRepositoryImpl ¶
type MatchRepositoryImpl struct {
// contains filtered or unexported fields
}
func (MatchRepositoryImpl) DesiredWorkerId ¶
func (MatchRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*MatchRepositoryImpl) ProcessInternalEventMatches ¶
func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*InternalEventMatchResults, error)
ProcessInternalEventMatches processes a list of internal events
func (MatchRepositoryImpl) ToV1StepRunData ¶
func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
type OLAPRepository ¶
type OLAPRepository interface { UpdateTablePartitions(ctx context.Context) error ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error) CreateTasks(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) error CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error) UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error) UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error) ListTasksByDAGId(ctx context.Context, tenantId string, dagIds []pgtype.UUID) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error) // ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG. // In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG. ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error) }
type OLAPRepositoryImpl ¶
type OLAPRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*OLAPRepositoryImpl) CreateDAGs ¶
func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
func (*OLAPRepositoryImpl) CreateTaskEvents ¶
func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
func (*OLAPRepositoryImpl) CreateTasks ¶
func (OLAPRepositoryImpl) DesiredWorkerId ¶
func (*OLAPRepositoryImpl) GetTaskPointMetrics ¶
func (*OLAPRepositoryImpl) ListTaskRunEvents ¶
func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
func (*OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId ¶
func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
func (*OLAPRepositoryImpl) ListTasks ¶
func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
func (*OLAPRepositoryImpl) ListTasksByDAGId ¶
func (*OLAPRepositoryImpl) ListTasksByExternalIds ¶
func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
func (*OLAPRepositoryImpl) ListTasksByIdAndInsertedAt ¶
func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRunDisplayNames ¶
func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRuns ¶
func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
func (OLAPRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*OLAPRepositoryImpl) ReadDAG ¶
func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRun ¶
func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRunData ¶
func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error)
func (*OLAPRepositoryImpl) ReadTaskRunMetrics ¶
func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
func (*OLAPRepositoryImpl) ReadWorkflowRun ¶
func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
func (OLAPRepositoryImpl) ToV1StepRunData ¶
func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*OLAPRepositoryImpl) UpdateDAGStatuses ¶
func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
func (*OLAPRepositoryImpl) UpdateTablePartitions ¶
func (o *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
func (*OLAPRepositoryImpl) UpdateTaskStatuses ¶
func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
type QueueFactoryRepository ¶
type QueueFactoryRepository interface {
NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}
type QueueRepository ¶
type QueueRepository interface { ListQueueItems(ctx context.Context, limit int) ([]*sqlcv1.V1QueueItem, error) MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error) GetTaskRateLimits(ctx context.Context, queueItems []*sqlcv1.V1QueueItem) (map[int64]map[string]int32, error) GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*sqlcv1.GetDesiredLabelsRow, error) Cleanup() }
type RateLimitRepository ¶
type RateLimitResult ¶
type ReadTaskRunMetricsOpts ¶
type ReadableTaskStatus ¶
type ReadableTaskStatus string
const ( READABLE_TASK_STATUS_QUEUED ReadableTaskStatus = "QUEUED" READABLE_TASK_STATUS_RUNNING ReadableTaskStatus = "RUNNING" READABLE_TASK_STATUS_COMPLETED ReadableTaskStatus = "COMPLETED" READABLE_TASK_STATUS_CANCELLED ReadableTaskStatus = "CANCELLED" READABLE_TASK_STATUS_FAILED ReadableTaskStatus = "FAILED" )
func StringToReadableStatus ¶
func StringToReadableStatus(status string) ReadableTaskStatus
func (ReadableTaskStatus) EnumValue ¶
func (s ReadableTaskStatus) EnumValue() int
type RefreshTimeoutBy ¶
type ReplayTaskOpts ¶
type ReplayTaskOpts struct { // (required) the task id TaskId int64 // (required) the inserted at time InsertedAt pgtype.Timestamptz // (required) the external id ExternalId string // (required) the step id StepId string // (optional) the input bytes to the task, uses the existing input if not set Input *TaskInput // (required) the initial state for the task InitialState sqlcv1.V1TaskInitialState // (optional) the additional metadata for the task AdditionalMetadata []byte }
type ReplayTasksResult ¶
type ReplayTasksResult struct { ReplayedTasks []TaskIdInsertedAtRetryCount QueuedTasks []*sqlcv1.V1Task InternalEventResults *InternalEventMatchResults }
type Repository ¶
type Repository interface { Triggers() TriggerRepository Tasks() TaskRepository Scheduler() SchedulerRepository Matches() MatchRepository OLAP() OLAPRepository OverwriteOLAPRepository(o OLAPRepository) Logs() LogLineRepository Workers() WorkerRepository Workflows() WorkflowRepository }
func NewRepository ¶
type RetriedTask ¶
type RunConcurrencyResult ¶
type RunConcurrencyResult struct { // The tasks which were enqueued Queued []TaskWithQueue // If the strategy involves cancelling a task, these are the tasks to cancel Cancelled []TaskWithCancelledReason // If the step has multiple concurrency strategies, these are the next ones to notify NextConcurrencyStrategies []int64 }
type SchedulerRepository ¶
type SchedulerRepository interface { Concurrency() ConcurrencyRepository Lease() LeaseRepository QueueFactory() QueueFactoryRepository RateLimit() RateLimitRepository Assignment() AssignmentRepository }
type TaskIdEventKeyTuple ¶
type TaskIdInsertedAtRetryCount ¶
type TaskIdInsertedAtRetryCount struct { // (required) the external id Id int64 `validate:"required"` // (required) the inserted at time InsertedAt pgtype.Timestamptz // (required) the retry count RetryCount int32 }
type TaskInput ¶
type TaskMetadata ¶
type TaskMetadata struct { TaskID int64 `json:"task_id"` TaskInsertedAt time.Time `json:"task_inserted_at"` }
func ParseTaskMetadata ¶
func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error)
type TaskOutputEvent ¶
type TaskOutputEvent struct { IsFailure bool `json:"is_failure"` EventType sqlcv1.V1TaskEventType `json:"event_type"` TaskExternalId string `json:"task_external_id"` TaskId int64 `json:"task_id"` RetryCount int32 `json:"retry_count"` WorkerId *string `json:"worker_id"` Output []byte `json:"output"` ErrorMessage string `json:"error_message"` StepReadableID string `json:"step_readable_id"` }
func NewCancelledTaskOutputEvent ¶
func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
func NewCancelledTaskOutputEventFromTask ¶
func NewCancelledTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
func NewCompletedTaskOutputEvent ¶
func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
func NewFailedTaskOutputEvent ¶
func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
func NewFailedTaskOutputEventFromTask ¶
func NewFailedTaskOutputEventFromTask(task *sqlcv1.V1Task, errorMsg string) *TaskOutputEvent
func NewSkippedTaskOutputEventFromTask ¶
func NewSkippedTaskOutputEventFromTask(task *sqlcv1.V1Task, output []byte) *TaskOutputEvent
func (*TaskOutputEvent) Bytes ¶
func (e *TaskOutputEvent) Bytes() []byte
func (*TaskOutputEvent) IsCancelled ¶
func (e *TaskOutputEvent) IsCancelled() bool
func (*TaskOutputEvent) IsCompleted ¶
func (e *TaskOutputEvent) IsCompleted() bool
func (*TaskOutputEvent) IsFailed ¶
func (e *TaskOutputEvent) IsFailed() bool
type TaskRepository ¶
type TaskRepository interface { UpdateTablePartitions(ctx context.Context) error // GetTaskByExternalId is a heavily cached method to return task metadata by its external id GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error) // FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids. // This is non-cacheable because tasks can be added to a workflow run as it executes. FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error) FailTasks(ctx context.Context, tenantId string, tasks []FailTaskOpts) (*FailTasksResponse, error) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error) ProcessTaskTimeouts(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskTimeoutsRow, bool, error) ProcessTaskReassignments(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskReassignmentsRow, bool, error) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error) GetQueueCounts(ctx context.Context, tenantId string) (map[string]int, error) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error) ReleaseSlot(ctx context.Context, tenantId string, externalId string) (*sqlcv1.V1TaskRuntime, error) }
type TaskRepositoryImpl ¶
type TaskRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*TaskRepositoryImpl) CancelTasks ¶
func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
func (*TaskRepositoryImpl) CompleteTasks ¶
func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
func (TaskRepositoryImpl) DesiredWorkerId ¶
func (*TaskRepositoryImpl) FailTasks ¶
func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
func (*TaskRepositoryImpl) FlattenExternalIds ¶
func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) GetQueueCounts ¶
func (*TaskRepositoryImpl) GetTaskByExternalId ¶
func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) ListFinalizedWorkflowRuns ¶
func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
func (*TaskRepositoryImpl) ListTaskMetas ¶
func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
func (TaskRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TaskRepositoryImpl) ProcessTaskReassignments ¶
func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskReassignmentsRow, bool, error)
func (*TaskRepositoryImpl) ProcessTaskRetryQueueItems ¶
func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
func (*TaskRepositoryImpl) ProcessTaskTimeouts ¶
func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskTimeoutsRow, bool, error)
func (*TaskRepositoryImpl) RefreshTimeoutBy ¶
func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReleaseSlot ¶
func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReplayTasks ¶
func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
func (TaskRepositoryImpl) ToV1StepRunData ¶
func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TaskRepositoryImpl) UpdateTablePartitions ¶
func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
type TaskRunMetric ¶
type TaskWithCancelledReason ¶
type TaskWithCancelledReason struct { *TaskIdInsertedAtRetryCount CancelledReason string TaskExternalId string WorkflowRunId string }
type TaskWithQueue ¶
type TaskWithQueue struct { *TaskIdInsertedAtRetryCount Queue string }
type TriggerRepository ¶
type TriggerRepository interface { TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error }
type TriggerRepositoryImpl ¶
type TriggerRepositoryImpl struct {
// contains filtered or unexported fields
}
func (TriggerRepositoryImpl) DesiredWorkerId ¶
func (TriggerRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts ¶
func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
func (TriggerRepositoryImpl) ToV1StepRunData ¶
func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TriggerRepositoryImpl) TriggerFromEvents ¶
func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
func (*TriggerRepositoryImpl) TriggerFromWorkflowNames ¶
func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
type TriggerTaskData ¶
type TriggerTaskData struct { // (required) the workflow name WorkflowName string `json:"workflow_name" validate:"required"` // (optional) the workflow run data Data []byte `json:"data"` // (optional) the workflow run metadata AdditionalMetadata []byte `json:"additional_metadata"` // (optional) the desired worker id DesiredWorkerId *string `json:"desired_worker_id"` // (optional) the parent external id ParentExternalId *string `json:"parent_external_id"` // (optional) the parent task id ParentTaskId *int64 `json:"parent_task_id"` // (optional) the parent inserted_at ParentTaskInsertedAt *time.Time `json:"parent_task_inserted_at"` // (optional) the child index ChildIndex *int64 `json:"child_index"` // (optional) the child key ChildKey *string `json:"child_key"` }
type UpdateDAGStatusRow ¶
type UpdateDAGStatusRow struct { DagId int64 DagInsertedAt pgtype.Timestamptz ReadableStatus sqlcv1.V1ReadableStatusOlap ExternalId pgtype.UUID }
type UpdateTaskStatusRow ¶
type UpdateTaskStatusRow struct { TaskId int64 TaskInsertedAt pgtype.Timestamptz ReadableStatus sqlcv1.V1ReadableStatusOlap ExternalId pgtype.UUID }
type V1StepRunData ¶
type V1StepRunData struct { Input map[string]interface{} `json:"input"` TriggeredBy string `json:"triggered_by"` Parents map[string]map[string]interface{} `json:"parents"` // custom-set user data for the step UserData map[string]interface{} `json:"user_data"` // overrides set from the playground Overrides map[string]interface{} `json:"overrides"` // errors in upstream steps (only used in on-failure step) StepRunErrors map[string]string `json:"step_run_errors,omitempty"` }
func (*V1StepRunData) Bytes ¶
func (v1 *V1StepRunData) Bytes() []byte
type V1WorkflowRunPopulator ¶
type V1WorkflowRunPopulator struct { WorkflowRun *WorkflowRunData TaskMetadata []TaskMetadata }
type WorkerRepository ¶
type WorkerRepository interface { ListWorkers(tenantId string, opts *repository.ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error) GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error) ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error) }
type WorkflowNameTriggerOpts ¶
type WorkflowNameTriggerOpts struct { *TriggerTaskData ExternalId string // Whether to skip the creation of the child workflow ShouldSkip bool }
type WorkflowRepository ¶
type WorkflowRunData ¶
type WorkflowRunData struct { AdditionalMetadata []byte `json:"additional_metadata"` CreatedAt pgtype.Timestamptz `json:"created_at"` DisplayName string `json:"display_name"` ErrorMessage string `json:"error_message"` ExternalID pgtype.UUID `json:"external_id"` FinishedAt pgtype.Timestamptz `json:"finished_at"` Input []byte `json:"input"` InsertedAt pgtype.Timestamptz `json:"inserted_at"` Kind sqlcv1.V1RunKind `json:"kind"` Output *[]byte `json:"output,omitempty"` ReadableStatus sqlcv1.V1ReadableStatusOlap `json:"readable_status"` StepId *pgtype.UUID `json:"step_id,omitempty"` StartedAt pgtype.Timestamptz `json:"started_at"` TaskExternalId *pgtype.UUID `json:"task_external_id,omitempty"` TaskId *int64 `json:"task_id,omitempty"` TaskInsertedAt *pgtype.Timestamptz `json:"task_inserted_at,omitempty"` TenantID pgtype.UUID `json:"tenant_id"` WorkflowID pgtype.UUID `json:"workflow_id"` WorkflowVersionId pgtype.UUID `json:"workflow_version_id"` }