kewpie

package module
v3.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2023 License: MIT Imports: 13 Imported by: 1

README

Kewpie

Kewpie is a task queue abstraction. It supports pluggable backends with a single common interface.

The format of tasks stored in backends is designed to be simple to write and consume, making it easy to write kewpie libraries in many languages.

The currently supported backends are:

  1. Amazon SQS
  2. PostgreSQL
  3. Memory (designed for testing only)
  4. Google PubSub (in progress, not yet ready for production)

Example

package main

import (
	"context"
	"log"
	"os"
	"time"

	kewpie "github.com/davidbanham/kewpie_go/v3"
)

type demoPayload struct {
	Hello string
}

type demoHandler struct{}

// Here we define a handler function that satisfies the Handler interface kewpie expects.
// It takes in a task and returns an error if the handling fails
// If the error is non-nil, the boolean tells kewpie whether or not the task should be retried
func (demoHandler) Handle(task kewpie.Task) (bool, error) {
	payload := demoPayload{}

	if err := task.Unmarshal(&payload); err != nil {
		// If the task is malformed, don't attempt to retry it
		return false, err
	}

	log.Println("Hello", payload.Hello)

	return false, nil
}

func main() {
	queueName := "demo"

	queue := kewpie.Kewpie{}

	if err := queue.Connect("postgres", []string{queueName}, nil); err != nil {
		log.Fatal(err)
	}

	task := kewpie.Task{}

	// Marshal serialises our payload struct into the internal JSON representation
	if err := task.Marshal(demoPayload{
		Hello: ", world!",
	}); err != nil {
		log.Fatal(err)
	}

	// In production we'd often use a request context, or a context with a cancel function
	ctx := context.Background()

	if err := queue.Publish(ctx, queueName, &task); err != nil {
		log.Fatal(err)
	}

	// This is just to neatly exit our example code once we've published and consumed a task
	go (func() {
		time.Sleep(1 * time.Second)
		queue.Disconnect()
		os.Exit(0)
	})()

	// Instantiate our handler. In reality we may want to pass a custom handle function or instantiate some other variables on it.
	handler := &demoHandler{}

	// Subscribe to the queue. This will call the handler's Handle method with any tasks we receive
	if err := queue.Subscribe(ctx, queueName, handler); err != nil {
		log.Fatal(err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Backend

type Backend interface {
	Healthy(ctx context.Context) error
	Publish(ctx context.Context, queueName string, payload *types.Task) error
	Subscribe(ctx context.Context, queueName string, handler types.Handler) error
	Suck(ctx context.Context, queueName string, handler types.Handler) error
	Pop(ctx context.Context, queueName string, handler types.Handler) error
	Init(queues []string) error
	Disconnect() error
	Purge(ctx context.Context, queueName string) error
	PurgeMatching(ctx context.Context, queueName, substr string) error
	MaxConcurrentDrainWorkers() int
}

type HTTPError added in v3.0.9

type HTTPError = types.HTTPError

type Handler added in v3.0.9

type Handler = types.Handler

type Kewpie

type Kewpie struct {
	// contains filtered or unexported fields
}

func (*Kewpie) AddPublishMiddleware added in v3.0.9

func (this *Kewpie) AddPublishMiddleware(f func(context.Context, *Task, string) error)

func (Kewpie) Buffer added in v3.0.9

func (this Kewpie) Buffer(ctx context.Context, queueName string, payload *Task) error

func (*Kewpie) Connect

func (this *Kewpie) Connect(backend string, queues []string, connection interface{}) error

func (Kewpie) Disconnect

func (this Kewpie) Disconnect() error

func (Kewpie) Drain added in v3.0.9

func (this Kewpie) Drain(ctx context.Context) error

func (Kewpie) Healthy

func (this Kewpie) Healthy(ctx context.Context) error

func (Kewpie) Pop

func (this Kewpie) Pop(ctx context.Context, queueName string, handler types.Handler) error

func (Kewpie) PrepareContext added in v3.0.9

func (this Kewpie) PrepareContext(ctx context.Context) context.Context

func (Kewpie) Publish

func (this Kewpie) Publish(ctx context.Context, queueName string, payload *types.Task) error

func (Kewpie) Purge

func (this Kewpie) Purge(ctx context.Context, queueName string) error

func (Kewpie) PurgeMatching

func (this Kewpie) PurgeMatching(ctx context.Context, queueName, substr string) error

func (Kewpie) Subscribe

func (this Kewpie) Subscribe(ctx context.Context, queueName string, handler types.Handler) error

func (Kewpie) SubscribeHTTP added in v3.0.9

func (this Kewpie) SubscribeHTTP(secret string, handler types.Handler, errorHandler func(context.Context, types.HTTPError)) func(w http.ResponseWriter, r *http.Request)

func (Kewpie) Suck added in v3.0.9

func (this Kewpie) Suck(ctx context.Context, queueName string, handler types.Handler) error

type Tags

type Tags = types.Tags

type Task

type Task = types.Task

Directories

Path Synopsis
backends
sqs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL