dispatch

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

README

dispatch logo

Test Go Reference Apache 2 License Discord

Go package to develop applications with Dispatch.

What is Dispatch?

Dispatch is a cloud service for developing scalable and reliable applications in Go, including:

  • Event-Driven Architectures
  • Background Jobs
  • Transactional Workflows
  • Multi-Tenant Data Pipelines

Dispatch differs from alternative solutions by allowing developers to write simple Go code: it has a minimal API footprint, which usually only requires wrapping a function (no complex framework to learn), failure recovery is built-in by default for transient errors like rate limits or timeouts, with a zero-configuration model.

To get started, follow the instructions to sign up for Dispatch 🚀.

Installation

Installing the Dispatch CLI

As a pre-requisite, we recommend installing the Dispatch CLI to simplify the configuration and execution of applications that use Dispatch. On macOS, this can be done easily using Homebrew:

brew tap dispatchrun/dispatch
brew install dispatch

Alternatively, you can download the latest dispatch binary from the Releases page.

Note that this step is optional, applications that use Dispatch can run without the CLI, passing configuration through environment variables or directly in the code. However, the CLI automates the onboarding flow and simplifies the configuration, so we recommend starting with it.

Installing the Dispatch SDK

The Go SDK can be added as a dependency using:

go get github.com/dispatchrun/dispatch-go@latest

If you're starting fresh, don't forget to run go mod init first (e.g. go mod init dispatch-example).

Usage

Writing Dispatch Applications

The following snippet shows how to write a simple Dispatch application that does the following:

  1. declare a Dispatch function named greet which can run asynchronously
  2. start a Dispatch endpoint to handle function calls
  3. schedule a call to greet with the argument World
# main.go
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/dispatchrun/dispatch-go"
)

func main() {
    greet := dispatch.Func("greet", func(ctx context.Context, msg string) (any, error) {
        fmt.Printf("Hello, %s!\n", msg)
        return nil, nil
    })

    endpoint, err := dispatch.New(greet)
    if err != nil {
        log.Fatal(err)
    }

    go func() {
        if _, err := greet.Dispatch(context.Background(), "World"); err != nil {
            log.Fatal(err)
        }
    }()

    if err := endpoint.ListenAndServe(); err != nil {
        log.Fatal(err)
    }
}

Obviously, this is just an example, a real application would perform much more interesting work, but it's a good start to get a sense of how to use Dispatch.

Running Dispatch Applications

The simplest way to run a Dispatch application is to use the Dispatch CLI, first we need to login:

dispatch login

Then we are ready to run the example program we wrote above:

dispatch run -- go run main.go

Writing Transactional Applications with Dispatch

Dispatch functions are coroutines that can be suspended and resumed at await points. The await points are durability checkpoints; if a function fails midway through execution, it can be retried automatically from these checkpoints.

pipeline := dispatch.Func("pipeline", func (ctx context.Context, msg string) (string, error) {
    // Each await point is a durability step, the functions can be run across the
    // fleet of service instances and retried as needed without losing track of
    // progress through the function execution.
    msg, _ = transform1.Await(ctx, msg)
    msg, _ = transform2.Await(ctx, msg)
    return publish.Await(ctx, msg)
})

publish := dispatch.Func("publish", func (ctx context.Context, msg string) (*dispatchhttp.Response, error) {
    // Each dispatch function runs concurrently to the others, even if it does
    // blocking operations like this POST request, it does not prevent other
    // concurrent operations from carrying on in the program.
    return dispatchhttp.Post("https://somewhere.com/", bytes.NewBufferString(msg))
})

transform1 := dispatch.Func("transform1", func (ctx context.Context, msg string) (string, error) {
    // ...
})

transform2 := dispatch.Func("transform2", func (ctx context.Context, msg string) (string, error) {
    // ...
})

This model is composable and can be used to create fan-out/fan-in control flows. gather can be used to wait on multiple concurrent calls:

process := dispatch.Func("process", func (ctx context.Context, msgs []string) ([]string, error) {
    // Transform messages concurrently and await the results.
    return transform.Gather(ctx, msgs)
})

transform := dispatch.Func("transform", func (ctx context.Context, msg string) (string, error) {
    // ...
})

Dispatch converts Go functions into Distributed Coroutines, which can be suspended and resumed on any instance of a service across a fleet. For a deep dive on these concepts, read our Distributed Coroutines blog post.

Integration with HTTP servers

Dispatch can be integrated into an existing HTTP server.

In the example below, a request to / triggers an asynchronous call to the greet function:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"

    "github.com/dispatchrun/dispatch-go"
)

func main() {
    greet := dispatch.Func("greet", func(ctx context.Context, msg string) (any, error) {
        fmt.Printf("Hello, %s!\n", msg)
        return nil, nil
    })

    endpoint, err := dispatch.New(greet)
    if err != nil {
        log.Fatal(err)
    }

    mux := http.NewServeMux()

    mux.Handle(endpoint.Handler())

    mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        greet.Dispatch(r.Context(), "World")
        w.WriteHeader(http.StatusOK)
    }))

    if err := http.ListenAndServe("localhost:8000", mux); err != nil {
        log.Fatal(err)
    }
}

Configuration

The Dispatch CLI automatically configures the SDK, so manual configuration is usually not required when running Dispatch applications. However, in some advanced cases, it might be useful to explicitly set configuration options.

In order for Dispatch to interact with functions remotely, the SDK needs to be configured with the address at which the server can be reached. The Dispatch API Key must also be set, and optionally, a public signing key should be configured to verify that requests originated from Dispatch. These configuration options can be passed as arguments to the the Dispatch constructor, but by default they will be loaded from environment variables:

Environment Variable Value Example
DISPATCH_API_KEY d4caSl21a5wdx5AxMjdaMeWehaIyXVnN
DISPATCH_ENDPOINT_URL https://service.domain.com
DISPATCH_VERIFICATION_KEY -----BEGIN PUBLIC KEY-----...

Serialization

Inputs & Outputs

Dispatch uses protobuf to serialize input and output values.

The inputs and outputs must either be primitive values, list or maps of primitive values, or have a type that implements one of the following interfaces:

  • encoding.TextMarshaler
  • encoding.BinaryMarshaler
  • json.Marshaler
  • proto.Message
Coroutine State

Dispatch uses the coroutine library to serialize coroutines.

The user must ensure that the contents of their stack frames are serializable.

For help with a serialization issues, please submit a GitHub issue.

Examples

Check out the examples directory for code samples to help you get started with the SDK.

Contributing

Contributions are always welcome! Would you spot a typo or anything that needs to be improved, feel free to send a pull request.

Pull requests need to pass all CI checks before getting merged. Anything that isn't a straightforward change would benefit from being discussed in an issue before submitting a change.

Remember to be respectful and open minded!

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTimeout indicates an operation failed due to a timeout.
	ErrTimeout error = dispatchproto.StatusError(dispatchproto.TimeoutStatus)

	// ErrTimeout indicates an operation failed due to throttling.
	ErrThrottled error = dispatchproto.StatusError(dispatchproto.ThrottledStatus)

	// ErrInvalidArgument indicates an operation failed due to an invalid argument.
	ErrInvalidArgument error = dispatchproto.StatusError(dispatchproto.InvalidArgumentStatus)

	// ErrInvalidResponse indicates an operation failed due to an invalid response.
	ErrInvalidResponse error = dispatchproto.StatusError(dispatchproto.InvalidResponseStatus)

	// ErrTemporary indicates an operation failed with a temporary error.
	ErrTemporary error = dispatchproto.StatusError(dispatchproto.TemporaryErrorStatus)

	// ErrPermanent indicates an operation failed with a permanent error.
	ErrPermanent error = dispatchproto.StatusError(dispatchproto.PermanentErrorStatus)

	// ErrIncompatibleStatus indicates that a function's serialized state is incompatible.
	ErrIncompatibleState error = dispatchproto.StatusError(dispatchproto.IncompatibleStateStatus)

	// ErrDNS indicates an operation failed with a DNS error.
	ErrDNS error = dispatchproto.StatusError(dispatchproto.DNSErrorStatus)

	// ErrTCP indicates an operation failed with a TCP error.
	ErrTCP error = dispatchproto.StatusError(dispatchproto.TCPErrorStatus)

	// ErrTLS indicates an operation failed with a TLS error.
	ErrTLS error = dispatchproto.StatusError(dispatchproto.TLSErrorStatus)

	// ErrHTTP indicates an operation failed with a HTTP error.
	ErrHTTP error = dispatchproto.StatusError(dispatchproto.HTTPErrorStatus)

	// ErrUnauthenticated indicates an operation failed or was not attempted
	// because the caller did not authenticate correctly.
	ErrUnauthenticated error = dispatchproto.StatusError(dispatchproto.UnauthenticatedStatus)

	// ErrPermissionDenied indicates an operation failed or was not attempted
	// because the caller did not have permission.
	ErrPermissionDenied error = dispatchproto.StatusError(dispatchproto.PermissionDeniedStatus)

	// ErrNotFound indicates an operation failed because a resource could not be found.
	ErrNotFound error = dispatchproto.StatusError(dispatchproto.NotFoundStatus)
)

Functions

This section is empty.

Types

type AnyFunction

type AnyFunction interface {
	Option

	Register(*Dispatch) (string, dispatchproto.Function)
}

AnyFunction is a Function[I, O] instance.

type Dispatch

type Dispatch struct {
	// contains filtered or unexported fields
}

Dispatch is a Dispatch endpoint.

func New

func New(opts ...Option) (*Dispatch, error)

New creates a Dispatch endpoint.

func (*Dispatch) Client

func (d *Dispatch) Client() (*dispatchclient.Client, error)

Client returns the Client attached to this endpoint.

func (*Dispatch) Handler

func (d *Dispatch) Handler() (string, http.Handler)

Handler returns an HTTP handler for Dispatch, along with the path that the handler should be registered at.

func (*Dispatch) ListenAndServe

func (d *Dispatch) ListenAndServe() error

ListenAndServe serves the Dispatch endpoint.

func (*Dispatch) Register

func (d *Dispatch) Register(fn AnyFunction)

Register registers a function.

func (*Dispatch) RegisterPrimitive

func (d *Dispatch) RegisterPrimitive(name string, fn dispatchproto.Function)

RegisterPrimitive registers a primitive function.

func (*Dispatch) URL

func (d *Dispatch) URL() string

URL is the URL of the Dispatch endpoint.

type Function

type Function[I, O any] struct {
	// contains filtered or unexported fields
}

Function is a Dispatch Function.

func Func

func Func[I, O any](name string, fn func(context.Context, I) (O, error)) *Function[I, O]

Func creates a Function.

func (*Function[I, O]) Await

func (f *Function[I, O]) Await(input I, opts ...dispatchproto.CallOption) (O, error)

Await calls the function and awaits a result.

Await should only be called within a Dispatch Function (created via Func).

func (*Function[I, O]) BuildCall

func (f *Function[I, O]) BuildCall(input I, opts ...dispatchproto.CallOption) (dispatchproto.Call, error)

BuildCall creates (but does not dispatch) a Call for the function.

func (*Function[I, O]) Dispatch

func (f *Function[I, O]) Dispatch(ctx context.Context, input I, opts ...dispatchproto.CallOption) (dispatchproto.ID, error)

Dispatch dispatches a Call to the function.

func (*Function[I, O]) Gather

func (f *Function[I, O]) Gather(inputs []I, opts ...dispatchproto.CallOption) ([]O, error)

Gather makes many concurrent calls to the function and awaits the results.

Gather should only be called within a Dispatch Function (created via Func).

func (*Function[I, O]) Name

func (f *Function[I, O]) Name() string

Name is the name of the function.

func (*Function[I, O]) Register

func (f *Function[I, O]) Register(endpoint *Dispatch) (string, dispatchproto.Function)

Register is called when the function is registered on a Dispatch endpoint.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option configures a Dispatch endpoint.

func Client

func Client(client *dispatchclient.Client) Option

Client sets the client to use when dispatching calls from functions registered on the endpoint.

By default the Dispatch endpoint will attempt to construct a dispatchclient.Client instance using the DISPATCH_API_KEY and optional DISPATCH_API_URL environment variables. If more control is required over client configuration, the custom client instance can be registered here and used instead.

func EndpointUrl

func EndpointUrl(endpointUrl string) Option

EndpointUrl sets the URL of the Dispatch endpoint.

It defaults to the value of the DISPATCH_ENDPOINT_URL environment variable.

func Env

func Env(env ...string) Option

Env sets the environment variables that a Dispatch endpoint parses its default configuration from.

It defaults to os.Environ().

func ServeAddress

func ServeAddress(addr string) Option

ServeAddress sets the address that the Dispatch endpoint is served on (see Dispatch.Serve).

Note that this is not the same as the endpoint URL, which is the URL that this Dispatch endpoint is publicly accessible from.

It defaults to the value of the DISPATCH_ENDPOINT_ADDR environment variable, which is automatically set by the Dispatch CLI. If this is unset, it defaults to 127.0.0.1:8000.

func VerificationKey

func VerificationKey(verificationKey string) Option

VerificationKey sets the verification key to use when verifying Dispatch request signatures.

The key should be a PEM or base64-encoded ed25519 public key.

It defaults to the value of the DISPATCH_VERIFICATION_KEY environment variable value.

If a verification key is not provided, request signatures will not be validated.

Directories

Path Synopsis
examples
internal
env

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL