repository

package
v0.53.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyQueued = fmt.Errorf("step run is already queued")
View Source
var ErrAlreadyRunning = fmt.Errorf("step run is already running")
View Source
var ErrDagParentNotFound = errors.New("dag parent not found")
View Source
var ErrDuplicateKey = fmt.Errorf("duplicate key error")
View Source
var ErrNoWorkerAvailable = fmt.Errorf("no worker available")
View Source
var ErrPreflightReplayChildStepRunNotInFinalState = fmt.Errorf("child step run is not in a final state")
View Source
var ErrPreflightReplayStepRunNotInFinalState = fmt.Errorf("step run is not in a final state")
View Source
var ErrRateLimitExceeded = fmt.Errorf("rate limit exceeded")
View Source
var ErrStepRunIsNotAssigned = fmt.Errorf("step run is not assigned")
View Source
var (
	ErrWorkflowRunNotFound = fmt.Errorf("workflow run not found")
)

Functions

func BoolPtr

func BoolPtr(b bool) *bool

func HashPassword

func HashPassword(pw string) (*string, error)

func IsFinalJobRunStatus

func IsFinalJobRunStatus(status dbsqlc.JobRunStatus) bool

func IsFinalStepRunStatus

func IsFinalStepRunStatus(status dbsqlc.StepRunStatus) bool

func IsFinalWorkflowRunStatus

func IsFinalWorkflowRunStatus(status dbsqlc.WorkflowRunStatus) bool

func JobRunStatusPtr

func JobRunStatusPtr(status db.JobRunStatus) *db.JobRunStatus

func RunPostCommit added in v0.52.12

func RunPostCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])

func RunPreCommit added in v0.52.12

func RunPreCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])

func StepRunEventReasonPtr

func StepRunEventReasonPtr(reason dbsqlc.StepRunEventReason) *dbsqlc.StepRunEventReason

func StepRunEventSeverityPtr

func StepRunEventSeverityPtr(severity dbsqlc.StepRunEventSeverity) *dbsqlc.StepRunEventSeverity

func StepRunStatusPtr

func StepRunStatusPtr(status db.StepRunStatus) *db.StepRunStatus

func StringPtr

func StringPtr(s string) *string

func VerifyPassword

func VerifyPassword(hashedPW, candidate string) (bool, error)

Types

type APIRepository

type APIRepository interface {
	Health() HealthRepository
	APIToken() APITokenRepository
	Event() EventAPIRepository
	Log() LogsAPIRepository
	Tenant() TenantAPIRepository
	TenantAlertingSettings() TenantAlertingAPIRepository
	TenantInvite() TenantInviteRepository
	Workflow() WorkflowAPIRepository
	WorkflowRun() WorkflowRunAPIRepository
	JobRun() JobRunAPIRepository
	StepRun() StepRunAPIRepository
	Slack() SlackRepository
	SNS() SNSRepository
	Step() StepRepository
	Worker() WorkerAPIRepository
	UserSession() UserSessionRepository
	User() UserRepository
	SecurityCheck() SecurityCheckRepository
	WebhookWorker() WebhookWorkerRepository
}

type APITokenRepository

type APITokenRepository interface {
	GetAPITokenById(id string) (*db.APITokenModel, error)
	RevokeAPIToken(id string) error
	ListAPITokensByTenant(tenantId string) ([]db.APITokenModel, error)
}

type ApiUpdateWorkerOpts added in v0.35.0

type ApiUpdateWorkerOpts struct {
	IsPaused *bool
}

type AssignResults added in v0.52.12

type AssignResults struct {
	Assigned           []*AssignedItem
	Unassigned         []*dbsqlc.QueueItem
	SchedulingTimedOut []*dbsqlc.QueueItem
	RateLimited        []*RateLimitResult
}

type AssignedItem added in v0.52.12

type AssignedItem struct {
	WorkerId pgtype.UUID

	QueueItem *dbsqlc.QueueItem
}

type AssignmentRepository added in v0.52.12

type AssignmentRepository interface {
	ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*dbsqlc.ListActionsForWorkersRow, error)
	ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params dbsqlc.ListAvailableSlotsForWorkersParams) ([]*dbsqlc.ListAvailableSlotsForWorkersRow, error)
}

type BulkCreateEventOpts added in v0.46.0

type BulkCreateEventOpts struct {
	TenantId string `validate:"required,uuid"`
	Events   []*CreateEventOpts
}

type BulkCreateEventResult added in v0.46.0

type BulkCreateEventResult struct {
	Events []*dbsqlc.Event
}

type CallbackOptFunc added in v0.52.12

type CallbackOptFunc[T any] func(*TenantCallbackOpts[T])

func WithPostCommitCallback added in v0.52.12

func WithPostCommitCallback[T any](cb TenantScopedCallback[T]) CallbackOptFunc[T]

func WithPreCommitCallback added in v0.52.12

func WithPreCommitCallback[T any](cb TenantScopedCallback[T]) CallbackOptFunc[T]

type ChildWorkflowRun added in v0.50.0

type ChildWorkflowRun struct {
	ParentId        string
	ParentStepRunId string
	ChildIndex      int
	Childkey        *string
}

type CreateAPITokenOpts

type CreateAPITokenOpts struct {
	// The id of the token
	ID string `validate:"required,uuid"`

	// When the token expires
	ExpiresAt time.Time

	// (optional) A tenant ID for this API token
	TenantId *string `validate:"omitempty,uuid"`

	// (optional) A name for this API token
	Name *string `validate:"omitempty,max=255"`

	Internal bool
}

type CreateCronWorkflowTriggerOpts added in v0.52.0

type CreateCronWorkflowTriggerOpts struct {
	// (required) the workflow id
	WorkflowId string `validate:"required,uuid"`

	// (required) the workflow name
	Name string `validate:"required"`

	Cron string `validate:"required,cron"`

	Input              map[string]interface{}
	AdditionalMetadata map[string]interface{}
}

type CreateDispatcherOpts

type CreateDispatcherOpts struct {
	ID string `validate:"required,uuid"`
}

type CreateEventOpts

type CreateEventOpts struct {
	// (required) the tenant id
	TenantId string `validate:"required,uuid"`

	// (required) the event key
	Key string `validate:"required"`

	// (optional) the event data
	Data []byte

	// (optional) the event that this event is replaying
	ReplayedEvent *string `validate:"omitempty,uuid"`

	// (optional) the event metadata
	AdditionalMetadata []byte
}

type CreateExpressionEvalOpt added in v0.47.0

type CreateExpressionEvalOpt struct {
	Key      string
	ValueStr *string
	ValueInt *int
	Kind     dbsqlc.StepExpressionKind
}

type CreateGroupKeyRunOpts

type CreateGroupKeyRunOpts struct {
	// (optional) the input data
	Input []byte
}

type CreateLogLineOpts

type CreateLogLineOpts struct {
	// The step run id
	StepRunId string `validate:"required,uuid"`

	// (optional) The time when the log line was created.
	CreatedAt *time.Time

	// (required) The message of the log line.
	Message string `validate:"required,min=1,max=10000"`

	// (optional) The level of the log line.
	Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`

	// (optional) The metadata of the log line.
	Metadata []byte
}

type CreateSNSIntegrationOpts

type CreateSNSIntegrationOpts struct {
	TopicArn string `validate:"required,min=1,max=255"`
}

type CreateScheduledWorkflowRunForWorkflowOpts added in v0.52.0

type CreateScheduledWorkflowRunForWorkflowOpts struct {
	WorkflowId string `validate:"required,uuid"`

	ScheduledTrigger time.Time

	Input              map[string]interface{}
	AdditionalMetadata map[string]interface{}
}

type CreateSessionOpts

type CreateSessionOpts struct {
	ID string `validate:"required,uuid"`

	ExpiresAt time.Time `validate:"required"`

	// (optional) the user id, can be nil if session is unauthenticated
	UserId *string `validate:"omitempty,uuid"`

	Data *types.JSON
}

type CreateStepRunEventOpts

type CreateStepRunEventOpts struct {
	StepRunId string `validate:"required,uuid"`

	EventMessage *string

	EventReason *dbsqlc.StepRunEventReason

	EventSeverity *dbsqlc.StepRunEventSeverity

	Timestamp *time.Time

	EventData map[string]interface{}
}

type CreateStreamEventOpts

type CreateStreamEventOpts struct {
	// The step run id
	StepRunId string `validate:"required,uuid"`

	// (optional) The time when the StreamEvent was created.
	CreatedAt *time.Time

	// (required) The message of the Stream Event.
	Message []byte

	// (optional) The metadata of the Stream Event.
	Metadata []byte
}

type CreateTenantAlertGroupOpts

type CreateTenantAlertGroupOpts struct {
	Emails []string `validate:"required,dive,email,max=255"`
}

type CreateTenantInviteOpts

type CreateTenantInviteOpts struct {
	// (required) the invitee email
	InviteeEmail string `validate:"required,email"`

	// (required) the inviter email
	InviterEmail string `validate:"required,email"`

	// (required) when the invite expires
	ExpiresAt time.Time `validate:"required"`

	// (required) the role of the invitee
	Role string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
}

type CreateTenantMemberOpts

type CreateTenantMemberOpts struct {
	Role   string `validate:"required,oneof=OWNER ADMIN MEMBER"`
	UserId string `validate:"required,uuid"`
}

type CreateTenantOpts

type CreateTenantOpts struct {
	// (required) the tenant name
	Name string `validate:"required"`

	// (required) the tenant slug
	Slug string `validate:"required,hatchetName"`

	// (optional) the tenant ID
	ID *string `validate:"omitempty,uuid"`

	// (optional) the tenant data retention period
	DataRetentionPeriod *string `validate:"omitempty,duration"`
}

type CreateTickerOpts

type CreateTickerOpts struct {
	ID string `validate:"required,uuid"`
}

type CreateUserOpts

type CreateUserOpts struct {
	Email         string `validate:"required,email"`
	EmailVerified *bool
	Name          *string

	// auth options
	Password *string    `validate:"omitempty,excluded_with=OAuth"`
	OAuth    *OAuthOpts `validate:"omitempty,excluded_with=Password"`
}

type CreateWebhookWorkerOpts added in v0.42.13

type CreateWebhookWorkerOpts struct {
	Name       string
	URL        string `validate:"required,url"`
	Secret     string
	TenantId   string `validate:"uuid"`
	Deleted    *bool
	TokenValue *string
	TokenID    *string
}

type CreateWorkerOpts

type CreateWorkerOpts struct {
	// The id of the dispatcher
	DispatcherId string `validate:"required,uuid"`

	// The maximum number of runs this worker can run at a time
	MaxRuns *int `validate:"omitempty,gte=1"`

	// The name of the worker
	Name string `validate:"required,hatchetName"`

	// The name of the service
	Services []string `validate:"dive,hatchetName"`

	// A list of actions this worker can run
	Actions []string `validate:"dive,actionId"`

	// (optional) Webhook Id associated with the worker (if any)
	WebhookId *string `validate:"omitempty,uuid"`

	// (optional) Runtime info for the worker
	RuntimeInfo *RuntimeInfo `validate:"omitempty"`
}

type CreateWorkflowConcurrencyOpts

type CreateWorkflowConcurrencyOpts struct {
	// (optional) the action id for getting the concurrency group
	Action *string `validate:"omitempty,actionId"`

	// (optional) the maximum number of concurrent workflow runs, default 1
	MaxRuns *int32

	// (optional) the strategy to use when the concurrency limit is reached, default CANCEL_IN_PROGRESS
	LimitStrategy *string `validate:"omitnil,oneof=CANCEL_IN_PROGRESS GROUP_ROUND_ROBIN CANCEL_NEWEST"`

	// (optional) a concurrency expression for evaluating the concurrency key
	Expression *string `validate:"omitempty,celworkflowrunstr"`
}

type CreateWorkflowJobOpts

type CreateWorkflowJobOpts struct {
	// (required) the job name
	Name string `validate:"required,hatchetName"`

	// (optional) the job description
	Description *string

	// (required) the job steps
	Steps []CreateWorkflowStepOpts `validate:"required,min=1,dive"`

	Kind string `validate:"required,oneof=DEFAULT ON_FAILURE"`
}

type CreateWorkflowRunOpt

type CreateWorkflowRunOpt func(*CreateWorkflowRunOpts)

func WithParent

func WithParent(
	parentId, parentStepRunId string,
	childIndex int,
	childKey *string,
	additionalMetadata map[string]interface{},
	parentAdditionalMetadata map[string]interface{},
) CreateWorkflowRunOpt

type CreateWorkflowRunOpts

type CreateWorkflowRunOpts struct {
	// (optional) the workflow run display name
	DisplayName *string

	TenantId string `validate:"required,uuid"`

	// (required) the workflow version id
	WorkflowVersionId string `validate:"required,uuid"`

	ManualTriggerInput *string `` /* 197-byte string literal not displayed */

	// (optional) the event id that triggered the workflow run
	TriggeringEventId *string `` /* 204-byte string literal not displayed */

	// (optional) the cron schedule that triggered the workflow run
	Cron         *string `` /* 230-byte string literal not displayed */
	CronParentId *string `` /* 230-byte string literal not displayed */
	CronName     *string `validate:"omitnil"`

	// (optional) the scheduled trigger
	ScheduledWorkflowId *string `` /* 200-byte string literal not displayed */

	InputData []byte

	TriggeredBy string

	GetGroupKeyRun *CreateGroupKeyRunOpts `validate:"omitempty"`

	// (optional) the parent workflow run which this workflow run was triggered from
	ParentId *string `validate:"omitempty,uuid"`

	// (optional) the parent step run id which this workflow run was triggered from
	ParentStepRunId *string `validate:"omitempty,uuid"`

	// (optional) the child key of the workflow run, if this is a child run of a different workflow
	ChildKey *string

	// (optional) the child index of the workflow run, if this is a child run of a different workflow
	// python sdk uses -1 as default value
	ChildIndex *int `validate:"omitempty,min=-1"`

	// (optional) additional metadata for the workflow run
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`

	// (optional) the desired worker id for sticky state
	DesiredWorkerId *string `validate:"omitempty,uuid"`

	// (optional) the deduplication value for the workflow run
	DedupeValue *string `validate:"omitempty"`

	// (optional) the priority of the workflow run
	Priority *int32 `validate:"omitempty,min=1,max=3"`
}

func GetCreateWorkflowRunOptsFromCron

func GetCreateWorkflowRunOptsFromCron(
	cron,
	cronParentId string,
	cronName *string,
	workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
	input []byte,
	additionalMetadata map[string]interface{},
) (*CreateWorkflowRunOpts, error)

func GetCreateWorkflowRunOptsFromEvent

func GetCreateWorkflowRunOptsFromEvent(
	eventId string,
	workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
	input []byte,
	additionalMetadata map[string]interface{},
) (*CreateWorkflowRunOpts, error)

func GetCreateWorkflowRunOptsFromManual

func GetCreateWorkflowRunOptsFromManual(
	workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
	input []byte,
	additionalMetadata map[string]interface{},
) (*CreateWorkflowRunOpts, error)

func GetCreateWorkflowRunOptsFromParent

func GetCreateWorkflowRunOptsFromParent(
	workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
	input []byte,
	parentId, parentStepRunId string,
	childIndex int,
	childKey *string,
	additionalMetadata map[string]interface{},
	parentAdditionalMetadata map[string]interface{},
) (*CreateWorkflowRunOpts, error)

func GetCreateWorkflowRunOptsFromSchedule

func GetCreateWorkflowRunOptsFromSchedule(
	scheduledWorkflowId string,
	workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
	input []byte,
	additionalMetadata map[string]interface{},
	fs ...CreateWorkflowRunOpt,
) (*CreateWorkflowRunOpts, error)

type CreateWorkflowRunPullRequestOpts

type CreateWorkflowRunPullRequestOpts struct {
	RepositoryOwner       string
	RepositoryName        string
	PullRequestID         int
	PullRequestTitle      string
	PullRequestNumber     int
	PullRequestHeadBranch string
	PullRequestBaseBranch string
	PullRequestState      string
}

type CreateWorkflowSchedulesOpts

type CreateWorkflowSchedulesOpts struct {
	ScheduledTriggers []time.Time

	Input              []byte
	AdditionalMetadata []byte
}

type CreateWorkflowStepOpts

type CreateWorkflowStepOpts struct {
	// (required) the step name
	ReadableId string `validate:"hatchetName"`

	// (required) the step action id
	Action string `validate:"required,actionId"`

	// (optional) the step timeout
	Timeout *string `validate:"omitnil,duration"`

	// (optional) the parents that this step depends on
	Parents []string `validate:"dive,hatchetName"`

	// (optional) the custom user data for the step, serialized as a json string
	UserData *string `validate:"omitnil,json"`

	// (optional) the step retry max
	Retries *int `validate:"omitempty,min=0"`

	// (optional) rate limits for this step
	RateLimits []CreateWorkflowStepRateLimitOpts `validate:"dive"`

	// (optional) desired worker affinity state for this step
	DesiredWorkerLabels map[string]DesiredWorkerLabelOpts `validate:"omitempty"`

	// (optional) the step retry backoff factor
	RetryBackoffFactor *float64 `validate:"omitnil,min=1,max=1000"`

	// (optional) the step retry backoff max seconds (can't be greater than 86400)
	RetryBackoffMaxSeconds *int `validate:"omitnil,min=1,max=86400"`
}

type CreateWorkflowStepRateLimitOpts

type CreateWorkflowStepRateLimitOpts struct {
	// (required) the rate limit key
	Key string `validate:"required"`

	// (optional) a CEL expression for the rate limit key
	KeyExpr *string `validate:"omitnil,celsteprunstr,required_without=Key"`

	// (optional) the rate limit units to consume
	Units *int `validate:"omitnil,required_without=UnitsExpr"`

	// (optional) a CEL expression for the rate limit units
	UnitsExpr *string `validate:"omitnil,celsteprunstr,required_without=Units"`

	// (optional) a CEL expression for a dynamic limit value for the rate limit
	LimitExpr *string `validate:"omitnil,celsteprunstr"`

	// (optional) the rate limit duration, defaults to MINUTE
	Duration *string `validate:"omitnil,oneof=SECOND MINUTE HOUR DAY WEEK MONTH YEAR"`
}

type CreateWorkflowTagOpts

type CreateWorkflowTagOpts struct {
	// (required) the tag name
	Name string `validate:"required"`

	// (optional) the tag color
	Color *string
}

type CreateWorkflowVersionOpts

type CreateWorkflowVersionOpts struct {
	// (required) the workflow name
	Name string `validate:"required,hatchetName"`

	Tags []CreateWorkflowTagOpts `validate:"dive"`

	// (optional) the workflow description
	Description *string `json:"description,omitempty"`

	// (optional) the workflow version
	Version *string `json:"version,omitempty"`

	// (optional) event triggers for the workflow
	EventTriggers []string

	// (optional) cron triggers for the workflow
	CronTriggers []string `validate:"dive,cron"`

	// (optional) the input bytes for the cron triggers
	CronInput []byte

	// (optional) scheduled triggers for the workflow
	ScheduledTriggers []time.Time

	// (required) the workflow jobs
	Jobs []CreateWorkflowJobOpts `validate:"required,min=1,dive"`

	OnFailureJob *CreateWorkflowJobOpts `json:"onFailureJob,omitempty" validate:"omitempty"`

	// (optional) the workflow concurrency groups
	Concurrency *CreateWorkflowConcurrencyOpts `json:"concurrency,omitempty" validator:"omitnil"`

	// (optional) the amount of time for step runs to wait to be scheduled before timing out
	ScheduleTimeout *string `validate:"omitempty,duration"`

	// (optional) sticky strategy
	Sticky *string `validate:"omitempty,oneof=SOFT HARD"`

	// (optional) the workflow kind
	Kind *string `validate:"omitempty,oneof=FUNCTION DURABLE DAG"`

	// (optional) the default priority for steps in the workflow (1-3)
	DefaultPriority *int32 `validate:"omitempty,min=1,max=3"`
}

func (*CreateWorkflowVersionOpts) Checksum

func (o *CreateWorkflowVersionOpts) Checksum() (string, error)

type DesiredWorkerLabelOpts added in v0.40.0

type DesiredWorkerLabelOpts struct {
	// (required) the label key
	Key string `validate:"required"`

	// (required if StringValue is nil) the label integer value
	IntValue *int32 `validate:"omitnil,required_without=StrValue"`

	// (required if StrValue is nil) the label string value
	StrValue *string `validate:"omitnil,required_without=IntValue"`

	// (optional) if the label is required
	Required *bool `validate:"omitempty"`

	// (optional) the weight of the label for scheduling (default: 100)
	Weight *int32 `validate:"omitempty"`

	// (optional) the label comparator for scheduling (default: EQUAL)
	Comparator *string `validate:"omitempty,oneof=EQUAL NOT_EQUAL GREATER_THAN LESS_THAN GREATER_THAN_OR_EQUAL LESS_THAN_OR_EQUAL"`
}

type DispatcherEngineRepository

type DispatcherEngineRepository interface {
	// CreateNewDispatcher creates a new dispatcher for a given tenant.
	CreateNewDispatcher(ctx context.Context, opts *CreateDispatcherOpts) (*dbsqlc.Dispatcher, error)

	// UpdateDispatcher updates a dispatcher for a given tenant.
	UpdateDispatcher(ctx context.Context, dispatcherId string, opts *UpdateDispatcherOpts) (*dbsqlc.Dispatcher, error)

	Delete(ctx context.Context, dispatcherId string) error

	UpdateStaleDispatchers(ctx context.Context, onStale func(dispatcherId string, getValidDispatcherId func() string) error) error
}

type EngineTokenRepository

type EngineTokenRepository interface {
	CreateAPIToken(ctx context.Context, opts *CreateAPITokenOpts) (*dbsqlc.APIToken, error)
	GetAPITokenById(ctx context.Context, id string) (*dbsqlc.APIToken, error)
}

type EntitlementsRepository

type EntitlementsRepository interface {
	TenantLimit() TenantLimitRepository
}

type ErrDedupeValueExists added in v0.40.0

type ErrDedupeValueExists struct {
	DedupeValue string
}

func (ErrDedupeValueExists) Error added in v0.40.0

func (e ErrDedupeValueExists) Error() string

type EventAPIRepository

type EventAPIRepository interface {
	// ListEvents returns all events for a given tenant.
	ListEvents(ctx context.Context, tenantId string, opts *ListEventOpts) (*ListEventResult, error)

	// ListEventKeys returns all unique event keys for a given tenant.
	ListEventKeys(tenantId string) ([]string, error)

	// GetEventById returns an event by id.
	GetEventById(id string) (*db.EventModel, error)

	// ListEventsById returns a list of events by id.
	ListEventsById(tenantId string, ids []string) ([]db.EventModel, error)
}

type EventEngineRepository

type EventEngineRepository interface {
	RegisterCreateCallback(callback TenantScopedCallback[*dbsqlc.Event])

	// CreateEvent creates a new event for a given tenant.
	CreateEvent(ctx context.Context, opts *CreateEventOpts) (*dbsqlc.Event, error)

	// CreateEvent creates new events for a given tenant.
	BulkCreateEvent(ctx context.Context, opts *BulkCreateEventOpts) (*BulkCreateEventResult, error)

	// BulkCreateEventSharedTenant creates new events for multiple tenants.
	BulkCreateEventSharedTenant(ctx context.Context, opts []*CreateEventOpts) ([]*dbsqlc.Event, error)

	// GetEventForEngine returns an event for the engine by id.
	GetEventForEngine(ctx context.Context, tenantId, id string) (*dbsqlc.Event, error)

	ListEventsByIds(ctx context.Context, tenantId string, ids []string) ([]*dbsqlc.Event, error)

	// DeleteExpiredEvents deletes events that were created before the given time. It returns the number of deleted events
	// and the number of non-deleted events that match the conditions.
	SoftDeleteExpiredEvents(ctx context.Context, tenantId string, before time.Time) (bool, error)

	// ClearEventPayloadData removes the potentially large payload data of events that were created before the given time.
	// It returns the number of events that were updated and the number of events that were not updated.
	ClearEventPayloadData(ctx context.Context, tenantId string) (bool, error)
}

type GetGroupKeyRunEngineRepository

type GetGroupKeyRunEngineRepository interface {
	// ListStepRunsToRequeue returns a list of step runs which are in a requeueable state.
	ListGetGroupKeyRunsToRequeue(ctx context.Context, tenantId string) ([]*dbsqlc.GetGroupKeyRun, error)

	ListGetGroupKeyRunsToReassign(ctx context.Context, tenantId string) ([]*dbsqlc.GetGroupKeyRun, error)

	AssignGetGroupKeyRunToWorker(ctx context.Context, tenantId, getGroupKeyRunId string) (workerId string, dispatcherId string, err error)
	AssignGetGroupKeyRunToTicker(ctx context.Context, tenantId, getGroupKeyRunId string) (tickerId string, err error)

	UpdateGetGroupKeyRun(ctx context.Context, tenantId, getGroupKeyRunId string, opts *UpdateGetGroupKeyRunOpts) (*dbsqlc.GetGroupKeyRunForEngineRow, error)

	GetGroupKeyRunForEngine(ctx context.Context, tenantId, getGroupKeyRunId string) (*dbsqlc.GetGroupKeyRunForEngineRow, error)
}

type GetQueueMetricsOpts

type GetQueueMetricsOpts struct {
	// (optional) a list of workflow ids to filter by
	WorkflowIds []string `validate:"omitempty,dive,uuid"`

	// (optional) exact metadata to filter by
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}

type GetQueueMetricsResponse

type GetQueueMetricsResponse struct {
	Total QueueMetric `json:"total"`

	ByWorkflowId map[string]QueueMetric `json:"by_workflow"`
}

type GetStepRunFull added in v0.44.8

type GetStepRunFull struct {
	*dbsqlc.StepRun
	ChildWorkflowRuns []string
}

type GetTenantAlertingSettingsResponse

type GetTenantAlertingSettingsResponse struct {
	Settings *dbsqlc.TenantAlertingSettings

	SlackWebhooks []*dbsqlc.SlackAppWebhook

	EmailGroups []*TenantAlertEmailGroupForSend

	Tenant *dbsqlc.Tenant
}

type GetWorkflowMetricsOpts

type GetWorkflowMetricsOpts struct {
	// (optional) the group key to filter by
	GroupKey *string

	// (optional) the workflow run status to filter by
	Status *string `validate:"omitnil,oneof=PENDING QUEUED RUNNING SUCCEEDED FAILED"`
}

type HealthRepository

type HealthRepository interface {
	IsHealthy() bool
	PgStat() *pgxpool.Stat
}

type JobRunAPIRepository

type JobRunAPIRepository interface {
	RegisterWorkflowRunRunningCallback(callback TenantScopedCallback[pgtype.UUID])

	// SetJobRunStatusRunning resets the status of a job run to a RUNNING status. This is useful if a step
	// run is being manually replayed, but shouldn't be used by most callers.
	SetJobRunStatusRunning(tenantId, jobRunId string) error

	ListJobRunByWorkflowRunId(ctx context.Context, tenantId, WorkflowRunId string) ([]*dbsqlc.ListJobRunsForWorkflowRunFullRow, error)
}

type JobRunEngineRepository

type JobRunEngineRepository interface {
	RegisterWorkflowRunRunningCallback(callback TenantScopedCallback[pgtype.UUID])

	// SetJobRunStatusRunning resets the status of a job run to a RUNNING status. This is useful if a step
	// run is being manually replayed, but shouldn't be used by most callers.
	SetJobRunStatusRunning(ctx context.Context, tenantId, jobRunId string) error

	ListJobRunsForWorkflowRun(ctx context.Context, tenantId, workflowRunId string) ([]*dbsqlc.ListJobRunsForWorkflowRunRow, error)

	GetJobRunByWorkflowRunIdAndJobId(ctx context.Context, tenantId, workflowRunId, jobId string) (*dbsqlc.GetJobRunByWorkflowRunIdAndJobIdRow, error)

	GetJobRunsByWorkflowRunId(ctx context.Context, tenantId, workflowRunId string) ([]*dbsqlc.GetJobRunsByWorkflowRunIdRow, error)

	ClearJobRunPayloadData(ctx context.Context, tenantId string) (bool, error)

	StartJobRun(ctx context.Context, tenantId, jobId string) ([]*dbsqlc.GetStepRunForEngineRow, error)
}

type JobRunHasCycleError

type JobRunHasCycleError struct {
	JobName string
}

func (*JobRunHasCycleError) Error

func (e *JobRunHasCycleError) Error() string

type LeaseRepository added in v0.52.12

type LeaseRepository interface {
	ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*dbsqlc.Queue, error)
	ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)

	AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind dbsqlc.LeaseKind, resourceIds []string, existingLeases []*dbsqlc.Lease) ([]*dbsqlc.Lease, error)
	ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*dbsqlc.Lease) error
}

type Limit added in v0.33.2

type Limit struct {
	Resource         dbsqlc.LimitResource
	Limit            int32
	Alarm            int32
	Window           *time.Duration
	CustomValueMeter bool
}

type ListActiveWorkersResult added in v0.52.12

type ListActiveWorkersResult struct {
	ID     pgtype.UUID
	Labels []*dbsqlc.ListManyWorkerLabelsRow
}

type ListAllJobRunsOpts

type ListAllJobRunsOpts struct {
	TickerId *string

	NoTickerId *bool

	Status *db.JobRunStatus
}

type ListCronWorkflowsOpts added in v0.51.0

type ListCronWorkflowsOpts struct {
	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=createdAt name"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`

	// (optional) the workflow id
	WorkflowId *string `validate:"omitempty,uuid"`

	// (optional) additional metadata for the workflow run
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}

TODO move this to workflow.go

type ListEventOpts

type ListEventOpts struct {
	// (optional) a list of event keys to filter by
	Keys []string

	// (optional) a list of workflow IDs to filter by
	Workflows []string

	// (optional) a list of workflow run statuses to filter by
	WorkflowRunStatus []db.WorkflowRunStatus

	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int

	// (optional) a search query
	Search *string

	// (optional) the event that this event is replaying
	ReplayedEvent *string `validate:"omitempty,uuid"`

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=createdAt"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`

	// (optional) the event metadata
	AdditionalMetadata []byte

	// (optional) event ids to filter by
	Ids []string
}

type ListEventResult

type ListEventResult struct {
	Rows  []*dbsqlc.ListEventsRow
	Count int
}

type ListGetGroupKeyRunsOpts

type ListGetGroupKeyRunsOpts struct {
	Status *db.StepRunStatus
}

type ListLogsOpts

type ListLogsOpts struct {
	// (optional) number of logs to skip
	Offset *int

	// (optional) number of logs to return
	Limit *int `validate:"omitnil,min=1,max=1000"`

	// (optional) a list of log levels to filter by
	Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`

	// (optional) a step run id to filter by
	StepRunId *string `validate:"omitempty,uuid"`

	// (optional) a search query
	Search *string

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=createdAt"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
}

type ListLogsResult

type ListLogsResult struct {
	Rows  []*dbsqlc.LogLine
	Count int
}

type ListPullRequestsForWorkflowRunOpts

type ListPullRequestsForWorkflowRunOpts struct {
	State *string
}

type ListRateLimitOpts added in v0.47.0

type ListRateLimitOpts struct {
	// (optional) a search query for the key
	Search *string

	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=key value limitValue"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
}

type ListRateLimitsResult added in v0.47.0

type ListRateLimitsResult struct {
	Rows  []*dbsqlc.ListRateLimitsForTenantNoMutateRow
	Count int
}

type ListScheduledWorkflowsOpts added in v0.51.0

type ListScheduledWorkflowsOpts struct {
	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=createdAt triggerAt"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`

	// (optional) the workflow id
	WorkflowId *string `validate:"omitempty,uuid"`

	// (optional) the parent workflow run id
	ParentWorkflowRunId *string `validate:"omitempty,uuid"`

	// (optional) the parent step run id
	ParentStepRunId *string `validate:"omitempty,uuid"`

	// (optional) statuses to filter by
	Statuses *[]db.WorkflowRunStatus

	// (optional) include scheduled runs that are in the future
	IncludeFuture *bool

	// (optional) additional metadata for the workflow run
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}

type ListStepRunArchivesOpts added in v0.34.2

type ListStepRunArchivesOpts struct {
	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int
}

type ListStepRunArchivesResult added in v0.34.2

type ListStepRunArchivesResult struct {
	Rows  []*dbsqlc.StepRunResultArchive
	Count int
}

type ListStepRunEventOpts

type ListStepRunEventOpts struct {
	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int
}

type ListStepRunEventResult

type ListStepRunEventResult struct {
	Rows  []*dbsqlc.StepRunEvent
	Count int
}

type ListStepRunsOpts

type ListStepRunsOpts struct {
	JobRunId *string `validate:"omitempty,uuid"`

	WorkflowRunIds []string `validate:"dive,uuid"`

	Status *dbsqlc.StepRunStatus
}

type ListTenantInvitesOpts

type ListTenantInvitesOpts struct {
	// (optional) the status of the invite
	Status *string `validate:"omitempty,oneof=PENDING ACCEPTED REJECTED"`

	// (optional) whether the invite has expired
	Expired *bool `validate:"omitempty"`
}

type ListTickerOpts

type ListTickerOpts struct {
	// Set this to only return tickers whose heartbeat is more recent than this time
	LatestHeartbeatAfter *time.Time

	Active *bool
}

type ListWorkersOpts

type ListWorkersOpts struct {
	Action *string `validate:"omitempty,actionId"`

	LastHeartbeatAfter *time.Time

	Assignable *bool
}

type ListWorkflowRunRoundRobinsOpts

type ListWorkflowRunRoundRobinsOpts struct {
	// (optional) the workflow id
	WorkflowId *string `validate:"omitempty,uuid"`

	// (optional) the workflow version id
	WorkflowVersionId *string `validate:"omitempty,uuid"`

	// (optional) the status of the workflow run
	Status *db.WorkflowRunStatus

	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int
}

type ListWorkflowRunsOpts

type ListWorkflowRunsOpts struct {
	// (optional) the workflow id
	WorkflowId *string `validate:"omitempty,uuid"`

	// (optional) the workflow version id
	WorkflowVersionId *string `validate:"omitempty,uuid"`

	// (optional) a list of workflow run ids to filter by
	Ids []string `validate:"omitempty,dive,uuid"`

	// (optional) the parent workflow run id
	ParentId *string `validate:"omitempty,uuid"`

	// (optional) the parent step run id
	ParentStepRunId *string `validate:"omitempty,uuid"`

	// (optional) the event id that triggered the workflow run
	EventId *string `validate:"omitempty,uuid"`

	// (optional) the group key for the workflow run
	GroupKey *string

	// (optional) the status of the workflow run
	Statuses *[]db.WorkflowRunStatus

	// (optional) a list of kinds to filter by
	Kinds *[]dbsqlc.WorkflowKind

	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=createdAt finishedAt startedAt duration"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`

	// (optional) a time after which the run was created
	CreatedAfter *time.Time

	// (optional) a time before which the run was created
	CreatedBefore *time.Time

	// (optional) a time after which the run was finished
	FinishedAfter *time.Time

	// (optional) a time before which the run was finished
	FinishedBefore *time.Time

	// (optional) exact metadata to filter by
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}

type ListWorkflowRunsResult

type ListWorkflowRunsResult struct {
	Rows  []*dbsqlc.ListWorkflowRunsRow
	Count int
}

type ListWorkflowsOpts

type ListWorkflowsOpts struct {
	// (optional) number of workflows to skip
	Offset *int

	// (optional) number of workflows to return
	Limit *int

	// (optional) the workflow name to filter by
	Name *string
}

type ListWorkflowsResult

type ListWorkflowsResult struct {
	Rows  []*dbsqlc.Workflow
	Count int
}

type LogsAPIRepository

type LogsAPIRepository interface {
	// ListLogLines returns a list of log lines for a given step run.
	ListLogLines(tenantId string, opts *ListLogsOpts) (*ListLogsResult, error)
}

type LogsEngineRepository

type LogsEngineRepository interface {
	// PutLog creates a new log line.
	PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) (*dbsqlc.LogLine, error)
}

type MessageQueueRepository added in v0.53.0

type MessageQueueRepository interface {
	// PubSub
	Listen(ctx context.Context, name string, f func(ctx context.Context, notification *PubMessage) error) error
	Notify(ctx context.Context, name string, payload string) error

	// Queues
	BindQueue(ctx context.Context, queue string, durable, autoDeleted, exclusive bool, exclusiveConsumer *string) error
	UpdateQueueLastActive(ctx context.Context, queue string) error
	CleanupQueues(ctx context.Context) error

	// Messages
	AddMessage(ctx context.Context, queue string, payload []byte) error
	ReadMessages(ctx context.Context, queue string, qos int) ([]*dbsqlc.ReadMessagesRow, error)
	AckMessage(ctx context.Context, id int64) error
	CleanupMessageQueueItems(ctx context.Context) error
}

type OAuthOpts

type OAuthOpts struct {
	Provider       string     `validate:"required,oneof=google github"`
	ProviderUserId string     `validate:"required,min=1"`
	AccessToken    []byte     `validate:"required,min=1"`
	RefreshToken   *[]byte    // optional
	ExpiresAt      *time.Time // optional
}

type PlanLimitMap added in v0.33.2

type PlanLimitMap map[string][]Limit

type ProcessStepRunUpdatesResultV2 added in v0.49.1

type ProcessStepRunUpdatesResultV2 struct {
	SucceededStepRuns     []*dbsqlc.GetStepRunForEngineRow
	CompletedWorkflowRuns []*dbsqlc.ResolveWorkflowRunStatusRow
	Continue              bool
}

type PubMessage added in v0.53.0

type PubMessage struct {
	Channel string
	Payload string
}

type QueueFactoryRepository added in v0.52.12

type QueueFactoryRepository interface {
	NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}

type QueueMetric

type QueueMetric struct {
	// the total number of PENDING_ASSIGNMENT step runs in the queue
	PendingAssignment int `json:"pending_assignment"`

	// the total number of PENDING step runs in the queue
	Pending int `json:"pending"`

	// the total number of RUNNING step runs in the queue
	Running int `json:"running"`
}

type QueueRepository added in v0.52.12

type QueueRepository interface {
	ListQueueItems(ctx context.Context, limit int) ([]*dbsqlc.QueueItem, error)
	MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)
	GetStepRunRateLimits(ctx context.Context, queueItems []*dbsqlc.QueueItem) (map[string]map[string]int32, error)
	GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*dbsqlc.GetDesiredLabelsRow, error)
	Cleanup()
}

type QueueStepRunOpts added in v0.44.0

type QueueStepRunOpts struct {
	IsRetry bool

	// IsInternalRetry is true if the step run is being retried internally by the system, for example if
	// it was sent to an invalid dispatcher. This does not count towards the retry limit but still gets
	// highest priority in the queue.
	IsInternalRetry bool

	Input []byte

	ExpressionEvals []CreateExpressionEvalOpt
}

type QueuedStepRun added in v0.42.0

type QueuedStepRun struct {
	StepRunId    string
	WorkerId     string
	DispatcherId string
}

type RateLimitEngineRepository

type RateLimitEngineRepository interface {
	ListRateLimits(ctx context.Context, tenantId string, opts *ListRateLimitOpts) (*ListRateLimitsResult, error)

	// CreateRateLimit creates a new rate limit record
	UpsertRateLimit(ctx context.Context, tenantId string, key string, opts *UpsertRateLimitOpts) (*dbsqlc.RateLimit, error)
}

type RateLimitRepository added in v0.52.12

type RateLimitRepository interface {
	ListCandidateRateLimits(ctx context.Context, tenantId pgtype.UUID) ([]string, error)
	UpdateRateLimits(ctx context.Context, tenantId pgtype.UUID, updates map[string]int) (map[string]int, error)
}

type RateLimitResult added in v0.52.12

type RateLimitResult struct {
	ExceededKey   string
	ExceededUnits int32
	ExceededVal   int32
	StepRunId     pgtype.UUID
}

type RefreshTimeoutBy

type RefreshTimeoutBy struct {
	IncrementTimeoutBy string `validate:"required,duration"`
}

type RuntimeInfo added in v0.50.4

type RuntimeInfo struct {
	SdkVersion      *string         `validate:"omitempty"`
	Language        *contracts.SDKS `validate:"omitempty"`
	LanguageVersion *string         `validate:"omitempty"`
	Os              *string         `validate:"omitempty"`
	Extra           *string         `validate:"omitempty"`
}

type SNSRepository

type SNSRepository interface {
	GetSNSIntegration(tenantId, topicArn string) (*db.SNSIntegrationModel, error)

	GetSNSIntegrationById(id string) (*db.SNSIntegrationModel, error)

	CreateSNSIntegration(tenantId string, opts *CreateSNSIntegrationOpts) (*db.SNSIntegrationModel, error)

	ListSNSIntegrations(tenantId string) ([]db.SNSIntegrationModel, error)

	DeleteSNSIntegration(tenantId, id string) error
}

type SchedulerRepository added in v0.52.12

type SchedulerRepository interface {
	Lease() LeaseRepository
	QueueFactory() QueueFactoryRepository
	RateLimit() RateLimitRepository
	Assignment() AssignmentRepository
}

type SecurityCheckRepository added in v0.34.2

type SecurityCheckRepository interface {
	GetIdent() (string, error)
}

type SlackRepository

type SlackRepository interface {
	UpsertSlackWebhook(tenantId string, opts *UpsertSlackWebhookOpts) (*db.SlackAppWebhookModel, error)

	ListSlackWebhooks(tenantId string) ([]db.SlackAppWebhookModel, error)

	GetSlackWebhookById(id string) (*db.SlackAppWebhookModel, error)

	DeleteSlackWebhook(tenantId string, id string) error
}

type StepRepository

type StepRepository interface {
	ListStepExpressions(ctx context.Context, stepId string) ([]*dbsqlc.StepExpression, error)
}

type StepRunAPIRepository

type StepRunAPIRepository interface {
	GetStepRunById(stepRunId string) (*GetStepRunFull, error)

	ListStepRunEvents(stepRunId string, opts *ListStepRunEventOpts) (*ListStepRunEventResult, error)

	ListStepRunEventsByWorkflowRunId(ctx context.Context, tenantId, workflowRunId string, lastId *int32) (*ListStepRunEventResult, error)

	ListStepRunArchives(tenantId, stepRunId string, opts *ListStepRunArchivesOpts) (*ListStepRunArchivesResult, error)
}

type StepRunEngineRepository

type StepRunEngineRepository interface {
	RegisterWorkflowRunCompletedCallback(callback TenantScopedCallback[*dbsqlc.ResolveWorkflowRunStatusRow])

	ListStepRuns(ctx context.Context, tenantId string, opts *ListStepRunsOpts) ([]*dbsqlc.GetStepRunForEngineRow, error)

	ListStepRunsToCancel(ctx context.Context, tenantId, jobRunId string) ([]*dbsqlc.GetStepRunForEngineRow, error)

	// ListStepRunsToReassign returns a list of step runs which are in a reassignable state.
	ListStepRunsToReassign(ctx context.Context, tenantId string) (reassignedStepRunIds []string, failedStepRuns []*dbsqlc.GetStepRunForEngineRow, err error)

	InternalRetryStepRuns(ctx context.Context, tenantId string, srIdsIn []string) (reassignedStepRunIds []string, failedStepRuns []*dbsqlc.GetStepRunForEngineRow, err error)

	ListStepRunsToTimeout(ctx context.Context, tenantId string) (bool, []*dbsqlc.GetStepRunForEngineRow, error)

	StepRunAcked(ctx context.Context, tenantId, workflowRunId, stepRunId string, ackedAt time.Time) error

	StepRunStarted(ctx context.Context, tenantId, workflowRunId, stepRunId string, startedAt time.Time) error

	StepRunSucceeded(ctx context.Context, tenantId, workflowRunId, stepRunId string, finishedAt time.Time, output []byte) error

	StepRunCancelled(ctx context.Context, tenantId, workflowRunId, stepRunId string, cancelledAt time.Time, cancelledReason string, propagate bool) error

	StepRunFailed(ctx context.Context, tenantId, workflowRunId, stepRunId string, failedAt time.Time, errStr string, retryCount int) error

	StepRunRetryBackoff(ctx context.Context, tenantId, stepRunId string, retryAfter time.Time) error

	RetryStepRuns(ctx context.Context, tenantId string) (bool, error)

	ReplayStepRun(ctx context.Context, tenantId, stepRunId string, input []byte) (*dbsqlc.GetStepRunForEngineRow, error)

	// PreflightCheckReplayStepRun checks if a step run can be replayed. If it can, it will return nil.
	PreflightCheckReplayStepRun(ctx context.Context, tenantId, stepRunId string) error

	ReleaseStepRunSemaphore(ctx context.Context, tenantId, stepRunId string, isUserTriggered bool) error

	// UpdateStepRunOverridesData updates the overrides data field in the input for a step run. This returns the input
	// bytes.
	UpdateStepRunOverridesData(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOverridesDataOpts) ([]byte, error)

	UpdateStepRunInputSchema(ctx context.Context, tenantId, stepRunId string, schema []byte) ([]byte, error)

	GetStepRunForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunForEngineRow, error)

	GetStepRunDataForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunDataForEngineRow, error)

	GetStepRunBulkDataForEngine(ctx context.Context, tenantId string, stepRunIds []string) ([]*dbsqlc.GetStepRunBulkDataForEngineRow, error)

	GetStepRunMetaForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunMetaRow, error)

	// QueueStepRun is like UpdateStepRun, except that it will only update the step run if it is in
	// a pending state.
	QueueStepRun(ctx context.Context, tenantId, stepRunId string, opts *QueueStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, error)

	GetQueueCounts(ctx context.Context, tenantId string) (map[string]int, error)

	ProcessStepRunUpdatesV2(ctx context.Context, qlp *zerolog.Logger, tenantId string) (ProcessStepRunUpdatesResultV2, error)

	CleanupQueueItems(ctx context.Context, tenantId string) error

	CleanupInternalQueueItems(ctx context.Context, tenantId string) error

	CleanupRetryQueueItems(ctx context.Context, tenantId string) error

	ListInitialStepRunsForJobRun(ctx context.Context, tenantId, jobRunId string) ([]*dbsqlc.GetStepRunForEngineRow, error)

	// ListStartableStepRuns returns a list of step runs that are in a startable state, assuming that the parentStepRunId has succeeded.
	// The singleParent flag is used to determine if we should reject listing step runs with many parents. This is important to avoid
	// race conditions where a step run is started by multiple parents completing at the same time. As a result, singleParent=false should
	// be called from a serializable process after processing step run status updates.
	ListStartableStepRuns(ctx context.Context, tenantId, parentStepRunId string, singleParent bool) ([]*dbsqlc.GetStepRunForEngineRow, error)

	ArchiveStepRunResult(ctx context.Context, tenantId, stepRunId string, err *string) error

	RefreshTimeoutBy(ctx context.Context, tenantId, stepRunId string, opts RefreshTimeoutBy) (pgtype.Timestamp, error)

	DeferredStepRunEvent(
		tenantId string,
		opts CreateStepRunEventOpts,
	)

	ClearStepRunPayloadData(ctx context.Context, tenantId string) (bool, error)
}

type StepRunForJobRun added in v0.44.8

type StepRunForJobRun struct {
	*dbsqlc.GetStepRunsForJobRunsWithOutputRow
	ChildWorkflowsCount int
}

type StepRunUpdateInfo

type StepRunUpdateInfo struct {
	WorkflowRunFinalState bool
	WorkflowRunId         string
	WorkflowRunStatus     string
}

type StreamEventsEngineRepository

type StreamEventsEngineRepository interface {
	// PutStreamEvent creates a new StreamEvent line.
	PutStreamEvent(ctx context.Context, tenantId string, opts *CreateStreamEventOpts) (*dbsqlc.StreamEvent, error)

	// GetStreamEvent returns a StreamEvent line by id.
	GetStreamEvent(ctx context.Context, tenantId string, streamEventId int64) (*dbsqlc.StreamEvent, error)

	// CleanupStreamEvents deletes all stale StreamEvents.
	CleanupStreamEvents(ctx context.Context) error

	// GetStreamEventMeta
	GetStreamEventMeta(ctx context.Context, tenantId string, stepRunId string) (*dbsqlc.GetStreamEventMetaRow, error)
}

type TenantAPIRepository

type TenantAPIRepository interface {
	// CreateTenant creates a new tenant.
	CreateTenant(opts *CreateTenantOpts) (*dbsqlc.Tenant, error)

	// CreateTenant creates a new tenant.
	UpdateTenant(tenantId string, opts *UpdateTenantOpts) (*db.TenantModel, error)

	// GetTenantByID returns the tenant with the given id
	GetTenantByID(tenantId string) (*db.TenantModel, error)

	// GetTenantBySlug returns the tenant with the given slug
	GetTenantBySlug(slug string) (*db.TenantModel, error)

	// CreateTenantMember creates a new member in the tenant
	CreateTenantMember(tenantId string, opts *CreateTenantMemberOpts) (*db.TenantMemberModel, error)

	// GetTenantMemberByID returns the tenant member with the given id
	GetTenantMemberByID(memberId string) (*db.TenantMemberModel, error)

	// GetTenantMemberByUserID returns the tenant member with the given user id
	GetTenantMemberByUserID(tenantId string, userId string) (*db.TenantMemberModel, error)

	// GetTenantMemberByEmail returns the tenant member with the given email
	GetTenantMemberByEmail(tenantId string, email string) (*db.TenantMemberModel, error)

	// ListTenantMembers returns the list of tenant members for the given tenant
	ListTenantMembers(tenantId string) ([]db.TenantMemberModel, error)

	// UpdateTenantMember updates the tenant member with the given id
	UpdateTenantMember(memberId string, opts *UpdateTenantMemberOpts) (*db.TenantMemberModel, error)

	// DeleteTenantMember deletes the tenant member with the given id
	DeleteTenantMember(memberId string) (*db.TenantMemberModel, error)

	// GetQueueMetrics returns the queue metrics for the given tenant
	GetQueueMetrics(ctx context.Context, tenantId string, opts *GetQueueMetricsOpts) (*GetQueueMetricsResponse, error)
}

type TenantAlertEmailGroupForSend

type TenantAlertEmailGroupForSend struct {
	TenantId pgtype.UUID `json:"tenantId"`
	Emails   []string    `validate:"required,dive,email,max=255"`
}

type TenantAlertingAPIRepository

type TenantAlertingAPIRepository interface {
	UpsertTenantAlertingSettings(tenantId string, opts *UpsertTenantAlertingSettingsOpts) (*db.TenantAlertingSettingsModel, error)

	GetTenantAlertingSettings(tenantId string) (*db.TenantAlertingSettingsModel, error)

	CreateTenantAlertGroup(tenantId string, opts *CreateTenantAlertGroupOpts) (*db.TenantAlertEmailGroupModel, error)

	UpdateTenantAlertGroup(id string, opts *UpdateTenantAlertGroupOpts) (*db.TenantAlertEmailGroupModel, error)

	ListTenantAlertGroups(tenantId string) ([]db.TenantAlertEmailGroupModel, error)

	GetTenantAlertGroupById(id string) (*db.TenantAlertEmailGroupModel, error)

	DeleteTenantAlertGroup(tenantId string, id string) error
}

type TenantAlertingEngineRepository

type TenantAlertingEngineRepository interface {
	GetTenantAlertingSettings(ctx context.Context, tenantId string) (*GetTenantAlertingSettingsResponse, error)

	UpdateTenantAlertingSettings(ctx context.Context, tenantId string, opts *UpdateTenantAlertingSettingsOpts) error

	GetTenantResourceLimitState(ctx context.Context, tenantId string, resource string) (*dbsqlc.GetTenantResourceLimitRow, error)
}

type TenantCallbackOpts added in v0.52.12

type TenantCallbackOpts[T any] struct {
	// contains filtered or unexported fields
}

func (*TenantCallbackOpts[T]) Run added in v0.52.12

func (o *TenantCallbackOpts[T]) Run(l *zerolog.Logger, tenantId string, v T)

type TenantEngineRepository

type TenantEngineRepository interface {
	// ListTenants lists all tenants in the instance
	ListTenants(ctx context.Context) ([]*dbsqlc.Tenant, error)

	// ListTenantsByPartition lists all tenants in the given partition
	ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string) ([]*dbsqlc.Tenant, error)

	ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string) ([]*dbsqlc.Tenant, error)

	ListTenantsBySchedulerPartition(ctx context.Context, schedulerPartitionId string) ([]*dbsqlc.Tenant, error)

	// CreateEnginePartition creates a new partition for tenants within the engine
	CreateControllerPartition(ctx context.Context) (string, error)

	// UpdateControllerPartitionHeartbeat updates the heartbeat for the given partition. If the partition no longer exists,
	// it creates a new partition and returns the new partition id. Otherwise, it returns the existing partition id.
	UpdateControllerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)

	DeleteControllerPartition(ctx context.Context, id string) error

	RebalanceAllControllerPartitions(ctx context.Context) error

	RebalanceInactiveControllerPartitions(ctx context.Context) error

	CreateSchedulerPartition(ctx context.Context) (string, error)

	UpdateSchedulerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)

	DeleteSchedulerPartition(ctx context.Context, id string) error

	RebalanceAllSchedulerPartitions(ctx context.Context) error

	RebalanceInactiveSchedulerPartitions(ctx context.Context) error

	CreateTenantWorkerPartition(ctx context.Context) (string, error)

	UpdateWorkerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)

	DeleteTenantWorkerPartition(ctx context.Context, id string) error

	RebalanceAllTenantWorkerPartitions(ctx context.Context) error

	RebalanceInactiveTenantWorkerPartitions(ctx context.Context) error

	// GetTenantByID returns the tenant with the given id
	GetTenantByID(ctx context.Context, tenantId string) (*dbsqlc.Tenant, error)
}

type TenantInviteRepository

type TenantInviteRepository interface {
	// CreateTenantInvite creates a new tenant invite with the given options
	CreateTenantInvite(tenantId string, opts *CreateTenantInviteOpts) (*db.TenantInviteLinkModel, error)

	// GetTenantInvite returns the tenant invite with the given id
	GetTenantInvite(id string) (*db.TenantInviteLinkModel, error)

	// ListTenantInvitesByEmail returns the list of tenant invites for the given invitee email for invites
	// which are not expired
	ListTenantInvitesByEmail(email string) ([]db.TenantInviteLinkModel, error)

	// ListTenantInvitesByTenantId returns the list of tenant invites for the given tenant id
	ListTenantInvitesByTenantId(tenantId string, opts *ListTenantInvitesOpts) ([]db.TenantInviteLinkModel, error)

	// UpdateTenantInvite updates the tenant invite with the given id
	UpdateTenantInvite(id string, opts *UpdateTenantInviteOpts) (*db.TenantInviteLinkModel, error)

	// DeleteTenantInvite deletes the tenant invite with the given id
	DeleteTenantInvite(id string) error
}

type TenantLimitConfig

type TenantLimitConfig struct {
	EnforceLimits bool
}

type TenantLimitRepository

type TenantLimitRepository interface {
	GetLimits(ctx context.Context, tenantId string) ([]*dbsqlc.TenantResourceLimit, error)

	// CanCreateWorkflowRun checks if the tenant can create a resource
	CanCreate(ctx context.Context, resource dbsqlc.LimitResource, tenantId string, numberOfResources int32) (bool, int, error)

	// MeterWorkflowRun increments the tenant's resource count
	Meter(ctx context.Context, resource dbsqlc.LimitResource, tenantId string, numberOfResources int32) (*dbsqlc.TenantResourceLimit, error)

	// Create new Tenant Resource Limits for a tenant
	SelectOrInsertTenantLimits(ctx context.Context, tenantId string, plan *string) error

	// UpsertTenantLimits updates or inserts new tenant limits
	UpsertTenantLimits(ctx context.Context, tenantId string, plan *string) error

	// Resolve all tenant resource limits
	ResolveAllTenantResourceLimits(ctx context.Context) error

	// SetPlanLimitMap sets the plan limit map
	SetPlanLimitMap(planLimitMap PlanLimitMap) error

	DefaultLimits() []Limit
}

type TenantScopedCallback added in v0.51.1

type TenantScopedCallback[T any] func(string, T) error

func (TenantScopedCallback[T]) Do added in v0.51.1

func (c TenantScopedCallback[T]) Do(l *zerolog.Logger, tenantId string, v T)

type TickerEngineRepository

type TickerEngineRepository interface {
	// CreateNewTicker creates a new ticker.
	CreateNewTicker(ctx context.Context, opts *CreateTickerOpts) (*dbsqlc.Ticker, error)

	// UpdateTicker updates a ticker.
	UpdateTicker(ctx context.Context, tickerId string, opts *UpdateTickerOpts) (*dbsqlc.Ticker, error)

	// ListTickers lists tickers.
	ListTickers(ctx context.Context, opts *ListTickerOpts) ([]*dbsqlc.Ticker, error)

	// DeactivateTicker deletes a ticker.
	DeactivateTicker(ctx context.Context, tickerId string) error

	// PollJobRuns looks for get group key runs who are close to past their timeoutAt value and are in a running state
	PollGetGroupKeyRuns(ctx context.Context, tickerId string) ([]*dbsqlc.GetGroupKeyRun, error)

	// PollCronSchedules returns all cron schedules which should be managed by the ticker
	PollCronSchedules(ctx context.Context, tickerId string) ([]*dbsqlc.PollCronSchedulesRow, error)

	PollScheduledWorkflows(ctx context.Context, tickerId string) ([]*dbsqlc.PollScheduledWorkflowsRow, error)

	PollTenantAlerts(ctx context.Context, tickerId string) ([]*dbsqlc.PollTenantAlertsRow, error)

	PollExpiringTokens(ctx context.Context) ([]*dbsqlc.PollExpiringTokensRow, error)

	PollTenantResourceLimitAlerts(ctx context.Context) ([]*dbsqlc.TenantResourceLimitAlert, error)

	PollUnresolvedFailedStepRuns(ctx context.Context) ([]*dbsqlc.PollUnresolvedFailedStepRunsRow, error)
}

type UnscopedCallback added in v0.51.1

type UnscopedCallback[T any] func(T) error

func (UnscopedCallback[T]) Do added in v0.51.1

func (c UnscopedCallback[T]) Do(l *zerolog.Logger, v T)

type UpdateDispatcherOpts

type UpdateDispatcherOpts struct {
	LastHeartbeatAt *time.Time
}

type UpdateGetGroupKeyRunOpts

type UpdateGetGroupKeyRunOpts struct {
	RequeueAfter *time.Time

	ScheduleTimeoutAt *time.Time

	Status *db.StepRunStatus

	StartedAt *time.Time

	FailedAt *time.Time

	FinishedAt *time.Time

	CancelledAt *time.Time

	CancelledReason *string

	Error *string

	Output *string
}

type UpdateJobRunLookupDataOpts

type UpdateJobRunLookupDataOpts struct {
	FieldPath []string
	Data      []byte
}

type UpdateSessionOpts

type UpdateSessionOpts struct {
	UserId *string `validate:"omitempty,uuid"`

	Data *types.JSON
}

type UpdateStepRunOverridesDataOpts

type UpdateStepRunOverridesDataOpts struct {
	OverrideKey string
	Data        []byte
	CallerFile  *string
}

type UpdateTenantAlertGroupOpts

type UpdateTenantAlertGroupOpts struct {
	Emails []string `validate:"required,dive,email,max=255"`
}

type UpdateTenantAlertingSettingsOpts

type UpdateTenantAlertingSettingsOpts struct {
	LastAlertedAt *time.Time
}

type UpdateTenantInviteOpts

type UpdateTenantInviteOpts struct {
	Status *string `validate:"omitempty,oneof=ACCEPTED REJECTED"`

	// (optional) the role of the invitee
	Role *string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
}

type UpdateTenantMemberOpts

type UpdateTenantMemberOpts struct {
	Role *string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
}

type UpdateTenantOpts

type UpdateTenantOpts struct {
	Name *string

	AnalyticsOptOut *bool `validate:"omitempty"`

	AlertMemberEmails *bool `validate:"omitempty"`
}

type UpdateTickerOpts

type UpdateTickerOpts struct {
	LastHeartbeatAt *time.Time
}

type UpdateUserOpts

type UpdateUserOpts struct {
	EmailVerified *bool
	Name          *string

	// auth options
	Password *string    `validate:"omitempty,required_without=OAuth,excluded_with=OAuth"`
	OAuth    *OAuthOpts `validate:"omitempty,required_without=Password,excluded_with=Password"`
}

type UpdateWebhookWorkerTokenOpts added in v0.42.13

type UpdateWebhookWorkerTokenOpts struct {
	TokenValue *string
	TokenID    *string
}

type UpdateWorkerOpts

type UpdateWorkerOpts struct {
	// The id of the dispatcher
	DispatcherId *string `validate:"omitempty,uuid"`

	// When the last worker heartbeat was
	LastHeartbeatAt *time.Time

	// If the worker is active and accepting new runs
	IsActive *bool

	// A list of actions this worker can run
	Actions []string `validate:"dive,actionId"`
}

type UpdateWorkflowOpts added in v0.48.0

type UpdateWorkflowOpts struct {
	// (optional) is paused -- if true, the workflow will not be scheduled
	IsPaused *bool
}

type UpdateWorkflowRunFromGroupKeyEvalOpts added in v0.45.0

type UpdateWorkflowRunFromGroupKeyEvalOpts struct {
	GroupKey *string

	Error *string
}

type UpsertRateLimitOpts

type UpsertRateLimitOpts struct {
	// The rate limit max value
	Limit int

	// The rate limit duration
	Duration *string `validate:"omitnil,oneof=SECOND MINUTE HOUR DAY WEEK MONTH YEAR"`
}

type UpsertSlackWebhookOpts

type UpsertSlackWebhookOpts struct {
	TeamId string `validate:"required,min=1,max=255"`

	TeamName string `validate:"required,min=1,max=255"`

	ChannelId string `validate:"required,min=1,max=255"`

	ChannelName string `validate:"required,min=1,max=255"`

	WebhookURL []byte `validate:"required,min=1"`
}

type UpsertTenantAlertingSettingsOpts

type UpsertTenantAlertingSettingsOpts struct {
	MaxFrequency                    *string `validate:"omitnil,duration"`
	EnableExpiringTokenAlerts       *bool   `validate:"omitnil"`
	EnableWorkflowRunFailureAlerts  *bool   `validate:"omitnil"`
	EnableTenantResourceLimitAlerts *bool   `validate:"omitnil"`
}

type UpsertWorkerLabelOpts added in v0.40.0

type UpsertWorkerLabelOpts struct {
	Key      string
	IntValue *int32
	StrValue *string
}

type UpsertWorkflowDeploymentConfigOpts

type UpsertWorkflowDeploymentConfigOpts struct {
	// (required) the github app installation id
	GithubAppInstallationId string `validate:"required,uuid"`

	// (required) the github repository name
	GitRepoName string `validate:"required"`

	// (required) the github repository owner
	GitRepoOwner string `validate:"required"`

	// (required) the github repository branch
	GitRepoBranch string `validate:"required"`
}

type UserRepository

type UserRepository interface {
	RegisterCreateCallback(callback UnscopedCallback[*db.UserModel])

	// GetUserByID returns the user with the given id
	GetUserByID(id string) (*db.UserModel, error)

	// GetUserByEmail returns the user with the given email
	GetUserByEmail(email string) (*db.UserModel, error)

	// GetUserPassword returns the user password with the given id
	GetUserPassword(id string) (*db.UserPasswordModel, error)

	// CreateUser creates a new user with the given options
	CreateUser(*CreateUserOpts) (*db.UserModel, error)

	// UpdateUser updates the user with the given email
	UpdateUser(id string, opts *UpdateUserOpts) (*db.UserModel, error)

	// ListTenantMemberships returns the list of tenant memberships for the given user
	ListTenantMemberships(userId string) ([]db.TenantMemberModel, error)
}

type UserSessionRepository

type UserSessionRepository interface {
	Create(opts *CreateSessionOpts) (*db.UserSessionModel, error)
	Update(sessionId string, opts *UpdateSessionOpts) (*db.UserSessionModel, error)
	Delete(sessionId string) (*db.UserSessionModel, error)
	GetById(sessionId string) (*db.UserSessionModel, error)
}

UserSessionRepository represents the set of queries on the UserSession model

type WebhookWorkerEngineRepository added in v0.34.0

type WebhookWorkerEngineRepository interface {
	// ListWebhookWorkersByPartitionId returns the list of webhook workers for a worker partition
	ListWebhookWorkersByPartitionId(ctx context.Context, partitionId string) ([]*dbsqlc.WebhookWorker, error)

	// ListActiveWebhookWorkers returns the list of active webhook workers for the given tenant
	ListActiveWebhookWorkers(ctx context.Context, tenantId string) ([]*dbsqlc.WebhookWorker, error)

	// ListWebhookWorkerRequests returns the list of webhook worker requests for the given webhook worker id
	ListWebhookWorkerRequests(ctx context.Context, webhookWorkerId string) ([]*dbsqlc.WebhookWorkerRequest, error)

	// InsertWebhookWorkerRequest inserts a new webhook worker request with the given options
	InsertWebhookWorkerRequest(ctx context.Context, webhookWorkerId string, method string, statusCode int32) error

	// CreateWebhookWorker creates a new webhook worker with the given options
	CreateWebhookWorker(ctx context.Context, opts *CreateWebhookWorkerOpts) (*dbsqlc.WebhookWorker, error)

	// UpdateWebhookWorkerToken updates a webhook worker with the given id and tenant id
	UpdateWebhookWorkerToken(ctx context.Context, id string, tenantId string, opts *UpdateWebhookWorkerTokenOpts) (*dbsqlc.WebhookWorker, error)

	// SoftDeleteWebhookWorker flags a webhook worker for delete with the given id and tenant id
	SoftDeleteWebhookWorker(ctx context.Context, id string, tenantId string) error

	// HardDeleteWebhookWorker deletes a webhook worker with the given id and tenant id
	HardDeleteWebhookWorker(ctx context.Context, id string, tenantId string) error
}

type WebhookWorkerRepository added in v0.34.0

type WebhookWorkerRepository interface {
	// GetWebhookWorkerByID returns the webhook worker with the given id
	GetWebhookWorkerByID(id string) (*db.WebhookWorkerModel, error)
}

type WorkerAPIRepository

type WorkerAPIRepository interface {
	// ListWorkers lists workers for the tenant
	ListWorkers(tenantId string, opts *ListWorkersOpts) ([]*dbsqlc.ListWorkersWithSlotCountRow, error)

	// ListRecentWorkerStepRuns lists recent step runs for a given worker
	ListWorkerState(tenantId, workerId string, maxRuns int) ([]*dbsqlc.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error)

	// GetWorkerActionsByWorkerId returns a list of actions for a worker
	GetWorkerActionsByWorkerId(tenantid, workerId string) ([]pgtype.Text, error)

	// GetWorkerById returns a worker by its id.
	GetWorkerById(workerId string) (*dbsqlc.GetWorkerByIdRow, error)

	// ListWorkerLabels returns a list of labels config for a worker
	ListWorkerLabels(tenantId, workerId string) ([]*dbsqlc.ListWorkerLabelsRow, error)

	// UpdateWorker updates a worker for a given tenant.
	UpdateWorker(tenantId string, workerId string, opts ApiUpdateWorkerOpts) (*dbsqlc.Worker, error)
}

type WorkerEngineRepository

type WorkerEngineRepository interface {
	// CreateNewWorker creates a new worker for a given tenant.
	CreateNewWorker(ctx context.Context, tenantId string, opts *CreateWorkerOpts) (*dbsqlc.Worker, error)

	// UpdateWorker updates a worker for a given tenant.
	UpdateWorker(ctx context.Context, tenantId, workerId string, opts *UpdateWorkerOpts) (*dbsqlc.Worker, error)

	// UpdateWorker updates a worker in the repository.
	// It will only update the worker if there is no lock on the worker, else it will skip.
	UpdateWorkerHeartbeat(ctx context.Context, tenantId, workerId string, lastHeartbeatAt time.Time) error

	// DeleteWorker removes the worker from the database
	DeleteWorker(ctx context.Context, tenantId, workerId string) error

	// UpdateWorkersByWebhookId removes the worker from the database
	UpdateWorkersByWebhookId(ctx context.Context, opts dbsqlc.UpdateWorkersByWebhookIdParams) error

	GetWorkerForEngine(ctx context.Context, tenantId, workerId string) (*dbsqlc.GetWorkerForEngineRow, error)

	UpdateWorkerActiveStatus(ctx context.Context, tenantId, workerId string, isActive bool, timestamp time.Time) (*dbsqlc.Worker, error)

	UpsertWorkerLabels(ctx context.Context, workerId pgtype.UUID, opts []UpsertWorkerLabelOpts) ([]*dbsqlc.WorkerLabel, error)

	DeleteOldWorkers(ctx context.Context, tenantId string, lastHeartbeatBefore time.Time) (bool, error)

	DeleteOldWorkerEvents(ctx context.Context, tenantId string, lastHeartbeatAfter time.Time) error

	GetDispatcherIdsForWorkers(ctx context.Context, tenantId string, workerIds []string) (map[string][]string, error)
}

type WorkerWithStepCount

type WorkerWithStepCount struct {
	Worker       *db.WorkerModel
	StepRunCount int
}

type WorkflowAPIRepository

type WorkflowAPIRepository interface {
	// ListWorkflows returns all workflows for a given tenant.
	ListWorkflows(tenantId string, opts *ListWorkflowsOpts) (*ListWorkflowsResult, error)

	// GetWorkflowById returns a workflow by its name. It will return db.ErrNotFound if the workflow does not exist.
	GetWorkflowById(context context.Context, workflowId string) (*dbsqlc.GetWorkflowByIdRow, error)

	// GetWorkflowVersionById returns a workflow version by its id. It will return db.ErrNotFound if the workflow
	// version does not exist.
	GetWorkflowVersionById(tenantId, workflowVersionId string) (*dbsqlc.GetWorkflowVersionByIdRow,
		[]*dbsqlc.WorkflowTriggerCronRef,
		[]*dbsqlc.WorkflowTriggerEventRef,
		[]*dbsqlc.WorkflowTriggerScheduledRef,
		error)

	// DeleteWorkflow deletes a workflow for a given tenant.
	DeleteWorkflow(ctx context.Context, tenantId, workflowId string) (*dbsqlc.Workflow, error)

	// GetWorkflowVersionMetrics returns the metrics for a given workflow version.
	GetWorkflowMetrics(tenantId, workflowId string, opts *GetWorkflowMetricsOpts) (*WorkflowMetrics, error)

	// UpdateWorkflow updates a workflow for a given tenant.
	UpdateWorkflow(ctx context.Context, tenantId, workflowId string, opts *UpdateWorkflowOpts) (*dbsqlc.Workflow, error)

	// GetWorkflowWorkerCount returns the number of workers for a given workflow.
	GetWorkflowWorkerCount(tenantId, workflowId string) (int, int, error)

	// CreateCronWorkflow creates a cron trigger
	CreateCronWorkflow(ctx context.Context, tenantId string, opts *CreateCronWorkflowTriggerOpts) (*dbsqlc.ListCronWorkflowsRow, error)

	// List ScheduledWorkflows lists workflows by scheduled trigger
	ListCronWorkflows(ctx context.Context, tenantId string, opts *ListCronWorkflowsOpts) ([]*dbsqlc.ListCronWorkflowsRow, int64, error)

	// GetCronWorkflow gets a cron workflow run
	GetCronWorkflow(ctx context.Context, tenantId, cronWorkflowId string) (*dbsqlc.ListCronWorkflowsRow, error)

	// DeleteCronWorkflow deletes a cron workflow run
	DeleteCronWorkflow(ctx context.Context, tenantId, id string) error

	// CreateScheduledWorkflow creates a scheduled workflow run
	CreateScheduledWorkflow(ctx context.Context, tenantId string, opts *CreateScheduledWorkflowRunForWorkflowOpts) (*dbsqlc.ListScheduledWorkflowsRow, error)
}

type WorkflowEngineRepository

type WorkflowEngineRepository interface {
	// CreateNewWorkflow creates a new workflow for a given tenant. It will create the parent
	// workflow based on the version's name.
	CreateNewWorkflow(ctx context.Context, tenantId string, opts *CreateWorkflowVersionOpts) (*dbsqlc.GetWorkflowVersionForEngineRow, error)

	// CreateWorkflowVersion creates a new workflow version for a given tenant. This will fail if there is
	// not a parent workflow with the same name already in the database.
	CreateWorkflowVersion(ctx context.Context, tenantId string, opts *CreateWorkflowVersionOpts, oldWorkflowVersion *dbsqlc.GetWorkflowVersionForEngineRow) (*dbsqlc.GetWorkflowVersionForEngineRow, error)

	// CreateSchedules creates schedules for a given workflow version.
	CreateSchedules(ctx context.Context, tenantId, workflowVersionId string, opts *CreateWorkflowSchedulesOpts) ([]*dbsqlc.WorkflowTriggerScheduledRef, error)

	GetLatestWorkflowVersion(ctx context.Context, tenantId, workflowId string) (*dbsqlc.GetWorkflowVersionForEngineRow, error)

	GetLatestWorkflowVersions(ctx context.Context, tenantId string, workflowIds []string) ([]*dbsqlc.GetWorkflowVersionForEngineRow, error)

	// GetWorkflowByName returns a workflow by its name. It will return db.ErrNotFound if the workflow does not exist.
	GetWorkflowByName(ctx context.Context, tenantId, workflowName string) (*dbsqlc.Workflow, error)

	// GetWorkflowsByName returns all workflows by their name. It will return db.ErrNotFound if the workflow does not exist.
	GetWorkflowsByNames(ctx context.Context, tenantId string, workflowNames []string) ([]*dbsqlc.Workflow, error)

	// ListWorkflowsForEvent returns the latest workflow versions for a given tenant that are triggered by the
	// given event.
	ListWorkflowsForEvent(ctx context.Context, tenantId, eventKey string) ([]*dbsqlc.GetWorkflowVersionForEngineRow, error)

	// GetWorkflowVersionById returns a workflow version by its id. It will return db.ErrNotFound if the workflow
	// version does not exist.
	GetWorkflowVersionById(ctx context.Context, tenantId, workflowVersionId string) (*dbsqlc.GetWorkflowVersionForEngineRow, error)
}

type WorkflowMetrics

type WorkflowMetrics struct {
	// the number of runs for a specific group key
	GroupKeyRunsCount int `json:"groupKeyRunsCount,omitempty"`

	// the total number of concurrency group keys
	GroupKeyCount int `json:"groupKeyCount,omitempty"`
}

type WorkflowRunAPIRepository

type WorkflowRunAPIRepository interface {
	RegisterCreateCallback(callback TenantScopedCallback[*dbsqlc.WorkflowRun])

	// ListWorkflowRuns returns workflow runs for a given workflow version id.
	ListWorkflowRuns(ctx context.Context, tenantId string, opts *ListWorkflowRunsOpts) (*ListWorkflowRunsResult, error)

	// Counts by status
	WorkflowRunMetricsCount(ctx context.Context, tenantId string, opts *WorkflowRunsMetricsOpts) (*dbsqlc.WorkflowRunsMetricsCountRow, error)

	// List ScheduledWorkflows lists workflows by scheduled trigger
	ListScheduledWorkflows(ctx context.Context, tenantId string, opts *ListScheduledWorkflowsOpts) ([]*dbsqlc.ListScheduledWorkflowsRow, int64, error)

	// DeleteScheduledWorkflow deletes a scheduled workflow run
	DeleteScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) error

	// GetScheduledWorkflow gets a scheduled workflow run
	GetScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) (*dbsqlc.ListScheduledWorkflowsRow, error)

	// UpdateScheduledWorkflow updates a scheduled workflow run
	UpdateScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string, triggerAt time.Time) error

	// CreateNewWorkflowRun creates a new workflow run for a workflow version.
	CreateNewWorkflowRun(ctx context.Context, tenantId string, opts *CreateWorkflowRunOpts) (*dbsqlc.WorkflowRun, error)

	// GetWorkflowRunById returns a workflow run by id.
	GetWorkflowRunById(ctx context.Context, tenantId, runId string) (*dbsqlc.GetWorkflowRunByIdRow, error)

	// GetWorkflowRunById returns a workflow run by id.
	GetWorkflowRunByIds(ctx context.Context, tenantId string, runIds []string) ([]*dbsqlc.GetWorkflowRunByIdsRow, error)

	GetStepsForJobs(ctx context.Context, tenantId string, jobIds []string) ([]*dbsqlc.GetStepsForJobsRow, error)

	GetStepRunsForJobRuns(ctx context.Context, tenantId string, jobRunIds []string) ([]*StepRunForJobRun, error)
}

type WorkflowRunEngineRepository

type WorkflowRunEngineRepository interface {
	RegisterCreateCallback(callback TenantScopedCallback[*dbsqlc.WorkflowRun])
	RegisterQueuedCallback(callback TenantScopedCallback[pgtype.UUID])

	// ListWorkflowRuns returns workflow runs for a given workflow version id.
	ListWorkflowRuns(ctx context.Context, tenantId string, opts *ListWorkflowRunsOpts) (*ListWorkflowRunsResult, error)

	GetChildWorkflowRun(ctx context.Context, parentId, parentStepRunId string, childIndex int, childkey *string) (*dbsqlc.WorkflowRun, error)

	GetChildWorkflowRuns(ctx context.Context, childWorkflowRuns []ChildWorkflowRun) ([]*dbsqlc.WorkflowRun, error)

	GetScheduledChildWorkflowRun(ctx context.Context, parentId, parentStepRunId string, childIndex int, childkey *string) (*dbsqlc.WorkflowTriggerScheduledRef, error)

	PopWorkflowRunsCancelInProgress(ctx context.Context, tenantId, workflowVersionId string, maxRuns int) (toCancel []*dbsqlc.WorkflowRun, toStart []*dbsqlc.WorkflowRun, err error)

	PopWorkflowRunsCancelNewest(ctx context.Context, tenantId, workflowVersionId string, maxRuns int) (toCancel []*dbsqlc.WorkflowRun, toStart []*dbsqlc.WorkflowRun, err error)

	PopWorkflowRunsRoundRobin(ctx context.Context, tenantId, workflowVersionId string, maxRuns int) ([]*dbsqlc.WorkflowRun, []*dbsqlc.GetStepRunForEngineRow, error)

	// CreateNewWorkflowRun creates a new workflow run for a workflow version.
	CreateNewWorkflowRun(ctx context.Context, tenantId string, opts *CreateWorkflowRunOpts) (*dbsqlc.WorkflowRun, error)

	// CreateNewWorkflowRuns creates new workflow runs in bulk
	CreateNewWorkflowRuns(ctx context.Context, tenantId string, opts []*CreateWorkflowRunOpts) ([]*dbsqlc.WorkflowRun, error)

	CreateDeDupeKey(ctx context.Context, tenantId, workflowRunId, worrkflowVersionId, dedupeValue string) error

	GetWorkflowRunInputData(tenantId, workflowRunId string) (map[string]interface{}, error)

	ProcessWorkflowRunUpdates(ctx context.Context, tenantId string) (bool, error)

	UpdateWorkflowRunFromGroupKeyEval(ctx context.Context, tenantId, workflowRunId string, opts *UpdateWorkflowRunFromGroupKeyEvalOpts) error

	// GetWorkflowRunById returns a workflow run by id.
	GetWorkflowRunById(ctx context.Context, tenantId, runId string) (*dbsqlc.GetWorkflowRunRow, error)

	// TODO maybe we don't need this?
	GetWorkflowRunByIds(ctx context.Context, tenantId string, runId []string) ([]*dbsqlc.GetWorkflowRunRow, error)

	QueuePausedWorkflowRun(ctx context.Context, tenantId, workflowId, workflowRunId string) error

	QueueWorkflowRunJobs(ctx context.Context, tenant string, workflowRun string) ([]*dbsqlc.GetStepRunForEngineRow, error)

	ProcessUnpausedWorkflowRuns(ctx context.Context, tenantId string) ([]*dbsqlc.GetWorkflowRunRow, bool, error)

	GetWorkflowRunAdditionalMeta(ctx context.Context, tenantId, workflowRunId string) (*dbsqlc.GetWorkflowRunAdditionalMetaRow, error)

	ReplayWorkflowRun(ctx context.Context, tenantId, workflowRunId string) (*dbsqlc.GetWorkflowRunRow, error)

	ListActiveQueuedWorkflowVersions(ctx context.Context, tenantId string) ([]*dbsqlc.ListActiveQueuedWorkflowVersionsRow, error)

	// DeleteExpiredWorkflowRuns deletes workflow runs that were created before the given time. It returns the number of deleted runs
	// and the number of non-deleted runs that match the conditions.
	SoftDeleteExpiredWorkflowRuns(ctx context.Context, tenantId string, statuses []dbsqlc.WorkflowRunStatus, before time.Time) (bool, error)
}

type WorkflowRunMetricsCountOpts

type WorkflowRunMetricsCountOpts struct {
	// (optional) the workflow id
	WorkflowId *string `validate:"omitempty,uuid"`

	// (optional) the workflow version id
	WorkflowVersionId *string `validate:"omitempty,uuid"`
}

type WorkflowRunsMetricsOpts

type WorkflowRunsMetricsOpts struct {
	// (optional) the workflow id
	WorkflowId *string `validate:"omitempty,uuid"`

	// (optional) the workflow version id
	WorkflowVersionId *string `validate:"omitempty,uuid"`

	// (optional) the parent workflow run id
	ParentId *string `validate:"omitempty,uuid"`

	// (optional) the parent step run id
	ParentStepRunId *string `validate:"omitempty,uuid"`

	// (optional) the event id that triggered the workflow run
	EventId *string `validate:"omitempty,uuid"`

	// (optional) exact metadata to filter by
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`

	// (optional) the time the workflow run was created before
	CreatedBefore *time.Time `validate:"omitempty"`

	// (optional) the time the workflow run was created after
	CreatedAfter *time.Time `validate:"omitempty"`
}

Directories

Path Synopsis
db

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL