Documentation ¶
Overview ¶
Example ¶
package main import ( "context" "fmt" "time" "github.com/arvidfm/asyncigo" ) func main() { _ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error { task := asyncigo.SpawnTask(ctx, func(ctx context.Context) (int, error) { for i := range 3 { fmt.Printf("in subtask: %d\n", i) _ = asyncigo.Sleep(ctx, time.Second) } return 42, nil }) for j := range 3 { fmt.Printf("in main task: %d\n", j) _ = asyncigo.Sleep(ctx, time.Second) } result, _ := task.Await(ctx) fmt.Printf("task result: %d\n", result) return nil }) }
Output: in main task: 0 in subtask: 0 in main task: 1 in subtask: 1 in main task: 2 in subtask: 2 task result: 42
Example (Polyglot) ¶
As Go's implementation of coroutines doesn't require us to use an "await" keyword every time we call one, it's easy to write "polyglottal" functions that work both synchronously and asynchronously, making the "coloured functions" problem less of an issue.
This example shows how one might write a sleep function that blocks if used outside an event loop, but not if an event loop is available.
package main import ( "context" "fmt" "math" "time" "github.com/arvidfm/asyncigo" ) func main() { ctx := context.Background() _ = sleepPolyglot(ctx, time.Second*2) _ = asyncigo.NewEventLoop().Run(ctx, func(ctx context.Context) error { start := time.Now() numTasks := 100 tasks := asyncigo.Map(asyncigo.Range(numTasks), func(int) asyncigo.Futurer { return asyncigo.SpawnTask(ctx, func(ctx context.Context) (any, error) { _ = sleepPolyglot(ctx, time.Second*2) return nil, nil }) }).Collect() _ = asyncigo.Wait(ctx, asyncigo.WaitAll, tasks...) elapsed := time.Since(start) fmt.Printf("%d tasks took %d seconds to finish\n", len(tasks), int(math.Round(elapsed.Seconds()))) return nil }) } func sleepPolyglot(ctx context.Context, duration time.Duration) error { if _, ok := asyncigo.RunningLoopMaybe(ctx); ok { return asyncigo.Sleep(ctx, duration) } time.Sleep(duration) return nil }
Output: 100 tasks took 2 seconds to finish
Index ¶
- Variables
- func GetFirstResult[T any](ctx context.Context, coros ...Coroutine2[T]) (T, error)
- func Sleep(ctx context.Context, duration time.Duration) error
- func Wait(ctx context.Context, mode WaitMode, futs ...Futurer) error
- func Zip[T, U any, TI Iterable[T], UI Iterable[U]](it1 TI, it2 UI) iter.Seq2[T, U]
- func ZipLongest[T, U any, TI Iterable[T], UI Iterable[U]](it1 TI, it2 UI) iter.Seq2[T, U]
- type AsyncIterable
- type AsyncReadWriteCloser
- type AsyncStream
- func (a *AsyncStream) Chunks(ctx context.Context, chunkSize int) AsyncIterable[[]byte]
- func (a *AsyncStream) Close() error
- func (a *AsyncStream) Lines(ctx context.Context) AsyncIterable[[]byte]
- func (a *AsyncStream) ReadAll(ctx context.Context) ([]byte, error)
- func (a *AsyncStream) ReadChunk(ctx context.Context, chunkSize int) ([]byte, error)
- func (a *AsyncStream) ReadLine(ctx context.Context) ([]byte, error)
- func (a *AsyncStream) ReadUntil(ctx context.Context, character byte) ([]byte, error)
- func (a *AsyncStream) Stream(ctx context.Context, bufSize int) AsyncIterable[[]byte]
- func (a *AsyncStream) Write(ctx context.Context, data []byte) Awaitable[int]
- type Awaitable
- type Callback
- type Coroutine1
- type Coroutine2
- type EpollAsyncFile
- type EpollPoller
- func (e *EpollPoller) Close() error
- func (e *EpollPoller) Dial(ctx context.Context, network, address string) (conn AsyncReadWriteCloser, err error)
- func (e *EpollPoller) Open(fd uintptr) (file AsyncReadWriteCloser, err error)
- func (e *EpollPoller) Pipe() (r, w AsyncReadWriteCloser, err error)
- func (e *EpollPoller) Subscribe(target *EpollAsyncFile) error
- func (e *EpollPoller) Unsubscribe(target *EpollAsyncFile) error
- func (e *EpollPoller) Wait(timeout time.Duration) error
- func (e *EpollPoller) WakeupThreadsafe() error
- type EpollSocket
- type EventLoop
- func (e *EventLoop) Dial(ctx context.Context, network, address string) (*AsyncStream, error)
- func (e *EventLoop) DialLines(ctx context.Context, network, address string) AsyncIterable[[]byte]
- func (e *EventLoop) Pipe() (r, w *AsyncStream, err error)
- func (e *EventLoop) Run(ctx context.Context, main Coroutine1) error
- func (e *EventLoop) RunCallback(callback func())
- func (e *EventLoop) RunCallbackThreadsafe(ctx context.Context, callback func())
- func (e *EventLoop) ScheduleCallback(delay time.Duration, callback func()) *Callback
- func (e *EventLoop) WaitForCallbacks() *Future[any]
- func (e *EventLoop) Yield(ctx context.Context, fut Futurer) error
- type Fder
- type Future
- func (f *Future[ResType]) AddDoneCallback(callback func(error)) Futurer
- func (f *Future[ResType]) AddResultCallback(callback func(ResType, error)) Awaitable[ResType]
- func (f *Future[ResType]) Await(ctx context.Context) (ResType, error)
- func (f *Future[ResType]) Cancel(err error)
- func (f *Future[ResType]) Err() error
- func (f *Future[ResType]) Future() *Future[ResType]
- func (f *Future[ResType]) HasResult() bool
- func (f *Future[ResType]) MustAwait(ctx context.Context) ResType
- func (f *Future[ResType]) Result() (ResType, error)
- func (f *Future[ResType]) SetResult(result ResType, err error)
- func (f *Future[ResType]) Shield() *Future[ResType]
- func (f *Future[ResType]) WriteResultTo(dest *ResType) Awaitable[ResType]
- type Futurer
- type Iterable
- type Iterable2
- type Iterator
- func AsIterator[V any, VS []V](slice VS) Iterator[V]
- func Chain[T any, TS Iterable[T]](its ...TS) Iterator[T]
- func Count[T constraints.Integer](start T) Iterator[T]
- func Filter[T, TS Iterable[T]](it TS, f func(T) bool) Iterator[T]
- func FlatMap[T, U any, TS Iterable[T], US Iterable[U]](it TS, f func(T) US) Iterator[U]
- func Flatten[T any, TS Iterable[T], TSS Iterable[TS]](its TSS) Iterator[T]
- func Iter[V any, VI Iterable[V]](it VI) Iterator[V]
- func Map[T, U any, TS Iterable[T]](it TS, f func(T) U) Iterator[U]
- func Range[T constraints.Integer](count T) Iterator[T]
- func Uniq[V comparable, VS Iterable[V]](it VS) Iterator[V]
- type MapIterator
- type Mutex
- type Poller
- type Queue
- type Task
- func (t *Task[_]) AddDoneCallback(callback func(error)) Futurer
- func (t *Task[RetType]) AddResultCallback(callback func(result RetType, err error)) Awaitable[RetType]
- func (t *Task[RetType]) Await(ctx context.Context) (RetType, error)
- func (t *Task[_]) Cancel(err error)
- func (t *Task[_]) Err() error
- func (t *Task[RetType]) Future() *Future[RetType]
- func (t *Task[_]) HasResult() bool
- func (t *Task[RetType]) MustAwait(ctx context.Context) RetType
- func (t *Task[RetType]) Result() (RetType, error)
- func (t *Task[RetType]) Shield() *Future[RetType]
- func (t *Task[_]) Stop()
- func (t *Task[RetType]) WriteResultTo(dst *RetType) Awaitable[RetType]
- type WaitMode
Examples ¶
- Package
- Package (Polyglot)
- AsyncIter (Basic)
- AsyncIter (Dial)
- AsyncIterable.UntilErr
- AsyncIterable.UntilErr (AsyncStreams_A)
- AsyncIterable.UntilErr (AsyncStreams_B)
- Chain
- Enumerate
- FlatMap
- Flatten
- Future.Shield
- Iterator.Collect
- MapIterator.Collect
- Range
- SpawnTask
- Task.Cancel
- Uniq
- Wait
- Zip
- ZipLongest
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotImplemented is returned by a Poller implementation for functions it does not support. ErrNotImplemented = errors.New("this function is not supported by this implementation") )
var (
ErrNotReady = errors.New("future is still pending")
)
Functions ¶
func GetFirstResult ¶
func GetFirstResult[T any](ctx context.Context, coros ...Coroutine2[T]) (T, error)
GetFirstResult returns the result of the first successful coroutine. Once a coroutine succeeds, all unfinished tasks will be cancelled. If no coroutine succeeds, the last error is returned.
func Wait ¶
Wait will wait for any or all of the given Futures to complete depending on the WaitMode passed. If any of the futures fail, the most recent error will be returned. Wait will not cancel any futures.
Example ¶
This example creates one future and two tasks and waits for them all to finish. Combining Wait with asyncigo.Awaitable.WriteResultTo allows for very succinct code.
In this case, the future and one of the tasks succeed, while the second task fails with an error. We can see how the error from the failing task is propagated by Wait, while the results are written to the specified locations.
package main import ( "context" "errors" "fmt" "time" "github.com/arvidfm/asyncigo" ) func main() { _ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error { fut1 := asyncigo.NewFuture[string]() task1 := asyncigo.SpawnTask(ctx, func(ctx context.Context) (int, error) { _ = asyncigo.Sleep(ctx, time.Second) fut1.SetResult("test", nil) return 20, nil }) task2 := asyncigo.SpawnTask(ctx, func(ctx context.Context) (float64, error) { _ = asyncigo.Sleep(ctx, time.Second) return 25.5, errors.New("oops") }) var result1 string var result2 int var result3 float64 err := asyncigo.Wait( ctx, asyncigo.WaitAll, fut1.WriteResultTo(&result1), task1.WriteResultTo(&result2), task2.WriteResultTo(&result3), ) fmt.Println("results:", result1, result2, result3) fmt.Println("error:", err) return nil }) }
Output: results: test 20 25.5 error: oops
func Zip ¶
Zip returns an iterator which yields each pair of items from the given iterators in turn. If the iterators are of different length, Zip will stop once the end of the shortest iterator has been reached.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.Zip( asyncigo.AsIterator([]int{1, 2, 3, 4}), asyncigo.AsIterator([]string{"a", "b", "c", "d", "e"}), ) for a, b := range it { fmt.Printf("%d: %s\n", a, b) } }
Output: 1: a 2: b 3: c 4: d
func ZipLongest ¶
ZipLongest returns an iterator which yields each pair of items from the given iterators in turn. If the iterators are of different length, ZipLongest will continue until the end of the longest iterator, yielding the empty value in place of the shorter iterable.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.ZipLongest( asyncigo.AsIterator([]int{1, 2, 3, 4}), asyncigo.AsIterator([]string{"a", "b", "c", "d", "e"}), ) for a, b := range it { fmt.Printf("%d: %s\n", a, b) } }
Output: 1: a 2: b 3: c 4: d 0: e
Types ¶
type AsyncIterable ¶
AsyncIterable is a helper type for iterating over asynchronous streams that may error.
func AsyncIter ¶
func AsyncIter[T any](f func(yield func(T) error) error) AsyncIterable[T]
AsyncIter is a helper function for constructing an AsyncIterable.
Example (Basic) ¶
This is a basic example showing how results and errors are yielded when iterating over an AsyncIterable.
package main import ( "errors" "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.AsyncIter(func(yield func(int) error) error { for i := range 5 { if err := yield(i); err != nil { return err } } return errors.New("oops") }) for i, err := range it { fmt.Printf("%d - %v\n", i, err) } }
Output: 0 - <nil> 1 - <nil> 2 - <nil> 3 - <nil> 4 - <nil> 0 - oops
Example (Dial) ¶
This shows a more advanced usage of AsyncIter, combining yields and awaits. It reads lines of data from an asynchronous stream and yields the length of each line.
package main import ( "context" "errors" "fmt" "io" "net" "os" "path/filepath" "strings" "sync" "syscall" "time" "github.com/arvidfm/asyncigo" ) func main() { defer serveFile("stream_example2.txt", "6172")() if err := asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error { it := asyncigo.AsyncIter(func(yield func(int) error) error { stream, err := asyncigo.RunningLoop(ctx).Dial(ctx, "tcp", "localhost:6172") if err != nil { return err } for { line, err := stream.ReadLine(ctx) if errors.Is(err, io.EOF) { return nil } else if err != nil { return err } unicode := []rune(strings.TrimSpace(string(line))) _ = yield(len(unicode)) } }) for lineLength, err := range it { if err != nil { return err } fmt.Println(lineLength) } return nil }); err != nil { panic(err) } } func serveFile(path, port string) (close func()) { data, err := os.ReadFile(filepath.Join("tests", path)) if err != nil { panic(err) } address := net.JoinHostPort("localhost", port) var waiter sync.WaitGroup var l net.Listener waiter.Add(1) go func() { defer waiter.Done() var err error l, err = net.Listen("tcp", address) if err != nil { panic(err) } for { conn, err := l.Accept() if err != nil { return } if _, err := conn.Write(data); err != nil { panic(err) } _ = conn.Close() } }() for { conn, err := net.Dial("tcp", address) if errors.Is(err, syscall.ECONNREFUSED) { time.Sleep(time.Millisecond * 10) } else if err != nil { panic(err) } else { _ = conn.Close() break } } return func() { _ = l.Close() waiter.Wait() } }
Output: 6 12 18 30
func (AsyncIterable[T]) ForEach ¶
func (ai AsyncIterable[T]) ForEach(f func(T) error) error
ForEach calls the given function for each value yielded by this AsyncIterable until the iterator finishes or returns an error. ForEach returns an error if either the iterator or the callback function returns an error.
func (AsyncIterable[T]) UntilErr ¶
func (ai AsyncIterable[T]) UntilErr(err *error) Iterator[T]
UntilErr returns a single-values iterator that yields each non-error value yielded by this AsyncIterable until the iterator finishes or returns an error. If the AsyncIterable returns an error, the error will be written to the variable referenced by the given error pointer.
Example ¶
Basic usage of UntilErr.
package main import ( "errors" "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.AsyncIter(func(yield func(int) error) error { for i := range 5 { if err := yield(i); err != nil { return err } } return errors.New("oops") }) var err error for i := range it.UntilErr(&err) { fmt.Println(i) } fmt.Println(err) }
Output: 0 1 2 3 4 oops
Example (AsyncStreams_A) ¶
UntilErr is particularly useful for ranging over network streams.
package main import ( "context" "errors" "fmt" "net" "os" "path/filepath" "sync" "syscall" "time" "github.com/arvidfm/asyncigo" ) func main() { defer serveFile("stream_example1.txt", "6172")() if err := asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error { stream, _ := asyncigo.RunningLoop(ctx).Dial(ctx, "tcp", "localhost:6172") var err error for line := range stream.Lines(ctx).UntilErr(&err) { fmt.Printf("got line: %s", line) } return err }); err != nil { panic(err) } } func serveFile(path, port string) (close func()) { data, err := os.ReadFile(filepath.Join("tests", path)) if err != nil { panic(err) } address := net.JoinHostPort("localhost", port) var waiter sync.WaitGroup var l net.Listener waiter.Add(1) go func() { defer waiter.Done() var err error l, err = net.Listen("tcp", address) if err != nil { panic(err) } for { conn, err := l.Accept() if err != nil { return } if _, err := conn.Write(data); err != nil { panic(err) } _ = conn.Close() } }() for { conn, err := net.Dial("tcp", address) if errors.Is(err, syscall.ECONNREFUSED) { time.Sleep(time.Millisecond * 10) } else if err != nil { panic(err) } else { _ = conn.Close() break } } return func() { _ = l.Close() waiter.Wait() } }
Output: got line: Lorem ipsum dolor sit amet. got line: Donec non velit consequat. got line: Donec interdum in nulla ac scelerisque. got line: Duis commodo, neque ac luctus eleifend. got line: Fusce lacinia id quam ac porttitor.
Example (AsyncStreams_B) ¶
Since UntilErr returns a standard single-valued iterator, you can easily manipulate the iterator using functions like asyncigo.Map and asyncigo.Enumerate. This example shows how you could combine multiple asynchronous iterators using asyncigo.Chain.
package main import ( "context" "errors" "fmt" "net" "os" "path/filepath" "sync" "syscall" "time" "github.com/arvidfm/asyncigo" ) func main() { defer serveFile("stream_example1.txt", "6172")() defer serveFile("stream_example2.txt", "6173")() if err := asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error { loop := asyncigo.RunningLoop(ctx) var err error for line := range asyncigo.Chain( loop.DialLines(ctx, "tcp", "localhost:6172").UntilErr(&err), loop.DialLines(ctx, "tcp", "localhost:6173").UntilErr(&err), ) { fmt.Printf("got line: %s", line) } return err }); err != nil { panic(err) } } func serveFile(path, port string) (close func()) { data, err := os.ReadFile(filepath.Join("tests", path)) if err != nil { panic(err) } address := net.JoinHostPort("localhost", port) var waiter sync.WaitGroup var l net.Listener waiter.Add(1) go func() { defer waiter.Done() var err error l, err = net.Listen("tcp", address) if err != nil { panic(err) } for { conn, err := l.Accept() if err != nil { return } if _, err := conn.Write(data); err != nil { panic(err) } _ = conn.Close() } }() for { conn, err := net.Dial("tcp", address) if errors.Is(err, syscall.ECONNREFUSED) { time.Sleep(time.Millisecond * 10) } else if err != nil { panic(err) } else { _ = conn.Close() break } } return func() { _ = l.Close() waiter.Wait() } }
Output: got line: Lorem ipsum dolor sit amet. got line: Donec non velit consequat. got line: Donec interdum in nulla ac scelerisque. got line: Duis commodo, neque ac luctus eleifend. got line: Fusce lacinia id quam ac porttitor. got line: 生麦生米生卵 got line: すもももももももものうち got line: 東京特許許可局長今日急遽休暇許可却下 got line: 斜め77度の並びで泣く泣く嘶くナナハン7台難なく並べて長眺め
func (AsyncIterable[T]) YieldTo ¶
func (ai AsyncIterable[T]) YieldTo(yield func(T) error) error
YieldTo will yield all the values from this AsyncIterable using the provided yield function for easier chaining of iterables.
type AsyncReadWriteCloser ¶
type AsyncReadWriteCloser interface { io.ReadWriteCloser // WaitForReady suspends the calling coroutine until an I/O event occurs for this file handle. WaitForReady(ctx context.Context) error }
AsyncReadWriteCloser represents a non-blocking file handle.
If an I/O operation is attempted when the underlying stream is not ready, e.g. because data is not yet available, a syscall.EAGAIN error will be returned.
type AsyncStream ¶
type AsyncStream struct {
// contains filtered or unexported fields
}
AsyncStream is a byte stream that can be read from and written to asynchronously.
func NewAsyncStream ¶
func NewAsyncStream(file AsyncReadWriteCloser) *AsyncStream
NewAsyncStream constructs a new AsyncStream.
func (*AsyncStream) Chunks ¶
func (a *AsyncStream) Chunks(ctx context.Context, chunkSize int) AsyncIterable[[]byte]
Chunks returns an AsyncIterable that iterates over the stream in fixed-size chunks of data.
func (*AsyncStream) Lines ¶
func (a *AsyncStream) Lines(ctx context.Context) AsyncIterable[[]byte]
Lines returns an AsyncIterable that iterates over all lines in the stream. The newline character will be included with each line.
func (*AsyncStream) ReadAll ¶
func (a *AsyncStream) ReadAll(ctx context.Context) ([]byte, error)
ReadAll reads until the end of the stream and returns all read data.
func (*AsyncStream) ReadLine ¶
func (a *AsyncStream) ReadLine(ctx context.Context) ([]byte, error)
ReadLine returns all data until a newline is encountered, including the newline.
func (*AsyncStream) ReadUntil ¶
ReadUntil returns all data until the given character is encountered, including the character.
func (*AsyncStream) Stream ¶
func (a *AsyncStream) Stream(ctx context.Context, bufSize int) AsyncIterable[[]byte]
Stream returns an AsyncIterable that yields the next chunk of data as soon as it is available. The chunks will be no larger than the given buffer size.
type Awaitable ¶
type Awaitable[T any] interface { Futurer // Await suspends the current task until this [Awaitable] // has completed and returns its result once completed. // // If the calling [Task] or the given [context.Context] // is cancelled before Await completes, an error will be returned // and the Awaitable will be cancelled as well. // See the [Awaitable.Shield] method if you do not want the Awaitable to be cancelled. Await(ctx context.Context) (T, error) // MustAwait is the same as [Awaitable.Await], but it panics if Await // returns an error. MustAwait(ctx context.Context) T // Shield returns a new [Future] which completes once this Awaitable completes, // but which will not cancel this Awaitable if cancelled. // Allows for awaiting an Awaitable from a [Task] without cancelling // the Awaitable if the Task is cancelled. Shield() *Future[T] // AddResultCallback registers a type-aware callback to run once this Awaitable // completes or is cancelled. If called when the Awaitable has already completed, // the callback will be run immediately. AddResultCallback(callback func(result T, err error)) Awaitable[T] // WriteResultTo registers a pointer to write the result // of this Awaitable to if it completes with no error. // // This method allows for particularly ergonomic use // of functions like [Wait]. WriteResultTo(dst *T) Awaitable[T] // Future returns the underlying [Future] holding the result // of this Awaitable. Returns itself if the Awaitable is a Future. Future() *Future[T] // Result returns the result of this Awaitable. // If this Awaitable has not yet completed, [ErrNotReady] will be returned. Result() (T, error) }
Awaitable is a type that holds the result of an operation that may complete at a later point in time, and which can be awaited to suspend the current coroutine until the operation has completed.
This interface enables polymorphism for Future and Task objects.
type Callback ¶
type Callback struct {
// contains filtered or unexported fields
}
Callback is a handle to a callback scheduled to be run by an EventLoop.
func NewCallback ¶
NewCallback creates a new handle to a callback specified to run after the given amount of time.
Calling this function will not actually schedule the callback to be run. Use EventLoop.ScheduleCallback instead.
type Coroutine1 ¶
Coroutine1 is a coroutine that can return an error.
type Coroutine2 ¶
Coroutine2 is a coroutine that can return a result or an error.
type EpollAsyncFile ¶
type EpollAsyncFile struct {
// contains filtered or unexported fields
}
EpollAsyncFile is an implementation of AsyncReadWriteCloser for EpollPoller.
func NewEpollAsyncFile ¶
func NewEpollAsyncFile(poller *EpollPoller, f Fder) *EpollAsyncFile
NewEpollAsyncFile wraps the given file handle using an EpollAsyncFile.
func (*EpollAsyncFile) Read ¶
func (eaf *EpollAsyncFile) Read(p []byte) (n int, err error)
Read implements io.Reader.
func (*EpollAsyncFile) WaitForReady ¶
func (eaf *EpollAsyncFile) WaitForReady(ctx context.Context) error
WaitForReady implements AsyncReadWriteCloser.
type EpollPoller ¶
type EpollPoller struct {
// contains filtered or unexported fields
}
EpollPoller is an epoll-backed Poller implementation.
func (*EpollPoller) Dial ¶
func (e *EpollPoller) Dial(ctx context.Context, network, address string) (conn AsyncReadWriteCloser, err error)
Dial implements Poller.
func (*EpollPoller) Open ¶
func (e *EpollPoller) Open(fd uintptr) (file AsyncReadWriteCloser, err error)
Open wraps the given file descriptor and subscribes to its events.
func (*EpollPoller) Pipe ¶
func (e *EpollPoller) Pipe() (r, w AsyncReadWriteCloser, err error)
Pipe implements Poller.
func (*EpollPoller) Subscribe ¶
func (e *EpollPoller) Subscribe(target *EpollAsyncFile) error
Subscribe instructs the poller to start listening for events for the given file handle.
func (*EpollPoller) Unsubscribe ¶
func (e *EpollPoller) Unsubscribe(target *EpollAsyncFile) error
Unsubscribe instructs the poller to stop listening for events for the given file handle.
func (*EpollPoller) Wait ¶
func (e *EpollPoller) Wait(timeout time.Duration) error
Wait implements Poller.
func (*EpollPoller) WakeupThreadsafe ¶
func (e *EpollPoller) WakeupThreadsafe() error
WakeupThreadsafe implements Poller.
type EpollSocket ¶
type EpollSocket struct {
// contains filtered or unexported fields
}
EpollSocket is a wrapper for a low-level socket file descriptor.
func NewSocket ¶
func NewSocket(fd int) *EpollSocket
NewSocket wraps the given file descriptor using an EpollSocket.
type EventLoop ¶
type EventLoop struct {
// contains filtered or unexported fields
}
EventLoop implements the core mechanism for processing callbacks and I/O events.
func RunningLoop ¶
RunningLoop returns the EventLoop running in the current context. If no EventLoop is running, this function will panic. This function should not generally be called from a manually launched goroutine.
func RunningLoopMaybe ¶
RunningLoopMaybe returns the EventLoop running in the current context, or nil with ok == false if no loop is running.
func (*EventLoop) DialLines ¶
DialLines is a convenience method that calls EventLoop.Dial followed by AsyncStream.Lines. The connection attempt will be deferred until the AsyncIterable is ranged over. If the connection fails, the connection error will be returned immediately on the first iteration.
func (*EventLoop) Pipe ¶
func (e *EventLoop) Pipe() (r, w *AsyncStream, err error)
Pipe creates two streams, where writing to w will make the written data available from r.
func (*EventLoop) Run ¶
func (e *EventLoop) Run(ctx context.Context, main Coroutine1) error
Run starts the event loop with the given coroutine as the main task. The loop will exit once the main task has exited and there are no pending callbacks.
func (*EventLoop) RunCallback ¶
func (e *EventLoop) RunCallback(callback func())
RunCallback schedules a callback for immediate execution by the event loop. Not threadsafe; use EventLoop.RunCallbackThreadsafe to schedule callbacks from other threads.
func (*EventLoop) RunCallbackThreadsafe ¶
RunCallbackThreadsafe schedules a callback for immediate execution on the event loop's thread.
func (*EventLoop) ScheduleCallback ¶
ScheduleCallback schedules a callback to be executed after the given duration.
func (*EventLoop) WaitForCallbacks ¶
WaitForCallbacks returns a Future that will complete once there are no pending callback functions.
type Fder ¶
type Fder interface { io.ReadWriteCloser // Fd returns the file descriptor of this handle. Fd() uintptr }
Fder represents a file handle that has an associated file descriptor.
type Future ¶
type Future[ResType any] struct { // contains filtered or unexported fields }
Future is a value container representing the result of a pending operation. It will run any callbacks registered using [Futurer.AddDoneCallback] or [Awaitable.AddResultCallback] once populated with a result using either Future.SetResult or [Futurer.Cancel].
func Go ¶
Go launches the given function in a goroutine and returns a Future that will complete when the goroutine finishes.
func NewFuture ¶
NewFuture returns a new Future instance ready to be awaited or populated with a result.
func (*Future[ResType]) AddDoneCallback ¶
AddDoneCallback implements Futurer.
func (*Future[ResType]) AddResultCallback ¶
AddResultCallback implements Awaitable.
func (*Future[ResType]) SetResult ¶
SetResult populates this Future with a result. This will mark the Future as completed, and the provided result will be propagated to any registered callbacks and returned from any future calls to [Awaitable.Result].
func (*Future[ResType]) Shield ¶
Shield implements Awaitable.
Example ¶
package main import ( "context" "fmt" "github.com/arvidfm/asyncigo" ) func main() { _ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error { shielded := asyncigo.NewFuture[any]() task1 := asyncigo.SpawnTask(ctx, func(ctx context.Context) (any, error) { fmt.Println("waiting for shielded...") return shielded.Shield().Await(ctx) }) unshielded := asyncigo.NewFuture[any]() task2 := asyncigo.SpawnTask(ctx, func(ctx context.Context) (any, error) { fmt.Println("waiting for unshielded...") return unshielded.Await(ctx) }) // yield to the event loop for one tick to initialise the tasks _ = asyncigo.RunningLoop(ctx).Yield(ctx, nil) task1.Cancel(nil) task2.Cancel(nil) fmt.Println("task1:", task1.Err()) fmt.Println("task2:", task2.Err()) fmt.Println("shielded:", shielded.Err()) fmt.Println("unshielded:", unshielded.Err()) return nil }) }
Output: waiting for shielded... waiting for unshielded... task1: context canceled task2: context canceled shielded: <nil> unshielded: context canceled
func (*Future[ResType]) WriteResultTo ¶
WriteResultTo implements Awaitable.
type Futurer ¶
type Futurer interface { // HasResult reports whether this Futurer has completed. // This is true if the Futurer has a result, or if it has been cancelled. HasResult() bool // Err returns a non-nil error if it has been cancelled // or completed with an error. Err() error // AddDoneCallback registers a type-unaware callback to run once this Futurer // completes or is cancelled. If called when the Futurer has already completed, // the callback will be run immediately. AddDoneCallback(callback func(error)) Futurer // Cancel cancels this Futurer. If err is nil, the Futurer // will be canceled with [context.Canceled]. If the Futurer // has already completed, this has no effect. Cancel(err error) }
Futurer is an untyped view of an Awaitable, useful for storing heterogeneous Awaitable instances in a container.
type Iterable ¶
Iterable represents any function that can be ranged over and which yields one value.
type Iterable2 ¶
Iterable2 represents any function that can be ranged over and which yields two values.
type Iterator ¶
Iterator is a function that can be ranged over, yielding one value each iteration.
func AsIterator ¶
AsIterator returns an Iterator which yields the values in the slice in turn.
func Chain ¶
Chain yields each value from each of the given iterators as a single flat stream.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.Chain( asyncigo.AsIterator([]int{1, 2, 3, 4}), asyncigo.AsIterator([]int{5, 6, 7}), asyncigo.AsIterator([]int{8, 9, 10, 11, 12}), ) fmt.Println(it.Collect()) }
Output: [1 2 3 4 5 6 7 8 9 10 11 12]
func Count ¶
func Count[T constraints.Integer](start T) Iterator[T]
Count returns an infinite iterator which yields every integer starting at start.
func Filter ¶
Filter returns an iterator which yields the values from the given iterator in turn, skipping any values for which the given filtering function return false.
func FlatMap ¶
FlatMap returns an iterator which yields the values from the iterators returned by the mapping function as a single flat stream of values.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.FlatMap( asyncigo.Range(5), func(v int) asyncigo.Iterator[int] { return asyncigo.Range(v + 1) }, ) fmt.Println(it.Collect()) }
Output: [0 0 1 0 1 2 0 1 2 3 0 1 2 3 4]
func Flatten ¶
Flatten yields each value from each of the nested iterators yielded by the given iterator as a single flat stream.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.Flatten(func(yield func(iterator asyncigo.Iterator[int]) bool) { _ = yield(asyncigo.AsIterator([]int{1, 2, 3, 4})) && yield(asyncigo.AsIterator([]int{5, 6, 7})) && yield(asyncigo.AsIterator([]int{8, 9, 10, 11, 12})) }) fmt.Println(it.Collect()) }
Output: [1 2 3 4 5 6 7 8 9 10 11 12]
func Map ¶
Map returns an iterator which yields the result of passing each value from the given iterator through the provided mapping function in turn.
func Range ¶
func Range[T constraints.Integer](count T) Iterator[T]
Range returns an iterator which yields every integer from 0 up to, but not including, count.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.Range(5) for i := range it { fmt.Println(i) } }
Output: 0 1 2 3 4
func Uniq ¶
func Uniq[V comparable, VS Iterable[V]](it VS) Iterator[V]
Uniq returns an iterator which yields the values from the given iterator in turn, skipping any already encountered values.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.Uniq( asyncigo.AsIterator([]int{1, 3, 5, 5, 8, 5, 2, 3, 9, 7, 3, 3, 4, 1}), ) fmt.Println(it.Collect()) }
Output: [1 3 5 8 2 9 7 4]
func (Iterator[V]) Collect ¶
func (i Iterator[V]) Collect() []V
Collect consumes the Iterator and returns the yielded values as a slice.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.Iter(func(yield func(int) bool) { a, b := 0, 1 for a < 20 { if !yield(a) { return } a, b = a+b, a } }) fmt.Println(it.Collect()) }
Output: [0 1 1 2 3 5 8 13]
type MapIterator ¶
type MapIterator[K comparable, V any] iter.Seq2[K, V]
MapIterator is a function that can be ranged over, yielding two values each iteration.
func AsIterator2 ¶
func AsIterator2[K comparable, V any](m map[K]V) MapIterator[K, V]
AsIterator2 returns a MapIterator which yields each key-value pair in the map.
func Enumerate ¶
func Enumerate[T constraints.Integer, U any, UI Iterable[U]](start T, it UI) MapIterator[T, U]
Enumerate returns an iterator which yields each value from the given iterator along with its index.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.Enumerate(5, asyncigo.AsIterator([]string{"a", "b", "c", "d"})) for i, v := range it { fmt.Printf("%d: %s\n", i, v) } }
Output: 5: a 6: b 7: c 8: d
func MapIter ¶
func MapIter[K comparable, V any, I Iterable2[K, V]](it I) MapIterator[K, V]
func (MapIterator[K, V]) Collect ¶
func (mi MapIterator[K, V]) Collect() map[K]V
Collect consumes the MapIterator and returns the yielded values as a map.
Example ¶
package main import ( "fmt" "github.com/arvidfm/asyncigo" ) func main() { it := asyncigo.MapIter(func(yield func(int, string) bool) { _ = yield(5, "a") && yield(10, "b") && yield(15, "c") }) m := it.Collect() for k := range m { fmt.Printf("%d: %s\n", k, m[k]) } }
Output: 5: a 10: b 15: c
type Mutex ¶
type Mutex struct {
// contains filtered or unexported fields
}
Mutex provides a simple asynchronous locking mechanism for coroutines. Mutex is not threadsafe.
type Poller ¶
type Poller interface { // Close closes this Poller. Close() error // Wait waits for one or more I/O events. If an I/O event occurs, Wait // should wake up any coroutines waiting on [AsyncReadWriteCloser.WaitForReady] // for the corresponding file handle. Wait(timeout time.Duration) error // WakeupThreadsafe instructs the Poller to stop waiting and return control to the event loop. WakeupThreadsafe() error // Pipe constructs a pair of asynchronous file handles where writing to w // causes the same data to be read from r. Pipe() (r, w AsyncReadWriteCloser, err error) // Dial opens a non-blocking network connection. Dial(ctx context.Context, network, address string) (AsyncReadWriteCloser, error) }
Poller represents a type that can wait for multiple I/O events simultaneously.
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue provides a basic asynchronous queue. Queue is not threadsafe.
type Task ¶
type Task[RetType any] struct { // contains filtered or unexported fields }
Task is responsible for driving a coroutine, intercepting any Awaitable instances awaited from the coroutine and advancing the coroutine once the pending Awaitable completes.
func SpawnTask ¶
func SpawnTask[RetType any](ctx context.Context, coro Coroutine2[RetType]) *Task[RetType]
SpawnTask starts the given coroutine as a background task.
Example ¶
package main import ( "context" "fmt" "time" "github.com/arvidfm/asyncigo" ) func main() { _ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error { var counter int var tasks []asyncigo.Futurer for range 100000 { tasks = append(tasks, asyncigo.SpawnTask(ctx, func(ctx context.Context) (any, error) { for range 10 { counter++ _ = asyncigo.Sleep(ctx, time.Millisecond*500) } return nil, nil })) } _ = asyncigo.Wait(ctx, asyncigo.WaitAll, tasks...) fmt.Println(counter) return nil }) }
Output: 1000000
func (*Task[_]) AddDoneCallback ¶
AddDoneCallback implements Futurer.
func (*Task[RetType]) AddResultCallback ¶
func (t *Task[RetType]) AddResultCallback(callback func(result RetType, err error)) Awaitable[RetType]
AddResultCallback implements Awaitable.
func (*Task[_]) Cancel ¶
Cancel implements Futurer.
Example ¶
When a task has been cancelled, it will continue running, but any calls to [Awaitable.Await] will immediately return context.Canceled. It's the responsibility of the task to stop early when cancelled.
package main import ( "context" "fmt" "github.com/arvidfm/asyncigo" ) func main() { _ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error { futs := make([]asyncigo.Future[int], 10) task := asyncigo.SpawnTask(ctx, func(ctx context.Context) (int, error) { for i := range futs { result, err := futs[i].Await(ctx) fmt.Printf("%d: (%v, %v)\n", i, result, err) } return 0, nil }) loop := asyncigo.RunningLoop(ctx) for i := range futs { if i == 5 { task.Cancel(nil) } _ = loop.Yield(ctx, nil) futs[i].SetResult(i, nil) } result, err := task.Await(ctx) fmt.Printf("task result: (%v, %v)", result, err) return nil }) }
Output: 0: (0, <nil>) 1: (1, <nil>) 2: (2, <nil>) 3: (3, <nil>) 4: (4, <nil>) 5: (0, context canceled) 6: (0, context canceled) 7: (0, context canceled) 8: (0, context canceled) 9: (0, context canceled) task result: (0, context canceled)
func (*Task[_]) Stop ¶
func (t *Task[_]) Stop()
Stop aborts the coroutine, preventing any further awaits. You should generally use [Futurer.Cancel] instead.
func (*Task[RetType]) WriteResultTo ¶
WriteResultTo implements Awaitable.