jorb

package module
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2024 License: MIT Imports: 15 Imported by: 0

README

JORB - Concentrated Computing

Jorb is a Concentrated (as opposed to Distributed) workflow system. Single computers are BIG and FAST these days. You can do a lot with them, but there aren't many nice workflow composition systems that don't require all sorts of distributed craziness like a message queue and multiple nodes and processes.

Jorb is that system for golang.

I use it when I have complex batch jobs that need:

  • To not run in a massively complex distributed computing system
  • Easy to debug and expand while in flight (who doesn't code in the debugger)
  • Rate limiting per step
  • Concurrency controls per step
  • Fancy Status bars (I found that https://github.com/vbauerster/mpb makes a good status listener, not provided currently)
  • Error recovery
  • State checkpointing
  • Very easy to stop and restart at whim

For instance I'm currently using it to power large sets of code changes, where I need to check out packages, make changes, commit, trial build on a fleet, monitor status, fork state depending on build success or failure, cut cr, send cr via email/slack/etc, check status. Many of these steps have rate limits with services or sane rate limits with the OS.

Quick Start

The unit tests in processor_test.go provide a pretty good example to get started, for instance https://github.com/gaffo/jorb/blob/main/processor_test.go#L111

Installation

To install Jorb, use:

go get github.com/gaffo/jorb

Concepts

When making a program you basically need the following: AC, OC, JC, Job, Run, States, StatusListener, Serializer, and a Processor

AC: Application Context

This is a struct where you can pass job specific application context to the jobs as they execute state transitions. Generaly I put implementations or funcs that let me call apis, like aws sdk clients.

This is usually epmemeral per go process run, don't keep state here, just glue your app into this

This is passed into each state exec function invocation

OC: Overalll Context

This is the overall state for your job, store state (generally immutable) here for values that all of your jobs in a Run will need. I'll store things like a database name, or file name or work directory that is specific to that run.

This is passed into each state exec function invocation

JC: Job Context

This is a per-job state that is specific to this workflow. Go wide table format here, using a union of all of the fields that all of this workflow's states will need.

This is passed into each state exec function invocation. You mutate it locally and return the updated JC.

NOTE: These fields need to be JSON serialzable.

Job

A job has a State (string) and a JC which contains your workflow specific state for each job. Jobs also track their state transitions, parent jobs, and errors per state.

Run

A run is a serializable group of jobs. Generally you create a run and add jobs to it then fire it at a processor. Or you load a previous job with a serialzier, fire it at a processor. It's meant to be super restartable.

You can also load up a run and spit outreports once it's been fully processed (or reallly at any time). It contains ALL of the state for a job other than the AC.

States

A State is a description of a possible state that a job can be in, a state has:

  • A TriggerState which is a string matching the state of the jobs you want this state to process
  • An optional ExecFunction which does the acutal processing (more in a sec)
  • Terminal: if the state is terminal, then it won't process, and a run will be considered complete when all jobs are in terminal states. Fun note, you can just swap in code on if a state is terminal to patch up workflows or to stop certain actions (I turn terminal off in off hours so I don't send actual CRs, just all the pre-validation). flag.Bool works great for this.
  • Concurrency: the number of concurrent procesors for this state, this is nice if the steps take a while esp on network calls
  • RateLimit: a rate.Limit that is shared by all processors for this state, great if you are hitting a rate limited api

Typically you want to be pretty granular with your steps. For instance in a recent workflow I have seperate states for:

  • File modification
  • Git Add
  • Git Commit
  • Git Push
  • Git Fetch
  • Git rev-parse This really just helps with debugging and "redriving" where you need to quickly send all of a state back to another state to patch them up, esp when developing flows and handling errors.
ExecFunc

This is the meat of the work. The general contract is that it is called with:

  • AC - execution (app) context
  • OC - overall context
  • JC - app specific context

And returns:

  • JC - updated with the state mutations that state did
  • NewState - the next state you want to go to with this job. It's totaly fine to go to the same state or previous states. On errors I usually come back to the same state which is basically a retry, or go to a seperate terminal state for specific errors (this is good because it allows you to easily redrive by editing the state file with find and replace).
  • optional []KickRequests - these are requests to span new jobs. This is how you fork a workflow. For instance if you get data from a table and you want to fire a lot of S3 fetches, use this. it's fine if you take the job that kicked everythign else and send it to a termainal state and do all the other work, or just re-use it as the first of many. Kicks will get a job ID that is ${parent_id}->${new_seq}.
  • error - This is logged on the job by state and will eventually have logic for retries and termination if there are too many

StatusListener

You can use a nil one but I hook this up to a hash of progress bars per state to show my status.

You have to have one cause I'm too lazy to deal with nil.

Serializer

I reallly recommend you use one, there's a JsonSerializer provided, just new it up. This lets you very easily kill and restart processing of the workflow constantly or at any time. It also lets you re-hydrate old workflows and report on them.

If you really don't want to use one then there's a NilSerializer you can use.

Processor

This does all the work, new one up with a app context and set of states and then exec a run with it. It'll block until it finishes calling to the ExecFunctions, Serializer, and StatusListener as needed.

Other Notes

This is super alpha software. I am point releasing it every breaking change at the v0.0.x level.

There are several places where I just log.Fatal if the app is in an invalid state especially if a unknown state string is found. The nice thing is the app tries to checkpoint the run file state every job completion, so you usually lose little information.

State saving: about that, work is definitely queueing up before it gets serialized. I need to optimize this better with batching so I'm not doing as many expensive saves.

Throughput: speaking of throughput, there's definitely room for improvement here. Too much work is getting queued up for the processors. Too much work is stacking up on the return queue.

Metrics: I'd really like to get metrics around how long states are taking, how many are executing on average, and where the bottle necks are. I think that many of the states can be dynamically adjusted on concurrency for optimal performance (when you do memory or disk or cpu heavy jobs).

No adaptive rate limiting or rate limit error handling. Many apis tell you when you're hitting rate limits, need a way to message that back to the stae procssor so you don't have to manually dial in rate limits and can just let the system adapt.

It uses slog, but doesn't setup a default logger, you can fix this by creating a file logger. It's VERY spammy if you don't.

Testing and refactoring is needed. It's getting better but testing a system like this is complex, and I need to pull some of the major functions into their own functions so I can test a lot of the edge cases without firing up a big job.

Documentation

Index

Constants

View Source
const (
	TRIGGER_STATE_NEW = "new"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job[JC any] struct {
	Id          string              // Id is a unique identifier for the job
	C           JC                  // C holds the job specific context
	State       string              // State represents the current processing state of the job
	StateErrors map[string][]string // StateErrors is a map of errors that occurred in the current state
	LastUpdate  *time.Time          // The last time this job was fetched
}

Job represents the current processing state of any job

func (Job[JC]) UpdateLastEvent added in v0.0.7

func (j Job[JC]) UpdateLastEvent() Job[JC]

UpdateLastEvent updates the LastUpdate field of the Job struct to the current time.

type JsonSerializer

type JsonSerializer[OC any, JC any] struct {
	File string
}

JsonSerializer is a struct that implements Serializer and stores and loads run from a file specified in the File field, there is a anonymous variable type check

func NewJsonSerializer

func NewJsonSerializer[OC any, JC any](file string) *JsonSerializer[OC, JC]

NewJsonSerializer create a new instance of the JsonSerializer struct. It takes a single argument `file` of type string, which represents the file path where the serialized run data will be stored or loaded from.

func (JsonSerializer[OC, JC]) Deserialize

func (js JsonSerializer[OC, JC]) Deserialize() (*Run[OC, JC], error)

Deserialize reads the serialized Run[OC, JC] data from the file specified when creating the JsonSerializer instance, deserializes the JSON data into a Run[OC, JC] instance, and returns the deserialized Run instance.

If any error occurs during the process, such as opening the file or decoding the JSON data, the function returns a zero-value Run[OC, JC] instance and the error.

Returns:

Run[OC, JC]: The deserialized Run instance.
error: An error value if the deserialization or file reading operation fails, otherwise nil.

func (JsonSerializer[OC, JC]) Serialize

func (js JsonSerializer[OC, JC]) Serialize(run Run[OC, JC]) error

Serialize takes a Run[OC, JC] instance and serializes it to JSON format, writing the serialized data to the file specified when creating the JsonSerializer instance. It creates the parent directory for the file if it doesn't exist, and creates the file if it doesn't exist.

If any error occurs during the process, such as creating the directory, creating the file, or encoding the Run instance, the function returns the error.

Parameters:

run Run[OC, JC]: The Run instance to be serialized.

Returns:

error: An error value if the serialization or file writing operation fails, otherwise nil.

type KickRequest

type KickRequest[JC any] struct {
	C     JC
	State string
}

KickRequest struct is a job context with a requested state that the framework will expand into an actual job

type NilSerializer

type NilSerializer[OC any, JC any] struct {
}

NilSerializer implements the Serializer interface with no-op implementations of the Serialize and Deserialize methods. It is useful when you don't need to persist or load Run instances, and is used as the default by NewProcessor if you don't specify one

func (*NilSerializer[OC, JC]) Deserialize

func (n *NilSerializer[OC, JC]) Deserialize() (*Run[OC, JC], error)

Deserialize is a no-op implementation that panics with a "not implemented" message. It satisfies the Serializer interface's Deserialize method requirement, but it should never be called in practice when using the NilSerializer.

func (*NilSerializer[OC, JC]) Serialize

func (n *NilSerializer[OC, JC]) Serialize(run Run[OC, JC]) error

Serialize is a no-op implementation that does nothing and always returns nil. It satisfies the Serializer interface's Serialize method requirement.

type NilStatusListener

type NilStatusListener struct {
}

NilStatusListener is a struct that implements the StatusListener interface with a no-op implementation of the StatusUpdate method. It is useful when you don't need to receive status updates or when you want to use a dummy status listener.

func (NilStatusListener) StatusUpdate

func (n NilStatusListener) StatusUpdate(status []StatusCount)

StatusUpdate is a no-op implementation that does nothing. It satisfies the StatusListener interface's StatusUpdate method requirement.

type Processor

type Processor[AC any, OC any, JC any] struct {
	// contains filtered or unexported fields
}

Processor executes a job

func NewProcessor

func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) (*Processor[AC, OC, JC], error)

func (*Processor[AC, OC, JC]) Exec

func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error

Exec this big work function, this does all the crunching

type Return

type Return[JC any] struct {
	PriorState   string
	Job          Job[JC]
	KickRequests []KickRequest[JC]
}

Return is a struct that contains a job and a list of kick requests that is used for returning job updates to the system

type Run

type Run[OC any, JC any] struct {
	Name    string             // Name of the run
	Jobs    map[string]Job[JC] // Map of jobs, where keys are job ids and values are Job states
	Overall OC                 // Overall overall state that is usful to all jobs, basically context for the overall batch
	// contains filtered or unexported fields
}

Run is basically the overall state of a given run (batch) in the processing framework it's meant to be re-entrant, eg if you kill the processor and you have a serializaer, you can restart using it at any time

func NewRun

func NewRun[OC any, JC any](name string, oc OC) *Run[OC, JC]

NewRun creates a new Run instance with the given name and overall context

Use the overall context to store any state that all of the jobs will want access to instead of storing it in the specific JobContexts

func (*Run[OC, JC]) AddJob

func (r *Run[OC, JC]) AddJob(jc JC)

Add a job to the pool, this shouldn't be called once it's running

func (*Run[OC, JC]) AddJobWithState added in v0.0.9

func (r *Run[OC, JC]) AddJobWithState(jc JC, state string)

func (*Run[OC, JC]) Equal added in v0.0.10

func (r *Run[OC, JC]) Equal(r2 *Run[OC, JC]) bool

func (*Run[OC, JC]) Init added in v0.0.7

func (r *Run[OC, JC]) Init()

func (*Run[OC, JC]) UpdateJob added in v0.0.10

func (r *Run[OC, JC]) UpdateJob(j Job[JC])

type Serializer

type Serializer[OC any, JC any] interface {
	Serialize(r Run[OC, JC]) error
	Deserialize() (*Run[OC, JC], error)
}

Serializer is an interface for job run seralization

type State

type State[AC any, OC any, JC any] struct {
	// TriggerState is the string identifier for this state.
	TriggerState string

	// Exec is a function that executes the logic for jobs in this state.
	// It takes the application context (AC), overall context (OC), and job context (JC) as input,
	// and returns the updated job context (JC), the next state string,
	// a slice of kick requests ([]KickRequest[JC]) for triggering other jobs,
	// and an error (if any).
	Exec func(ctx context.Context, ac AC, oc OC, jc JC) (JC, string, []KickRequest[JC], error)

	// Terminal indicates whether this state is a terminal state,
	// meaning that no further state transitions should occur after reaching this state.
	Terminal bool

	// Concurrency specifies the maximum number of concurrent executions allowed for this state.
	Concurrency int

	// RateLimit is an optional rate limiter for controlling the execution rate of this state. Useful when calling rate limited apis.
	RateLimit *rate.Limiter
}

State represents a state in a state machine for job processing. It defines the behavior and configuration for a particular state.

type StateExec added in v0.0.9

type StateExec[AC any, OC any, JC any] struct {
	// contains filtered or unexported fields
}

func (*StateExec[AC, OC, JC]) Run added in v0.0.9

func (s *StateExec[AC, OC, JC]) Run()

type StatusCount

type StatusCount struct {
	State     string
	Completed int
	Executing int
	Waiting   int
	Terminal  bool
}

type StatusListener

type StatusListener interface {
	// StatusUpdate is called by the processor to provide an update on the current
	// status of job processing. The `status` parameter is a slice of StatusCount
	// instances, where each instance represents the count of jobs in a particular state.
	//
	// The status counts will be in the same order as the states passed to the processor
	StatusUpdate(status []StatusCount)
}

StatusListener is an interface that defines a method for receiving status updates. It is used by the processor to notify interested parties about the current status of job processing.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL