loader

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2021 License: BSD-3-Clause, MIT, MIT-0, + 1 more Imports: 4 Imported by: 0

README

loader - concurrent dependency graph solver

go get -u "tawesoft.co.uk/go"
import "tawesoft.co.uk/go/loader"
Links License Stable?
homedocssrc MIT no

About

Package loader implements the ability to define a graph of tasks and dependencies, classes of synchronous and concurrent workers, and limiting strategies, and solve the graph incrementally or totally.

For example, this could be used to implement a loading screen for a computer game with a progress bar that updates in real time, with images being decoded concurrently with files being loaded from disk, and synchronised with the main thread for safe OpenGL operations such as creating texture objects on the GPU.

While this package is generally suitable for use in real world applications, we are waiting to get some experience with how it works for us in an internal application before polishing or committing to a stable API.

TODO: doesn't yet free temporary results

TODO: refactor the load loop to always send/receive at the same time

TODO: clean up generally

TODO: not decided about the API for Loader.Result (but loader.MustResult is ok)

TODO: a step to simplify the DAG to remove passthrough loader.NamedTask steps

Examples

Configure the Loader with a Strategy to limit concurrent connections per host

package main

import (
    "fmt"
    "math/rand"
    "net/url"
    "runtime"
    "strings"
    "time"

    "tawesoft.co.uk/go/loader"
)

// interactive, if true, means we display a progress in real time.
// If false, we block until everything has loaded
const interactive = true

// NetStrategy is a loader.Strategy for limiting the number of concurrent
// connections to a single host
type NetStrategy struct {
    // Limit concurrent connections to a single host
    // Firefox uses 8, Chrome uses 6.
    MaxConcurrentConnectionsPerHost int

    // A count of concurrent connections by hostname
    Hosts map[string]int
}

// Start returns true if the task may start. Checks the current connections by
// hostname to see if it exceeds the limit or not.
func (s *NetStrategy) Start(info interface{}) bool {
    name := info.(string)
    count, _ := s.Hosts[name]

    if count >= s.MaxConcurrentConnectionsPerHost {
        fmt.Printf("Temporarily delaying connection to %s due to too many connections to host\n", name)
        return false
    }

    s.Hosts[name] = count + 1
    return true
}

// End indicates the task has completed, so we no longer have to count it
// towards the limit.
func (s *NetStrategy) End(info interface{}) {
    name := info.(string)
    count := s.Hosts[name]
    s.Hosts[name] = count - 1
}

func init() {
    rand.Seed(time.Now().UnixNano())
}

func main() {
    ldr := loader.New()

    // Initialise a NetStrategy as a method of limiting concurrent connections
    // to a single host. For our example, 2 concurrent connections.
    netStrategy := &NetStrategy{
        MaxConcurrentConnectionsPerHost: 2, // (Chrome uses 6, Firefox uses 8)
        Hosts: make(map[string]int),
    }

    // Define consumerNet on the loader as a class of worker for network files.
    // Allows up to 5 simultaneous downloads (Firefox uses 256!) but the
    // strategy will limit concurrent connections to a single host.
    consumerNet := ldr.NewConsumer(5, netStrategy)

    // Define consumerCPU on the loader as a class of worker for CPU-bound
    // tasks.
    consumerCPU := ldr.NewConsumer(runtime.NumCPU(), nil)

    // A helper function that returns a loader.Task for downloading a file
    // concurrently with consumerNet
    loadNet := func(path string) loader.Task {
        u, err := url.Parse(path)
        if err != nil { panic(err) }
        hostname := u.Hostname()

        return loader.Task{
            // Info is used by the consumer's netStrategy
            Info: func() interface{} {
                return hostname
            },
            Consumer: consumerNet,

            Load: func(_ ... interface{}) (interface{}, error) {
                // pretend to read a file
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(400)))
                return fmt.Sprintf("I am network file %s!", path), nil
            },
        }
    }

    // A helper function that returns a loader.Task that does something with
    // its subtasks
    loadService := func(name string, tasks ... loader.Task) loader.Task {
        return loader.Task{
            Name: name,
            Keep: true,
            RequiresDirect: tasks,
            Consumer: consumerCPU,

            Load: func(inputs ... interface{}) (interface{}, error) {
                inputStrings := make([]string, 0)
                for _, input := range inputs {
                    inputStrings = append(inputStrings, input.(string))
                }
                result := fmt.Sprintf("I'm task %s and I have the following inputs: %s",
                    name, strings.Join(inputStrings, ", "))
                return result, nil
            },
        }
    }

    tasks := []loader.Task{
        // load files off the internet, with limits on multiple connections to
        // a single host, and do something with the results
        loadService("example.net API",
            loadNet("https://www.example.net/products.json"),
            loadNet("https://www.example.net/servers.json"),
            loadNet("https://www.example.net/news.json"),
        ),
        loadService("anotherhost API",
            loadNet("https://anotherhost.example.org/friends.json"),
            loadNet("https://anotherhost.example.org/recommendations.json"),
            loadNet("https://anotherhost.example.org/notifications.json"),
        ),
    }

    ldr.Add(tasks)

    // We can either load incrementally with a realtime progress bar
    if interactive {
        lastComplete := -1
        for {
            progress, err := ldr.Load(50 * time.Millisecond)
            if err != nil {
                fmt.Printf("Load error: %s\n", err)
                break
            }

            if progress.Completed != lastComplete {
                lastComplete = progress.Completed
                fmt.Printf("Progress: %d/%d\n", progress.Completed, progress.Total)
            }

            if progress.Done { break }

            time.Sleep(16 * time.Millisecond)
        }

    // Or just block until everything has finished loading
    } else {
        ldr.LoadAll()
    }

    // Get results
    fmt.Println(ldr.MustResult("example.net API"))
    fmt.Println(ldr.MustResult("anotherhost API"))
}

Getting Help

This package is part of tawesoft.co.uk/go, a monorepo for small Go modules maintained by Tawesoft®. Check out that URL for more information about other Go modules from Tawesoft plus community and commercial support options.

Documentation

Overview

Package loader implements the ability to define a graph of tasks and dependencies, classes of synchronous and concurrent workers, and limiting strategies, and solve the graph incrementally or totally.

For example, this could be used to implement a loading screen for a computer game with a progress bar that updates in real time, with images being decoded concurrently with files being loaded from disk, and synchronised with the main thread for safe OpenGL operations such as creating texture objects on the GPU.

While this package is generally suitable for use in real world applications, we are waiting to get some experience with how it works for us in an internal application before polishing or committing to a stable API.

TODO: doesn't yet free temporary results

TODO: refactor the load loop to always send/receive at the same time

TODO: clean up generally

TODO: not decided about the API for Loader.Result (but loader.MustResult is ok)

TODO: a step to simplify the DAG to remove passthrough loader.NamedTask steps

Examples

Configure the Loader with a Strategy to limit concurrent connections per host

https://www.tawesoft.co.uk/go/doc/loader/examples/limit-connections-per-host/

Package Information

License: MIT (see LICENSE.txt)

Stable: no

For more information, documentation, source code, examples, support, links, etc. please see https://www.tawesoft.co.uk/go and https://www.tawesoft.co.uk/go/loader

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerID

type ConsumerID int

ConsumerID uniquely identifies a consumer in a given Loader.

See the NewConsumer() method on the Loader type.

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

Loader is used to manage a graph of Task items to be completed synchronously or concurrently with different types of work divided among a set of Consumer objects.

func New

func New() *Loader

Returns a new Loader

func (*Loader) Add

func (l *Loader) Add(tasks []Task) error

Add adds a graph of Tasks to the Loader to later process with its Load() or LoadAll() methods. Tasks may be added even during or after a Load() loop.

Because Task names, even at the top level of the array, are scoped to this function, Two Tasks across a Loader Add() boundary cannot refer to each other by name. If this behaviour is desired, append to a Task array and send the combined array in one call to Add.

An error is generated if a named Task requirement is not in scope. In this event, the state of the task dependency graph is undefined and no methods on the Loader, other than Close, may be called.

func (*Loader) Close

func (l *Loader) Close()

Close TODO

func (*Loader) Load

func (l *Loader) Load(budget time.Duration) (Progress, error)

Load completes as many loading tasks as possible within the time budget. If idle while waiting for concurrent results, it may return early.

See also the LoadAll() method.

func (*Loader) LoadAll

func (l *Loader) LoadAll() (Progress, error)

LoadAll completes all loading tasks and blocks until finished

func (*Loader) MustResult

func (l *Loader) MustResult(name string) interface{}

func (*Loader) NewConsumer

func (l *Loader) NewConsumer(concurrency int, strategy Strategy) ConsumerID

NewConsumer creates a task consumer that performs the (possibly asynchronous) completion of tasks at a given level of concurrency (e.g. number of goroutines) and returns an opaque ID that uniquely identifies that consumer with the active Loader.

A concurrency of zero means that the consumer's tasks will be performed sequentially on the same thread as the Loader's Load() or LoadAll() methods.

The strategy argument allows control over temporarily delaying a task. Strategy may be nil to always accept.

The special ConsumerID of zero corresponds to a default builtin consumer that has a concurrency of zero and a nil strategy.

func (*Loader) Result

func (l *Loader) Result(name string) (workResult, bool)

Returns a named result from a Task where Keep is true and the Name is unique across all Tasks where Keep is True.

type Progress

type Progress struct {
	Completed int
	Remaining int
	Total     int
	Done      bool
}

Progress holds the result of a Loader's Load or LoadAll methods and represents progress in completing all Task items.

type Strategy

type Strategy interface {
	// Start takes the result of a task Info() and returns true if the task is
	// accepted, or false if the task must be temporarily delayed.
	Start(info interface{}) bool

	// End takes the result of a task Info() and registers that the task has
	// completed processing.
	End(info interface{})
}

Strategy allows temporarily delaying of a task based on other currently progressing tasks e.g. see the examples folder for an implementation that avoids multiple concurrent connections to a single host.

Note: do not share the same instance of a Strategy across two consumers without making it thread safe e.g. use of mutexes.

Note: the Strategy must give a constant answer for the same sequence of Start and End methods and arguments i.e. must not depend on an external factor such as time or user input but must only depend on some innate property of the currently accepted tasks.

Note: if not used carefully, a Strategy, or the interaction between two Strategies, may lead to deadlocks. This may be the case if there is any construction of a Task dependency graph (considering only those Tasks that may be delayed by a Strategy), or any subgraph thereof formidable by removing leaf nodes, where the Strategy's lower bound on the number of simultaneous tasks is less than or equal to the number of leaf nodes minus the number of vertex disjoint subgraphs of that graph.

type Task

type Task struct {
	// Optional name of the task. Used to reference a task as a dependency of
	// another and to retrieve its result, if kept. Does not have to be unique
	// (it is scoped to its subtasks and successor siblings in one call to a
	// Loader Add method), unless it is kept (see the Keep field, below).
	Name string

	// Keep indicates that the task's Load() result will be available from
	// the Loader Result() method by the task Name. If Keep is true, the
	// task's Name must be globally unique across all Tasks kept by the loader.
	// If Keep is false, the task's name does not have to be unique, even if a
	// kept Task has the same name.
	Keep bool

	// Load performs the (possibly asynchronous) completion of the task e.g.
	// reading a file from disk, a unit of computation, etc.
	//
	// The results argument is the ordered results of the tasks in
	// RequiresNamed (if any) followed by the ordered results of the tasks in
	// RequiresDirect (if any).
	Load func(results ...interface{}) (interface{}, error)

	// Free performs the (possibly asynchronous) removal of a task's Load()
	// result e.g. releasing memory. May be nil.
	Free func(i interface{})

	// RequiresNamed is an array of names of tasks required to complete first
	// as a dependency of this task. May be nil.
	//
	// Note that a required named task must be defined before a task can depend
	// on it (e.g. by appearing earlier in the array passed to Loader Add()).
	RequiresNamed []string

	// RequiresDirect is an array of tasks required to complete first as a
	// dependency of this task. May be nil. These "subtasks" are in a new scope
	// for naming purposes.
	RequiresDirect []Task

	// Consumer performs the asynchronous completion of tasks at a given level
	// of concurrency. Use an ID returned by a Loader Consumer() method. May be
	// zero, in which case the task is completed in the same thread as the
	// caller Loader's Load() or LoadAll() methods.
	Consumer ConsumerID

	// Info returns some value for a task understood by the given Strategy.
	// May be nil. Must be a constant function i.e. always return the same
	// value for a given Task.
	Info func() interface{}
}

Task is a (possibly recursively nested) unit of work that is to be performed (possibly concurrently), subject to dependencies and limits.

func NamedTask

func NamedTask(name string) Task

NamedTask is used to reference another task by name as a subtask i.e. in a Task's RequiresDirect instead of RequiresNamed.

TODO NamedTasks should be simplified in the DAG to remove the node entirely.

Directories

Path Synopsis
examples
dev
Configure the loader to limit concurrent connections per host
Configure the loader to limit concurrent connections per host
limit-connections-per-host
Configure the Loader with a Strategy to limit concurrent connections per host
Configure the Loader with a Strategy to limit concurrent connections per host

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL