v1

package
v0.55.0-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2025 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const MAX_INTERNAL_RETRIES = 3
View Source
const MAX_TENANT_RATE_LIMITS = 10000
View Source
const NUM_PARTITIONS = 4

TODO: make this dynamic for the instance

Variables

This section is empty.

Functions

This section is empty.

Types

type AssignResults

type AssignResults struct {
	Assigned           []*AssignedItem
	Unassigned         []*sqlcv1.V1QueueItem
	SchedulingTimedOut []*sqlcv1.V1QueueItem
	RateLimited        []*RateLimitResult
}

type AssignedItem

type AssignedItem struct {
	WorkerId pgtype.UUID

	QueueItem *sqlcv1.V1QueueItem
}

type AssignmentRepository

type AssignmentRepository interface {
	ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*sqlcv1.ListActionsForWorkersRow, error)
	ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params sqlcv1.ListAvailableSlotsForWorkersParams) ([]*sqlcv1.ListAvailableSlotsForWorkersRow, error)
}

type CandidateEventMatch

type CandidateEventMatch struct {
	// A UUID for the event
	ID string

	// A timestamp for the event
	EventTimestamp time.Time

	// Key for the event
	Key string

	// Resource hint for the event
	ResourceHint *string

	// Data for the event
	Data []byte
}

type ChildWorkflowSignalCreatedData

type ChildWorkflowSignalCreatedData struct {
	// The external id of the target child task
	ChildExternalId string `json:"external_id"`

	// The external id of the parent task
	ParentExternalId string `json:"parent_external_id"`

	// The index of the child task
	ChildIndex int64 `json:"child_index"`

	// The key of the child task
	ChildKey *string `json:"child_key"`
}

func (*ChildWorkflowSignalCreatedData) Bytes

func (c *ChildWorkflowSignalCreatedData) Bytes() []byte

type CompleteTaskOpts

type CompleteTaskOpts struct {
	*TaskIdInsertedAtRetryCount

	// (required) the output bytes for the task
	Output []byte
}

type ConcurrencyRepository

type ConcurrencyRepository interface {
	// Checks whether the concurrency strategy is active, and if not, sets is_active=False
	UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error

	RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (*RunConcurrencyResult, error)
}

type ConcurrencyRepositoryImpl

type ConcurrencyRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (ConcurrencyRepositoryImpl) DesiredWorkerId

func (s ConcurrencyRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow

func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*ConcurrencyRepositoryImpl) RunConcurrencyStrategy

func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(
	ctx context.Context,
	tenantId pgtype.UUID,
	strategy *sqlcv1.V1StepConcurrency,
) (res *RunConcurrencyResult, err error)

func (ConcurrencyRepositoryImpl) ToV1StepRunData

func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive

func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(
	ctx context.Context,
	tenantId pgtype.UUID,
	strategy *sqlcv1.V1StepConcurrency,
) error

type CreateLogLineOpts

type CreateLogLineOpts struct {
	TaskId int64

	TaskInsertedAt pgtype.Timestamptz

	// (optional) The time when the log line was created.
	CreatedAt *time.Time

	// (required) The message of the log line.
	Message string `validate:"required,min=1,max=10000"`

	// (optional) The level of the log line.
	Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`

	// (optional) The metadata of the log line.
	Metadata []byte
}

type CreateMatchOpts

type CreateMatchOpts struct {
	Kind sqlcv1.V1MatchKind

	Conditions []GroupMatchCondition

	TriggerDAGId *int64

	TriggerDAGInsertedAt pgtype.Timestamptz

	TriggerExternalId *string

	TriggerWorkflowRunId *string

	TriggerStepId *string

	TriggerStepIndex pgtype.Int8

	TriggerExistingTaskId *int64

	TriggerExistingTaskInsertedAt pgtype.Timestamptz

	TriggerParentTaskExternalId pgtype.UUID

	TriggerParentTaskId pgtype.Int8

	TriggerParentTaskInsertedAt pgtype.Timestamptz

	TriggerChildIndex pgtype.Int8

	TriggerChildKey pgtype.Text

	SignalTaskId *int64

	SignalTaskInsertedAt pgtype.Timestamptz

	SignalExternalId *string

	SignalKey *string
}

type CreateTaskOpts

type CreateTaskOpts struct {
	// (required) the external id
	ExternalId string `validate:"required,uuid"`

	// (required) the workflow run id. note this may be the same as the external id if this is a
	// single-task workflow, otherwise it represents the external id of the DAG.
	WorkflowRunId string `validate:"required,uuid"`

	// (required) the step id
	StepId string `validate:"required,uuid"`

	// (required) the input bytes to the task
	Input *TaskInput

	// (required) the step index for the task
	StepIndex int

	// (optional) the additional metadata for the task
	AdditionalMetadata []byte

	// (optional) the desired worker id
	DesiredWorkerId *string

	// (optional) the DAG id for the task
	DagId *int64

	// (optional) the DAG inserted at for the task
	DagInsertedAt pgtype.Timestamptz

	// (required) the initial state for the task
	InitialState sqlcv1.V1TaskInitialState

	// (optional) the parent task external id
	ParentTaskExternalId *string

	// (optional) the parent task id
	ParentTaskId *int64

	// (optional) the parent task inserted at
	ParentTaskInsertedAt *time.Time

	// (optional) the child index for the task
	ChildIndex *int64

	// (optional) the child key for the task
	ChildKey *string
}

type DAGWithData

type DAGWithData struct {
	*sqlcv1.V1Dag

	Input []byte

	AdditionalMetadata []byte

	ParentTaskExternalID *pgtype.UUID
}

type ErrNamesNotFound

type ErrNamesNotFound struct {
	Names []string
}

func (*ErrNamesNotFound) Error

func (e *ErrNamesNotFound) Error() string

type EventTriggerOpts

type EventTriggerOpts struct {
	EventId string

	Key string

	Data []byte

	AdditionalMetadata []byte
}

type EventType

type EventType string
const (
	EVENT_TYPE_REQUEUED_NO_WORKER   EventType = "REQUEUED_NO_WORKER"
	EVENT_TYPE_REQUEUED_RATE_LIMIT  EventType = "REQUEUED_RATE_LIMIT"
	EVENT_TYPE_SCHEDULING_TIMED_OUT EventType = "SCHEDULING_TIMED_OUT"
	EVENT_TYPE_ASSIGNED             EventType = "ASSIGNED"
	EVENT_TYPE_STARTED              EventType = "STARTED"
	EVENT_TYPE_FINISHED             EventType = "FINISHED"
	EVENT_TYPE_FAILED               EventType = "FAILED"
	EVENT_TYPE_RETRYING             EventType = "RETRYING"
	EVENT_TYPE_CANCELLED            EventType = "CANCELLED"
	EVENT_TYPE_TIMED_OUT            EventType = "TIMED_OUT"
	EVENT_TYPE_REASSIGNED           EventType = "REASSIGNED"
	EVENT_TYPE_SLOT_RELEASED        EventType = "SLOT_RELEASED"
	EVENT_TYPE_TIMEOUT_REFRESHED    EventType = "TIMEOUT_REFRESHED"
	EVENT_TYPE_RETRIED_BY_USER      EventType = "RETRIED_BY_USER"
	EVENT_TYPE_SENT_TO_WORKER       EventType = "SENT_TO_WORKER"
	EVENT_TYPE_RATE_LIMIT_ERROR     EventType = "RATE_LIMIT_ERROR"
	EVENT_TYPE_ACKNOWLEDGED         EventType = "ACKNOWLEDGED"
	EVENT_TYPE_CREATED              EventType = "CREATED"
	EVENT_TYPE_QUEUED               EventType = "QUEUED"
)

type FailTaskOpts

type FailTaskOpts struct {
	*TaskIdInsertedAtRetryCount

	// (required) whether this is an application-level error or an internal error on the Hatchet side
	IsAppError bool

	// (optional) the error message for the task
	ErrorMessage string
}

type FailTasksResponse

type FailTasksResponse struct {
	*FinalizedTaskResponse

	RetriedTasks []RetriedTask
}

type FinalizedTaskResponse

type FinalizedTaskResponse struct {
	ReleasedTasks []*sqlcv1.ReleaseTasksRow

	InternalEvents []InternalTaskEvent
}

type GroupMatchCondition

type GroupMatchCondition struct {
	GroupId string `validate:"required,uuid"`

	EventType sqlcv1.V1EventType

	EventKey string

	// (optional) a hint for querying the event data
	EventResourceHint *string

	// the data key which gets inserted into the returned data from a satisfied match condition
	ReadableDataKey string

	Expression string

	Action sqlcv1.V1MatchConditionAction

	// (optional) the data which was used to satisfy the condition (relevant for replays)
	Data []byte
}

type InternalEventMatchResults

type InternalEventMatchResults struct {
	// The list of tasks which were created from the matches
	CreatedTasks []*sqlcv1.V1Task

	// The list of tasks which were replayed from the matches
	ReplayedTasks []*sqlcv1.V1Task
}

type InternalTaskEvent

type InternalTaskEvent struct {
	TenantID       string                 `json:"tenant_id"`
	TaskID         int64                  `json:"task_id"`
	TaskExternalID string                 `json:"task_external_id"`
	RetryCount     int32                  `json:"retry_count"`
	EventType      sqlcv1.V1TaskEventType `json:"event_type"`
	EventKey       string                 `json:"event_key"`
	Data           []byte                 `json:"data"`
}

InternalTaskEvent resembles sqlcv1.V1TaskEvent, but doesn't include the id field as we use COPY FROM to write the events to the database.

type LeaseRepository

type LeaseRepository interface {
	ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1Queue, error)
	ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)
	ListConcurrencyStrategies(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1StepConcurrency, error)

	AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind sqlcv1.LeaseKind, resourceIds []string, existingLeases []*sqlcv1.Lease) ([]*sqlcv1.Lease, error)
	ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*sqlcv1.Lease) error
}

type ListActiveWorkersResult

type ListActiveWorkersResult struct {
	ID      string
	MaxRuns int
	Labels  []*sqlcv1.ListManyWorkerLabelsRow
}

type ListFinalizedWorkflowRunsResponse

type ListFinalizedWorkflowRunsResponse struct {
	WorkflowRunId string

	OutputEvents []*TaskOutputEvent
}

type ListLogsOpts

type ListLogsOpts struct {
	// (optional) number of logs to skip
	Offset *int

	// (optional) number of logs to return
	Limit *int `validate:"omitnil,min=1,max=1000"`

	// (optional) a list of log levels to filter by
	Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`

	// (optional) a search query
	Search *string
}

type ListTaskRunOpts

type ListTaskRunOpts struct {
	CreatedAfter time.Time

	Statuses []sqlcv1.V1ReadableStatusOlap

	WorkflowIds []uuid.UUID

	WorkerId *uuid.UUID

	StartedAfter time.Time

	FinishedBefore *time.Time

	AdditionalMetadata map[string]interface{}

	Limit int64

	Offset int64
}

type ListWorkflowRunOpts

type ListWorkflowRunOpts struct {
	CreatedAfter time.Time

	Statuses []sqlcv1.V1ReadableStatusOlap

	WorkflowIds []uuid.UUID

	StartedAfter time.Time

	FinishedBefore *time.Time

	AdditionalMetadata map[string]interface{}

	Limit int64

	Offset int64

	ParentTaskExternalId *pgtype.UUID
}

type LogLineRepository

type LogLineRepository interface {
	ListLogLines(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, opts *ListLogsOpts) ([]*sqlcv1.V1LogLine, error)

	PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) error
}

type MatchData

type MatchData struct {
	// contains filtered or unexported fields
}

parses match aggregated data

func NewMatchData

func NewMatchData(mcAggregatedData []byte) (*MatchData, error)

func (*MatchData) Action

func (*MatchData) DataKeys

func (m *MatchData) DataKeys() []string

func (*MatchData) DataValueAsTaskOutputEvent

func (m *MatchData) DataValueAsTaskOutputEvent(key string) *TaskOutputEvent

Helper function for internal events

type MatchRepository

type MatchRepository interface {
	ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*InternalEventMatchResults, error)
}

type MatchRepositoryImpl

type MatchRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (MatchRepositoryImpl) DesiredWorkerId

func (s MatchRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (MatchRepositoryImpl) PopulateExternalIdsForWorkflow

func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*MatchRepositoryImpl) ProcessInternalEventMatches

func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*InternalEventMatchResults, error)

ProcessInternalEventMatches processes a list of internal events

func (MatchRepositoryImpl) ToV1StepRunData

func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

type OLAPRepository

type OLAPRepository interface {
	UpdateTablePartitions(ctx context.Context) error
	ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
	ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
	ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error)

	ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
	ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
	ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
	ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
	ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
	ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
	CreateTasks(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) error
	CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
	CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
	GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)
	UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
	UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
	ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
	ListTasksByDAGId(ctx context.Context, tenantId string, dagIds []pgtype.UUID) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
	ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)

	// ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG.
	// In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG.
	ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
}

func NewOLAPRepositoryFromPool

func NewOLAPRepositoryFromPool(pool *pgxpool.Pool, l *zerolog.Logger, olapRetentionPeriod time.Duration) (OLAPRepository, func() error)

type OLAPRepositoryImpl

type OLAPRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (*OLAPRepositoryImpl) CreateDAGs

func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error

func (*OLAPRepositoryImpl) CreateTaskEvents

func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error

func (*OLAPRepositoryImpl) CreateTasks

func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) error

func (OLAPRepositoryImpl) DesiredWorkerId

func (s OLAPRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (*OLAPRepositoryImpl) GetTaskPointMetrics

func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)

func (*OLAPRepositoryImpl) ListTaskRunEvents

func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)

func (*OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId

func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)

func (*OLAPRepositoryImpl) ListTasks

func (*OLAPRepositoryImpl) ListTasksByDAGId

func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)

func (*OLAPRepositoryImpl) ListTasksByExternalIds

func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)

func (*OLAPRepositoryImpl) ListTasksByIdAndInsertedAt

func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)

func (*OLAPRepositoryImpl) ListWorkflowRunDisplayNames

func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)

func (*OLAPRepositoryImpl) ListWorkflowRuns

func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)

func (OLAPRepositoryImpl) PopulateExternalIdsForWorkflow

func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*OLAPRepositoryImpl) ReadDAG

func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)

func (*OLAPRepositoryImpl) ReadTaskRun

func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)

func (*OLAPRepositoryImpl) ReadTaskRunData

func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error)

func (*OLAPRepositoryImpl) ReadTaskRunMetrics

func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)

func (*OLAPRepositoryImpl) ReadWorkflowRun

func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)

func (OLAPRepositoryImpl) ToV1StepRunData

func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*OLAPRepositoryImpl) UpdateDAGStatuses

func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)

func (*OLAPRepositoryImpl) UpdateTablePartitions

func (o *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error

func (*OLAPRepositoryImpl) UpdateTaskStatuses

func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)

type QueueFactoryRepository

type QueueFactoryRepository interface {
	NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}

type QueueRepository

type QueueRepository interface {
	ListQueueItems(ctx context.Context, limit int) ([]*sqlcv1.V1QueueItem, error)
	MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)

	GetTaskRateLimits(ctx context.Context, queueItems []*sqlcv1.V1QueueItem) (map[int64]map[string]int32, error)
	GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*sqlcv1.GetDesiredLabelsRow, error)
	Cleanup()
}

type RateLimitRepository

type RateLimitRepository interface {
	ListCandidateRateLimits(ctx context.Context, tenantId pgtype.UUID) ([]string, error)
	UpdateRateLimits(ctx context.Context, tenantId pgtype.UUID, updates map[string]int) (map[string]int, error)
}

type RateLimitResult

type RateLimitResult struct {
	ExceededKey    string
	ExceededUnits  int32
	ExceededVal    int32
	TaskId         int64
	TaskInsertedAt pgtype.Timestamptz
	RetryCount     int32
}

type ReadTaskRunMetricsOpts

type ReadTaskRunMetricsOpts struct {
	CreatedAfter time.Time

	WorkflowIds []uuid.UUID

	ParentTaskExternalID *pgtype.UUID
}

type ReadableTaskStatus

type ReadableTaskStatus string
const (
	READABLE_TASK_STATUS_QUEUED    ReadableTaskStatus = "QUEUED"
	READABLE_TASK_STATUS_RUNNING   ReadableTaskStatus = "RUNNING"
	READABLE_TASK_STATUS_COMPLETED ReadableTaskStatus = "COMPLETED"
	READABLE_TASK_STATUS_CANCELLED ReadableTaskStatus = "CANCELLED"
	READABLE_TASK_STATUS_FAILED    ReadableTaskStatus = "FAILED"
)

func StringToReadableStatus

func StringToReadableStatus(status string) ReadableTaskStatus

func (ReadableTaskStatus) EnumValue

func (s ReadableTaskStatus) EnumValue() int

type RefreshTimeoutBy

type RefreshTimeoutBy struct {
	TaskExternalId string `validate:"required,uuid"`

	IncrementTimeoutBy string `validate:"required,duration"`
}

type ReplayTaskOpts

type ReplayTaskOpts struct {
	// (required) the task id
	TaskId int64

	// (required) the inserted at time
	InsertedAt pgtype.Timestamptz

	// (required) the external id
	ExternalId string

	// (required) the step id
	StepId string

	// (optional) the input bytes to the task, uses the existing input if not set
	Input *TaskInput

	// (required) the initial state for the task
	InitialState sqlcv1.V1TaskInitialState

	// (optional) the additional metadata for the task
	AdditionalMetadata []byte
}

type ReplayTasksResult

type ReplayTasksResult struct {
	ReplayedTasks []TaskIdInsertedAtRetryCount

	QueuedTasks []*sqlcv1.V1Task

	InternalEventResults *InternalEventMatchResults
}

type Repository

type Repository interface {
	Triggers() TriggerRepository
	Tasks() TaskRepository
	Scheduler() SchedulerRepository
	Matches() MatchRepository
	OLAP() OLAPRepository
	OverwriteOLAPRepository(o OLAPRepository)
	Logs() LogLineRepository
	Workers() WorkerRepository
	Workflows() WorkflowRepository
}

func NewRepository

func NewRepository(pool *pgxpool.Pool, l *zerolog.Logger, taskRetentionPeriod, olapRetentionPeriod time.Duration) (Repository, func() error)

type RetriedTask

type RetriedTask struct {
	*TaskIdInsertedAtRetryCount

	AppRetryCount int32

	RetryBackoffFactor pgtype.Float8

	RetryMaxBackoff pgtype.Int4
}

type RunConcurrencyResult

type RunConcurrencyResult struct {
	// The tasks which were enqueued
	Queued []TaskWithQueue

	// If the strategy involves cancelling a task, these are the tasks to cancel
	Cancelled []TaskWithCancelledReason

	// If the step has multiple concurrency strategies, these are the next ones to notify
	NextConcurrencyStrategies []int64
}

type SchedulerRepository

type SchedulerRepository interface {
	Concurrency() ConcurrencyRepository
	Lease() LeaseRepository
	QueueFactory() QueueFactoryRepository
	RateLimit() RateLimitRepository
	Assignment() AssignmentRepository
}

type Sticky

type Sticky string
const (
	STICKY_HARD Sticky = "HARD"
	STICKY_SOFT Sticky = "SOFT"
	STICKY_NONE Sticky = "NONE"
)

type TaskIdEventKeyTuple

type TaskIdEventKeyTuple struct {
	Id int64 `validate:"required"`

	EventKey string `validate:"required"`
}

type TaskIdInsertedAtRetryCount

type TaskIdInsertedAtRetryCount struct {
	// (required) the external id
	Id int64 `validate:"required"`

	// (required) the inserted at time
	InsertedAt pgtype.Timestamptz

	// (required) the retry count
	RetryCount int32
}

type TaskInput

type TaskInput struct {
	Input map[string]interface{} `json:"input"`

	TriggerData *MatchData `json:"trigger_datas"`
}

func (*TaskInput) Bytes

func (t *TaskInput) Bytes() []byte

type TaskMetadata

type TaskMetadata struct {
	TaskID         int64     `json:"task_id"`
	TaskInsertedAt time.Time `json:"task_inserted_at"`
}

func ParseTaskMetadata

func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error)

type TaskOutputEvent

type TaskOutputEvent struct {
	IsFailure bool `json:"is_failure"`

	EventType sqlcv1.V1TaskEventType `json:"event_type"`

	TaskExternalId string `json:"task_external_id"`

	TaskId int64 `json:"task_id"`

	RetryCount int32 `json:"retry_count"`

	WorkerId *string `json:"worker_id"`

	Output []byte `json:"output"`

	ErrorMessage string `json:"error_message"`

	StepReadableID string `json:"step_readable_id"`
}

func NewCancelledTaskOutputEvent

func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent

func NewCancelledTaskOutputEventFromTask

func NewCancelledTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent

func NewCompletedTaskOutputEvent

func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent

func NewFailedTaskOutputEvent

func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent

func NewFailedTaskOutputEventFromTask

func NewFailedTaskOutputEventFromTask(task *sqlcv1.V1Task, errorMsg string) *TaskOutputEvent

func NewSkippedTaskOutputEventFromTask

func NewSkippedTaskOutputEventFromTask(task *sqlcv1.V1Task, output []byte) *TaskOutputEvent

func (*TaskOutputEvent) Bytes

func (e *TaskOutputEvent) Bytes() []byte

func (*TaskOutputEvent) IsCancelled

func (e *TaskOutputEvent) IsCancelled() bool

func (*TaskOutputEvent) IsCompleted

func (e *TaskOutputEvent) IsCompleted() bool

func (*TaskOutputEvent) IsFailed

func (e *TaskOutputEvent) IsFailed() bool

type TaskRepository

type TaskRepository interface {
	UpdateTablePartitions(ctx context.Context) error

	// GetTaskByExternalId is a heavily cached method to return task metadata by its external id
	GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)

	// FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids.
	// This is non-cacheable because tasks can be added to a workflow run as it executes.
	FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)

	CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)

	FailTasks(ctx context.Context, tenantId string, tasks []FailTaskOpts) (*FailTasksResponse, error)

	CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)

	ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)

	ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)

	ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)

	ProcessTaskTimeouts(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskTimeoutsRow, bool, error)

	ProcessTaskReassignments(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskReassignmentsRow, bool, error)

	ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)

	GetQueueCounts(ctx context.Context, tenantId string) (map[string]int, error)

	ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)

	RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)

	ReleaseSlot(ctx context.Context, tenantId string, externalId string) (*sqlcv1.V1TaskRuntime, error)
}

type TaskRepositoryImpl

type TaskRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (*TaskRepositoryImpl) CancelTasks

func (*TaskRepositoryImpl) CompleteTasks

func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)

func (TaskRepositoryImpl) DesiredWorkerId

func (s TaskRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (*TaskRepositoryImpl) FailTasks

func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)

func (*TaskRepositoryImpl) FlattenExternalIds

func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)

func (*TaskRepositoryImpl) GetQueueCounts

func (r *TaskRepositoryImpl) GetQueueCounts(ctx context.Context, tenantId string) (map[string]int, error)

func (*TaskRepositoryImpl) GetTaskByExternalId

func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)

func (*TaskRepositoryImpl) ListFinalizedWorkflowRuns

func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)

func (*TaskRepositoryImpl) ListTaskMetas

func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)

func (*TaskRepositoryImpl) ListTasks

func (r *TaskRepositoryImpl) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)

func (TaskRepositoryImpl) PopulateExternalIdsForWorkflow

func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*TaskRepositoryImpl) ProcessTaskReassignments

func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskReassignmentsRow, bool, error)

func (*TaskRepositoryImpl) ProcessTaskRetryQueueItems

func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)

func (*TaskRepositoryImpl) ProcessTaskTimeouts

func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskTimeoutsRow, bool, error)

func (*TaskRepositoryImpl) RefreshTimeoutBy

func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)

func (*TaskRepositoryImpl) ReleaseSlot

func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)

func (*TaskRepositoryImpl) ReplayTasks

func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)

func (TaskRepositoryImpl) ToV1StepRunData

func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*TaskRepositoryImpl) UpdateTablePartitions

func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error

type TaskRunMetric

type TaskRunMetric struct {
	Status string `json:"status"`
	Count  uint64 `json:"count"`
}

type TaskWithCancelledReason

type TaskWithCancelledReason struct {
	*TaskIdInsertedAtRetryCount

	CancelledReason string

	TaskExternalId string

	WorkflowRunId string
}

type TaskWithQueue

type TaskWithQueue struct {
	*TaskIdInsertedAtRetryCount

	Queue string
}

type TriggerRepository

type TriggerRepository interface {
	TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)

	TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)

	PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

	PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
}

type TriggerRepositoryImpl

type TriggerRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (TriggerRepositoryImpl) DesiredWorkerId

func (s TriggerRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (TriggerRepositoryImpl) PopulateExternalIdsForWorkflow

func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts

func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

func (TriggerRepositoryImpl) ToV1StepRunData

func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*TriggerRepositoryImpl) TriggerFromEvents

func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)

func (*TriggerRepositoryImpl) TriggerFromWorkflowNames

func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)

type TriggerTaskData

type TriggerTaskData struct {
	// (required) the workflow name
	WorkflowName string `json:"workflow_name" validate:"required"`

	// (optional) the workflow run data
	Data []byte `json:"data"`

	// (optional) the workflow run metadata
	AdditionalMetadata []byte `json:"additional_metadata"`

	// (optional) the desired worker id
	DesiredWorkerId *string `json:"desired_worker_id"`

	// (optional) the parent external id
	ParentExternalId *string `json:"parent_external_id"`

	// (optional) the parent task id
	ParentTaskId *int64 `json:"parent_task_id"`

	// (optional) the parent inserted_at
	ParentTaskInsertedAt *time.Time `json:"parent_task_inserted_at"`

	// (optional) the child index
	ChildIndex *int64 `json:"child_index"`

	// (optional) the child key
	ChildKey *string `json:"child_key"`
}

type UpdateDAGStatusRow

type UpdateDAGStatusRow struct {
	DagId          int64
	DagInsertedAt  pgtype.Timestamptz
	ReadableStatus sqlcv1.V1ReadableStatusOlap
	ExternalId     pgtype.UUID
}

type UpdateTaskStatusRow

type UpdateTaskStatusRow struct {
	TaskId         int64
	TaskInsertedAt pgtype.Timestamptz
	ReadableStatus sqlcv1.V1ReadableStatusOlap
	ExternalId     pgtype.UUID
}

type V1StepRunData

type V1StepRunData struct {
	Input       map[string]interface{}            `json:"input"`
	TriggeredBy string                            `json:"triggered_by"`
	Parents     map[string]map[string]interface{} `json:"parents"`

	// custom-set user data for the step
	UserData map[string]interface{} `json:"user_data"`

	// overrides set from the playground
	Overrides map[string]interface{} `json:"overrides"`

	// errors in upstream steps (only used in on-failure step)
	StepRunErrors map[string]string `json:"step_run_errors,omitempty"`
}

func (*V1StepRunData) Bytes

func (v1 *V1StepRunData) Bytes() []byte

type V1WorkflowRunPopulator

type V1WorkflowRunPopulator struct {
	WorkflowRun  *WorkflowRunData
	TaskMetadata []TaskMetadata
}

type WorkerRepository

type WorkerRepository interface {
	ListWorkers(tenantId string, opts *repository.ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error)
	GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error)
	ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error)
}

type WorkflowNameTriggerOpts

type WorkflowNameTriggerOpts struct {
	*TriggerTaskData

	ExternalId string

	// Whether to skip the creation of the child workflow
	ShouldSkip bool
}

type WorkflowRepository

type WorkflowRepository interface {
	ListWorkflowNamesByIds(ctx context.Context, tenantId string, workflowIds []pgtype.UUID) (map[pgtype.UUID]string, error)
}

type WorkflowRunData

type WorkflowRunData struct {
	AdditionalMetadata []byte                      `json:"additional_metadata"`
	CreatedAt          pgtype.Timestamptz          `json:"created_at"`
	DisplayName        string                      `json:"display_name"`
	ErrorMessage       string                      `json:"error_message"`
	ExternalID         pgtype.UUID                 `json:"external_id"`
	FinishedAt         pgtype.Timestamptz          `json:"finished_at"`
	Input              []byte                      `json:"input"`
	InsertedAt         pgtype.Timestamptz          `json:"inserted_at"`
	Kind               sqlcv1.V1RunKind            `json:"kind"`
	Output             *[]byte                     `json:"output,omitempty"`
	ReadableStatus     sqlcv1.V1ReadableStatusOlap `json:"readable_status"`
	StepId             *pgtype.UUID                `json:"step_id,omitempty"`
	StartedAt          pgtype.Timestamptz          `json:"started_at"`
	TaskExternalId     *pgtype.UUID                `json:"task_external_id,omitempty"`
	TaskId             *int64                      `json:"task_id,omitempty"`
	TaskInsertedAt     *pgtype.Timestamptz         `json:"task_inserted_at,omitempty"`
	TenantID           pgtype.UUID                 `json:"tenant_id"`
	WorkflowID         pgtype.UUID                 `json:"workflow_id"`
	WorkflowVersionId  pgtype.UUID                 `json:"workflow_version_id"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL