Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrAlreadyStarted = errors.New("GoFlow is already started") ErrNotStarted = errors.New("GoFlow is not started yet") )
Functions ¶
This section is empty.
Types ¶
type Broker ¶
Broker is an interface that abstracts messaging systems used by GoFlow. It requires two brokers: one for submitting tasks to the worker pool, and another for receiving results from the worker pool.
type GoFlow ¶
type GoFlow struct {
// contains filtered or unexported fields
}
GoFlow is the core structure of the framework. It manages interactions with brokers to send tasks and receive results. GoFlow continually polls the results broker, writing incoming results to the results store.
In local mode, GoFlow also manages the worker pool and task handler registry.
func New ¶
New creates and initializes a new GoFlow instance in distributed mode. It sets up the context for cancellation and configures the necessary components. If no options are provided, default values are used (see defaultOptions()).
For detailed configuration options, see options.go.
func NewLocalMode ¶
NewLocalMode creates and initializes a new GoFlow instance configured for local mode. It sets up a worker pool and task/result brokers with specified sizes for task and result queues. The context is also set up for cancellation, and if no options are provided, default values are used (see defaultOptions()).
For detailed configuration options, see options.go.
func (*GoFlow) Close ¶
Close gracefully shuts down the GoFlow instance. It cancels the context to signal all ongoing operations to stop. If the worker pool is configured, (i.e. local mode) it waits for all workers to complete their tasks and shut down before returning.
func (*GoFlow) GetResult ¶
GetResult retrieves the result associated with the specified task ID. It returns the result and a boolean indicating whether the result was found.
If the task with the given ID has completed, the result will be returned. If the task has not yet completed or does not exist, the boolean will be false.
func (*GoFlow) Push ¶
Push submits a new task with the specified type and payload to the task broker. It creates a task, submits it to the broker, and returns the task's ID.
The task is processed by the worker pool, and the caller can use the returned task ID to retrieve the result later.
func (*GoFlow) RegisterHandler ¶
RegisterHandler registers a task handler for the specified task type. It stores the handler in the taskHandlers store for local mode execution. Handlers can be dynamically registered while the goflow instance is running.
If the GoFlow instance is not running in local mode (i.e., taskHandlers is nil), a warning is logged, and the handler is not registered. In distributed mode, handlers must be pre-registered when compiling the worker pool.
func (*GoFlow) Start ¶
Start initiates the execution of the GoFlow instance. It checks if the worker pool and task handlers are configured (i.e., running in local mode). If so, it starts the worker pool to process tasks submitted through the task broker, which is not necessary in distributed mode.
Additionally, the method launches a goroutine to persist results from the results broker to the results store.
type KVStore ¶
type KVStore[K comparable, V any] interface { // Put stores the value associated with the given key. Put(k K, v V) // Get retrieves the value associated with the given key, returning // the value and a boolean indicating whether the key was found. Get(k K) (V, bool) }
KVStore defines a key-value store interface in the GoFlow framework. It provides methods for storing and retrieving values associated with keys.
Users can implement KVStore to create custom key-value storage solutions as needed. Example implementations could include in-memory, database-backed, or other forms of key-value mappings.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithNumWorkers ¶
WithNumWorkers allows you to set the number of goroutines that will spawn and listen to the task queue. Has no effect if running in distributed mode.
func WithResultQueueBufferSize ¶
WithTaskQueueBufferSize allows you to set the buffer size of the result queue channel. Has no effect if running in distributed mode.
func WithResultsStore ¶
WithResultsStore allows you to inject your own results store. Anything that implements the KVStore interface is viable.
func WithTaskQueueBufferSize ¶
WithTaskQueueBufferSize allows you to set the buffer size of the task queue channel. Has no effect if running in distributed mode.
type WorkerPool ¶
type WorkerPool interface { // Start initializes the worker pool, with workers listening to taskQueue and // submitting results. It should be non-blocking, starting workers in their own // goroutines and returning immediately. The worker pool will run until the context // is canceled. Start( ctx context.Context, taskQueue task.Dequeuer[task.Task], results task.Submitter[task.Result], taskHandlers workerpool.HandlerGetter, ) // AwaitShutdown ensures that all workers complete processing after GoFlow's context // is canceled, allowing for graceful shutdown without leaving hanging goroutines. AwaitShutdown() }
WorkerPool is implemented only when running GoFlow in local mode. In distributed mode, the worker pool is abstracted away from GoFlow by the task and results brokers.