cstream

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: MIT Imports: 3 Imported by: 1

README

concurrent-stream: High Throughput Generics Stream/Pipeline/Channel Processing in Go.

Go Reference

go get github.com/planxnx/concurrent-stream

Examples

Basic
results := make(chan int)
stream := cstream.NewStream(ctx, 8, results)

go func() {
	for i := 0; i < 10; i++ {
		i := i
		stream.Go(func() int {
			return expensiveFunc(i)
		})
	}

	// Should be called to close the stream
	// after all tasks are submitted.
	stream.Close()
}()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
	defer wg.Done()
	for result := range results {
		fmt.Println(result)
	}
}()

// Wait for all tasks to finish.
if err := stream.Wait(); err != nil {
	panic(err)
}
close(results)

wg.Wait()
Concurrency Mapping
data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
mapper := NewParallelMap(ctx, 8, data, func(item int, _ int) {
	return expensiveFunc(item)
})

.
.
.

result, err := mapper.Result()
if err != nil {
	panic(err)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Map added in v0.1.2

type Map[I, O any] struct {
	// contains filtered or unexported fields
}

Map is concurrent map function with a concurrency limit.

func NewMap added in v0.1.2

func NewMap[I, O any](ctx context.Context, goroutine int, input []I, iteratee func(item I, index int) O) *Map[I, O]

NewMap creates a new concurrent map with the given context, concurrency limit, input slice, and callback function.

func (*Map[I, O]) Close added in v0.1.2

func (m *Map[I, O]) Close()

Close stops the concurrent map. Will block until concurrent map is closed.

func (*Map[I, O]) IsDone added in v0.1.2

func (m *Map[I, O]) IsDone() bool

IsDone returns true if the concurrent map is done or closed.

func (*Map[I, O]) IsRunning added in v0.1.2

func (m *Map[I, O]) IsRunning() bool

IsRunning returns true if the stream is running.

func (*Map[I, O]) Result added in v0.1.2

func (m *Map[I, O]) Result(ctx context.Context) ([]O, error)

Result returns the results of the parallel map. It will block until all the tasks are completed.

func (*Map[I, O]) Wait added in v0.1.2

func (m *Map[I, O]) Wait() error

Wait blocks until map is done or the context is canceled.

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream is a simple stream of tasks with a concurrency limit.

func NewStream

func NewStream[T any](ctx context.Context, c int, out chan T) *Stream[T]

NewStream creates a new StreamChan with the given context, concurrency limit, and output channel.

func (*Stream[T]) Close

func (s *Stream[T]) Close()

Close stops the stream. Should be called after all tasks are submitted or want to stop the stream. Will block until concurrent map is closed.

func (*Stream[T]) Go

func (s *Stream[T]) Go(task func() T)

Go sends a task to the stream's pool. All tasks are executed concurrently. If worker pool is full, it will block until a worker is available.

func (*Stream[T]) IsDone added in v0.1.2

func (s *Stream[T]) IsDone() bool

IsDone returns true if the stream is done or finished executing.

func (*Stream[T]) IsRunning added in v0.1.2

func (s *Stream[T]) IsRunning() bool

IsRunning returns true if the stream is running.

func (*Stream[T]) Out

func (s *Stream[T]) Out() <-chan T

Out returns the output channel.

func (*Stream[T]) Wait

func (s *Stream[T]) Wait() error

Wait blocks until stream is done or the context is canceled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL