fifo

package module
v0.0.0-...-14bf6e6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2024 License: MIT Imports: 4 Imported by: 0

README

FIFO Queue with dedup & closures

When you get too many requests and really need to avoid processing the same job twice.

Init queue

// typed
type JobID string
type Request struct{...}
type Response struct{...}
queue := fifo.NewQueue[JobID, Request, Response](context.Background())

// untyped
queue := fifo.NewQueue[string, any, any](context.Background())

Send job to queue

// queue a job for background processing
queue.Add(jobID, myRequest)

// queue the job & execute every closure once finished
queue.Add(jobID, myRequest, func(resp any) {
    ...
})

// queue the job & execute only the first closure once finished
queue.AddAndCloseOnce(jobID, myRequest, func(resp any) {
    ...
})

// queue the job & wait for response from worker
ch := make(chan any)
queue.Add(jobID, myRequest, func(resp any) {
    ch <- resp
})
switch resp := (<-ch).(type) {
case Response:
	...
case error:
	...
default:
	...
}

Process queue

for job := range queue.Jobs {
    jobID := job.ID
    request := job.Request
    
    response := Response{...}
    
    queue.Finish(jobID, response)
}

Example (simple)

package main

import (
    "context"
    "fmt"
    "github.com/frifox/fifo"
    "io"
    "net/http"
    "sync"
)

type Request struct {
    URL string
}
type Response struct {
    Body string
}

func main() {
    queue := fifo.NewQueue[string, Request, Response](context.Background())
    tasks := sync.WaitGroup{}

    request := Request{
        URL: "https://google.com/",
    }
    closure := func(r Response) {
        fmt.Printf("[closure] job finished: body=`%s`\n", r.Body)
        tasks.Done()
    }

    // launch 10 queue workers
    for i := 0; i < 10; i++ {
        go worker(queue)
    }

    // queue job 10 times
    for i := 0; i < 10; i++ {
        tasks.Add(1)
        queued := queue.Add(request.URL, request, closure)
        if queued {
            fmt.Printf("[queue] job added\n")
        } else {
            fmt.Printf("[queue] job is already in the queue\n")
        }
    }

    tasks.Wait()
}

func worker(queue *fifo.Queue[string, Request, Response]) {
    for job := range queue.Jobs {
        fmt.Printf("[worker] starting: jobID=%s\n", job.ID)
        resp, _ := http.Get(job.Request.URL)
        body, _ := io.ReadAll(resp.Body)

        fmt.Printf("[worker] processed: jobID=%s\n", job.ID)
        queue.Finish(job.ID, Response{
            Body: string(body[:20]) + "...",
        })
    }
}
user@pc:~$ go run example/simple/main.go
[queue] job added
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[worker] starting: jobID=https://google.com/
[worker] processed: jobID=https://google.com/
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue[ID comparable, Request any, Response any] struct {
	Jobs chan job[ID, Request]
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue[ID comparable, Request any, Response any](ctx context.Context) *Queue[ID, Request, Response]

func (*Queue[ID, Request, Response]) Add

func (q *Queue[ID, Request, Response]) Add(jobID ID, request Request, closures ...func(Response)) (queued bool)

func (*Queue[ID, Request, Response]) AddAndCloseOnce

func (q *Queue[ID, Request, Response]) AddAndCloseOnce(jobID ID, request Request, closures ...func(Response)) (queued bool)

func (*Queue[ID, Request, Response]) Finish

func (q *Queue[ID, Request, Response]) Finish(jobID ID, ret Response)

func (*Queue[ID, Request, Response]) Len

func (q *Queue[ID, Request, Response]) Len() int64

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL