goflow

package module
v0.0.0-...-4355fa8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2024 License: MIT Imports: 8 Imported by: 0

README

goflow - A Distributed Task Queue for Go

Build Status Go Report Card codecov

GoFlow is a scalable and flexible framework for task orchestration, supporting both local and distributed execution modes. It enables efficient task processing through worker pools, customizable task handlers, and pluggable brokers for seamless task and result communication.

There are three ways to utilize GoFlow, catering to different levels of complexity:

  1. Local Mode (as a library) The simplest method, integrating GoFlow directly into your Go application. This sets up an in-process worker pool, which you interact with via the GoFlow object to send tasks and retrieve results.

  2. Distributed Mode (as a library) Deploy GoFlow in a distributed setup by using a message broker to mediate communication between your application and worker pools. Your application sends tasks and receives results via the GoFlow object, while the worker pools are configured to interact with your chosen message broker.

  3. Distributed Mode (via gRPC client) Deploy GoFlow on a Kubernetes cluster using the CLI. In this mode, you interact with GoFlow using a lightweight gRPC client, eliminating the need to include the full GoFlow library in your application.

Getting Started

Prerequisites
  • Golang (>=1.23)
Local Mode

To use GoFlow as an embedded process in your application, install it via go get:

go get github.com/jamesTait-jt/goflow

This will add GoFlow as a dependency to your project. You can then create a GoFlow object in your code:

// Create an in memory store to keep track of custom handlers
taskHandlerStore := store.NewInMemoryKVStore[string, task.Handler]()

// Inject the store into the GoFlow object
gf := goflow.NewLocalMode(taskHandlerStore)

By default, this initializes the GoFlow object with standard configuration (see options.go). You can customize these settings using functional options, such as WithResultsStore to define a custom results store or WithNumWorkers to set the number of worker goroutines. Once configured, the GoFlow object is ready to start:

if err := gf.Start(); err != nil {
    // Handle the error
}

This starts the worker pool to process tasks. When you're finished, gracefully shut down GoFlow using the Close method:

if err := gf.Close(); err != nil {
    // Handle the error
}

This stops the worker pool and closes any open resources.

Task handlers

In GoFlow, task handlers are functions that process tasks submitted to the framework. A task handler takes a payload (of type any) and returns a task.Result, which contains the result of processing the task. Task handlers are registered to specific task types, allowing GoFlow to route tasks to the appropriate handler when processed.

Below is an example that demonstrates how to define and register a task handler in GoFlow. The task handler will copy the payload sent on the task into the result payload:

repeater := func(payload any) task.Result {
    return task.Result{Payload: payload)}
}

To ensure GoFlow uses the correct handler for a given task type, we register the handler with a specific task type. In this case, we register the handler for the repeater task type:

taskType := "repeater"
gf.RegisterHandler(taskType, repeater)

Handlers can be registered either before starting GoFlow or dynamically while it is running.

Once the handler is registered, we can push tasks to GoFlow for processing. Each task will be picked up by a worker, which will retrieve the appropriate handler from the registry to process the task.

taskID, err := gf.Push("repeater", "Hello, GoFlow!")
if err != nil {
    // Handle error
}

GoFlow assigns a unique task ID and returns it so you can later retrieve the result.

result, ok, err := gf.GetResult(taskIDs[i])
if err != nil {
    // Handle error
}

if !ok {
    // Task did not have a corresponding result (it may not be finished)
}
Results store
Task handler store
Configuration

copy your handlers into minikube

Usage

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyStarted = errors.New("GoFlow is already started")
	ErrNotStarted     = errors.New("GoFlow is not started yet")
)

Functions

This section is empty.

Types

type Broker

type Broker[T task.TaskOrResult] interface {
	task.Submitter[T]
	task.Dequeuer[T]
	AwaitShutdown()
}

Broker is an interface that abstracts messaging systems used by GoFlow. It requires two brokers: one for submitting tasks to the worker pool, and another for receiving results from the worker pool.

type GoFlow

type GoFlow struct {
	// contains filtered or unexported fields
}

GoFlow is the core structure of the framework. It manages interactions with brokers to send tasks and receive results. GoFlow continually polls the results broker, writing incoming results to the results store.

In local mode, GoFlow also manages the worker pool and task handler registry.

func New

func New(taskBroker Broker[task.Task], resultsBroker Broker[task.Result], opts ...Option) *GoFlow

New creates and initializes a new GoFlow instance in distributed mode. It sets up the context for cancellation and configures the necessary components. If no options are provided, default values are used (see defaultOptions()).

For detailed configuration options, see options.go.

func NewLocalMode

func NewLocalMode(
	taskHandlers KVStore[string, task.Handler],
	opts ...Option,
) *GoFlow

NewLocalMode creates and initializes a new GoFlow instance configured for local mode. It sets up a worker pool and task/result brokers with specified sizes for task and result queues. The context is also set up for cancellation, and if no options are provided, default values are used (see defaultOptions()).

For detailed configuration options, see options.go.

func (*GoFlow) Close

func (gf *GoFlow) Close() error

Close gracefully shuts down the GoFlow instance. It cancels the context to signal all ongoing operations to stop. If the worker pool is configured, (i.e. local mode) it waits for all workers to complete their tasks and shut down before returning.

func (*GoFlow) GetResult

func (gf *GoFlow) GetResult(taskID string) (task.Result, bool, error)

GetResult retrieves the result associated with the specified task ID. It returns the result and a boolean indicating whether the result was found.

If the task with the given ID has completed, the result will be returned. If the task has not yet completed or does not exist, the boolean will be false.

func (*GoFlow) Push

func (gf *GoFlow) Push(taskType string, payload any) (string, error)

Push submits a new task with the specified type and payload to the task broker. It creates a task, submits it to the broker, and returns the task's ID.

The task is processed by the worker pool, and the caller can use the returned task ID to retrieve the result later.

func (*GoFlow) RegisterHandler

func (gf *GoFlow) RegisterHandler(taskType string, handler task.Handler)

RegisterHandler registers a task handler for the specified task type. It stores the handler in the taskHandlers store for local mode execution. Handlers can be dynamically registered while the goflow instance is running.

If the GoFlow instance is not running in local mode (i.e., taskHandlers is nil), a warning is logged, and the handler is not registered. In distributed mode, handlers must be pre-registered when compiling the worker pool.

func (*GoFlow) Start

func (gf *GoFlow) Start() error

Start initiates the execution of the GoFlow instance. It checks if the worker pool and task handlers are configured (i.e., running in local mode). If so, it starts the worker pool to process tasks submitted through the task broker, which is not necessary in distributed mode.

Additionally, the method launches a goroutine to persist results from the results broker to the results store.

type KVStore

type KVStore[K comparable, V any] interface {
	// Put stores the value associated with the given key.
	Put(k K, v V)

	// Get retrieves the value associated with the given key, returning
	// the value and a boolean indicating whether the key was found.
	Get(k K) (V, bool)
}

KVStore defines a key-value store interface in the GoFlow framework. It provides methods for storing and retrieving values associated with keys.

Users can implement KVStore to create custom key-value storage solutions as needed. Example implementations could include in-memory, database-backed, or other forms of key-value mappings.

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithNumWorkers

func WithNumWorkers(numWorkers int) Option

WithNumWorkers allows you to set the number of goroutines that will spawn and listen to the task queue. Has no effect if running in distributed mode.

func WithResultQueueBufferSize

func WithResultQueueBufferSize(bufferSize int) Option

WithTaskQueueBufferSize allows you to set the buffer size of the result queue channel. Has no effect if running in distributed mode.

func WithResultsStore

func WithResultsStore(resultsStore KVStore[string, task.Result]) Option

WithResultsStore allows you to inject your own results store. Anything that implements the KVStore interface is viable.

func WithTaskQueueBufferSize

func WithTaskQueueBufferSize(bufferSize int) Option

WithTaskQueueBufferSize allows you to set the buffer size of the task queue channel. Has no effect if running in distributed mode.

type WorkerPool

type WorkerPool interface {
	// Start initializes the worker pool, with workers listening to taskQueue and
	// submitting results. It should be non-blocking, starting workers in their own
	// goroutines and returning immediately. The worker pool will run until the context
	// is canceled.
	Start(
		ctx context.Context,
		taskQueue task.Dequeuer[task.Task],
		results task.Submitter[task.Result],
		taskHandlers workerpool.HandlerGetter,
	)

	// AwaitShutdown ensures that all workers complete processing after GoFlow's context
	// is canceled, allowing for graceful shutdown without leaving hanging goroutines.
	AwaitShutdown()
}

WorkerPool is implemented only when running GoFlow in local mode. In distributed mode, the worker pool is abstracted away from GoFlow by the task and results brokers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL