worker

package
v0.53.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: MIT Imports: 26 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func At

func At(t ...time.Time) scheduled

func Cron

func Cron(c string) cron

func Crons

func Crons(c ...string) cronArr

func Event

func Event(e string) event

func Events

func Events(events ...string) eventsArr

func NoTrigger

func NoTrigger() noTrigger

Types

type Action

type Action interface {
	// Name returns the name of the action
	Name() string

	// Run runs the action
	Run(args ...any) []any

	MethodFn() any

	ConcurrencyFn() GetWorkflowConcurrencyGroupFn

	// Service returns the service that the action belongs to
	Service() string

	Compute() *compute.Compute
}

Action is an individual action that can be run by the worker.

type ActionMap added in v0.51.4

type ActionMap map[string]ActionWithCompute

type ActionPayload added in v0.34.3

type ActionPayload struct {
	*client.Action

	ActionPayload string `json:"actionPayload"`
}

type ActionRegistry added in v0.51.4

type ActionRegistry map[string]Action

type ActionWithCompute added in v0.51.4

type ActionWithCompute struct {
	// contains filtered or unexported fields
}

type GetWorkflowConcurrencyGroupFn added in v0.8.0

type GetWorkflowConcurrencyGroupFn func(ctx HatchetContext) (string, error)

type HatchetContext

type HatchetContext interface {
	context.Context

	SetContext(ctx context.Context)

	GetContext() context.Context

	Worker() HatchetWorkerContext

	StepOutput(step string, target interface{}) error

	TriggeredByEvent() bool

	WorkflowInput(target interface{}) error

	UserData(target interface{}) error

	AdditionalMetadata() map[string]string

	StepName() string

	StepRunId() string

	WorkflowRunId() string

	Log(message string)

	StreamEvent(message []byte)

	SpawnWorkflow(workflowName string, input any, opts *SpawnWorkflowOpts) (*client.Workflow, error)

	SpawnWorkflows(childWorkflows []*SpawnWorkflowsOpts) ([]*client.Workflow, error)

	ReleaseSlot() error

	RefreshTimeout(incrementTimeoutBy string) error

	RetryCount() int
	// contains filtered or unexported methods
}

type HatchetWorkerContext added in v0.40.0

type HatchetWorkerContext interface {
	context.Context

	SetContext(ctx context.Context)

	GetContext() context.Context

	ID() string

	GetLabels() map[string]interface{}

	UpsertLabels(labels map[string]interface{}) error
}

type HealthCheckResponse added in v0.34.0

type HealthCheckResponse struct {
	Actions []string `json:"actions"`
}

type JobRunLookupData

type JobRunLookupData struct {
	Input       map[string]interface{} `json:"input"`
	TriggeredBy TriggeredBy            `json:"triggered_by"`
	Steps       map[string]StepData    `json:"steps,omitempty"`
}

type ManagedCompute added in v0.51.4

type ManagedCompute struct {
	ActionRegistry  *ActionRegistry
	Client          client.Client
	MaxRuns         int
	RuntimeConfigs  []rest.CreateManagedWorkerRuntimeConfigRequest
	CloudRegisterID *string
	Logger          *zerolog.Logger
}

func NewManagedCompute added in v0.51.4

func NewManagedCompute(actionRegistry *ActionRegistry, client client.Client, maxRuns int) *ManagedCompute

func (*ManagedCompute) CloudRegister added in v0.51.4

func (mc *ManagedCompute) CloudRegister(ctx context.Context)

type MiddlewareFunc

type MiddlewareFunc func(ctx HatchetContext, next func(HatchetContext) error) error

type RateLimit added in v0.19.0

type RateLimit struct {
	// Key is the rate limit key
	Key     string  `yaml:"key,omitempty"`
	KeyExpr *string `yaml:"keyExpr,omitempty"`

	// Units is the amount of units this step consumes
	Units          *int    `yaml:"units,omitempty"`
	UnitsExpr      *string `yaml:"unitsExpr,omitempty"`
	LimitValueExpr *string `yaml:"limitValueExpr,omitempty"`
}

type RegisterActionOpt

type RegisterActionOpt func(*registerActionOpts)

func WithActionName

func WithActionName(name string) RegisterActionOpt

func WithCompute added in v0.51.4

func WithCompute(compute *compute.Compute) RegisterActionOpt

type RegisterWebhookWorkerOpts added in v0.34.0

type RegisterWebhookWorkerOpts struct {
	Name   string
	URL    string
	Secret *string
}

type Service

type Service struct {
	Name string
	// contains filtered or unexported fields
}

func (*Service) Call

func (s *Service) Call(verb string) *WorkflowStep

func (*Service) On deprecated

func (s *Service) On(t triggerConverter, workflow workflowConverter) error

Deprecated: Use RegisterWorkflow instead

func (*Service) RegisterAction

func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error

func (*Service) RegisterWorkflow added in v0.34.0

func (s *Service) RegisterWorkflow(workflow workflowConverter) error

func (*Service) Use

func (s *Service) Use(mws ...MiddlewareFunc)

type SpawnWorkflowOpts added in v0.18.0

type SpawnWorkflowOpts struct {
	Key                *string
	Sticky             *bool
	AdditionalMetadata *map[string]string
}

type SpawnWorkflowsOpts added in v0.50.0

type SpawnWorkflowsOpts struct {
	WorkflowName       string
	Input              any
	Key                *string
	Sticky             *bool
	AdditionalMetadata *map[string]string
}

type Step

type Step struct {
	Id string

	// non-ctx input is not optional
	NonCtxInput reflect.Type

	// non-err output is optional
	NonErrOutput *reflect.Type

	APIStep types.WorkflowStep
}

type StepData

type StepData map[string]interface{}

type StepRunData

type StepRunData struct {
	Input              map[string]interface{} `json:"input"`
	TriggeredBy        TriggeredBy            `json:"triggered_by"`
	Parents            map[string]StepData    `json:"parents"`
	AdditionalMetadata map[string]string      `json:"additional_metadata"`
	UserData           map[string]interface{} `json:"user_data"`
}

type TriggeredBy

type TriggeredBy string

TODO: move this into proto definitions

const (
	TriggeredByEvent    TriggeredBy = "event"
	TriggeredByCron     TriggeredBy = "cron"
	TriggeredBySchedule TriggeredBy = "schedule"
)

type WebhookHandlerOptions added in v0.34.0

type WebhookHandlerOptions struct {
	Secret string
}

type WebhookWorkerOpts added in v0.34.0

type WebhookWorkerOpts struct {
	URL       string
	Secret    string
	WebhookId string
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(fs ...WorkerOpt) (*Worker, error)

NewWorker creates a new worker instance

func (*Worker) Call

func (w *Worker) Call(action string) *WorkflowStep

func (*Worker) NewService

func (w *Worker) NewService(name string) *Service

func (*Worker) On deprecated

func (w *Worker) On(t triggerConverter, workflow workflowConverter) error

Deprecated: Use RegisterWorkflow instead

func (*Worker) RegisterAction

func (w *Worker) RegisterAction(actionId string, method any) error

RegisterAction can be used to register a single action which can be reused across multiple workflows.

An action should be of the format <service>:<verb>, for example slack:create-channel.

The method must match the following signatures: - func(ctx context.Context) error - func(ctx context.Context, input *Input) error - func(ctx context.Context, input *Input) (*Output, error) - func(ctx context.Context) (*Output, error)

func (*Worker) RegisterWebhook added in v0.34.0

func (w *Worker) RegisterWebhook(ww RegisterWebhookWorkerOpts) error

func (*Worker) RegisterWorkflow added in v0.34.0

func (w *Worker) RegisterWorkflow(workflow workflowConverter) error

func (*Worker) Run added in v0.52.12

func (w *Worker) Run(ctx context.Context) error

Run starts the worker in blocking fashion, returning an error if the worker could not be started or if the worker stopped due to a networking issue.

func (*Worker) Start

func (w *Worker) Start() (func() error, error)

Start starts the worker in non-blocking fashion, returning a cleanup function and an error if the worker could not be started.

func (*Worker) StartWebhook added in v0.34.0

func (w *Worker) StartWebhook(ww WebhookWorkerOpts) (func() error, error)

TODO do not expose this to the end-user client somehow

func (*Worker) Use

func (w *Worker) Use(mws ...MiddlewareFunc)

func (*Worker) WebhookHttpHandler added in v0.34.0

func (w *Worker) WebhookHttpHandler(opts WebhookHandlerOptions, workflows ...workflowConverter) http.HandlerFunc

type WorkerOpt

type WorkerOpt func(*WorkerOpts)

func WithClient

func WithClient(client client.Client) WorkerOpt

func WithErrorAlerter

func WithErrorAlerter(alerter errors.Alerter) WorkerOpt

func WithIntegration

func WithIntegration(integration integrations.Integration) WorkerOpt

func WithInternalData added in v0.34.0

func WithInternalData(actions []string) WorkerOpt

func WithLabels added in v0.40.0

func WithLabels(labels map[string]interface{}) WorkerOpt

func WithLogLevel added in v0.18.1

func WithLogLevel(lvl string) WorkerOpt

func WithMaxRuns added in v0.12.0

func WithMaxRuns(maxRuns int) WorkerOpt

func WithName

func WithName(name string) WorkerOpt

type WorkerOpts

type WorkerOpts struct {
	// contains filtered or unexported fields
}

type Workflow

type Workflow struct {
	Jobs []WorkflowJob
}

type WorkflowConcurrency added in v0.8.0

type WorkflowConcurrency struct {
	// contains filtered or unexported fields
}

func Concurrency added in v0.8.0

func Expression added in v0.45.0

func Expression(expr string) *WorkflowConcurrency

func (*WorkflowConcurrency) LimitStrategy added in v0.8.0

func (*WorkflowConcurrency) MaxRuns added in v0.8.0

func (c *WorkflowConcurrency) MaxRuns(maxRuns int32) *WorkflowConcurrency

type WorkflowJob

type WorkflowJob struct {
	// The name of the job
	Name string

	Description string

	On triggerConverter

	Concurrency *WorkflowConcurrency

	// The steps that are run in the job
	Steps []*WorkflowStep

	OnFailure *WorkflowJob

	ScheduleTimeout string

	StickyStrategy *types.StickyStrategy
}

func (*WorkflowJob) ToActionMap

func (j *WorkflowJob) ToActionMap(svcName string) ActionMap

func (*WorkflowJob) ToWorkflow

func (j *WorkflowJob) ToWorkflow(svcName string, namespace string) types.Workflow

func (*WorkflowJob) ToWorkflowJob

func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.WorkflowJob, error)

func (*WorkflowJob) ToWorkflowTrigger added in v0.34.0

func (j *WorkflowJob) ToWorkflowTrigger() triggerConverter

type WorkflowStep

type WorkflowStep struct {
	// The step timeout
	Timeout string

	// The executed function
	Function any

	// The step id/name. If not set, one will be generated from the function name
	Name string

	// The ids of the parents
	Parents []string

	Retries int

	RetryBackoffFactor *float32

	RetryMaxBackoffSeconds *int32

	RateLimit []RateLimit

	DesiredLabels map[string]*types.DesiredWorkerLabel

	Compute *compute.Compute
}

func Fn

func Fn(f any) *WorkflowStep

func (*WorkflowStep) AddParents

func (w *WorkflowStep) AddParents(parents ...string) *WorkflowStep

func (*WorkflowStep) GetActionId

func (w *WorkflowStep) GetActionId(svcName string, index int) string

func (*WorkflowStep) GetStepId

func (w *WorkflowStep) GetStepId(index int) string

func (*WorkflowStep) SetCompute added in v0.51.4

func (w *WorkflowStep) SetCompute(compute *compute.Compute) *WorkflowStep

func (*WorkflowStep) SetDesiredLabels added in v0.40.0

func (w *WorkflowStep) SetDesiredLabels(labels map[string]*types.DesiredWorkerLabel) *WorkflowStep

func (*WorkflowStep) SetName

func (w *WorkflowStep) SetName(name string) *WorkflowStep

func (*WorkflowStep) SetRateLimit added in v0.19.0

func (w *WorkflowStep) SetRateLimit(rateLimit RateLimit) *WorkflowStep

func (*WorkflowStep) SetRetries added in v0.11.0

func (w *WorkflowStep) SetRetries(retries int) *WorkflowStep

func (*WorkflowStep) SetRetryBackoffFactor added in v0.52.0

func (w *WorkflowStep) SetRetryBackoffFactor(retryBackoffFactor float32) *WorkflowStep

func (*WorkflowStep) SetRetryMaxBackoffSeconds added in v0.52.0

func (w *WorkflowStep) SetRetryMaxBackoffSeconds(retryMaxBackoffSeconds int32) *WorkflowStep

func (*WorkflowStep) SetTimeout

func (w *WorkflowStep) SetTimeout(timeout string) *WorkflowStep

func (*WorkflowStep) ToActionMap

func (w *WorkflowStep) ToActionMap(svcName string) ActionMap

func (*WorkflowStep) ToWorkflow

func (w *WorkflowStep) ToWorkflow(svcName string, namespace string) types.Workflow

func (*WorkflowStep) ToWorkflowStep

func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace string) (*Step, error)

func (*WorkflowStep) ToWorkflowTrigger added in v0.34.0

func (w *WorkflowStep) ToWorkflowTrigger() triggerConverter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL