FIFO Queue with dedup & closures
When you get too many requests and really need to avoid processing the same job twice.
Init queue
// typed
type JobID string
type Request struct{...}
type Response struct{...}
queue := fifo.NewQueue[JobID, Request, Response](context.Background())
// untyped
queue := fifo.NewQueue[string, any, any](context.Background())
Send job to queue
// queue a job for background processing
queue.Add(jobID, myRequest)
// queue the job & execute every closure once finished
queue.Add(jobID, myRequest, func(resp any) {
// queue the job & execute only the first closure once finished
queue.AddAndCloseOnce(jobID, myRequest, func(resp any) {
// queue the job & wait for response from worker
ch := make(chan any)
queue.Add(jobID, myRequest, func(resp any) {
ch <- resp
switch resp := (<-ch).(type) {
case Response:
case error:
Process queue
for job := range queue.Jobs {
jobID := job.ID
request := job.Request
response := Response{...}
queue.Finish(jobID, response)
Example (simple)
package main
import (
type Request struct {
URL string
type Response struct {
Body string
func main() {
queue := fifo.NewQueue[string, Request, Response](context.Background())
tasks := sync.WaitGroup{}
request := Request{
URL: "",
closure := func(r Response) {
fmt.Printf("[closure] job finished: body=`%s`\n", r.Body)
// launch 10 queue workers
for i := 0; i < 10; i++ {
go worker(queue)
// queue job 10 times
for i := 0; i < 10; i++ {
queued := queue.Add(request.URL, request, closure)
if queued {
fmt.Printf("[queue] job added\n")
} else {
fmt.Printf("[queue] job is already in the queue\n")
func worker(queue *fifo.Queue[string, Request, Response]) {
for job := range queue.Jobs {
fmt.Printf("[worker] starting: jobID=%s\n", job.ID)
resp, _ := http.Get(job.Request.URL)
body, _ := io.ReadAll(resp.Body)
fmt.Printf("[worker] processed: jobID=%s\n", job.ID)
queue.Finish(job.ID, Response{
Body: string(body[:20]) + "...",
user@pc:~$ go run example/simple/main.go
[queue] job added
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[queue] job is already in the queue
[worker] starting: jobID=
[worker] processed: jobID=
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`
[closure] job finished: body=`<!doctype html><html...`