condqueue

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2023 License: MIT Imports: 3 Imported by: 1

README

condqueue

A small Go package providing a concurrent queue, on which consumers can wait for an item satisfying a given condition, and producers can add items to wake consumers.

Run go get hermannm.dev/condqueue to add it to your project!

Usage

In the example below, we have a simple message, which can either be of type success or error. A producer goroutine adds messages to the queue, while two consumer goroutines wait for a message type each.

import (
	"context"
	"fmt"
	"sync"

	"hermannm.dev/condqueue"
)

type Message struct {
	Type    string
	Content string
}

func main() {
	queue := condqueue.New[Message]()

	var wg sync.WaitGroup
	wg.Add(3)

	// Producer
	go func() {
		fmt.Println("[Producer] Adding success message...")
		queue.Add(Message{Type: "success", Content: "Great success!"})

		fmt.Println("[Producer] Adding error message...")
		queue.Add(Message{Type: "error", Content: "I've made a huge mistake"})

		wg.Done()
	}()

	// Consumer 1
	go func() {
		fmt.Println("[Consumer 1] Waiting for success...")

		msg, _ := queue.AwaitMatchingItem(context.Background(), func(candidate Message) bool {
			return candidate.Type == "success"
		})

		fmt.Printf("[Consumer 1] Received success message: %s\n", msg.Content)
		wg.Done()
	}()

	// Consumer 2
	go func() {
		fmt.Println("[Consumer 2] Waiting for errors...")

		msg, _ := queue.AwaitMatchingItem(context.Background(), func(candidate Message) bool {
			return candidate.Type == "error"
		})

		fmt.Printf("[Consumer 2] Received error message: %s\n", msg.Content)
		wg.Done()
	}()

	wg.Wait()
}

This gives the following output (the order may vary due to concurrency):

[Consumer 2] Waiting for errors...
[Producer] Adding success message...
[Producer] Adding error message...
[Consumer 2] Received error message: I've made a huge mistake
[Consumer 1] Waiting for success...
[Consumer 1] Received success message: Great success!

For more details on how to use condqueue, refer to the documentation.

Documentation

Overview

Package condqueue provides a concurrent queue, on which consumers can wait for an item satisfying a given condition, and producers can add items to wake consumers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CondQueue

type CondQueue[T any] struct {
	// contains filtered or unexported fields
}

CondQueue is a concurrent queue of items of type T. Consumer goroutines can call CondQueue.AwaitMatchingItem to wait for an item matching a given condition to arrive in the queue. Producer goroutines can call CondQueue.Add, which passes the item to a matching consumer.

A CondQueue must never be copied (initialize it with New to ensure this).

func New

func New[T any]() *CondQueue[T]

New initializes a CondQueue for items of type T. It must never be copied.

func (*CondQueue[T]) Add added in v0.2.0

func (queue *CondQueue[T]) Add(item T)

Add checks if the given item is a match for any consumers currently waiting on the queue.

If a matching consumer is found, the consumer is woken and given the item, then removed from the queue. The item is only given to a single consumer (prioritizing older consumers), so any other matching consumers must wait for a later item.

If no matching consumer is found, the item is stored so that future consumers may match on it.

func (*CondQueue[T]) AwaitMatchingItem

func (queue *CondQueue[T]) AwaitMatchingItem(
	ctx context.Context,
	isMatch func(item T) bool,
) (matchingItem T, cancelErr error)

AwaitMatchingItem goes through unconsumed items in the queue, and returns an item where isMatch(item) returns true. If no match is found there, it waits until one arrives in the queue.

If ctx is canceled before a match is found, the context's error is returned by calling context.Cause on it. If the context never cancels, e.g. when using context.Background, the error can safely be ignored. If a matching item is never received, and the context never cancels, this may halt the calling goroutine forever. It is therefore advised to use context.WithTimeout or similar.

If multiple concurrent consumers may match on the same item, only one of them will receive the item - i.e., every call to CondQueue.Add corresponds with one returned match from AwaitMatchingItem.

func (*CondQueue[T]) Clear

func (queue *CondQueue[T]) Clear()

Clear removes all unconsumed items from the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL