concurrency

package
v0.0.0-...-654d6e8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2024 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel[T any] struct {
}

Channel is a logic object which can generate or manipulate go channel all methods of Channel are in the book tilted《Concurrency in Go》

func NewChannel

func NewChannel[T any]() *Channel[T]

NewChannel return a Channel instance

func (*Channel[T]) Bridge

func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T

Bridge link multiply channels into one channel. Play: https://go.dev/play/p/qmWSy1NVF-Y

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
genVals := func() <-chan <-chan int {
	out := make(chan (<-chan int))
	go func() {
		defer close(out)
		for i := 1; i <= 5; i++ {
			stream := make(chan int, 1)
			stream <- i
			close(stream)
			out <- stream
		}
	}()
	return out
}

for v := range c.Bridge(ctx, genVals()) {
	fmt.Println(v)
}
Output:

1
2
3
4
5

func (*Channel[T]) FanIn

func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T

FanIn merge multiple channels into one channel. Play: https://go.dev/play/p/2VYFMexEvTm

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
channels := make([]<-chan int, 2)

for i := 0; i < 2; i++ {
	channels[i] = c.Take(ctx, c.Repeat(ctx, i), 2)
}

chs := c.FanIn(ctx, channels...)

for v := range chs {
	fmt.Println(v) //1 1 0 0 or 0 0 1 1
}
Output:

func (*Channel[T]) Generate

func (c *Channel[T]) Generate(ctx context.Context, values ...T) <-chan T

Generate creates channel, then put values into the channel. Play: https://go.dev/play/p/7aB4KyMMp9A

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
intStream := c.Generate(ctx, 1, 2, 3)

fmt.Println(<-intStream)
fmt.Println(<-intStream)
fmt.Println(<-intStream)
Output:

1
2
3

func (*Channel[T]) Or

func (c *Channel[T]) Or(channels ...<-chan T) <-chan T

Or read one or more channels into one channel, will close when any readin channel is closed. Play: https://go.dev/play/p/Wqz9rwioPww

Example
sig := func(after time.Duration) <-chan any {
	c := make(chan any)
	go func() {
		defer close(c)
		time.Sleep(after)
	}()
	return c
}

start := time.Now()

c := NewChannel[any]()
<-c.Or(
	sig(1*time.Second),
	sig(2*time.Second),
	sig(3*time.Second),
)

if time.Since(start).Seconds() < 2 {
	fmt.Println("ok")
}
Output:

ok

func (*Channel[T]) OrDone

func (c *Channel[T]) OrDone(ctx context.Context, channel <-chan T) <-chan T

OrDone read a channel into another channel, will close until cancel context. Play: https://go.dev/play/p/lm_GoS6aDjo

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
intStream := c.Take(ctx, c.Repeat(ctx, 1), 3)

for v := range c.OrDone(ctx, intStream) {
	fmt.Println(v)
}
Output:

1
1
1

func (*Channel[T]) Repeat

func (c *Channel[T]) Repeat(ctx context.Context, values ...T) <-chan T

Repeat create channel, put values into the channel repeatly until cancel the context. Play: https://go.dev/play/p/k5N_ALVmYjE

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
intStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 4)

for v := range intStream {
	fmt.Println(v)
}
Output:

1
2
1
2

func (*Channel[T]) RepeatFn

func (c *Channel[T]) RepeatFn(ctx context.Context, fn func() T) <-chan T

RepeatFn create a channel, excutes fn repeatly, and put the result into the channel until close context. Play: https://go.dev/play/p/4J1zAWttP85

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fn := func() string {
	return "hello"
}

c := NewChannel[string]()
intStream := c.Take(ctx, c.RepeatFn(ctx, fn), 3)

for v := range intStream {
	fmt.Println(v)
}
Output:

hello
hello
hello

func (*Channel[T]) Take

func (c *Channel[T]) Take(ctx context.Context, valueStream <-chan T, number int) <-chan T

Take create a channel whose values are taken from another channel with limit number. Play: https://go.dev/play/p/9Utt-1pDr2J

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

numbers := make(chan int, 5)
numbers <- 1
numbers <- 2
numbers <- 3
numbers <- 4
numbers <- 5
defer close(numbers)

c := NewChannel[int]()
intStream := c.Take(ctx, numbers, 3)

for v := range intStream {
	fmt.Println(v)
}
Output:

1
2
3

func (*Channel[T]) Tee

func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T)

Tee split one chanel into two channels, until cancel the context. Play: https://go.dev/play/p/3TQPKnCirrP

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
intStream := c.Take(ctx, c.Repeat(ctx, 1), 2)

ch1, ch2 := c.Tee(ctx, intStream)

for v := range ch1 {
	fmt.Println(v)
	fmt.Println(<-ch2)
}
Output:

1
1
1
1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL