worker

package
v0.33.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2024 License: MIT Imports: 17 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func At

func At(t ...time.Time) scheduled

func Cron

func Cron(c string) cron

func Crons

func Crons(c ...string) cronArr

func Event

func Event(e string) event

func Events

func Events(events ...string) eventsArr

func NoTrigger

func NoTrigger() noTrigger

Types

type Action

type Action interface {
	// Name returns the name of the action
	Name() string

	// Run runs the action
	Run(args ...any) []any

	MethodFn() any

	ConcurrencyFn() GetWorkflowConcurrencyGroupFn

	// Service returns the service that the action belongs to
	Service() string
}

Action is an individual action that can be run by the worker.

type ChildWorkflow added in v0.18.0

type ChildWorkflow struct {
	// contains filtered or unexported fields
}

func (*ChildWorkflow) Result added in v0.18.0

func (c *ChildWorkflow) Result() (*ChildWorkflowResult, error)

type ChildWorkflowResult added in v0.18.0

type ChildWorkflowResult struct {
	// contains filtered or unexported fields
}

func (*ChildWorkflowResult) StepOutput added in v0.18.0

func (r *ChildWorkflowResult) StepOutput(key string, v interface{}) error

type GetWorkflowConcurrencyGroupFn added in v0.8.0

type GetWorkflowConcurrencyGroupFn func(ctx HatchetContext) (string, error)

type HatchetContext

type HatchetContext interface {
	context.Context

	SetContext(ctx context.Context)

	GetContext() context.Context

	StepOutput(step string, target interface{}) error

	TriggeredByEvent() bool

	WorkflowInput(target interface{}) error

	StepName() string

	StepRunId() string

	WorkflowRunId() string

	Log(message string)

	StreamEvent(message []byte)

	SpawnWorkflow(workflowName string, input any, opts *SpawnWorkflowOpts) (*ChildWorkflow, error)

	ReleaseSlot() error

	RefreshTimeout(incrementTimeoutBy string) error

	RetryCount() int
	// contains filtered or unexported methods
}

type JobRunLookupData

type JobRunLookupData struct {
	Input       map[string]interface{} `json:"input"`
	TriggeredBy TriggeredBy            `json:"triggered_by"`
	Steps       map[string]StepData    `json:"steps,omitempty"`
}

type MiddlewareFunc

type MiddlewareFunc func(ctx HatchetContext, next func(HatchetContext) error) error

type RateLimit added in v0.19.0

type RateLimit struct {
	// Units is the amount of units this step consumes
	Units int

	// Key is the rate limit key
	Key string
}

type RegisterActionOpt

type RegisterActionOpt func(*registerActionOpts)

func WithActionName

func WithActionName(name string) RegisterActionOpt

type Service

type Service struct {
	Name string
	// contains filtered or unexported fields
}

func (*Service) Call

func (s *Service) Call(verb string) *WorkflowStep

func (*Service) On

func (s *Service) On(t triggerConverter, workflow workflowConverter) error

func (*Service) RegisterAction

func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error

func (*Service) Use

func (s *Service) Use(mws ...MiddlewareFunc)

type SpawnWorkflowOpts added in v0.18.0

type SpawnWorkflowOpts struct {
	Key *string
}

type Step

type Step struct {
	Id string

	// non-ctx input is not optional
	NonCtxInput reflect.Type

	// non-err output is optional
	NonErrOutput *reflect.Type

	APIStep types.WorkflowStep
}

type StepData

type StepData map[string]interface{}

type StepRunData

type StepRunData struct {
	Input       map[string]interface{} `json:"input"`
	TriggeredBy TriggeredBy            `json:"triggered_by"`
	Parents     map[string]StepData    `json:"parents"`
}

type TriggeredBy

type TriggeredBy string

TODO: move this into proto definitions

const (
	TriggeredByEvent    TriggeredBy = "event"
	TriggeredByCron     TriggeredBy = "cron"
	TriggeredBySchedule TriggeredBy = "schedule"
)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(fs ...WorkerOpt) (*Worker, error)

NewWorker creates a new worker instance

func (*Worker) Call

func (w *Worker) Call(action string) *WorkflowStep

func (*Worker) NewService

func (w *Worker) NewService(name string) *Service

func (*Worker) On

func (w *Worker) On(t triggerConverter, workflow workflowConverter) error

func (*Worker) RegisterAction

func (w *Worker) RegisterAction(actionId string, method any) error

RegisterAction can be used to register a single action which can be reused across multiple workflows.

An action should be of the format <service>:<verb>, for example slack:create-channel.

The method must match the following signatures: - func(ctx context.Context) error - func(ctx context.Context, input *Input) error - func(ctx context.Context, input *Input) (*Output, error) - func(ctx context.Context) (*Output, error)

func (*Worker) Start

func (w *Worker) Start() (func() error, error)

Start starts the worker in blocking fashion

func (*Worker) Use

func (w *Worker) Use(mws ...MiddlewareFunc)

type WorkerOpt

type WorkerOpt func(*WorkerOpts)

func WithClient

func WithClient(client client.Client) WorkerOpt

func WithErrorAlerter

func WithErrorAlerter(alerter errors.Alerter) WorkerOpt

func WithIntegration

func WithIntegration(integration integrations.Integration) WorkerOpt

func WithLogLevel added in v0.18.1

func WithLogLevel(lvl string) WorkerOpt

func WithMaxRuns added in v0.12.0

func WithMaxRuns(maxRuns int) WorkerOpt

func WithName

func WithName(name string) WorkerOpt

type WorkerOpts

type WorkerOpts struct {
	// contains filtered or unexported fields
}

type Workflow

type Workflow struct {
	Jobs []WorkflowJob
}

type WorkflowConcurrency added in v0.8.0

type WorkflowConcurrency struct {
	// contains filtered or unexported fields
}

func Concurrency added in v0.8.0

func (*WorkflowConcurrency) LimitStrategy added in v0.8.0

func (*WorkflowConcurrency) MaxRuns added in v0.8.0

func (c *WorkflowConcurrency) MaxRuns(maxRuns int32) *WorkflowConcurrency

type WorkflowJob

type WorkflowJob struct {
	// The name of the job
	Name string

	Description string

	Concurrency *WorkflowConcurrency

	// The steps that are run in the job
	Steps []*WorkflowStep

	OnFailure *WorkflowJob
}

func (*WorkflowJob) ToActionMap

func (j *WorkflowJob) ToActionMap(svcName string) map[string]any

func (*WorkflowJob) ToWorkflow

func (j *WorkflowJob) ToWorkflow(svcName string, namespace string) types.Workflow

func (*WorkflowJob) ToWorkflowJob

func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.WorkflowJob, error)

type WorkflowStep

type WorkflowStep struct {
	// The step timeout
	Timeout string

	// The executed function
	Function any

	// The step id/name. If not set, one will be generated from the function name
	Name string

	// The ids of the parents
	Parents []string

	Retries int

	RateLimit []RateLimit
}

func Fn

func Fn(f any) *WorkflowStep

func (*WorkflowStep) AddParents

func (w *WorkflowStep) AddParents(parents ...string) *WorkflowStep

func (*WorkflowStep) GetActionId

func (w *WorkflowStep) GetActionId(svcName string, index int) string

func (*WorkflowStep) GetStepId

func (w *WorkflowStep) GetStepId(index int) string

func (*WorkflowStep) SetName

func (w *WorkflowStep) SetName(name string) *WorkflowStep

func (*WorkflowStep) SetRateLimit added in v0.19.0

func (w *WorkflowStep) SetRateLimit(rateLimit RateLimit) *WorkflowStep

func (*WorkflowStep) SetRetries added in v0.11.0

func (w *WorkflowStep) SetRetries(retries int) *WorkflowStep

func (*WorkflowStep) SetTimeout

func (w *WorkflowStep) SetTimeout(timeout string) *WorkflowStep

func (*WorkflowStep) ToActionMap

func (w *WorkflowStep) ToActionMap(svcName string) map[string]any

func (*WorkflowStep) ToWorkflow

func (w *WorkflowStep) ToWorkflow(svcName string, namespace string) types.Workflow

func (*WorkflowStep) ToWorkflowStep

func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace string) (*Step, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL