rill

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2024 License: MIT Imports: 4 Imported by: 12

README

Rill GoDoc Go Report Card codecov Mentioned in Awesome Go

Rill is a toolkit that brings composable concurrency to Go, making it easier to build concurrent programs from simple, reusable parts. It reduces boilerplate while preserving Go's natural channel-based model.

go get -u github.com/destel/rill

Goals

  • Make common tasks easier.
    Rill provides a cleaner and safer way of solving common concurrency problems, such as parallel job execution or real-time event processing. It removes boilerplate and abstracts away the complexities of goroutine, channel, and error management. At the same time, developers retain full control over the concurrency level of all operations.

  • Make concurrent code composable and clean.
    Most functions in the library take Go channels as inputs and return new, transformed channels as outputs. This allows them to be chained in various ways to build reusable pipelines from simpler parts, similar to Unix pipes. As a result, concurrent programs become clear sequences of reusable operations.

  • Centralize error handling.
    Errors are automatically propagated through a pipeline and can be handled in a single place at the end. For more complex scenarios, Rill also provides tools to intercept and handle errors at any point in a pipeline.

  • Simplify stream processing.
    Thanks to Go channels, built-in functions can handle potentially infinite streams, processing items as they arrive. This makes Rill a convenient tool for real-time processing or handling large datasets that don't fit in memory.

  • Provide solutions for advanced tasks.
    Beyond basic operations, the library includes ready-to-use functions for batching, ordered fan-in, map-reduce, stream splitting, merging, and more. Pipelines, while usually linear, can have any cycle-free topology (DAG).

  • Support custom extensions.
    Since Rill operates on standard Go channels, it's easy to write custom functions compatible with the library.

  • Keep it lightweight.
    Rill has a small, type-safe, channel-based API, and zero dependencies, making it straightforward to integrate into existing projects. It's also lightweight in terms of resource usage, ensuring that the number of memory allocations and goroutines does not grow with the input size.

Quick Start

Let's look at a practical example: fetch users from an API, activate them, and save the changes back. It shows how to control concurrency at each step while keeping the code clean and manageable.

Try it

func main() {
	ctx := context.Background()

	// Convert a slice of user IDs into a channel
	ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Read users from the API.
	// Concurrency = 3
	users := rill.Map(ids, 3, func(id int) (*mockapi.User, error) {
		return mockapi.GetUser(ctx, id)
	})

	// Activate users.
	// Concurrency = 2
	err := rill.ForEach(users, 2, func(u *mockapi.User) error {
		if u.IsActive {
			fmt.Printf("User %d is already active\n", u.ID)
			return nil
		}

		u.IsActive = true
		err := mockapi.SaveUser(ctx, u)
		if err != nil {
			return err
		}

		fmt.Printf("User saved: %+v\n", u)
		return nil
	})

	// Handle errors
	fmt.Println("Error:", err)
}

Batching

Processing items in batches rather than individually can significantly improve performance in many scenarios, particularly when working with external services or databases. Batching reduces the number of queries and API calls, increases throughput, and typically lowers costs.

To demonstrate batching, let's improve the previous example by using the API's bulk fetching capability. The Batch function transforms a stream of individual IDs into a stream of slices. This enables the use of GetUsers API to fetch multiple users in a single call, instead of making individual GetUser calls.

Try it

func main() {
	ctx := context.Background()

	// Convert a slice of user IDs into a channel
	ids := rill.FromSlice([]int{
		1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
		21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
	}, nil)

	// Group IDs into batches of 5
	idBatches := rill.Batch(ids, 5, -1)

	// Bulk fetch users from the API
	// Concurrency = 3
	userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*mockapi.User, error) {
		return mockapi.GetUsers(ctx, ids)
	})

	// Transform the stream of batches back into a flat stream of users
	users := rill.Unbatch(userBatches)

	// Activate users.
	// Concurrency = 2
	err := rill.ForEach(users, 2, func(u *mockapi.User) error {
		if u.IsActive {
			fmt.Printf("User %d is already active\n", u.ID)
			return nil
		}

		u.IsActive = true
		err := mockapi.SaveUser(ctx, u)
		if err != nil {
			return err
		}

		fmt.Printf("User saved: %+v\n", u)
		return nil
	})

	// Handle errors
	fmt.Println("Error:", err)
}

Real-Time Batching

Real-world applications often need to handle events or data that arrives at unpredictable rates. While batching is still desirable for efficiency, waiting to collect a full batch might introduce unacceptable delays when the input stream becomes slow or sparse.

Rill solves this with timeout-based batching: batches are emitted either when they're full or after a specified timeout, whichever comes first. This approach ensures optimal batch sizes during high load while maintaining responsiveness during quiet periods.

Consider an application that needs to update users' last_active_at timestamps in a database. The function responsible for this - UpdateUserTimestamp can be called concurrently, at unpredictable rates, and from different parts of the application. Performing all these updates individually may create too many concurrent queries, potentially overwhelming the database.

In the example below, the updates are queued into userIDsToUpdate channel and then grouped into batches of up to 5 items, with each batch sent to the database as a single query. The Batch function is used with a timeout of 100ms, ensuring zero latency during high load, and up to 100ms latency with smaller batches during quiet periods.

Try it

func main() {
	// Start the background worker that processes the updates
	go updateUserTimestampWorker()

	// Do some updates. They'll be automatically grouped into
	// batches: [1,2,3,4,5], [6,7], [8]
	UpdateUserTimestamp(1)
	UpdateUserTimestamp(2)
	UpdateUserTimestamp(3)
	UpdateUserTimestamp(4)
	UpdateUserTimestamp(5)
	UpdateUserTimestamp(6)
	UpdateUserTimestamp(7)
	time.Sleep(500 * time.Millisecond) // simulate sparse updates
	UpdateUserTimestamp(8)
}

// This is the queue of user IDs to update.
var userIDsToUpdate = make(chan int)

// UpdateUserTimestamp is the public API for updating the last_active_at column in the users table
func UpdateUserTimestamp(userID int) {
	userIDsToUpdate <- userID
}

// This is a background worker that sends queued updates to the database in batches.
// For simplicity, there are no retries, error handling and synchronization
func updateUserTimestampWorker() {

	ids := rill.FromChan(userIDsToUpdate, nil)

	idBatches := rill.Batch(ids, 5, 100*time.Millisecond)

	_ = rill.ForEach(idBatches, 1, func(batch []int) error {
		fmt.Printf("Executed: UPDATE users SET last_active_at = NOW() WHERE id IN (%v)\n", batch)
		return nil
	})
}

Errors, Termination and Contexts

Error handling can be non-trivial in concurrent applications. Rill simplifies this by providing a structured approach to the problem. Pipelines typically consist of a sequence of non-blocking channel transformations, followed by a blocking stage that returns a final result and an error. The general rule is: any error occurring anywhere in a pipeline is propagated down to the final stage, where it's caught by some blocking function and returned to the caller.

Rill provides a wide selection of blocking functions. Here are some commonly used ones:

  • ForEach: Concurrently applies a user function to each item in the stream. Example
  • ToSlice: Collects all stream items into a slice. Example
  • First: Returns the first item or error encountered in the stream and discards the rest Example
  • Reduce: Concurrently reduces the stream to a single value, using a user provided reducer function. Example
  • All: Concurrently checks if all items in the stream satisfy a user provided condition. Example
  • Err: Returns the first error encountered in the stream or nil, and discards the rest of the stream. Example

All blocking functions share a common behavior. In case of an early termination (before reaching the end of the input stream or in case of an error), such functions initiate background draining of the remaining items. This is done to prevent goroutine leaks by ensuring that all goroutines feeding the stream are allowed to complete.

Rill is context-agnostic, meaning that it does not enforce any specific context usage. However, it's recommended to make user-defined pipeline stages context-aware. This is especially important for the initial stage, as it allows to stop feeding the pipeline with new items after the context cancellation. In practice the first stage is often naturally context-aware through Go's standard APIs for databases, HTTP clients, and other external sources.

In the example below the CheckAllUsersExist function uses several concurrent workers to check if all users
from the given list exist. When an error occurs (like a non-existent user), the function returns that error
and cancels the context, which in turn stops all remaining user fetches.

Try it

func main() {
	ctx := context.Background()

	// ID 999 doesn't exist, so fetching will stop after hitting it.
	err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15})
	fmt.Printf("Check result: %v\n", err)
}

// CheckAllUsersExist uses several concurrent workers to check if all users with given IDs exist.
func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {
	// Create new context that will be canceled when this function returns
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	// Convert the slice into a stream
	idsStream := rill.FromSlice(ids, nil)

	// Fetch users concurrently.
	users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) {
		u, err := mockapi.GetUser(ctx, id)
		if err != nil {
			return nil, fmt.Errorf("failed to fetch user %d: %w", id, err)
		}

		fmt.Printf("Fetched user %d\n", id)
		return u, nil
	})

	// Return the first error (if any) and cancel remaining fetches via context
	return rill.Err(users)
}

In the example above only the second stage (mockapi.GetUser) of the pipeline is context-aware. FromSlice works well here since the input is small, iteration is fast and context cancellation prevents expensive API calls regardless. The following code demonstrates how to replace FromSlice with Generate when full context awareness becomes important.

idsStream := rill.Generate(func(send func(int), sendErr func(error)) {
	for _, id := range ids {
		if ctx.Err() != nil {
			return
		}
		send(id)
	}
})

Order Preservation (Ordered Fan-In)

Concurrent processing can boost performance, but since tasks take different amounts of time to complete, the results' order usually differs from the input order. While out-of-order results are acceptable in many scenarios, some cases require preserving the original order. This seemingly simple problem is deceptively challenging to solve correctly.

To address this, Rill provides ordered versions of its core functions, such as OrderedMap or OrderedFilter. These functions perform additional synchronization under the hood to ensure that if value x precedes value y in the input stream, then f(x) will precede f(y) in the output.

Here's a practical example: finding the first occurrence of a specific string among 1000 large files hosted online. Downloading all files at once would consume too much memory, processing them sequentially would be too slow, and traditional concurrency patterns do not preserve the order of files, making it challenging to find the first match.

The combination of OrderedFilter and First functions solves this elegantly, while downloading and keeping in memory at most 5 files at a time.

Try it

func main() {
	ctx := context.Background()

	// The string to search for in the downloaded files
	needle := []byte("26")

	// Generate a stream of URLs from https://example.com/file-0.txt 
	// to https://example.com/file-999.txt
	urls := rill.Generate(func(send func(string), sendErr func(error)) {
		for i := 0; i < 1000 && ctx.Err() == nil; i++ {
			send(fmt.Sprintf("https://example.com/file-%d.txt", i))
		}
	})

	// Download and process the files
	// At most 5 files are downloaded and held in memory at the same time
	matchedUrls := rill.OrderedFilter(urls, 5, func(url string) (bool, error) {
		fmt.Println("Downloading:", url)

		content, err := mockapi.DownloadFile(ctx, url)
		if err != nil {
			return false, err
		}

		// keep only URLs of files that contain the needle
		return bytes.Contains(content, needle), nil
	})

	// Find the first matched URL
	firstMatchedUrl, found, err := rill.First(matchedUrls)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	// Print the result
	if found {
		fmt.Println("Found in:", firstMatchedUrl)
	} else {
		fmt.Println("Not found")
	}
}

Stream Merging and FlatMap

Rill comes with the Merge function that combines multiple streams into a single one. Another, often overlooked, function that can combine streams is FlatMap. It's a powerful tool that transforms each input item into its own stream, and then merges all these streams together.

In the example below, FlatMap transforms each department into a stream of users, then merges these streams into one. Like other Rill functions, FlatMap gives full control over concurrency. In this particular case the concurrency level is 3, meaning that users are fetched from at most 3 departments at the same time.

Additionally, this example demonstrates how to write a reusable streaming wrapper over paginated API calls - the StreamUsers function. This wrapper can be useful both on its own and as part of larger pipelines.

Try it

func main() {
	ctx := context.Background()

	// Start with a stream of department names
	departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)

	// Stream users from all departments concurrently.
	// At most 3 departments at the same time.
	users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] {
		return StreamUsers(ctx, &mockapi.UserQuery{Department: department})
	})

	// Print the users from the combined stream
	err := rill.ForEach(users, 1, func(user *mockapi.User) error {
		fmt.Printf("%+v\n", user)
		return nil
	})
	fmt.Println("Error:", err)
}

// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function.
// It iterates through all listing pages and uses [Generate] to simplify sending users and errors to the resulting stream.
// This function is useful both on its own and as part of larger pipelines.
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
	return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
		var currentQuery mockapi.UserQuery
		if query != nil {
			currentQuery = *query
		}

		for page := 0; ; page++ {
			currentQuery.Page = page

			users, err := mockapi.ListUsers(ctx, &currentQuery)
			if err != nil {
				sendErr(err)
				return
			}

			if len(users) == 0 {
				break
			}

			for _, user := range users {
				send(user)
			}
		}
	})
}

Go 1.23 Iterators

Starting from Go 1.23, the language added range-over-function feature, allowing users to define custom iterators for use in for-range loops. This feature enables Rill to integrate seamlessly with existing iterator-based functions in the standard library and third-party packages.

Rill provides FromSeq and FromSeq2 functions to convert an iterator into a stream, and ToSeq2 function to convert a stream back into an iterator.

ToSeq2 can be a good alternative to ForEach when concurrency is not needed. It gives more control and performs all necessary cleanup and draining, even if the loop is terminated early using break or return.

Try it

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Transform each number
	// Concurrency = 3
	squares := rill.Map(numbers, 3, func(x int) (int, error) {
		return square(x), nil
	})

	// Convert the stream into an iterator and use for-range to print the results
	for val, err := range rill.ToSeq2(squares) {
		if err != nil {
			fmt.Println("Error:", err)
			break // cleanup is done regardless of early exit
		}
		fmt.Printf("%+v\n", val)
	}
}

Testing Strategy

Rill has a test coverage of over 95%, with testing focused on:

  • Correctness: ensuring that functions produce accurate results at different levels of concurrency
  • Concurrency: confirming that correct number of goroutines is spawned and utilized
  • Ordering: ensuring that ordered versions of functions preserve the order, while basic versions do not

Contributing

Contributions are welcome! Whether it's reporting a bug, suggesting a feature, or submitting a pull request, your support helps improve Rill. Please ensure that your code adheres to the existing style and includes relevant tests._

Documentation

Overview

Package rill provides composable channel-based concurrency primitives for Go that simplify parallel processing, batching, and stream handling. It offers building blocks for constructing concurrent pipelines from reusable parts while maintaining precise control over concurrency levels. The package reduces boilerplate, abstracts away goroutine orchestration, features centralized error handling, and has zero external dependencies.

Streams and Try Containers

In this package, a stream refers to a channel of Try containers. A Try container is a simple struct that holds a value and an error. When an "empty stream" is referred to, it means a channel of Try containers that has been closed and was never written to.

Most functions in this package are concurrent, and the level of concurrency can be controlled by the argument n. Some functions share common behaviors and characteristics, which are described below.

Non-blocking functions

Functions such as Map, Filter, and Batch take a stream as an input and return a new stream as an output. They do not block and return the output stream immediately. All the processing is done in the background by the goroutine pools they spawn. These functions forward all errors from the input stream to the output stream. Any errors returned by the user-provided functions are also sent to the output stream. When such a function reaches the end of the input stream, it closes the output stream, stops processing and cleans up resources.

Such functions are designed to be composed together to build complex processing pipelines:

stage2 := rill.Map(input, ...)
stage3 := rill.Batch(stage2, ...)
stage4 := rill.Map(stage3, ...)
results := rill.Unbatch(stage4, ...)
// consume the results and handle errors with some blocking function

Blocking functions

Functions such as ForEach, Reduce and MapReduce are used at the last stage of the pipeline to consume the stream and return the final result or error.

Usually, these functions block until one of the following conditions is met:

  • The end of the stream is reached. In this case, the function returns the final result.
  • An error is encountered either in the input stream or in some user-provided function. In this case, the function returns the error.

In case of an early termination (before reaching the end of the input stream), such functions initiate background draining of the remaining items. This is done to prevent goroutine leaks by ensuring that all goroutines feeding the stream are allowed to complete. The input stream should not be used anymore after calling such functions.

It's also possible to consume the pipeline results manually, for example using a for-range loop. In this case, add a deferred call to DrainNB before the loop to ensure that goroutines are not leaked.

 defer rill.DrainNB(results)

 for res := range results {
		if res.Error != nil {
			return res.Error
		}
		// process res.Value
 }

Unordered functions

Functions such as Map, Filter, and FlatMap write items to the output stream as soon as they become available. Due to the concurrent nature of these functions, the order of items in the output stream may not match the order of items in the input stream. These functions prioritize performance and concurrency over maintaining the original order.

Ordered functions

Functions such as OrderedMap or OrderedFilter preserve the order of items from the input stream. These functions are still concurrent, but use special synchronization techniques to ensure that items are written to the output stream in the same order as they were read from the input stream. This additional synchronization has some overhead, but it is negligible for i/o bound workloads.

Some other functions, such as ToSlice, Batch or First are not concurrent and are ordered by nature.

Error handling

Error handling can be non-trivial in concurrent applications. Rill simplifies this by providing a structured error handling approach. As described above, all errors are automatically propagated down the pipeline to the final stage, where they can be caught. This allows the pipeline to terminate after the first error is encountered and return it to the caller.

In cases where more complex error handling logic is required, the Catch function can be used. It can catch and handle errors at any point in the pipeline, providing the flexibility to handle not only the first error, but any of them.

Example

This example demonstrates a Rill pipeline that fetches users from an API, updates their status to active and saves them back. Both operations are performed concurrently

package main

import (
	"context"
	"fmt"

	"github.com/destel/rill"
	"github.com/destel/rill/mockapi"
)

func main() {
	ctx := context.Background()

	// Convert a slice of user IDs into a stream
	ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Read users from the API.
	// Concurrency = 3
	users := rill.Map(ids, 3, func(id int) (*mockapi.User, error) {
		return mockapi.GetUser(ctx, id)
	})

	// Activate users.
	// Concurrency = 2
	err := rill.ForEach(users, 2, func(u *mockapi.User) error {
		if u.IsActive {
			fmt.Printf("User %d is already active\n", u.ID)
			return nil
		}

		u.IsActive = true
		err := mockapi.SaveUser(ctx, u)
		if err != nil {
			return err
		}

		fmt.Printf("User saved: %+v\n", u)
		return nil
	})

	// Handle errors
	fmt.Println("Error:", err)
}
Output:

Example (Batching)

This example demonstrates a Rill pipeline that fetches users from an API, and updates their status to active and saves them back. Users are fetched concurrently and in batches to reduce the number of API calls.

package main

import (
	"context"
	"fmt"

	"github.com/destel/rill"
	"github.com/destel/rill/mockapi"
)

func main() {
	ctx := context.Background()

	// Convert a slice of user IDs into a stream
	ids := rill.FromSlice([]int{
		1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
		21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
	}, nil)

	// Group IDs into batches of 5
	idBatches := rill.Batch(ids, 5, -1)

	// Bulk fetch users from the API
	// Concurrency = 3
	userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*mockapi.User, error) {
		return mockapi.GetUsers(ctx, ids)
	})

	// Transform the stream of batches back into a flat stream of users
	users := rill.Unbatch(userBatches)

	// Activate users.
	// Concurrency = 2
	err := rill.ForEach(users, 2, func(u *mockapi.User) error {
		if u.IsActive {
			fmt.Printf("User %d is already active\n", u.ID)
			return nil
		}

		u.IsActive = true
		err := mockapi.SaveUser(ctx, u)
		if err != nil {
			return err
		}

		fmt.Printf("User saved: %+v\n", u)
		return nil
	})

	// Handle errors
	fmt.Println("Error:", err)
}
Output:

Example (BatchingRealTime)

This example demonstrates how batching can be used to group similar concurrent database updates into a single query. The UpdateUserTimestamp function is used to update the last_active_at column in the users table. Updates are not executed immediately, but are rather queued and then sent to the database in batches of up to 5.

When updates are sparse, it can take some time to collect a full batch. In this case the Batch function emits partial batches, ensuring that updates are delayed by at most 100ms.

For simplicity, this example does not have retries, error handling and synchronization

package main

import (
	"fmt"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Start the background worker that processes the updates
	go updateUserTimestampWorker()

	// Do some updates. They'll be automatically grouped into
	// batches: [1,2,3,4,5], [6,7], [8]
	UpdateUserTimestamp(1)
	UpdateUserTimestamp(2)
	UpdateUserTimestamp(3)
	UpdateUserTimestamp(4)
	UpdateUserTimestamp(5)
	UpdateUserTimestamp(6)
	UpdateUserTimestamp(7)
	time.Sleep(500 * time.Millisecond) // simulate sparse updates
	UpdateUserTimestamp(8)

	// Wait for the updates to be processed
	// In real-world application, different synchronization mechanisms would be used.
	time.Sleep(1 * time.Second)
}

// This is the queue of user IDs to update.
var userIDsToUpdate = make(chan int)

// UpdateUserTimestamp is the public API for updating the last_active_at column in the users table
func UpdateUserTimestamp(userID int) {
	userIDsToUpdate <- userID
}

// This is a background worker that sends queued updates to the database in batches.
// For simplicity, there are no retries, error handling and synchronization
func updateUserTimestampWorker() {

	ids := rill.FromChan(userIDsToUpdate, nil)

	idBatches := rill.Batch(ids, 5, 100*time.Millisecond)

	_ = rill.ForEach(idBatches, 1, func(batch []int) error {
		fmt.Printf("Executed: UPDATE users SET last_active_at = NOW() WHERE id IN (%v)\n", batch)
		return nil
	})
}
Output:

Example (Context)

This example demonstrates how to gracefully stop a pipeline on the first error. The CheckAllUsersExist uses several concurrent workers and returns an error as soon as it encounters a non-existent user. Such early return triggers the context cancellation, which in turn stops all remaining users fetches.

package main

import (
	"context"
	"fmt"

	"github.com/destel/rill"
	"github.com/destel/rill/mockapi"
)

func main() {
	ctx := context.Background()

	// ID 999 doesn't exist, so fetching will stop after hitting it.
	err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15})
	fmt.Printf("Check result: %v\n", err)
}

// CheckAllUsersExist uses several concurrent workers to checks if all users with given IDs exist.
func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	idsStream := rill.Generate(func(send func(int), sendErr func(error)) {
		for _, id := range ids {
			if ctx.Err() != nil {
				return
			}
			send(id)
		}
	})

	users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) {
		u, err := mockapi.GetUser(ctx, id)
		if err != nil {
			return nil, fmt.Errorf("failed to fetch user %d: %w", id, err)
		}

		fmt.Printf("Fetched user %d\n", id)
		return u, nil
	})

	return rill.Err(users)
}
Output:

Example (FanIn_FanOut)

This example demonstrates how to use the Fan-in and Fan-out patterns to send messages through multiple servers concurrently.

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of messages into a stream
	messages := rill.FromSlice([]string{
		"message1", "message2", "message3", "message4", "message5",
		"message6", "message7", "message8", "message9", "message10",
	}, nil)

	// Fan-out the messages to three servers
	results1 := rill.Map(messages, 2, func(message string) (string, error) {
		return message, sendMessage(message, "server1")
	})

	results2 := rill.Map(messages, 2, func(message string) (string, error) {
		return message, sendMessage(message, "server2")
	})

	results3 := rill.Map(messages, 2, func(message string) (string, error) {
		return message, sendMessage(message, "server3")
	})

	// Fan-in the results from all servers into a single stream
	results := rill.Merge(results1, results2, results3)

	// Handle errors
	err := rill.Err(results)
	fmt.Println("Error:", err)
}

// Helper function that simulates sending a message through a server
func sendMessage(message string, server string) error {
	randomSleep(500 * time.Millisecond)
	fmt.Printf("Sent through %s: %s\n", server, message)
	return nil
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

Example (FlatMap)

This example demonstrates using FlatMap to fetch users from multiple departments concurrently. Additionally, it demonstrates how to write a reusable streaming wrapper over paginated API calls - the StreamUsers function

package main

import (
	"context"
	"fmt"

	"github.com/destel/rill"
	"github.com/destel/rill/mockapi"
)

func main() {
	ctx := context.Background()

	// Start with a stream of department names
	departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)

	// Stream users from all departments concurrently.
	// At most 3 departments at the same time.
	users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] {
		return StreamUsers(ctx, &mockapi.UserQuery{Department: department})
	})

	// Print the users from the combined stream
	err := rill.ForEach(users, 1, func(user *mockapi.User) error {
		fmt.Printf("%+v\n", user)
		return nil
	})
	fmt.Println("Error:", err)
}

// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function.
// It iterates through all listing pages and uses [Generate] to simplify sending users and errors to the resulting stream.
// This function is useful both on its own and as part of larger pipelines.
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
	return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
		var currentQuery mockapi.UserQuery
		if query != nil {
			currentQuery = *query
		}

		for page := 0; ; page++ {
			currentQuery.Page = page

			users, err := mockapi.ListUsers(ctx, &currentQuery)
			if err != nil {
				sendErr(err)
				return
			}

			if len(users) == 0 {
				break
			}

			for _, user := range users {
				send(user)
			}
		}
	})
}
Output:

Example (Ordering)

This example demonstrates how to find the first file containing a specific string among 1000 large files hosted online.

Downloading all files at once would consume too much memory, while processing them one-by-one would take too long. And traditional concurrency patterns do not preserve the order of files, and would make it challenging to find the first match.

The combination of OrderedFilter and First functions solves the problem, while downloading and holding in memory at most 5 files at the same time.

package main

import (
	"bytes"
	"context"
	"fmt"

	"github.com/destel/rill"
	"github.com/destel/rill/mockapi"
)

func main() {
	ctx := context.Background()

	// The string to search for in the downloaded files
	needle := []byte("26")

	// Generate a stream of URLs from https://example.com/file-0.txt
	// to https://example.com/file-999.txt
	urls := rill.Generate(func(send func(string), sendErr func(error)) {
		for i := 0; i < 1000 && ctx.Err() == nil; i++ {
			send(fmt.Sprintf("https://example.com/file-%d.txt", i))
		}
	})

	// Download and process the files
	// At most 5 files are downloaded and held in memory at the same time
	matchedUrls := rill.OrderedFilter(urls, 5, func(url string) (bool, error) {
		fmt.Println("Downloading:", url)

		content, err := mockapi.DownloadFile(ctx, url)
		if err != nil {
			return false, err
		}

		// keep only URLs of files that contain the needle
		return bytes.Contains(content, needle), nil
	})

	// Find the first matched URL
	firstMatchedUrl, found, err := rill.First(matchedUrls)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	// Print the result
	if found {
		fmt.Println("Found in:", firstMatchedUrl)
	} else {
		fmt.Println("Not found")
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func All added in v0.2.0

func All[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error)

All checks if all items in the input stream satisfy the condition f. This function returns false as soon as it finds an item that does not satisfy the condition. Otherwise, it returns true, including the case when the stream was empty.

This is a blocking unordered function that processes items concurrently using n goroutines. When n = 1, processing becomes sequential, making the function ordered.

See the package documentation for more information on blocking unordered functions and error handling.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Are all numbers prime?
	// Concurrency = 3
	ok, err := rill.All(numbers, 3, func(x int) (bool, error) {
		return isPrime(x), nil
	})

	fmt.Println("Result:", ok)
	fmt.Println("Error:", err)
}

// helper function that checks if a number is prime
// and simulates some additional work using sleep
func isPrime(n int) bool {
	randomSleep(500 * time.Millisecond)

	if n < 2 {
		return false
	}
	for i := 2; i*i <= n; i++ {
		if n%i == 0 {
			return false
		}
	}
	return true
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func Any added in v0.2.0

func Any[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error)

Any checks if there is an item in the input stream that satisfies the condition f. This function returns true as soon as it finds such an item. Otherwise, it returns false.

Any is a blocking unordered function that processes items concurrently using n goroutines. When n = 1, processing becomes sequential, making the function ordered.

See the package documentation for more information on blocking unordered functions and error handling.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Is there at least one prime number?
	// Concurrency = 3
	ok, err := rill.Any(numbers, 3, func(x int) (bool, error) {
		return isPrime(x), nil
	})

	fmt.Println("Result: ", ok)
	fmt.Println("Error: ", err)
}

// helper function that checks if a number is prime
// and simulates some additional work using sleep
func isPrime(n int) bool {
	randomSleep(500 * time.Millisecond)

	if n < 2 {
		return false
	}
	for i := 2; i*i <= n; i++ {
		if n%i == 0 {
			return false
		}
	}
	return true
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func Batch

func Batch[A any](in <-chan Try[A], size int, timeout time.Duration) <-chan Try[[]A]

Batch take a stream of items and returns a stream of batches based on a maximum size and a timeout.

A batch is emitted when one of the following conditions is met:

  • The batch reaches the maximum size
  • The time since the first item was added to the batch exceeds the timeout
  • The input stream is closed

This function never emits empty batches. To disable the timeout and emit batches only based on the size, set the timeout to -1. Setting the timeout to zero is not supported and will result in a panic

This is a non-blocking ordered function that processes items sequentially.

See the package documentation for more information on non-blocking ordered functions and error handling.

Example

Also check out the package level examples to see Batch in action

package main

import (
	"fmt"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Generate a stream of numbers 0 to 49, where a new number is emitted every 50ms
	numbers := make(chan rill.Try[int])
	go func() {
		defer close(numbers)
		for i := 0; i < 50; i++ {
			numbers <- rill.Wrap(i, nil)
			time.Sleep(50 * time.Millisecond)
		}
	}()

	// Group numbers into batches of up to 5
	batches := rill.Batch(numbers, 5, 1*time.Second)

	printStream(batches)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}
Output:

func Buffer

func Buffer[A any](in <-chan A, size int) <-chan A

Buffer takes a channel of items and returns a buffered channel of exact same items in the same order. This can be useful for preventing write operations on the input channel from blocking, especially if subsequent stages in the processing pipeline are slow. Buffering allows up to size items to be held in memory before back pressure is applied to the upstream producer.

Typical usage of Buffer might look like this:

users := getUsers(ctx, companyID)
users = rill.Buffer(users, 100)
// Now work with the users channel as usual.
// Up to 100 users can be buffered if subsequent stages of the pipeline are slow.

func Catch

func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A]

Catch allows handling errors in the middle of a stream processing pipeline. Every error encountered in the input stream is passed to the function f for handling.

The outcome depends on the return value of f:

  • If f returns nil, the error is considered handled and filtered out from the output stream.
  • If f returns a non-nil error, the original error is replaced with the result of f.

This is a non-blocking unordered function that handles errors concurrently using n goroutines. An ordered version of this function, OrderedCatch, is also available.

See the package documentation for more information on non-blocking unordered functions and error handling.

Example
package main

import (
	"errors"
	"fmt"
	"math/rand"
	"strconv"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of strings into a stream
	strs := rill.FromSlice([]string{"1", "2", "3", "4", "5", "not a number 6", "7", "8", "9", "10"}, nil)

	// Convert strings to ints
	// Concurrency = 3
	ids := rill.Map(strs, 3, func(s string) (int, error) {
		randomSleep(500 * time.Millisecond) // simulate some additional work
		return strconv.Atoi(s)
	})

	// Catch and ignore number parsing errors
	// Concurrency = 2
	ids = rill.Catch(ids, 2, func(err error) error {
		if errors.Is(err, strconv.ErrSyntax) {
			return nil // Ignore this error
		}
		return err
	})

	// No error will be printed
	printStream(ids)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func Drain

func Drain[A any](in <-chan A)

Drain consumes and discards all items from an input channel, blocking until the channel is closed.

func DrainNB

func DrainNB[A any](in <-chan A)

DrainNB is a non-blocking version of Drain. It does draining in a separate goroutine.

func Err added in v0.2.0

func Err[A any](in <-chan Try[A]) error

Err returns the first error encountered in the input stream or nil if there were no errors.

This is a blocking ordered function that processes items sequentially. See the package documentation for more information on blocking ordered functions and error handling.

Example
package main

import (
	"context"
	"fmt"

	"github.com/destel/rill"
	"github.com/destel/rill/mockapi"
)

func main() {
	ctx := context.Background()

	// Convert a slice of users into a stream
	users := rill.FromSlice([]*mockapi.User{
		{ID: 1, Name: "foo", Age: 25},
		{ID: 2, Name: "bar", Age: 30},
		{ID: 3}, // empty username is invalid
		{ID: 4, Name: "baz", Age: 35},
		{ID: 5, Name: "qux", Age: 26},
		{ID: 6, Name: "quux", Age: 27},
	}, nil)

	// Save users. Use struct{} as a result type
	// Concurrency = 2
	results := rill.Map(users, 2, func(user *mockapi.User) (struct{}, error) {
		return struct{}{}, mockapi.SaveUser(ctx, user)
	})

	// We're only need to know if all users were saved successfully
	err := rill.Err(results)
	fmt.Println("Error:", err)
}
Output:

func Filter

func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A]

Filter takes a stream of items of type A and filters them using a predicate function f. Returns a new stream of items that passed the filter.

This is a non-blocking unordered function that processes items concurrently using n goroutines. An ordered version of this function, OrderedFilter, is also available.

See the package documentation for more information on non-blocking unordered functions and error handling.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Keep only prime numbers
	// Concurrency = 3
	primes := rill.Filter(numbers, 3, func(x int) (bool, error) {
		return isPrime(x), nil
	})

	printStream(primes)
}

// helper function that checks if a number is prime
// and simulates some additional work using sleep
func isPrime(n int) bool {
	randomSleep(500 * time.Millisecond)

	if n < 2 {
		return false
	}
	for i := 2; i*i <= n; i++ {
		if n%i == 0 {
			return false
		}
	}
	return true
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func FilterMap added in v0.3.0

func FilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B]

FilterMap takes a stream of items of type A, applies a function f that can filter and transform them into items of type B. Returns a new stream of transformed items that passed the filter. This operation is equivalent to a Filter followed by a Map.

This is a non-blocking unordered function that processes items concurrently using n goroutines. An ordered version of this function, OrderedFilterMap, is also available.

See the package documentation for more information on non-blocking unordered functions and error handling.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Keep only prime numbers and square them
	// Concurrency = 3
	squares := rill.FilterMap(numbers, 3, func(x int) (int, bool, error) {
		if !isPrime(x) {
			return 0, false, nil
		}

		return x * x, true, nil
	})

	printStream(squares)
}

// helper function that checks if a number is prime
// and simulates some additional work using sleep
func isPrime(n int) bool {
	randomSleep(500 * time.Millisecond)

	if n < 2 {
		return false
	}
	for i := 2; i*i <= n; i++ {
		if n%i == 0 {
			return false
		}
	}
	return true
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func First added in v0.2.0

func First[A any](in <-chan Try[A]) (value A, found bool, err error)

First returns the first item or error encountered in the input stream, whichever comes first. The found return flag is set to false if the stream was empty, otherwise it is set to true.

This is a blocking ordered function that processes items sequentially. See the package documentation for more information on blocking ordered functions and error handling.

Example
package main

import (
	"fmt"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Keep only the numbers divisible by 4
	// Concurrency = 3; Ordered
	dvisibleBy4 := rill.OrderedFilter(numbers, 3, func(x int) (bool, error) {
		return x%4 == 0, nil
	})

	// Get the first number divisible by 4
	first, ok, err := rill.First(dvisibleBy4)

	fmt.Println("Result:", first, ok)
	fmt.Println("Error:", err)
}
Output:

func FlatMap

func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B]

FlatMap takes a stream of items of type A and transforms each item into a new sub-stream of items of type B using a function f. Those sub-streams are then flattened into a single output stream, which is returned.

This is a non-blocking unordered function that processes items concurrently using n goroutines. An ordered version of this function, OrderedFlatMap, is also available.

See the package documentation for more information on non-blocking unordered functions and error handling.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil)

	// Replace each number in the input stream with three strings
	// Concurrency = 2
	result := rill.FlatMap(numbers, 2, func(x int) <-chan rill.Try[string] {
		randomSleep(500 * time.Millisecond) // simulate some additional work

		return rill.FromSlice([]string{
			fmt.Sprintf("foo%d", x),
			fmt.Sprintf("bar%d", x),
			fmt.Sprintf("baz%d", x),
		}, nil)
	})

	printStream(result)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func ForEach

func ForEach[A any](in <-chan Try[A], n int, f func(A) error) error

ForEach applies a function f to each item in an input stream.

This is a blocking unordered function that processes items concurrently using n goroutines. When n = 1, processing becomes sequential, making the function ordered and similar to a regular for-range loop.

See the package documentation for more information on blocking unordered functions and error handling.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Square each number and print the result
	// Concurrency = 3
	err := rill.ForEach(numbers, 3, func(x int) error {
		y := square(x)
		fmt.Println(y)
		return nil
	})

	// Handle errors
	fmt.Println("Error:", err)
}

// helper function that squares the number
// and simulates some additional work using sleep
func square(x int) int {
	randomSleep(500 * time.Millisecond)
	return x * x
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

Example (Ordered)

There is no ordered version of the ForEach function. To achieve ordered processing, use concurrency set to 1. If you need a concurrent and ordered ForEach, then do all processing with the OrderedMap, and then use ForEach with concurrency set to 1 at the final stage.

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Square each number
	// Concurrency = 3; Ordered
	squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
		return square(x), nil
	})

	// Print results.
	// Concurrency = 1; Ordered
	err := rill.ForEach(squares, 1, func(y int) error {
		fmt.Println(y)
		return nil
	})

	// Handle errors
	fmt.Println("Error:", err)
}

// helper function that squares the number
// and simulates some additional work using sleep
func square(x int) int {
	randomSleep(500 * time.Millisecond)
	return x * x
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func FromChan

func FromChan[A any](values <-chan A, err error) <-chan Try[A]

FromChan converts a regular channel into a stream. Additionally, this function can take an error, that will be added to the output stream alongside the values. Either argument can be nil, in which case it is ignored. If both arguments are nil, the function returns nil.

Such function signature allows concise wrapping of functions that return a channel and an error:

stream := rill.FromChan(someFunc())

func FromChans

func FromChans[A any](values <-chan A, errs <-chan error) <-chan Try[A]

FromChans converts a regular channel into a stream. Additionally, this function can take a channel of errors, which will be added to the output stream alongside the values. Either argument can be nil, in which case it is ignored. If both arguments are nil, the function returns nil.

Such function signature allows concise wrapping of functions that return two channels:

stream := rill.FromChans(someFunc())

func FromSeq added in v0.4.0

func FromSeq[A any](seq iter.Seq[A], err error) <-chan Try[A]

FromSeq converts an iterator into a stream. If err is not nil function returns a stream with a single error.

Such function signature allows concise wrapping of functions that return an iterator and an error:

stream := rill.FromSeq(someFunc())
Example
// Start with an iterator that yields numbers from 1 to 10
numbersSeq := slices.Values([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})

// Convert the iterator into a stream
numbers := rill.FromSeq(numbersSeq, nil)

// Transform each number
// Concurrency = 3
squares := rill.Map(numbers, 3, func(x int) (int, error) {
	return square(x), nil
})

printStream(squares)
Output:

func FromSeq2 added in v0.4.0

func FromSeq2[A any](seq iter.Seq2[A, error]) <-chan Try[A]

FromSeq2 converts an iterator of value-error pairs into a stream.

Example
// Create an iter.Seq2 iterator that yields numbers from 1 to 10
numberSeq := func(yield func(int, error) bool) {
	for i := 1; i <= 10; i++ {
		if !yield(i, nil) {
			return
		}
	}
}

// Convert the iterator into a stream
numbers := rill.FromSeq2(numberSeq)

// Transform each number
// Concurrency = 3
squares := rill.Map(numbers, 3, func(x int) (int, error) {
	return square(x), nil
})

printStream(squares)
Output:

func FromSlice

func FromSlice[A any](slice []A, err error) <-chan Try[A]

FromSlice converts a slice into a stream. If err is not nil function returns a stream with a single error.

Such function signature allows concise wrapping of functions that return a slice and an error:

stream := rill.FromSlice(someFunc())

func Generate added in v0.6.0

func Generate[A any](f func(send func(A), sendErr func(error))) <-chan Try[A]

Generate is a shorthand for creating streams. It provides a more ergonomic way of sending both values and errors to a stream, manages goroutine and channel lifecycle.

stream := rill.Generate(func(send func(int), sendErr func(error)) {
	for i := 0; i < 100; i++ {
		send(i)
	}
	sendErr(someError)
})

Here's how the same code would look without Generate:

stream := make(chan rill.Try[int])
go func() {
	defer close(stream)
	for i := 0; i < 100; i++ {
		stream <- rill.Try[int]{Value: i}
	}
	stream <- rill.Try[int]{Error: someError}
}()
Example

Generate a stream of URLs from https://example.com/file-0.txt to https://example.com/file-9.txt

package main

import (
	"fmt"

	"github.com/destel/rill"
)

func main() {
	urls := rill.Generate(func(send func(string), sendErr func(error)) {
		for i := 0; i < 10; i++ {
			send(fmt.Sprintf("https://example.com/file-%d.txt", i))
		}
	})

	printStream(urls)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}
Output:

Example (Context)

Generate an infinite stream of natural numbers (1, 2, 3, ...). New numbers are sent to the stream every 500ms until the context is canceled

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/destel/rill"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	numbers := rill.Generate(func(send func(int), sendErr func(error)) {
		for i := 1; ctx.Err() == nil; i++ {
			send(i)
			time.Sleep(500 * time.Millisecond)
		}
	})

	printStream(numbers)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}
Output:

func Map

func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B]

Map takes a stream of items of type A and transforms them into items of type B using a function f. Returns a new stream of transformed items.

This is a non-blocking unordered function that processes items concurrently using n goroutines. An ordered version of this function, OrderedMap, is also available.

See the package documentation for more information on non-blocking unordered functions and error handling.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Transform each number
	// Concurrency = 3
	squares := rill.Map(numbers, 3, func(x int) (int, error) {
		return square(x), nil
	})

	printStream(squares)
}

// helper function that squares the number
// and simulates some additional work using sleep
func square(x int) int {
	randomSleep(500 * time.Millisecond)
	return x * x
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func MapReduce added in v0.2.0

func MapReduce[A any, K comparable, V any](in <-chan Try[A], nm int, mapper func(A) (K, V, error), nr int, reducer func(V, V) (V, error)) (map[K]V, error)

MapReduce transforms the input stream into a Go map using a mapper and a reducer functions. The transformation is performed in two concurrent phases.

  • The mapper function transforms each input item into a key-value pair.
  • The reducer function reduces values for the same key into a single value. This phase has the same semantics as the Reduce function, in particular the reducer function must be commutative and associative.

MapReduce is a blocking unordered function that processes items concurrently using nm and nr goroutines for the mapper and reducer functions respectively. Setting nr = 1 will make the reduce phase sequential and ordered, see Reduce for more information.

See the package documentation for more information on blocking unordered functions and error handling.

Example
package main

import (
	"fmt"
	"regexp"
	"strings"

	"github.com/destel/rill"
)

func main() {
	var re = regexp.MustCompile(`\w+`)
	text := "Early morning brings early birds to the early market. Birds sing, the market buzzes, and the morning shines."

	// Convert a text into a stream of words
	words := rill.FromSlice(re.FindAllString(text, -1), nil)

	// Count the number of occurrences of each word
	mr, err := rill.MapReduce(words,
		// Map phase: Use the word as key and "1" as value
		// Concurrency = 3
		3, func(word string) (string, int, error) {
			return strings.ToLower(word), 1, nil
		},
		// Reduce phase: Sum all "1" values for the same key
		// Concurrency = 2
		2, func(x, y int) (int, error) {
			return x + y, nil
		},
	)

	fmt.Println("Result:", mr)
	fmt.Println("Error:", err)
}
Output:

func Merge

func Merge[A any](ins ...<-chan A) <-chan A

Merge performs a fan-in operation on the list of input channels, returning a single output channel. The resulting channel will contain all items from all inputs, and will be closed when all inputs are fully consumed.

This is a non-blocking function that processes items from each input sequentially.

See the package documentation for more information on non-blocking functions and error handling.

Example
package main

import (
	"fmt"

	"github.com/destel/rill"
)

func main() {
	// Convert slices of numbers into streams
	numbers1 := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil)
	numbers2 := rill.FromSlice([]int{6, 7, 8, 9, 10}, nil)
	numbers3 := rill.FromSlice([]int{11, 12}, nil)

	numbers := rill.Merge(numbers1, numbers2, numbers3)

	printStream(numbers)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}
Output:

func OrderedCatch

func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A]

OrderedCatch is the ordered version of Catch.

Example

The same example as for the Catch, but using ordered versions of functions.

package main

import (
	"errors"
	"fmt"
	"math/rand"
	"strconv"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of strings into a stream
	strs := rill.FromSlice([]string{"1", "2", "3", "4", "5", "not a number 6", "7", "8", "9", "10"}, nil)

	// Convert strings to ints
	// Concurrency = 3; Ordered
	ids := rill.OrderedMap(strs, 3, func(s string) (int, error) {
		randomSleep(500 * time.Millisecond) // simulate some additional work
		return strconv.Atoi(s)
	})

	// Catch and ignore number parsing errors
	// Concurrency = 2; Ordered
	ids = rill.OrderedCatch(ids, 2, func(err error) error {
		if errors.Is(err, strconv.ErrSyntax) {
			return nil // Ignore this error
		}
		return err
	})

	// No error will be printed
	printStream(ids)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func OrderedFilter

func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A]

OrderedFilter is the ordered version of Filter.

Example

The same example as for the Filter, but using ordered versions of functions.

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Keep only prime numbers
	// Concurrency = 3; Ordered
	primes := rill.OrderedFilter(numbers, 3, func(x int) (bool, error) {
		return isPrime(x), nil
	})

	printStream(primes)
}

// helper function that checks if a number is prime
// and simulates some additional work using sleep
func isPrime(n int) bool {
	randomSleep(500 * time.Millisecond)

	if n < 2 {
		return false
	}
	for i := 2; i*i <= n; i++ {
		if n%i == 0 {
			return false
		}
	}
	return true
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func OrderedFilterMap added in v0.3.0

func OrderedFilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B]

OrderedFilterMap is the ordered version of FilterMap.

Example

The same example as for the FilterMap, but using ordered versions of functions.

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Keep only prime numbers and square them
	// Concurrency = 3
	squares := rill.OrderedFilterMap(numbers, 3, func(x int) (int, bool, error) {
		if !isPrime(x) {
			return 0, false, nil
		}

		return x * x, true, nil
	})

	printStream(squares)
}

// helper function that checks if a number is prime
// and simulates some additional work using sleep
func isPrime(n int) bool {
	randomSleep(500 * time.Millisecond)

	if n < 2 {
		return false
	}
	for i := 2; i*i <= n; i++ {
		if n%i == 0 {
			return false
		}
	}
	return true
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func OrderedFlatMap

func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B]

OrderedFlatMap is the ordered version of FlatMap.

Example

The same example as for the FlatMap, but using ordered versions of functions.

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil)

	// Replace each number in the input stream with three strings
	// Concurrency = 2; Ordered
	result := rill.OrderedFlatMap(numbers, 2, func(x int) <-chan rill.Try[string] {
		randomSleep(500 * time.Millisecond) // simulate some additional work

		return rill.FromSlice([]string{
			fmt.Sprintf("foo%d", x),
			fmt.Sprintf("bar%d", x),
			fmt.Sprintf("baz%d", x),
		}, nil)
	})

	printStream(result)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func OrderedMap

func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B]

OrderedMap is the ordered version of Map.

Example

The same example as for the Map, but using ordered versions of functions.

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Transform each number
	// Concurrency = 3; Ordered
	squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
		return square(x), nil
	})

	printStream(squares)
}

// helper function that squares the number
// and simulates some additional work using sleep
func square(x int) int {
	randomSleep(500 * time.Millisecond)
	return x * x
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func OrderedSplit2

func OrderedSplit2[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (outTrue <-chan Try[A], outFalse <-chan Try[A])

OrderedSplit2 is the ordered version of Split2.

func Reduce added in v0.2.0

func Reduce[A any](in <-chan Try[A], n int, f func(A, A) (A, error)) (result A, hasResult bool, err error)

Reduce combines all items from the input stream into a single value using a binary function f. The function f is called for pairs of items, progressively reducing the stream contents until only one value remains.

As an unordered function, Reduce can apply f to any pair of items in any order, which requires f to be:

  • Associative: f(a, f(b, c)) == f(f(a, b), c)
  • Commutative: f(a, b) == f(b, a)

The hasResult return flag is set to false if the stream was empty, otherwise it is set to true.

Reduce is a blocking unordered function that processes items concurrently using n goroutines. The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially, making the function ordered. This also removes the need for the function f to be commutative.

See the package documentation for more information on blocking unordered functions and error handling.

Example
package main

import (
	"fmt"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Sum all numbers
	sum, ok, err := rill.Reduce(numbers, 3, func(a, b int) (int, error) {
		return a + b, nil
	})

	fmt.Println("Result:", sum, ok)
	fmt.Println("Error:", err)
}
Output:

func Split2

func Split2[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (outTrue <-chan Try[A], outFalse <-chan Try[A])

Split2 divides the input stream into two output streams based on the predicate function f: The splitting behavior is determined by the boolean return value of f. When f returns true, the item is sent to the outTrue stream, otherwise it is sent to the outFalse stream. In case of any error, the item is sent to one of the output streams in a non-deterministic way.

This is a non-blocking unordered function that processes items concurrently using n goroutines. An ordered version of this function, OrderedSplit2, is also available.

See the package documentation for more information on non-blocking unordered functions and error handling.

func ToChans

func ToChans[A any](in <-chan Try[A]) (<-chan A, <-chan error)

ToChans splits an input stream into two channels: one for values and one for errors. It's an inverse of FromChans. Returns two nil channels if the input is nil.

func ToSeq2 added in v0.4.0

func ToSeq2[A any](in <-chan Try[A]) iter.Seq2[A, error]

ToSeq2 converts an input stream into an iterator of value-error pairs.

This is a blocking ordered function that processes items sequentially. It does not return on the first encountered error. Instead, it iterates over all value-error pairs, either until the input stream is fully consumed or the loop is broken by the caller. So all error handling, if needed, should be done inside the iterator (for-range loop body).

See the package documentation for more information on blocking ordered functions.

Example
// Convert a slice of numbers into a stream
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Transform each number
// Concurrency = 3
squares := rill.Map(numbers, 3, func(x int) (int, error) {
	return square(x), nil
})

// Convert the stream into an iterator and use for-range to print the results
for val, err := range rill.ToSeq2(squares) {
	if err != nil {
		fmt.Println("Error:", err)
		break // cleanup is done regardless of early exit
	}
	fmt.Printf("%+v\n", val)
}
Output:

func ToSlice

func ToSlice[A any](in <-chan Try[A]) ([]A, error)

ToSlice converts an input stream into a slice.

This is a blocking ordered function that processes items sequentially. See the package documentation for more information on blocking ordered functions and error handling.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/destel/rill"
)

func main() {
	// Convert a slice of numbers into a stream
	numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

	// Transform each number
	// Concurrency = 3; Ordered
	squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
		return square(x), nil
	})

	resultsSlice, err := rill.ToSlice(squares)

	fmt.Println("Result:", resultsSlice)
	fmt.Println("Error:", err)
}

// helper function that squares the number
// and simulates some additional work using sleep
func square(x int) int {
	randomSleep(500 * time.Millisecond)
	return x * x
}

func randomSleep(max time.Duration) {
	time.Sleep(time.Duration(rand.Intn(int(max))))
}
Output:

func Unbatch

func Unbatch[A any](in <-chan Try[[]A]) <-chan Try[A]

Unbatch is the inverse of Batch. It takes a stream of batches and returns a stream of individual items.

This is a non-blocking ordered function that processes items sequentially. See the package documentation for more information on non-blocking ordered functions and error handling.

Example
package main

import (
	"fmt"

	"github.com/destel/rill"
)

func main() {
	// Create a stream of batches
	batches := rill.FromSlice([][]int{
		{1, 2, 3},
		{4, 5},
		{6, 7, 8, 9},
		{10},
	}, nil)

	numbers := rill.Unbatch(batches)

	printStream(numbers)
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
	fmt.Println("Result:")
	err := rill.ForEach(stream, 1, func(x A) error {
		fmt.Printf("%+v\n", x)
		return nil
	})
	fmt.Println("Error:", err)
}
Output:

Types

type Try

type Try[A any] struct {
	Value A
	Error error
}

Try is a container holding a value of type A or an error

func Wrap

func Wrap[A any](value A, err error) Try[A]

Wrap converts a value and/or error into a Try container. It's a convenience function to avoid creating a Try container manually and benefit from type inference.

Such function signature also allows concise wrapping of functions that return a value and an error:

item := rill.Wrap(strconv.ParseInt("42"))

Directories

Path Synopsis
internal
th
Package th provides basic test helpers.
Package th provides basic test helpers.
Package mockapi provides a very basic mock API for examples and demos.
Package mockapi provides a very basic mock API for examples and demos.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL