README ¶
Go: concurrency
If you look at the programming languages of today, you probably get this idea that the world is objected-oriented. But it’s not. It’s actually parallel. Multi-core machines, users, networking, etc. All these things are happening simultaneously and yet computing tools that we have are not good at expressing this kind of world-views. […] Go is a concurrent language.
Concurrency is not parallelism by Rob Pike
- Reference
- Concurrency is not parallelism
- goroutine ≠ thread
defer
,recover
- Be careful with
defer
and deadlock - channel to communicate
- questions
- select for channel:
select
≠switch
- receive
nil
from channel sync.Mutex
, race condition- Share memory by communicating
- memory leak
sync/atomic
- web server
sync.Mutex
is just a valuesync.Once
goroutine
, closure- rate limit
select
,continue
,break
- Counting problem
- Count: simulate web requests
- Count:
NaiveCounter
- Count:
MutexCounter
- Count:RWMutexCounter
- Count:AtomicIntCounter
- Count:AtomicCounter
- Count:
ChannelCounter
(No Buffer) - Count:
ChannelCounter
(Buffer) - Count: benchmark results
- Dequeue problem
- Find duplicates with concurrency
- Concurrency: Merge Sort
- Concurrency: Prime Sieve
- close channel
- blocking defer
- buffered channel copy
select
closed channelselect
defaultsync.Cond
- blocking
sync.Mutex
- empty buffered channel
- close two channels
- atomic, defer
- channel capacity
- select continue
- select nil chan
- select multiple
Reference
- Thread
- Process
- Asynchronous I/O
- Context switch
- Green threads
- Concurrency is Not Parallelism by Rob Pike (Slide)
- Go Concurrency Patterns by Rob Pike (Slide)
- Effective Go — Concurrency
- Advanced Go Concurrency Patterns by Sameer Ajmani (Slide)
- Go Concurrency Patterns : Pipelines and cancellation by Sameer Ajmani
- Go Memory Model
- Five things that make Go fast by Dave Cheney
- Why is a goroutine’s stack infinite by Dave Cheney
- High performance servers without the event loop by Dave Cheney
- Goroutines vs OS Threads
- Bjorn Rabenstein - Prometheus: Designing and Implementing a Modern Monitoring Solution in Go
- beorn7/concurrentcount
Concurrency is not parallelism
My YouTube video on Go concurrency:
You write any concurrent code, but you run with a single processor, then your program is not parallel because it is not executing anything in parallel. But Go code can still be concurrent with a single processor: when there are multiple processors available, the code runs in parallel automatically.
Go is a concurrent language. Concurrency and parallelism are not the same thing. Concurrency is the composition of independently executing processes(computations). Parallelism is the simultaneous execution of (possibly related) computations. Concurrency is about dealing with a lot of things at once. Parallelism is about doing a lot of things at once. Concurrency is about programming structure. Parallelism is about execution. Concurrency provides a way to structure a solution to solve a problem that may (but not necessariliy) be parallelizable.
Independently Executing Procedure + Coordination = Concurrency
Go’s concurrency is coordination, communication of independently executing
procedures. Go concurrency model is like communication of UNIX pipelines: ls -l | grep key | less
. Go concurrency is more a type-safe
generalization of Unix
pipes. goroutine is like ampersand &
in a shell command, which runs
things in the background but does not wait for it to end, as
here and
here:
package main
import "fmt"
func main() {
// launch goroutine in background
go func() {
fmt.Println("Hello, playground")
}()
//
// Does not print anything
//
// when main returns
// the program exits
// and the goroutine will not be run
// and gets garbage-collected
}
package main
import (
"fmt"
"time"
)
func b() {
fmt.Println("b is still running")
fmt.Println("because although a exited but main hasn't exited yet!")
}
func a() {
fmt.Println("a exits")
go b()
}
func main() {
a()
time.Sleep(time.Second)
// a exits
// b is still running
// because although a exited but main hasn't exited yet!
go func() {
fmt.Println("Hello, playground")
}()
time.Sleep(time.Second)
// Hello, playground
}
goroutine ≠ thread
Thread is a lightweight process since it executes within the context of one process. Both threads and processes are independent units of execution. Threads under the same process run in one shared memory space, while process run in separate memory spaces.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
A thread is the entity within a process that can be scheduled for execution. All threads of a process share its virtual address space and system resources. In addition, each thread maintains exception handlers, a scheduling priority, thread local storage, a unique thread identifier, and a set of structures the system will use to save the thread context until it is scheduled. The thread context includes the thread’s set of machine registers, the kernel stack, a thread environment block, and a user stack in the address space of the thread’s process. Threads can also have their own security context, which can be used for impersonating clients.
When you say 8-core machine, the core
represents the actual physical
processors. 8-core machine has 8 independent processing units (cores or
CPUs). Not to be confused with processor, a process
is a computer program
instance that is being executed. A process
can be made up of multiple
threads
executing instructions concurrently. Again, core
is an actual
physical processor
, and process
and thread
are independent units of
program execution: threads
under the same process
run in a shared memory
space, whereas processes
run in separate memory spaces. threads
are more
dependent on an operating system, than a hardware or CPU. Normally one CPU can
handle one thread
at a time, but one CPU with
hyper threading
can handle two threads
simultaneously.
[Threads] are conceptually the same as processes, but share the same memory space.
As threads share address space, they are lighter than processes so are faster to create and faster to switch between.
Threads still have an expensive context switch cost, a lot of state must be retained.
Goroutines take the idea of threads a step further.
Many goroutines are multiplexed onto a single operating system thread.
- Super cheap to create.
- Super cheap to switch between as it all happens in user space.
- Tens of thousands of goroutines in a single process are the norm, hundreds of thousands not unexpected.
This results in relatively few operating system threads per Go process, with the Go runtime taking care of assigning a runnable Goroutine to a free operating system thread.
High performance servers without the event loop by Dave Cheney
goroutine is an independently executing function, launched with go statement. goroutine is NOT a thread. Think of goroutine as a very cheap, lightweight thread. A program may have thousands of goroutines but with only one thread.
In telecommunications and computer networks, multiplexing (sometimes contracted to muxing) is a method by which multiple analog message signals or digital data streams are combined into one signal over a shared medium.
Multiplexing by Wikipedia
Go runtime multiplexes goroutines into multiple OS threads: when one goroutine blocks such as waiting for I/O, the thread blocks too but no other goroutine blocks. When a goroutine blocks on a thread, Go runtime moves other goroutines to a different, available thread, so they won't be blocked.
As of Go 1.4, the garbage collector has become precise enough that goroutine stack now takes only 2048 bytes of memory. goroutine has its own call stack that grows and shrinks as required. It starts small and allocates, frees heap storage automatically. Go allows you to write high-performance program without much expert knowledge or dealing with OS threads.
Each goroutine starts with a small stack, allocated from the heap. The size has fluctuated over time, but in Go 1.5 each goroutine starts with a 2k allocation.
Instead of using guard pages, the Go compiler inserts a check as part of every function call to test if there is sufficient stack for the function to run.
If there is insufficient space, the runtime will allocate a large stack segment on the heap, copy the contents of the current stack to the new segment, free the old segment, and the function call restarted.
Goroutine stack growth by Dave Cheney
Note that when the main function returns, the program exists. goroutines that were running in background get garbage-collected, like here:
package main
import (
"fmt"
"time"
)
func a() {
fmt.Println("a() called")
go func() {
time.Sleep(10 * time.Second)
fmt.Println("go func() called")
// this is not called
//
// you can get this printed with channel
}()
go b()
}
func b() {
time.Sleep(1 * time.Second)
fmt.Println("b() called")
}
func main() {
a()
time.Sleep(5 * time.Second)
// when main returns all others return as well
}
/*
a() called
b() called
*/
defer
, recover
defer
delays the function execution until just before the enclosing
function exit(return). The order of execution is:
defer
: Stack(Last-In-First-Out)goroutine
: Queue(First-In-First-Out)
LAST defer statement runs FIRST, like here:
package main
import "fmt"
func main() {
defer println("Defer 1")
defer println("Defer 2")
defer println("Defer 3")
defer func() {
fmt.Println("Recover:", recover())
}()
panic("Panic!!!")
/*
Recover: Panic!!!
Defer 3
Defer 2
Defer 1
*/
// recover stops the panic
// recover returns the value from panic
// panic function is to cause a run time error
// for "cannot happen" situations
// And stops the program to begin panicking
// So even if it's recovered
// the next lines after panic won't be run.
for {
fmt.Println("This does not print! Anything below not being run!")
}
}
FIRST goroutine runs FIRST, like here:
package main
import "time"
func main() {
// goroutine #01 : Queue
go println(1)
// goroutine #02
// Anonymous Function Closure
// Not function literal
// So we need parenthesis at the end
go func() {
println(2)
}()
// goroutine #03
// Anonymous Function Closure with input
go func(n int) {
println(n)
}(3)
// 1
// 2
// 3
time.Sleep(time.Nanosecond)
// main goroutine does not wait(block) for goroutine's return
// Without this, we just reach the end of main and goroutine does not run
}
Note that defer still gets executed_ even when a function panic
s,
like here:
package main
import (
"fmt"
"time"
)
func main() {
go func() {
defer fmt.Println("Hello, playground")
panic(1)
}()
time.Sleep(time.Second)
}
/*
Hello, playground
panic: 1
*/
panic
is a built-in function that stops the ordinary flow of control and begins panicking. When the function F calls panic, execution of F stops, any deferred functions in F are executed normally, and then F returns to its caller. To the caller, F then behaves like a call to panic. The process continues up the stack until all functions in the current goroutine have returned, at which point the program crashes. Panics can be initiated by invoking panic directly. They can also be caused by runtime errors, such as out-of-bounds array accesses.
recover
is a built-in function that regains control of a panicking goroutine. recover is only useful inside defer-red functions. During normal execution, a call to recover will return nil and have no other effect. If the current goroutine is panicking, a call to recover will capture the value given to panic and resume normal execution.
Note that when a function panic
s, the function execution stops and it
runs any defer statements inside the function, and it returns.
So you won't see "Hello World", from this
code:
package main
import "fmt"
func main() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
panic("Panic!")
fmt.Println("Hello, World!")
// NOT printed
}
/*
Panic!
*/
When it panic
s, the main goroutine (main
function) exits. That's why
we didn't see "Hello World!" in the code above.
package main
import (
"fmt"
"time"
)
func panicAndrecover() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
panic("Panic!")
}
func main() {
panicAndrecover()
fmt.Println("Hello, World!")
/*
Panic!
Hello, World!
*/
recursiveRecover()
/*
Restarting after error: [ 0 ] Panic
Restarting after error: [ 1 ] Panic
Restarting after error: [ 2 ] Panic
Restarting after error: [ 3 ] Panic
Restarting after error: [ 4 ] Panic
Too much panic: 5
*/
}
var count int
func recursiveRecover() {
defer func() {
if err := recover(); err != nil {
fmt.Println("Restarting after error:", err)
time.Sleep(time.Second)
count++
if count == 5 {
fmt.Printf("Too much panic: %d", count)
return
}
recursiveRecover()
}
}()
panic(fmt.Sprintf("[ %d ] Panic", count))
}
package main
import "fmt"
func main() {
m := make(map[string]int)
m["A"] = 1
m["B"] = 2
for k, v := range m {
func() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err, "at", k, v)
}
}()
panic("panic")
}()
}
}
/*
panic at A 1
panic at B 2
*/
The code prints out "Hello World!" because panic
only exits the function
panicAndRecover
, not the main
goroutine(main
function). And the function
recursiveRecover
shows an interesting usage to self-recover your program.
Here's another example:
package main
import (
"fmt"
"log"
"time"
)
func main() {
keepRunning(5)
}
/*
Restarting after error: 2009-11-10 23:00:00 +0000 UTC
Restarting after error: 2009-11-10 23:00:00.001 +0000 UTC
Restarting after error: 2009-11-10 23:00:00.002 +0000 UTC
Restarting after error: 2009-11-10 23:00:00.003 +0000 UTC
Restarting after error: 2009-11-10 23:00:00.004 +0000 UTC
Too much panic: 5
2009/11/10 23:00:00 2009-11-10 23:00:00.004 +0000 UTC
*/
var count int
func keepRunning(limit int) {
defer func() {
if err := recover(); err != nil {
fmt.Println("Restarting after error:", err)
time.Sleep(time.Millisecond)
count++
if count == limit {
fmt.Printf("Too much panic: %d\n", count)
log.Fatal(err)
}
keepRunning(limit)
}
}()
run()
}
func run() {
panic(time.Now().String())
}
Be careful with defer
and deadlock
Again, defer
delays the function execution until just
before the enclosing function exit(return). That means if the function
does not exit defer
statement never gets executed:
package main
import (
"fmt"
"net/http"
"sync"
)
type storage struct {
sync.Mutex
data string
}
var globalStorage storage
func handler(w http.ResponseWriter, r *http.Request) {
globalStorage.Lock()
defer globalStorage.Unlock()
fmt.Fprintf(w, "Hi %s, I love %s!", globalStorage.data, r.URL.Path[1:])
}
func main() {
globalStorage.Lock()
// (X) deadlock!
// defer globalStorage.Unlock()
globalStorage.data = "start"
globalStorage.Unlock()
http.HandleFunc("/", handler)
if err := http.ListenAndServe(":8080", nil); err != nil {
panic(err)
}
}
channel to communicate
Go concurrency is about composition of independently executing functions. Suppose multiple goroutines are running independently at the same time. Then how would we compose and coordinate them? Go has channel:
ch1 := make(chan int)
// same as
ch2 := make(chan int, 0) // unbuffered
ch3 := make(chan int, 1) // make channel with buffer 1
ch3 <- 1 // doesn't block
ch3 <- 2 // blocks until another goroutine receives from the channel
// fatal error: all goroutines are asleep - deadlock!
And try this:
package main
import "fmt"
func main() {
func() {
ch := make(chan int, 0) // make channel with buffer 0
go func() {
ch <- 1
}()
v, ok := <-ch
fmt.Println(v, ok) // 1 true
close(ch)
v2, ok2 := <-ch
fmt.Println(v2, ok2) // 0 false
}()
func() {
ch := make(chan int, 1)
ch <- 1
close(ch)
v, ok := <-ch
fmt.Println(v, ok) // 1 true
v2, ok2 := <-ch
fmt.Println(v2, ok2) // 0 false
}()
func() {
ch := make(chan int, 1)
close(ch)
v, ok := <-ch
fmt.Println(v, ok) // 0 false
v2, ok2 := <-ch
fmt.Println(v2, ok2) // 0 false
}()
}
Channel can communicate and signal between goroutines, as here:
package main
import "fmt"
func main() {
ch := make(chan struct{})
go func() {
fmt.Println("Hello, playground")
ch <- struct{}{}
}()
// wait until we receive from channel ch
<-ch
// Hello, playground
}
You can either send to or receive from a channel. A receiver always blocks until it receives data from a channel. A sender only blocks until an unbuffered channel receiver has received the value, or buffered channel receiver has copied the value to the buffer (when the buffer is full, it waits until some receiver has retrieved the value). Unbuffered channel has a pending receiver that would receive the value as soon as the sender sends a value:
Again, a receiver always blocks until it receives data from a channel. A sender only blocks until an unbuffered channel receiver has received a value, or buffered channel receiver has copied the value to the buffer. Unbuffered channel has a pending receiver that would receive the value as soon as the sender sends a value.
A sender and receiver must both be ready to play their part in the communication. Otherwise we wait until they are. It’s a blocking operation. Thus channels both communicate and synchronize (in a single operation). Synchronize by sending on sender's side and receiving on receiver's side. You don’t really need locking if you use channel. You can just use the channel to pass the data back and forth between goroutines.
Go also has sync package for low-level
synchronization. sync.WaitGroup
is useful for a collection of goroutines,
as here:
package main
import "sync"
func main() {
ch := make(chan struct{})
var wg sync.WaitGroup
go func() {
println(1)
ch <- struct{}{}
}()
wg.Add(1)
go func() {
println(2)
wg.Done()
}()
<-ch
wg.Wait()
// 1
// 2
}
We can also use channels to spawn many goroutines and exit the program after the first receive, as follows:
package main
import (
"math/rand"
)
func main() {
ch := make(chan int)
for {
go func() {
ch <- rand.Intn(10)
}()
}
<-ch
// process took too long
}
But this code would consume all your machine memories, because for
loop
runs forever in this code, never reaching the channel receivers. You have
to set your own limit, like here:
package main
import "fmt"
func main() {
ch := make(chan int)
for i := 0; i < 5; i++ {
go func() {
ch <- i
}()
}
fmt.Println(<-ch) // 5
fmt.Println(<-ch) // 5
fmt.Println(<-ch) // 5
fmt.Println(<-ch) // 5
fmt.Println(<-ch) // 5
}
questions:
- Senders only block until an unbuffered channel receiver has received the value, or buffered channel receiver has copied the value to the buffer (when the buffer is full, it waits until some receiver has retrieved the value). Then is channel synchronous or asynchronous?
- Why in the example above, is it receiving only ONE value 5, not 0, 1, 2, 3, 4?
- Is there any easier way to receive all values from channel?
#1-1. synchronous, asynchronous channel
By default, channel is UN-buffered. And unbuffered channel is synchronous. The sender blocks until the receiver has received the value. The receiver also blocks until there’s a value to receive from the sender. Without buffer, every single send will block until another goroutine receives from the channel.
This allows goroutines to synchronize without explicit locks or condition variables.
Buffered channel is asynchronous, sending or receiving does not need to wait(block): it won’t wait for other goroutines to finish. It only blocks when all the buffers are full. goroutine waits until some receiver has retrieved a value and created available buffers. Buffered channels can be useful when we do not need to synchronize all goroutines completely. The capacity(buffer) of the channel limits the number of the simultaneous calls.
package main
import "fmt"
func main() {
ch := make(chan int, 100)
ch <- 1
ch <- 2
ch <- 3
fmt.Println(ch, len(ch), cap(ch)) // 0xc420072380 3 100
fmt.Println(<-ch) // 1
fmt.Println(ch, len(ch), cap(ch)) // 0xc420072380 2 100
fmt.Println(<-ch) // 2
fmt.Println(ch, len(ch), cap(ch)) // 0xc420072380 1 100
fmt.Println(<-ch) // 3
// fmt.Println(<-ch)
// fatal error: all goroutines are asleep - deadlock!
ch <- 5
ch <- 10
fmt.Println(ch, len(ch), cap(ch)) // 0xc42005e380 2 100
}
Buffered channel operates in a non-blocking way. When running 100 million goroutines with Intel(R) Core(TM) i7–4910MQ CPU @ 2.90GHz, non-blocking(buffered channel) performs 7 times faster than non-buffered channel, as here:
package main
import (
"fmt"
"log"
"runtime"
"time"
)
func main() {
num := 100000000
sendOneTo := func(c chan int) {
for i := 0; i < num; i++ {
c <- 1
}
}
connect := func(cin, cout chan int) {
for {
x := <-cin
cout <- x
}
}
round := func(ch1, ch2, ch3, ch4 chan int) {
go connect(ch1, ch2)
go connect(ch2, ch3)
go connect(ch3, ch4)
go sendOneTo(ch1)
for i := 0; i < num; i++ {
_ = <-ch4
}
}
startBfCh := time.Now()
bfCh1 := make(chan int, num)
bfCh2 := make(chan int, num)
bfCh3 := make(chan int, num)
bfCh4 := make(chan int, num)
round(bfCh1, bfCh2, bfCh3, bfCh4)
fmt.Println("[Asynchronous, Non-Blocking] Buffered took", time.Since(startBfCh))
startUnCh := time.Now()
unCh1 := make(chan int)
unCh2 := make(chan int)
unCh3 := make(chan int)
unCh4 := make(chan int)
round(unCh1, unCh2, unCh3, unCh4)
fmt.Println("[Synchronous, Blocking] UnBuffered took", time.Since(startUnCh))
}
/*
[Asynchronous, Non-Blocking] Buffered took 32.96282781s (30 seconds)
[Synchronous, Blocking] UnBuffered took 3m17.140920286s (3 minutes)
*/
func init() {
maxCPU := runtime.NumCPU()
runtime.GOMAXPROCS(runtime.NumCPU())
log.Println("Concurrent execution with", maxCPU, "CPUs.")
}
#1-2. buffered channel faster because it’s non-blocking?
Not always. Lack of buffers is, in most cases, inconsequential to the performance, because unbuffered channel has a pending receiver that would receive the value as soon as the sender sends a value. And you also need to consider the memory overhead.
Today,
c = make(chan int, 1 << 31)
is prohibitively expensive.
Synchronous channel operation is more deterministic and rigorous because we know which communication is actually being proceeded, which gives more control over readers and writers on channels. You need synchronous(unbuffered) channel when all communications need to remain in lock-step synchronization. Asynchronous(buffered) channel is useful where you need more throughput and responsiveness.
A buffered channel can be used like a semaphore, for instance to limit throughput. In this example, incoming requests are passed to handle, which sends a value into the channel, processes the request, and then receives a value from the channel to ready the “semaphore” for the next consumer. The capacity of the channel buffer limits the number of simultaneous calls to process.
// http://golang.org/doc/effective_go.html#channels
var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
sem <- 1 // Wait for active queue to drain.
process(r) // May take a long time.
<-sem // Done; enable next request to run.
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // Don't wait for handle to finish.
}
}
#1-3. be careful with buffered channel!
just fixed a bug where we blocked during shutdown sending to an error channel; if it was buffered we'd have silently ignored them! #golang
@davecheney but eventually we'd hit the limit of that buffer on shutdown and it'd freeze; all the while we would not actually report errs
@jmoiron the bug sounds like the author didn't consider "what happens if the reader of this channel never comes along to pick up this value"
#1-4. non-deterministic receive from buffered channel
By default, channel is UNbuffered. And unbuffered channel is synchronous:
sender
blocks until the receiver has received the value.receiver
also blocks until there’s a value to receive from the sender.
Without buffer(unbuffered), every single send
will block until another goroutine receives from the channel.
Unbuffered channel has a pending receiver that would receive the value as soon as the sender sends a value.
Buffered channel is asynchronous, sending or receiving does not need to wait(block): it won’t wait for other goroutines to finish.
sender
andreceiver
do not block, as long as the buffers are not full.- You can send values to buffered
receiver
channel as long as buffers are not full yet. - You can receive values from buffered
sender
channel as long as buffers are not full yet.
Therefore, receiving from a buffered channel can be non-deterministic because it does not block whether the values are ready to be received or not. Try this code:
package main
import (
"fmt"
"log"
)
func main() {
bufferedSenderChan := make(chan<- int, 3)
bufferedReceiverChan := make(<-chan int, 3)
bufferedSenderChan <- 0
bufferedSenderChan <- 1
bufferedSenderChan <- 2
// defer func() {
// if err := recover(); err != nil {
// fmt.Println(err)
// }
// }()
// panic(1)
// You cannot recover from deadlock!
// <-bufferedReceiverChan
// fatal error: all goroutines are asleep - deadlock!
// close(bufferedReceiverChan) // (cannot close receive-only channel)
// fmt.Println(<-bufferedReceiverChan)
_ = bufferedReceiverChan
bufferedChan := make(chan int, 3)
bufferedChan <- 0
bufferedChan <- 1
bufferedChan <- 2
fmt.Println(<-bufferedChan)
fmt.Println(<-bufferedChan)
fmt.Println(<-bufferedChan)
/*
0
1
2
*/
fmt.Println()
for i := 0; i < 10; i++ {
go func(i int) {
bufferedChan <- i
}(i)
}
for i := 0; i < 10; i++ {
fmt.Printf("%v ", <-bufferedChan)
}
fmt.Println()
/*
9 0 1 6 7 5 2 3 8 4
*/
fmt.Println()
slice := []float64{23.0, 23, 23, -123.2, 23, 123.2, -2.2, 23.1, -101.2, 17.2}
sum := 0.0
for _, elem := range slice {
sum += elem
}
counter1 := NewChannelCounter(0)
defer counter1.Done()
defer counter1.Close()
for _, elem := range slice {
counter1.Add(elem)
}
val1 := counter1.Get()
if val1 != sum {
log.Fatalf("NewChannelCounter with No Buffer got wrong. Expected %v but got %v\n", sum, val1)
}
counter2 := NewChannelCounter(10)
defer counter2.Done()
defer counter2.Close()
for _, elem := range slice {
counter2.Add(elem)
}
val2 := counter2.Get()
if val2 != sum {
log.Fatalf("NewChannelCounter with Buffer got wrong. Expected %v but got %v\n", sum, val2)
}
// 2015/08/08 14:03:24 NewChannelCounter with Buffer got wrong. Expected 28.167699999999993 but got 23
}
// Counter is an interface for counting.
// It contains counting data as long as a type
// implements all the methods in the interface.
type Counter interface {
// Get returns the current count.
Get() float64
// Add adds the delta value to the counter.
Add(delta float64)
}
// ChannelCounter counts through channels.
type ChannelCounter struct {
valueChan chan float64
deltaChan chan float64
done chan struct{}
}
func NewChannelCounter(buf int) *ChannelCounter {
c := &ChannelCounter{
make(chan float64, buf),
make(chan float64, buf),
make(chan struct{}),
}
go c.Run()
return c
}
func (c *ChannelCounter) Run() {
var value float64
for {
// "select" statement chooses which of a set of
// possible send or receive operations will proceed.
select {
case delta := <-c.deltaChan:
value += delta
case <-c.done:
return
case c.valueChan <- value:
// Do nothing.
// If there is no default case, the "select" statement
// blocks until at least one of the communications can proceed.
}
}
}
func (c *ChannelCounter) Get() float64 {
return <-c.valueChan
}
func (c *ChannelCounter) Add(delta float64) {
c.deltaChan <- delta
}
func (c *ChannelCounter) Done() {
c.done <- struct{}{}
}
func (c *ChannelCounter) Close() {
close(c.deltaChan)
}
#2. why is this receiving only one value?
package main
import "fmt"
func main() {
ch := make(chan int)
for i := 0; i < 5; i++ {
go func() {
ch <- i
}()
}
fmt.Println(<-ch) // 5
fmt.Println(<-ch) // 5
fmt.Println(<-ch) // 5
fmt.Println(<-ch) // 5
fmt.Println(<-ch) // 5
}
Go FAQ explains about closures running as goroutines:
package main
import (
"fmt"
"time"
)
// defer function runs in Last In First Out order
// after the surrounding function returns.
// NOT AFTER FOR-LOOP
//
// variables that are defined ON or INSIDE for-loop
// should be passed as arguments to the closure
func main() {
func() {
for _, i1 := range []int{0, 1, 2, 3} {
defer func() {
fmt.Println("defer i1:", i1)
}()
}
fmt.Println()
for _, i2 := range []int{0, 1, 2, 3} {
defer func(i2 int) {
fmt.Println("defer i2:", i2)
}(i2)
}
fmt.Println()
i := 0
for _, i3 := range []int{0, 1, 2, 3} {
i++
defer func(i3 int) {
fmt.Println("defer i, i3:", i, i3)
}(i3)
}
fmt.Println()
j := 0
for _, i4 := range []int{0, 1, 2, 3} {
j++
defer func(j, i4 int) {
fmt.Println("defer j, i4:", j, i4)
}(j, i4)
}
fmt.Println()
}()
/*
defer j, i4: 4 3
defer j, i4: 3 2
defer j, i4: 2 1
defer j, i4: 1 0
defer i, i3: 4 3
defer i, i3: 4 2
defer i, i3: 4 1
defer i, i3: 4 0
defer i2: 3
defer i2: 2
defer i2: 1
defer i2: 0
defer i1: 3
defer i1: 3
defer i1: 3
defer i1: 3
*/
func() {
for _, i1 := range []int{0, 1, 2, 3} {
go func() {
fmt.Println("go i1:", i1)
}()
}
fmt.Println()
time.Sleep(time.Second)
for _, i2 := range []int{0, 1, 2, 3} {
go func(i2 int) {
fmt.Println("go i2:", i2)
}(i2)
}
fmt.Println()
time.Sleep(time.Second)
i := 0
for _, i3 := range []int{0, 1, 2, 3} {
i++
go func(i3 int) {
fmt.Println("go i, i3:", i, i3)
}(i3)
}
fmt.Println()
time.Sleep(time.Second)
j := 0
for _, i4 := range []int{0, 1, 2, 3} {
j++
go func(j, i4 int) {
fmt.Println("go j, i4:", j, i4)
}(j, i4)
}
fmt.Println()
time.Sleep(time.Second)
}()
/*
go i1: 3
go i1: 3
go i1: 3
go i1: 3
go i2: 0
go i2: 1
go i2: 3
go i2: 2
go i, i3: 4 3
go i, i3: 4 0
go i, i3: 4 1
go i, i3: 4 2
go j, i4: 1 0
go j, i4: 2 1
go j, i4: 4 3
go j, i4: 3 2
*/
}
Variables that are defined ON or INSIDE for-loop must be passed as arguments to the closure. Again. Variables that are defined ON for-loop must be passed as arguments to the closure..
#3. wait for all goroutines to finish
range
can also be used for iterating and receiving from a channel:
package main
import "fmt"
func main() {
ch := make(chan int)
for i := 0; i < 5; i++ {
go func() {
ch <- i
}()
}
for v := range ch {
fmt.Println(v)
}
}
/*
5
5
5
5
5
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/tmp/sandbox982202598/main.go:14 +0x1e0
*/
This panics with deadlock message
because when we iterate a channel, range
ends only after the
channel is closed. We MUST make sure to close the channel
after the last sent value is received by the channel:
package main
import "fmt"
func main() {
{
ch := make(chan int)
limit := 5
for i := 0; i < limit; i++ {
go func(i int) {
ch <- i
}(i)
}
cn := 0
for v := range ch {
fmt.Println(v)
cn++
if cn == limit {
close(ch)
}
}
// 0
// 1
// 2
// 3
// 4
v, ok := <-ch
fmt.Println(v, ok) // 0 false
// any value received from closed channel succeeds without blocking
// , returning the zero value of channel type and false.
}
{
done, errChan := make(chan struct{}), make(chan error)
limit := 5
for i := 0; i < limit; i++ {
go func(i int) {
fmt.Println("Done at", i)
done <- struct{}{}
}(i)
}
cn := 0
for cn != limit {
select {
case err := <-errChan:
panic(err)
case <-done:
cn++
}
}
close(done)
close(errChan)
/*
Done at 4
Done at 0
Done at 1
Done at 2
Done at 3
*/
}
}
Note that received values from a channel are in order:
For channels, the iteration values produced are the successive values sent on the channel until the channel is closed. If the channel is
nil
, the range expression blocks forever.
select for channel: select
≠ switch
select
is like
switch
for
channels:
package main
import "fmt"
func typeName1(v interface{}) string {
switch typedValue := v.(type) {
case int:
fmt.Println("Value:", typedValue)
return "int"
case string:
fmt.Println("Value:", typedValue)
return "string"
default:
fmt.Println("Value:", typedValue)
return "unknown"
}
panic("unreachable")
}
func typeName2(v interface{}) string {
switch v.(type) {
case int:
return "int"
case string:
return "string"
default:
return "unknown"
}
panic("unreachable")
}
type Stringer interface {
String() string
}
type fakeString struct {
content string
}
// function used to implement the Stringer interface
func (s *fakeString) String() string {
return s.content
}
func printString(value interface{}) {
switch str := value.(type) {
case string:
fmt.Println(str)
case Stringer:
fmt.Println(str.String())
}
}
func main() {
fmt.Println(typeName1(1))
fmt.Println(typeName1("Hello"))
fmt.Println(typeName1(-.1))
/*
Value: 1
int
Value: Hello
string
Value: -0.1
unknown
*/
fmt.Println(typeName2(1)) // int
fmt.Println(typeName2("Hello")) // string
fmt.Println(typeName2(-.1)) // unknown
s := &fakeString{"Ceci n'est pas un string"}
printString(s) // Ceci n'est pas un string
printString("Hello, Gophers") // Hello, Gophers
}
select
chooses the one that is firstly ready to
send or
receive:
If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection.
Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.
Try this:
package main
import (
"fmt"
"time"
)
func send(msg string) <-chan string {
ch := make(chan string)
go func() {
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
if i == 5 {
fmt.Println("Sleeping 2 seconds...")
time.Sleep(2 * time.Second)
}
}
}()
return ch
}
func main() {
ch := send("Hello")
for {
select {
case v := <-ch:
fmt.Println("Received:", v)
case <-time.After(time.Second):
fmt.Println("Done!")
return
}
}
}
/*
Received: Hello 0
Received: Hello 1
Received: Hello 2
Received: Hello 3
Received: Hello 4
Received: Hello 5
Sleeping 2 seconds...
Done!
*/
Also try this code from this thread:
package main
import (
"log"
"time"
)
func main() {
chs := make([]chan struct{}, 100)
// init
for i := range chs {
chs[i] = make(chan struct{}, 1)
}
// close
for _, ch := range chs {
close(ch)
}
// receive
for _, ch := range chs {
select {
case <-ch:
// https://golang.org/ref/spec#Close
// After calling close, and after any previously sent values
// have been received, receive operations will return the zero
// value for the channel's type without blocking.
log.Println("Succeed")
// http://golang.org/ref/spec#Select_statements
// time.After _is_ evaluated each time.
// https://groups.google.com/d/msg/golang-nuts/1tjcV80ccq8/hcoP9uMNiUcJ
case <-time.After(time.Millisecond):
log.Fatalf("Receive Delayed!")
}
}
}
/*
...
2015/06/27 14:34:48 Succeed
2015/06/27 14:34:48 Succeed
2015/06/27 14:34:48 Succeed
2015/06/27 14:34:48 Succeed
*/
Another example:
package main
import (
"fmt"
"net/http"
"time"
)
var sitesToPing = []string{
"http://www.google.com",
"http://www.amazon.com",
"http://nowebsite.net",
}
func main() {
respChan, errChan := make(chan string), make(chan error)
for _, target := range sitesToPing {
go head(target, respChan, errChan)
}
for i := 0; i < len(sitesToPing); i++ {
select {
case res := <-respChan:
fmt.Println(res)
case err := <-errChan:
fmt.Println(err)
case <-time.After(time.Second):
fmt.Println("Timeout!")
}
}
close(respChan)
close(errChan)
}
/*
200 / http://www.google.com:OK
405 / http://www.amazon.com:Method Not Allowed
Timeout!
*/
func head(
target string,
respChan chan string,
errChan chan error,
) {
req, err := http.NewRequest("HEAD", target, nil)
if err != nil {
errChan <- fmt.Errorf("0 / %s:None with %v", target, err)
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
errChan <- fmt.Errorf("0 / %s:None with %v", target, err)
return
}
defer resp.Body.Close()
stCode := resp.StatusCode
stText := http.StatusText(resp.StatusCode)
respChan <- fmt.Sprintf("%d / %s:%s", stCode, target, stText)
return
}
receive nil
from channel
Try this code. Note that
even if it send nil
to a channel, it receives:
package main
import (
"fmt"
"time"
)
func main() {
{
errChan := make(chan error)
go func() {
errChan <- nil
}()
select {
case v := <-errChan:
fmt.Println("even if nil, it still receives", v)
case <-time.After(time.Second):
fmt.Println("time-out!")
}
// even if nil, it still receives <nil>
}
{
errChan := make(chan error)
errChan = nil
go func() {
errChan <- nil
}()
select {
case v := <-errChan:
fmt.Println("even if nil, it still receives", v)
case <-time.After(time.Second):
fmt.Println("time-out!")
}
// time-out!
}
}
sync.Mutex
, race condition
Let’s say we need synchronization between concurrent tasks. Go recommends
channels for higher-level synchronization, which will be covered shortly. We
can also use mutex
es. Go package sync has
mutexes and they are useful for low-level libraries.
mutex is mutual exclusion,
in order to ensure that no two processes or threads be in the critical section
at the same time. It is important to prevent this kind of race
conditions:
There is no benign race condition.
Lock is a synchronization mechanism to enforce the limits on resource access when there are many executing threads, therefore preventing the race condition. Go mutex is a binary semaphore (record of a particular resource’s availability) of either locked or unlocked. This can be used to prevent race conditions.
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
Use Lock to acquire the mutex, and Unlock to release the Lock. Calling Lock on the same mutex twice causes deadlock, where two or more competing actions are each waiting for the other to finish and thus neither ever ends. Code is thread-safe if it manipulates shared data structures in the way that guarantees safe execution of multiple threads at the same time. Take a look at the following code, where it has to ensure the mutual exclusion for Go map data structure, which is not thread-safe:
/*
go run -race 31_no_race_surbl_with_mutex.go
*/
package main
import (
"log"
"net"
"net/url"
"strings"
"sync"
)
// Data is a set of data in map data structure.
// Every element is unique, and it is unordered.
// It maps its value to frequency.
type Data struct {
// m maps an element to its frequency
m map[interface{}]int
// RWMutex is more expensive
// https://blogs.oracle.com/roch/entry/beware_of_the_performance_of
// sync.RWMutex
//
// to synchronize access to shared state across multiple goroutines.
//
sync.Mutex
}
// NewData returns a new Data.
// Map supports the built-in function "make"
// so we do not have to use "new" and
// "make" does not return pointer.
func NewData() *Data {
nmap := make(map[interface{}]int)
return &Data{
m: nmap,
}
// return make(Data)
}
// Init initializes the Data.
func (d *Data) Init() {
// (X) d = NewData()
// This only updates its pointer
// , not the Data itself
//
*d = *NewData()
}
// GetSize returns the size of set.
func (d Data) GetSize() int {
return len(d.m)
}
// IsEmpty returns true if the set is empty.
func (d Data) IsEmpty() bool {
return d.GetSize() == 0
}
// Insert insert values to the set.
func (d *Data) Insert(items ...interface{}) {
for _, value := range items {
d.Lock()
v, ok := d.m[value]
d.Unlock()
if ok {
d.Lock()
d.m[value] = v + 1
d.Unlock()
continue
}
d.Lock()
d.m[value] = 1
d.Unlock()
}
}
func main() {
d := NewData()
d.Insert(1, 2, -.9, "A", 0, 2, 2, 2)
if d.IsEmpty() {
log.Fatalf("IsEmpty() should return false: %#v", d)
}
if d.GetSize() != 5 {
log.Fatalf("GetSize() should return 5: %#v", d)
}
rmap2 := Check(goodSlice...)
for k, v := range rmap2 {
if v.IsSpam {
log.Fatalf("Check | Unexpected %+v %+v but it's ok", k, v)
}
}
}
var goodSlice = []string{
"google.com",
}
// DomainInfo contains domain information from Surbl.org.
type DomainInfo struct {
IsSpam bool
Types []string
}
var nonSpam = DomainInfo{
IsSpam: false,
Types: []string{"none"},
}
var addressMap = map[string]string{
"2": "SC: SpamCop web sites",
"4": "WS: sa-blacklist web sited",
"8": "AB: AbuseButler web sites",
"16": "PH: Phishing sites",
"32": "MW: Malware sites",
"64": "JP: jwSpamSpy + Prolocation sites",
"68": "WS JP: sa-blacklist web sited jwSpamSpy + Prolocation sites",
}
// Check concurrently checks SURBL spam list.
// http://www.surbl.org/guidelines
// http://www.surbl.org/surbl-analysis
func Check(domains ...string) map[string]DomainInfo {
final := make(map[string]DomainInfo)
var wg sync.WaitGroup
var mutex sync.Mutex
for _, domain := range domains {
dom := hosten(domain)
dmToLook := dom + ".multi.surbl.org"
wg.Add(1)
go func() {
defer wg.Done()
ads, err := net.LookupHost(dmToLook)
if err != nil {
switch err.(type) {
case net.Error:
if err.(*net.DNSError).Err == "no such host" {
mutex.Lock()
final[dom] = nonSpam
mutex.Unlock()
}
default:
log.Fatal(err)
}
} else {
stypes := []string{}
for _, add := range ads {
tempSlice := strings.Split(add, ".")
flag := tempSlice[len(tempSlice)-1]
if val, ok := addressMap[flag]; !ok {
stypes = append(stypes, "unknown_source")
} else {
stypes = append(stypes, val)
}
}
info := DomainInfo{
IsSpam: true,
Types: stypes,
}
mutex.Lock()
final[dom] = info
mutex.Unlock()
}
}()
}
wg.Wait()
return final
}
// hosten returns the host of url.
func hosten(dom string) string {
dom = strings.TrimSpace(dom)
var domain string
if strings.HasPrefix(dom, "http:") ||
strings.HasPrefix(dom, "https:") {
dmt, err := url.Parse(dom)
if err != nil {
log.Fatal(err)
}
domain = dmt.Host
} else {
domain = dom
}
return domain
}
Share memory by communicating
Do not communicate by sharing memory; instead, share memory by communicating.
Go has race detection tool. And following carefully channel-based patterns, we can prevent race conditions even without locking.
A receiver blocks until it receives data from a channel.
Channel is synchronization and communication. You don't really need locking if you use channel. High-level synchronization is better done via communication of channels. Then what do we mean by share memory by communicating?
Let's first create some race conditions, with
go run -race 32_race.go
as below:
/*
go run -race 32_race.go
*/
package main
import "sync"
func updateSliceData(sliceData *[]int, num int, wg *sync.WaitGroup) {
defer wg.Done()
*sliceData = append(*sliceData, num)
}
func updateMapData(mapData *map[int]bool, num int, wg *sync.WaitGroup) {
defer wg.Done()
(*mapData)[num] = true
}
func main() {
var wg sync.WaitGroup
var sliceData = []int{}
wg.Add(100)
for i := 0; i < 100; i++ {
go updateSliceData(&sliceData, i, &wg)
}
wg.Wait()
var mapData = map[int]bool{}
wg.Add(100)
for i := 0; i < 100; i++ {
go updateMapData(&mapData, i, &wg)
}
wg.Wait()
}
/*
==================
WARNING: DATA RACE
Read by goroutine 5:
main.updateSliceData()
/home/ubuntu/race.go:7 +0x5f
Previous write by goroutine 4:
main.updateSliceData()
/home/ubuntu/race.go:7 +0x147
Goroutine 5 (running) created at:
main.main()
/home/ubuntu/race.go:21 +0xfd
Goroutine 4 (finished) created at:
main.main()
/home/ubuntu/race.go:21 +0xfd
==================
==================
WARNING: DATA RACE
Read by goroutine 5:
runtime.growslice()
/usr/local/go/src/runtime/slice.go:37 +0x0
main.updateSliceData()
/home/ubuntu/race.go:7 +0xcd
Previous write by goroutine 4:
main.updateSliceData()
/home/ubuntu/race.go:7 +0x104
Goroutine 5 (running) created at:
main.main()
/home/ubuntu/race.go:21 +0xfd
Goroutine 4 (finished) created at:
main.main()
/home/ubuntu/race.go:21 +0xfd
==================
==================
WARNING: DATA RACE
Write by goroutine 5:
runtime.mapassign1()
/usr/local/go/src/runtime/hashmap.go:383 +0x0
main.updateMapData()
/home/ubuntu/race.go:12 +0x94
Previous write by goroutine 4:
runtime.mapassign1()
/usr/local/go/src/runtime/hashmap.go:383 +0x0
main.updateMapData()
/home/ubuntu/race.go:12 +0x94
Goroutine 5 (running) created at:
main.main()
/home/ubuntu/race.go:28 +0x1cf
Goroutine 4 (finished) created at:
main.main()
/home/ubuntu/race.go:28 +0x1cf
==================
Found 3 data race(s)
exit status 66
*/
This code is creating race conditions. Go slice and map are NOT thread-safe data structure. They do not prevent you from race-conditions. In the code above, race conditions occur when several goroutines try to communicate—sharing and writing to non thread-safe data structure—by sharing memory—running concurrently.
Then what can we do to prevent this? Go has Lock:
/*
go run -race 33_no_race_with_mutex.go
*/
package main
import (
"fmt"
"sync"
)
func updateSliceDataWithLock(sliceData *[]int, num int, wg *sync.WaitGroup, mutex *sync.Mutex) {
defer wg.Done()
mutex.Lock()
*sliceData = append(*sliceData, num)
mutex.Unlock()
}
func updateMapDataWithLock(mapData *map[int]bool, num int, wg *sync.WaitGroup, mutex *sync.Mutex) {
defer wg.Done()
mutex.Lock()
(*mapData)[num] = true
mutex.Unlock()
}
// Mutexes can be created as part of other structures
type sliceData struct {
sync.Mutex
s []int
}
func updateSliceDataWithLockStruct(data *sliceData, num int, wg *sync.WaitGroup) {
defer wg.Done()
data.Lock()
data.s = append(data.s, num)
data.Unlock()
}
// Mutexes can be created as part of other structures
type mapData struct {
sync.Mutex
m map[int]bool
}
func updateMapDataWithLockStruct(data *mapData, num int, wg *sync.WaitGroup) {
defer wg.Done()
data.Lock()
data.m[num] = true
data.Unlock()
}
func main() {
var (
wg sync.WaitGroup
mutex sync.Mutex
)
var ds1 = []int{}
wg.Add(100)
for i := 0; i < 100; i++ {
go updateSliceDataWithLock(&ds1, i, &wg, &mutex)
}
wg.Wait()
fmt.Println(ds1)
var dm1 = map[int]bool{}
wg.Add(100)
for i := 0; i < 100; i++ {
go updateMapDataWithLock(&dm1, i, &wg, &mutex)
}
wg.Wait()
fmt.Println(dm1)
ds2 := sliceData{}
ds2.s = []int{}
wg.Add(100)
for i := 0; i < 100; i++ {
go updateSliceDataWithLockStruct(&ds2, i, &wg)
}
wg.Wait()
fmt.Println(ds2)
dm2 := mapData{}
dm2.m = make(map[int]bool)
wg.Add(100)
for i := 0; i < 100; i++ {
go updateMapDataWithLockStruct(&dm2, i, &wg)
}
wg.Wait()
fmt.Println(dm2)
}
But idiomatic Go should use channels:
Concurrent programming in many environments is made difficult by the subtleties required to implement correct access to shared variables. Go encourages a different approach in which shared values are passed around on channels and, in fact, never actively shared by separate threads of execution. Only one goroutine has access to the value at any given time. Data races cannot occur, by design. To encourage this way of thinking we have reduced it to a slogan:
Do not communicate by sharing memory; instead, share memory by communicating.
Try this code with
go run -race 34_no_race_with_channel.go
.
Note that we do not need to pass pointer of channel,
because channels, like map
and slice
, are syntactically pointer,
as explained here:
/*
go run -race 34_no_race_with_channel.go
*/
package main
// channels were syntactically pointers.
// No need to pass reference.
func sendWithChannel(ch chan int, num int) {
ch <- num
}
func main() {
ch1 := make(chan int)
for i := 0; i < 100; i++ {
go sendWithChannel(ch1, i)
}
cn := 0
var sliceData = []int{}
for v := range ch1 {
sliceData = append(sliceData, v)
cn++
if cn == 100 {
close(ch1)
}
}
ch2 := make(chan int)
var mapData = map[int]bool{}
for i := 0; i < 100; i++ {
go sendWithChannel(ch2, i)
}
cn = 0
for v := range ch2 {
mapData[v] = true
cn++
if cn == 100 {
close(ch2)
}
}
}
There is no race condition in this code. There is NO sync.Mutex
either.
This is what Go means by:
Do not communicate by sharing memory; instead, share memory by communicating.
With channel, you do not need low-level sync.Mutex
for synchronization.
Thread is a lightweight process since it executes within the context of one
process. Both threads and processes are independent units of execution.
Threads under the same process run in one shared memory space,
while process run in separate memory spaces.
Again multiple threads share the same address space (memory), reading
and writing on shared data. That is why, in multi-threaded programming,
you need to synchronize access to memory between threads (not across processes)
with Mutex
.
Why goroutines, instead of threads? explains:
Goroutines are part of making concurrency easy to use. The idea, is to multiplex independently executing functions(coroutines) onto a set of threads. When a coroutine blocks, such as by calling a blocking system call, the run-time automatically moves other coroutines on the same operating system thread to a different, runnable thread so they won't be blocked. The programmer sees none of this, which is the point. The result, which we call goroutines, can be very cheap: they have little overhead beyond the memory for the stack, which is just a few kilobytes.
To make the stacks small, Go's run-time uses resizable, bounded stacks. A newly minted goroutine is given a few kilobytes, which is almost always enough. When it isn't, the run-time grows (and shrinks) the memory for storing the stack automatically, allowing many goroutines to live in a modest amount of memory. The CPU overhead averages about three cheap instructions per function call. It is practical to create hundreds of thousands of goroutines in the same address space.
If goroutines were just threads, system resources would run out at a much smaller number.
goroutines are multiplexed onto multiple OS threads. When a goroutine blocks on a thread, Go run-time moves other goroutines to a different, available thread, so they won't be blocked. goroutine is cheaper than threads, because goroutines are multiplexed onto a small number of OS threads. A program may run thousands of goroutines in one thread. We do not need to allocate one-thread-per-one-goroutine. We don't need to worry about threads in Go. Go handles synchronization. One goroutine may be blocked by waiting for I/O, and the thread would block as well, but other goroutines would never block because Go automatically moves other goroutines to another available thread. Therefore, Go uses relatively fewer OS threads per Go process.
To summarize:
- goroutines: non-blocking, light-weight thread.
- channel: let the channel handle the synchronization for you.
Again, the idea of Do not communicate by sharing memory; instead, share memory by communicating.
is to:
- avoid using locking(
sync.Mutex
) if possible, because it's a blocking operation and easy to cause deadlocks. - use channel instead, then do not worry about locking.
If you communicate by sharing memory
, you need to manually
synchronize access to memory between threads with locking,
because it shares the same address space. If you share memory by communicating
, which means you use channel and let the channel
handle synchronization, you do not worry about locking and race
conditions.
Now you can refactor this code above, using channel instead of
sync.Mutex
(full code can be found here
https://github.com/gyuho/learn/tree/master/doc/go_concurrency/code/surbl):
/*
go run -race 31_no_race_surbl_with_mutex.go
*/
package main
import (
"log"
"net"
"net/url"
"strings"
"sync"
)
// Data is a set of data in map data structure.
// Every element is unique, and it is unordered.
// It maps its value to frequency.
type Data struct {
// m maps an element to its frequency
m map[interface{}]int
// RWMutex is more expensive
// https://blogs.oracle.com/roch/entry/beware_of_the_performance_of
// sync.RWMutex
//
// to synchronize access to shared state across multiple goroutines.
//
sync.Mutex
}
// NewData returns a new Data.
// Map supports the built-in function "make"
// so we do not have to use "new" and
// "make" does not return pointer.
func NewData() *Data {
nmap := make(map[interface{}]int)
return &Data{
m: nmap,
}
// return make(Data)
}
// Init initializes the Data.
func (d *Data) Init() {
// (X) d = NewData()
// This only updates its pointer
// , not the Data itself
//
*d = *NewData()
}
// GetSize returns the size of set.
func (d Data) GetSize() int {
return len(d.m)
}
// IsEmpty returns true if the set is empty.
func (d Data) IsEmpty() bool {
return d.GetSize() == 0
}
// Insert insert values to the set.
func (d *Data) Insert(items ...interface{}) {
for _, value := range items {
d.Lock()
v, ok := d.m[value]
d.Unlock()
if ok {
d.Lock()
d.m[value] = v + 1
d.Unlock()
continue
}
d.Lock()
d.m[value] = 1
d.Unlock()
}
}
func main() {
d := NewData()
d.Insert(1, 2, -.9, "A", 0, 2, 2, 2)
if d.IsEmpty() {
log.Fatalf("IsEmpty() should return false: %#v", d)
}
if d.GetSize() != 5 {
log.Fatalf("GetSize() should return 5: %#v", d)
}
rmap2 := Check(goodSlice...)
for k, v := range rmap2 {
if v.IsSpam {
log.Fatalf("Check | Unexpected %+v %+v but it's ok", k, v)
}
}
}
var goodSlice = []string{
"google.com",
}
// DomainInfo contains domain information from Surbl.org.
type DomainInfo struct {
IsSpam bool
Types []string
}
var nonSpam = DomainInfo{
IsSpam: false,
Types: []string{"none"},
}
var addressMap = map[string]string{
"2": "SC: SpamCop web sites",
"4": "WS: sa-blacklist web sited",
"8": "AB: AbuseButler web sites",
"16": "PH: Phishing sites",
"32": "MW: Malware sites",
"64": "JP: jwSpamSpy + Prolocation sites",
"68": "WS JP: sa-blacklist web sited jwSpamSpy + Prolocation sites",
}
// Check concurrently checks SURBL spam list.
// http://www.surbl.org/guidelines
// http://www.surbl.org/surbl-analysis
func Check(domains ...string) map[string]DomainInfo {
final := make(map[string]DomainInfo)
var wg sync.WaitGroup
var mutex sync.Mutex
for _, domain := range domains {
dom := hosten(domain)
dmToLook := dom + ".multi.surbl.org"
wg.Add(1)
go func() {
defer wg.Done()
ads, err := net.LookupHost(dmToLook)
if err != nil {
switch err.(type) {
case net.Error:
if err.(*net.DNSError).Err == "no such host" {
mutex.Lock()
final[dom] = nonSpam
mutex.Unlock()
}
default:
log.Fatal(err)
}
} else {
stypes := []string{}
for _, add := range ads {
tempSlice := strings.Split(add, ".")
flag := tempSlice[len(tempSlice)-1]
if val, ok := addressMap[flag]; !ok {
stypes = append(stypes, "unknown_source")
} else {
stypes = append(stypes, val)
}
}
info := DomainInfo{
IsSpam: true,
Types: stypes,
}
mutex.Lock()
final[dom] = info
mutex.Unlock()
}
}()
}
wg.Wait()
return final
}
// hosten returns the host of url.
func hosten(dom string) string {
dom = strings.TrimSpace(dom)
var domain string
if strings.HasPrefix(dom, "http:") ||
strings.HasPrefix(dom, "https:") {
dmt, err := url.Parse(dom)
if err != nil {
log.Fatal(err)
}
domain = dmt.Host
} else {
domain = dom
}
return domain
}
With channel:
/*
go run -race 35_no_race_surbl_with_channel.go
*/
package main
import (
"fmt"
"log"
"net"
"net/url"
"strings"
)
// DomainInfo contains domain information from Surbl.org.
type DomainInfo struct {
Domain string
IsSpam bool
Types []string
}
var addressMap = map[string]string{
"2": "SC: SpamCop web sites",
"4": "WS: sa-blacklist web sited",
"8": "AB: AbuseButler web sites",
"16": "PH: Phishing sites",
"32": "MW: Malware sites",
"64": "JP: jwSpamSpy + Prolocation sites",
"68": "WS JP: sa-blacklist web sited jwSpamSpy + Prolocation sites",
}
// Check concurrently checks SURBL spam list.
// http://www.surbl.org/guidelines
// http://www.surbl.org/surbl-analysis
func Check(domains ...string) map[string]DomainInfo {
ch := make(chan DomainInfo)
for _, domain := range domains {
go func(domain string) {
dom := hosten(domain)
dmToLook := dom + ".multi.surbl.org"
ads, err := net.LookupHost(dmToLook)
if err != nil {
switch err.(type) {
case net.Error:
if err.(*net.DNSError).Err == "no such host" {
nonSpam := DomainInfo{
Domain: domain,
IsSpam: false,
Types: []string{"none"},
}
ch <- nonSpam
}
default:
log.Fatal(err)
}
} else {
stypes := []string{}
for _, add := range ads {
tempSlice := strings.Split(add, ".")
flag := tempSlice[len(tempSlice)-1]
if val, ok := addressMap[flag]; !ok {
stypes = append(stypes, "unknown_source")
} else {
stypes = append(stypes, val)
}
}
info := DomainInfo{
Domain: domain,
IsSpam: true,
Types: stypes,
}
ch <- info
}
}(domain)
}
final := make(map[string]DomainInfo)
checkSize := len(domains)
cn := 0
for info := range ch {
final[info.Domain] = info
cn++
if cn == checkSize {
close(ch)
}
}
return final
}
// hosten returns the host of url.
func hosten(dom string) string {
dom = strings.TrimSpace(dom)
var domain string
if strings.HasPrefix(dom, "http:") ||
strings.HasPrefix(dom, "https:") {
dmt, err := url.Parse(dom)
if err != nil {
log.Fatal(err)
}
domain = dmt.Host
} else {
domain = dom
}
return domain
}
var goodSlice = []string{
"google.com", "amazon.com", "yahoo.com", "gmail.com", "walmart.com",
"stanford.edu", "intel.com", "github.com", "surbl.org", "wikipedia.org",
}
func main() {
fmt.Println(Check(goodSlice...))
}
If you benchmark two versions, you can see that the code with channel
is faster than the one with sync.Mutex
, as
here:
BenchmarkCheckWithLock 100 73032395 ns/op 149328 B/op 1961 allocs/op
BenchmarkCheckWithLock-2 100 73151925 ns/op 149371 B/op 1962 allocs/op
BenchmarkCheckWithLock-4 50 124766761 ns/op 149474 B/op 1963 allocs/op
BenchmarkCheckWithLock-8 50 22952625 ns/op 149879 B/op 1964 allocs/op
BenchmarkCheckWithLock-16 50 126122965 ns/op 150508 B/op 1967 allocs/op
BenchmarkCheck 100 184853661 ns/op 149780 B/op 1974 allocs/op
BenchmarkCheck-2 100 124283447 ns/op 149742 B/op 1974 allocs/op
BenchmarkCheck-4 100 128578550 ns/op 149758 B/op 1974 allocs/op
BenchmarkCheck-8 100 74226839 ns/op 149833 B/op 1975 allocs/op
BenchmarkCheck-16 50 24317567 ns/op 149880 B/op 1975 allocs/op
(But not all the time. It depends on the code. Sometimes channel takes too much memory and slows down the program.)
memory leak
When there is a defer
statement that never gets run
because the function that contains defer
is long-running
and never returns or etc:
package main
import (
"fmt"
"io/ioutil"
"os"
"time"
)
func main() {
fpath := "file.txt"
f, err := os.OpenFile(fpath, os.O_RDWR|os.O_TRUNC, 0777)
if err != nil {
f, err = os.Create(fpath)
if err != nil {
panic(err)
}
}
defer f.Close()
i := 0
for {
i++
// if this is inside a long-running function
// this never gets run and causes memory leak
defer func() {
if _, err := f.WriteString(fmt.Sprintf("LINE %d\n", i)); err != nil {
panic(err)
}
}()
if i == 100 {
break
}
}
time.Sleep(time.Second)
fc, err := toString(fpath)
fmt.Println(fpath, "contents:", fc)
// file.txt contents:
defer func() {
if err := os.Remove(fpath); err != nil {
panic(err)
}
}()
}
func toString(fpath string) (string, error) {
file, err := os.Open(fpath)
if err != nil {
// NOT retur nil, err
// []byte can be null but not string
return "", err
}
defer file.Close()
// func ReadAll(r io.Reader) ([]byte, error)
tbytes, err := ioutil.ReadAll(file)
if err != nil {
return "", err
}
return string(tbytes), nil
}
sync/atomic
In concurrent programming, an operation (or set of operations) is atomic, linearizable, indivisible or un-interruptible if it occurs instantaneously to the rest of the system. Atomicity is a guarantee of isolation from concurrent processes. For example, let’s say that we have a web application that has a shared variable declared globally. Every request spawns its own goroutine, and if each goroutine tries to manipulate the shared variable, there is a high probability of race condition. If you just need a reference counter in a global scope, Go sync/atomic would be the simplest way to ensure the atomicity of a global variable between several goroutines, like here:
package main
import (
"fmt"
"log"
"sync/atomic"
"time"
)
func main() {
var refCounter int32 = 0
fmt.Println(atomic.LoadInt32(&refCounter))
fmt.Println(atomic.AddInt32(&refCounter, 1))
fmt.Println(atomic.LoadInt32(&refCounter))
fmt.Println(refCounter)
go func() {
time.Sleep(10 * time.Second)
atomic.AddInt32(&refCounter, -1)
}()
for atomic.LoadInt32(&refCounter) != 0 {
log.Println("Sleeping 20 seconds")
time.Sleep(20 * time.Second)
fmt.Println(refCounter)
}
atomic.AddInt32(&refCounter, 1)
atomic.AddInt32(&refCounter, -1)
}
web server
Go’s HTTP server spawns a goroutine per request—neither a process nor a thread:
func (srv *Server) Serve(l net.Listener) error
Serve accepts incoming connections on the Listener
l
, creating a new service goroutine for each. The service goroutines read requests and then callsrv.Handler
to reply to them.
// Serve accepts incoming connections on the Listener l, creating a
// new service goroutine for each. The service goroutines read requests and
// then call srv.Handler to reply to them.
func (srv *Server) Serve(l net.Listener) error {
defer l.Close()
var tempDelay time.Duration // how long to sleep on accept failure
for {
rw, e := l.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
tempDelay = 0
c, err := srv.newConn(rw)
if err != nil {
continue
}
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve()
}
}
And try this:
package main
import (
"net/http"
)
func main() {
http.HandleFunc("/", foo)
http.ListenAndServe(":3000", nil)
}
func foo(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "A Go Web Server")
w.WriteHeader(200)
}
/*
curl -i localhost:3000
HTTP/1.1 200 OK
Server: A Go Web Server
Date: Fri, 17 Oct 2014 20:01:38 GMT
Content-Length: 0
Content-Type: text/plain; charset=utf-8
*/
sync.Mutex
is just a value
Try this:
package main
import (
"fmt"
"sync"
"time"
)
/*
Ian Lance Taylor (https://groups.google.com/d/msg/golang-nuts/7Xi2APcqpM0/UzHJnabiDQAJ):
You are looking at this incorrectly in some way that I don't
understand. A sync.Mutex is a value with two methods: Lock and
Unlock. Lock acquires a lock on the mutex. Unlock releases it. Only
one goroutine can acquire the lock on the mutex at a time.
That's all there is. A mutex doesn't have a scope. It can be a field
of a struct but it doesn't have to be. A mutex doesn't protect
anything in particular by itself. You have to write your code to call
Lock, do the protected operations, and then call Unlock.
Your example code looks fine.
*/
func main() {
var hits struct {
sync.Mutex
n int
}
hits.Lock()
hits.n++
hits.Unlock()
fmt.Println(hits)
// {{0 0} 1}
m := map[string]time.Time{}
// without this:
// Found 1 data race(s)
var mutex sync.Mutex
done := make(chan struct{})
for range []int{0, 1} {
go func() {
mutex.Lock()
m[time.Now().String()] = time.Now()
mutex.Unlock()
done <- struct{}{}
}()
}
cn := 0
for range done {
cn++
if cn == 2 {
close(done)
}
}
fmt.Println(m)
/*
map[2015-11-05 20:42:36.516629792 -0800 PST:2015-11-05 20:42:36.516678634 -0800 PST 2015-11-05 20:42:36.516685141 -0800 PST:2015-11-05 20:42:36.516686379 -0800 PST]
*/
}
sync.Once
package main
import (
"fmt"
"sync"
)
type Mine struct {
createOnce *sync.Once
}
func main() {
m := Mine{}
m.createOnce = &sync.Once{}
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
// m.createOnce = &sync.Once{}
m.createOnce.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
fmt.Println()
for i := 0; i < 10; i++ {
go func() {
m.createOnce = &sync.Once{}
m.createOnce.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
/*
Only once
Only once
Only once
Only once
Only once
Only once
Only once
Only once
Only once
Only once
Only once
*/
goroutine
, closure
Try this:
package main
import (
"fmt"
"time"
)
func main() {
for _, d := range []int{1, 2} {
x := d
func() {
fmt.Printf("%d(x: %d)\n", d, x)
}()
}
time.Sleep(time.Second)
fmt.Println()
/*
1(x: 1)
2(x: 2)
*/
// (X) DON'T DO THIS
for _, d := range []int{10, 20} {
x := d
go func() {
fmt.Printf("%d(x: %d)\n", d, x)
}()
}
time.Sleep(time.Second)
fmt.Println()
/*
20(x: 10)
20(x: 20)
*/
for _, d := range []int{100, 200} {
go func(d int) {
fmt.Printf("%d\n", d)
}(d)
}
time.Sleep(time.Second)
fmt.Println()
/*
200
100
*/
// https://github.com/coreos/etcd/pull/3880#issuecomment-157442671
// 'forever' is first evaluated without creating a new goroutine.
// And then 'wrap(forever())' is evaluated with a new goroutine.
go func() { wrap(forever()) }()
// calling forever...
// this is running in the background(goroutine)
// 'forever' is first evaluated without creating a new goroutine.
// There is no goroutine created to run this in the background.
// So this is blocking forever!!!
go wrap(forever())
// calling forever...
}
func forever() error {
fmt.Println("calling forever...")
time.Sleep(time.Hour)
return nil
}
func wrap(err error) {
_ = err
}
rate limit
package main
import (
"fmt"
"log"
"sync"
"time"
)
func main() {
func() {
q := NewQueue(10, time.Second/3)
for range []int{0, 1, 2, 3, 4, 5, 6, 7, 8} {
if q.Push(time.Now()) {
log.Fatalf("should not have exceeded the rate limit: %+v", q)
}
}
for range []int{0, 1, 2, 3, 4, 5, 6, 7, 8} {
if !q.Push(time.Now()) {
log.Fatalf("should have exceeded the rate limit: %+v", q)
}
}
if q.slice.length() != 10 {
log.Fatalf("Queue should only have 10 timestamps: %+v", q)
}
}()
func() {
q := NewQueue(10, time.Second/10)
tick := time.NewTicker(time.Second / 8)
done := make(chan struct{})
go func() {
now := time.Now()
for tk := range tick.C {
log.Println("took:", time.Since(now))
now = time.Now()
isExceeded := q.Push(tk)
if isExceeded {
log.Println(tk, "has exceeded the rate limit", q.rate)
tick.Stop()
done <- struct{}{}
break
}
}
}()
select {
case <-time.After(3 * time.Second):
log.Fatalln("time out!")
case <-done:
log.Println("success")
}
}()
}
// timeSlice stores a slice of time.Time
// in a thread-safe way.
type timeSlice struct {
// RWMutex is more expensive
// https://blogs.oracle.com/roch/entry/beware_of_the_performance_of
// sync.RWMutex
//
// to synchronize access to shared state across multiple goroutines.
//
mu sync.Mutex
times []time.Time
}
func newTimeSlice() *timeSlice {
tslice := timeSlice{}
sl := make([]time.Time, 0)
tslice.times = sl
return &tslice
}
func (t *timeSlice) push(ts time.Time) {
t.mu.Lock()
t.times = append(t.times, ts)
t.mu.Unlock()
}
func (t *timeSlice) length() int {
t.mu.Lock()
d := len(t.times)
t.mu.Unlock()
return d
}
func (t *timeSlice) pop() {
if t.length() != 0 {
t.mu.Lock()
t.times = t.times[1:len(t.times):len(t.times)]
t.mu.Unlock()
}
}
func (t *timeSlice) first() (time.Time, bool) {
if t.length() == 0 {
return time.Time{}, false
}
t.mu.Lock()
v := t.times[0]
t.mu.Unlock()
return v, true
}
// Queue contains the slice of timestamps
// and other rate limiter configurations.
type Queue struct {
slice *timeSlice
// burstSize is like a buffer.
// If burstSize is 5, it allows rate exceeding
// for the fist 5 elements.
burstSize int
rate time.Duration
}
// NewQueue returns a new Queue.
func NewQueue(burstSize int, rate time.Duration) *Queue {
tslice := newTimeSlice()
q := Queue{}
q.slice = tslice
q.burstSize = burstSize
q.rate = rate
return &q
}
// Push appends the timestamp to the Queue.
// It return true if rate has exceeded.
// We need a pointer of Queue, where it defines
// timeSlice with pointer as well. To append to slice
// and update struct members, we need pointer types.
func (q *Queue) Push(ts time.Time) bool {
if q.slice.length() == q.burstSize {
q.slice.pop()
}
q.slice.push(ts)
if q.slice.length() < q.burstSize {
return false
}
ft, ok := q.slice.first()
if !ok {
return false
}
diff := ft.Sub(ts)
return q.rate > diff
}
func (q *Queue) String() string {
return fmt.Sprintf("times: %+v / burstSize: %d / rate: %v", q.slice.times, q.burstSize, q.rate)
}
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
func main() {
var (
num int
mu sync.Mutex
qps = 10
wg sync.WaitGroup
N = 10000
)
wg.Add(N)
limiter := rate.NewLimiter(rate.Every(time.Second), qps)
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
for limiter.Wait(context.TODO()) == nil {
mu.Lock()
num++
mu.Unlock()
}
}(i)
}
time.Sleep(time.Second)
mu.Lock()
fmt.Println("num:", num)
mu.Unlock()
fmt.Println("burst:", limiter.Burst())
fmt.Println("blocking...")
donec := make(chan struct{})
go func() {
wg.Wait()
close(donec)
}()
select {
case <-donec:
fmt.Println("Done!")
case <-time.After(time.Second):
fmt.Println("Timed out!")
}
}
/*
num: 11
burst: 10
blocking...
Timed out!
*/
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
func main() {
var (
num int
mu sync.Mutex
qps = 10
wg sync.WaitGroup
N = 10000
)
wg.Add(N)
limiter := rate.NewLimiter(rate.Every(time.Second), qps)
donec := make(chan struct{})
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
go func() {
for limiter.Wait(context.TODO()) == nil {
mu.Lock()
num++
mu.Unlock()
}
}()
<-donec
}(i)
}
time.Sleep(time.Second)
mu.Lock()
fmt.Println("num:", num)
mu.Unlock()
fmt.Println("burst:", limiter.Burst())
fmt.Println("closing...")
close(donec)
wg.Wait()
fmt.Println("Done!")
}
/*
num: 11
burst: 10
closing...
Done!
*/
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
func main() {
var (
num int
mu sync.Mutex
qps = 10
wg sync.WaitGroup
N = 10000
)
wg.Add(N)
ctx, cancel := context.WithCancel(context.Background())
limiter := rate.NewLimiter(rate.Every(time.Second), qps)
for i := 0; i < N; i++ {
go func() {
defer wg.Done()
for {
if err := limiter.Wait(ctx); err == context.Canceled {
return
}
mu.Lock()
num++
mu.Unlock()
}
}()
}
time.Sleep(time.Second)
mu.Lock()
fmt.Println("num:", num)
mu.Unlock()
fmt.Println("burst:", limiter.Burst())
fmt.Println("canceling...")
cancel()
wg.Wait()
fmt.Println("Done!")
}
/*
num: 11
burst: 10
canceling...
Done!
*/
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
func main() {
st := &stresser{
qps: 100000,
N: 100,
}
go st.Start()
time.Sleep(time.Second)
st.StopAndWait()
fmt.Println("st.success", st.success)
println()
go st.Start()
time.Sleep(time.Second)
st.StopAndWait()
fmt.Println("st.success", st.success)
println()
go st.Start()
time.Sleep(time.Second)
st.StopAndWait()
fmt.Println("st.success", st.success)
}
/*
s.cancel() 1
s.cancel() 2
Start finished with context canceled
wg.Wait() 1
wg.Wait() 2
st.success 100001
s.cancel() 1
s.cancel() 2
wg.Wait() 1
Start finished with context canceled
wg.Wait() 2
st.success 200002
s.cancel() 1
s.cancel() 2
wg.Wait() 1
Start finished with context canceled
wg.Wait() 2
st.success 300003
*/
type stresser struct {
qps int
N int
mu sync.Mutex
wg *sync.WaitGroup
rateLimiter *rate.Limiter
cancel func()
canceled bool
success int
}
func (s *stresser) Start() {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(s.N)
s.mu.Lock()
s.wg = wg
s.rateLimiter = rate.NewLimiter(rate.Every(time.Second), s.qps)
s.cancel = cancel
s.canceled = false
s.mu.Unlock()
for i := 0; i < s.N; i++ {
go s.run(ctx)
}
<-ctx.Done()
fmt.Println("Start finished with", ctx.Err())
}
func (s *stresser) run(ctx context.Context) {
defer s.wg.Done()
for {
if err := s.rateLimiter.Wait(ctx); err == context.Canceled || ctx.Err() == context.Canceled {
return
}
s.mu.Lock()
canceled := s.canceled
s.mu.Unlock()
if canceled {
panic("canceled but got context...")
}
s.mu.Lock()
s.success++
s.mu.Unlock()
}
}
func (s *stresser) StopAndWait() {
s.mu.Lock()
fmt.Println("s.cancel() 1")
s.cancel()
fmt.Println("s.cancel() 2")
s.canceled = true
wg := s.wg
s.mu.Unlock()
fmt.Println("wg.Wait() 1")
wg.Wait()
fmt.Println("wg.Wait() 2")
}
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
func main() {
st := &stresser{
qps: 100,
N: 10,
}
go st.start()
time.Sleep(2 * time.Second)
st.StopAndWait()
fmt.Println("success", st.success) // success 299
}
type stresser struct {
qps int
N int
mu sync.Mutex
wg *sync.WaitGroup
rateLimiter *rate.Limiter
cancel func()
success int
}
func (s *stresser) start() {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(s.N)
s.mu.Lock()
s.wg = wg
s.rateLimiter = rate.NewLimiter(rate.Limit(s.qps), s.qps)
s.cancel = cancel
s.mu.Unlock()
for i := 0; i < s.N; i++ {
go s.run(ctx)
}
<-ctx.Done()
}
func (s *stresser) run(ctx context.Context) {
defer s.wg.Done()
for {
if err := s.rateLimiter.Wait(ctx); err == context.Canceled || ctx.Err() == context.Canceled {
return
}
s.mu.Lock()
s.success++
s.mu.Unlock()
}
}
func (s *stresser) StopAndWait() {
s.mu.Lock()
s.cancel()
wg := s.wg
s.mu.Unlock()
wg.Wait()
}
select
, continue
, break
package main
import (
"fmt"
"time"
)
func main() {
d1, d2 := time.Millisecond, time.Second
for {
select {
case <-time.After(d1):
d1 = time.Hour
fmt.Println("d1 = time.Hour")
continue // continue to the for-loop
case <-time.After(d2):
break // break and go to the lines below select
}
d2 = time.Nanosecond
fmt.Println("d2 = time.Nanosecond")
break // otherwise infinite for-loop
}
ch := make(chan string, 5000)
ch <- "a"
ch <- "a"
ch <- "a"
ch <- "a"
ch <- "a"
done := make(chan struct{})
go func() {
here:
for {
select {
case s, ok := <-ch:
if !ok {
fmt.Println("break 1")
break // closed
}
fmt.Println(s, ok)
case <-time.After(time.Second):
fmt.Println("break 2")
break here
}
}
fmt.Println("break 3")
done <- struct{}{}
}()
<-done
fmt.Println("done")
}
/*
a true
a true
a true
a true
a true
break 2
break 3
done
*/
Counting problem
Suppose millions of concurrent web requests coming to your web application. And you want to count visits, or any other metrics per request. Counting should not hurt the performance of your application. Counting is an inherently sequential problem. There's one resource to be updated while concurrent, multiple requests can cause contentions. Then what would be the best way to count with concurrency?
Count: simulate web requests
Here's how I would simulate the web requests:
func RunCountHandler(b *testing.B, isDebug bool, counter Counter, delta float64) {
countHandler := func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
original := counter.Get()
counter.Add(delta)
fmt.Fprintf(w, "Original: %v / Added: %v / Current: %v", original, delta, counter.Get())
default:
http.Error(w, "Method Not Allowed", 405)
}
}
mainRouter := http.NewServeMux()
mainRouter.HandleFunc("/", httpLog(isDebug, countHandler))
numberOfRequests := 100
// don't do this at Travis
for i := 0; i < b.N; i++ {
b.StopTimer()
ts := httptest.NewServer(mainRouter)
var wg sync.WaitGroup
wg.Add(numberOfRequests)
for i := 0; i < numberOfRequests; i++ {
go func() {
defer wg.Done()
if resp, err := http.Get(ts.URL); err != nil {
panic(err)
} else {
// bstr, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
// if err != nil {
// panic(err)
// }
// fmt.Println(string(bstr))
// without Close
// 2015/08/02 16:49:00 http: Accept error: accept tcp6 [::1]:38096: accept4: too many open files; retrying in 1s
// 2015/08/02 16:49:01 http: Accept error: accept tcp6 [::1]:38096: accept4: too many open files; retrying in 1s
}
}()
}
b.StartTimer()
wg.Wait()
ts.Close()
}
}
Counting operation takes only about nanoseconds while http
request
takes much milliseconds. Benchmarking by mocking web server won't be able to isolate
the performance of counting
as below, except that channel
method is slower because
it allocates more memory:
BenchmarkServer_NaiveCounter 100 13641425 ns/op 1724319 B/op 10551 allocs/op
BenchmarkServer_NaiveCounter-2 200 5577538 ns/op 1761024 B/op 10465 allocs/op
BenchmarkServer_NaiveCounter-4 300 3970441 ns/op 1736143 B/op 10392 allocs/op
BenchmarkServer_NaiveCounter-8 500 3054495 ns/op 1636052 B/op 9846 allocs/op
BenchmarkServer_NaiveCounter-16 500 2754022 ns/op 1446608 B/op 8784 allocs/op
BenchmarkServer_MutexCounter 100 10334728 ns/op 1739715 B/op 10570 allocs/op
BenchmarkServer_MutexCounter-2 200 6533533 ns/op 1737853 B/op 10466 allocs/op
BenchmarkServer_MutexCounter-4 300 4217715 ns/op 1703817 B/op 10349 allocs/op
BenchmarkServer_MutexCounter-8 500 3072379 ns/op 1599124 B/op 9745 allocs/op
BenchmarkServer_MutexCounter-16 500 2721123 ns/op 1417956 B/op 8579 allocs/op
BenchmarkServer_RWMutexCounter 100 11248896 ns/op 1736902 B/op 10579 allocs/op
BenchmarkServer_RWMutexCounter-2 200 7160659 ns/op 1759653 B/op 10481 allocs/op
BenchmarkServer_RWMutexCounter-4 300 4439413 ns/op 1718228 B/op 10390 allocs/op
BenchmarkServer_RWMutexCounter-8 500 3340555 ns/op 1679569 B/op 10077 allocs/op
BenchmarkServer_RWMutexCounter-16 500 3053389 ns/op 1438662 B/op 8698 allocs/op
BenchmarkServer_AtomicIntCounter 100 12053604 ns/op 1743955 B/op 10590 allocs/op
BenchmarkServer_AtomicIntCounter-2 200 8204060 ns/op 1750468 B/op 10477 allocs/op
BenchmarkServer_AtomicIntCounter-4 300 4443112 ns/op 1710413 B/op 10370 allocs/op
BenchmarkServer_AtomicIntCounter-8 500 3961467 ns/op 1630977 B/op 9897 allocs/op
BenchmarkServer_AtomicIntCounter-16 500 2926347 ns/op 1441098 B/op 8780 allocs/op
BenchmarkServer_AtomicCounter 100 11159504 ns/op 1736091 B/op 10570 allocs/op
BenchmarkServer_AtomicCounter-2 200 7661146 ns/op 1741652 B/op 10482 allocs/op
BenchmarkServer_AtomicCounter-4 300 4450239 ns/op 1725751 B/op 10406 allocs/op
BenchmarkServer_AtomicCounter-8 500 3121161 ns/op 1627260 B/op 9925 allocs/op
BenchmarkServer_AtomicCounter-16 500 2963900 ns/op 1465410 B/op 8873 allocs/op
BenchmarkServer_ChannelCounter_NoBuffer 100 113879946 ns/op 1801659 B/op 10602 allocs/op
BenchmarkServer_ChannelCounter_NoBuffer-2 20 111064393 ns/op 1742514 B/op 10512 allocs/op
BenchmarkServer_ChannelCounter_NoBuffer-4 20 110180521 ns/op 1801574 B/op 10566 allocs/op
BenchmarkServer_ChannelCounter_NoBuffer-8 100 30717707 ns/op 1990469 B/op 10692 allocs/op
BenchmarkServer_ChannelCounter_NoBuffer-16 100 24029631 ns/op 1689640 B/op 9902 allocs/op
BenchmarkServer_ChannelCounter_Buffer 2 1126995870 ns/op 1576520 B/op 10680 allocs/op
BenchmarkServer_ChannelCounter_Buffer-2 3 684545001 ns/op 1710218 B/op 10609 allocs/op
BenchmarkServer_ChannelCounter_Buffer-4 3 417227794 ns/op 1782202 B/op 10636 allocs/op
BenchmarkServer_ChannelCounter_Buffer-8 10 188985058 ns/op 1850097 B/op 10654 allocs/op
BenchmarkServer_ChannelCounter_Buffer-16 10 119519447 ns/op 1591680 B/op 9212 allocs/op
Count: NaiveCounter
[**_NaiveCounter
is the fastest way to count but subject to race
conditions, as here.
This is not thread-safe:
package main
import (
"fmt"
"sync"
)
// Counter is an interface for counting.
// It contains counting data as long as a type
// implements all the methods in the interface.
type Counter interface {
// Get returns the current count.
Get() float64
// Add adds the delta value to the counter.
Add(delta float64)
}
// NaiveCounter counts in a naive way.
// Do not use this with concurrency.
// It will cause race conditions.
type NaiveCounter float64
func (c *NaiveCounter) Get() float64 {
// return (*c).(float64)
// (X) (*c).(float64) (non-interface type NaiveCounter on left)
return float64(*c)
}
func (c *NaiveCounter) Add(delta float64) {
*c += NaiveCounter(delta)
}
func main() {
counter := new(NaiveCounter)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
counter.Add(1.347)
counter.Get()
counter.Add(-5.5)
counter.Get()
counter.Add(0.340)
counter.Get()
}()
}
wg.Wait()
fmt.Println(counter.Get())
// -38.12999999999999
}
/*
go run -race 00_naive.go
==================
WARNING: DATA RACE
Read by goroutine 7:
main.main.func1()
/home/ubuntu/go/src/github.com/gyuho/learn/doc/go_concurrent_count/code/00_naive.go:43 +0x70
Previous write by goroutine 6:
main.main.func1()
/home/ubuntu/go/src/github.com/gyuho/learn/doc/go_concurrent_count/code/00_naive.go:43 +0x88
Goroutine 7 (running) created at:
main.main()
/home/ubuntu/go/src/github.com/gyuho/learn/doc/go_concurrent_count/code/00_naive.go:49 +0xc5
Goroutine 6 (finished) created at:
main.main()
/home/ubuntu/go/src/github.com/gyuho/learn/doc/go_concurrent_count/code/00_naive.go:49 +0xc5
==================
-38.12999999999999
Found 1 data race(s)
exit status 66
*/
Count: MutexCounter
Try this:
package main
import (
"fmt"
"sync"
)
// Counter is an interface for counting.
// It contains counting data as long as a type
// implements all the methods in the interface.
type Counter interface {
// Get returns the current count.
Get() float64
// Add adds the delta value to the counter.
Add(delta float64)
}
// MutexCounter implements Counter with sync.Mutex.
type MutexCounter struct {
mu sync.Mutex // guards the following
value float64
}
func (c *MutexCounter) Get() float64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func (c *MutexCounter) Add(delta float64) {
c.mu.Lock()
defer c.mu.Unlock()
c.value += delta
}
func main() {
counter := new(MutexCounter)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
counter.Add(1.347)
counter.Get()
counter.Add(-5.5)
counter.Get()
counter.Add(0.340)
counter.Get()
}()
}
wg.Wait()
fmt.Println(counter.Get())
// 962.0000000000002
}
Count: RWMutexCounter
Try this:
package main
import (
"fmt"
"sync"
)
// Counter is an interface for counting.
// It contains counting data as long as a type
// implements all the methods in the interface.
type Counter interface {
// Get returns the current count.
Get() float64
// Add adds the delta value to the counter.
Add(delta float64)
}
// RWMutexCounter implements Counter with sync.RWMutex.
type RWMutexCounter struct {
mu sync.RWMutex // guards the following sync.
value float64
}
func (c *RWMutexCounter) Get() float64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.value
}
func (c *RWMutexCounter) Add(delta float64) {
c.mu.Lock()
defer c.mu.Unlock()
c.value += delta
}
func main() {
counter := new(RWMutexCounter)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
counter.Add(1.347)
counter.Get()
counter.Add(-5.5)
counter.Get()
counter.Add(0.340)
counter.Get()
}()
}
wg.Wait()
fmt.Println(counter.Get())
// -38.12999999999999
}
Count: AtomicIntCounter
Try this:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// Counter is an interface for counting.
// It contains counting data as long as a type
// implements all the methods in the interface.
type Counter interface {
// Get returns the current count.
Get() float64
// Add adds the delta value to the counter.
Add(delta float64)
}
// AtomicIntCounter implements Counter with atomic package.
// Go has only int64 atomic variable.
// This truncates float value into integer.
type AtomicIntCounter int64
func (c *AtomicIntCounter) Get() float64 {
return float64(atomic.LoadInt64((*int64)(c)))
}
// Add ignores the non-integer part of delta.
func (c *AtomicIntCounter) Add(delta float64) {
atomic.AddInt64((*int64)(c), int64(delta))
}
func main() {
counter := new(AtomicIntCounter)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
counter.Add(1.347)
counter.Get()
counter.Add(-5.5)
counter.Get()
counter.Add(0.340)
counter.Get()
}()
}
wg.Wait()
fmt.Println(counter.Get())
// -40
}
Count: AtomicCounter
Try this:
package main
import (
"fmt"
"math"
"sync"
"sync/atomic"
)
// Counter is an interface for counting.
// It contains counting data as long as a type
// implements all the methods in the interface.
type Counter interface {
// Get returns the current count.
Get() float64
// Add adds the delta value to the counter.
Add(delta float64)
}
// AtomicCounter implements Counter with atomic package.
// Go has only int64 atomic variable.
// This uses math.Float64frombits package for the floating
// point number corresponding the IEEE 754 binary representation
type AtomicCounter uint64
func (c *AtomicCounter) Get() float64 {
return math.Float64frombits(atomic.LoadUint64((*uint64)(c)))
}
// Add ignores the non-integer part of delta.
func (c *AtomicCounter) Add(delta float64) {
for {
oldBits := atomic.LoadUint64((*uint64)(c))
newBits := math.Float64bits(math.Float64frombits(oldBits) + delta)
if atomic.CompareAndSwapUint64((*uint64)(c), oldBits, newBits) {
return
}
}
}
func main() {
counter := new(AtomicCounter)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
counter.Add(1.347)
counter.Get()
counter.Add(-5.5)
counter.Get()
counter.Add(0.340)
counter.Get()
}()
}
wg.Wait()
fmt.Println(counter.Get())
// -38.12999999999999
}
Count: ChannelCounter
(No Buffer)
Try this:
package main
import (
"fmt"
"sync"
)
// Counter is an interface for counting.
// It contains counting data as long as a type
// implements all the methods in the interface.
type Counter interface {
// Get returns the current count.
Get() float64
// Add adds the delta value to the counter.
Add(delta float64)
}
// ChannelCounter counts through channels.
type ChannelCounter struct {
valueChan chan float64
deltaChan chan float64
done chan struct{}
}
func NewChannelCounter(buf int) *ChannelCounter {
c := &ChannelCounter{
make(chan float64),
make(chan float64, buf), // only buffer the deltaChan
make(chan struct{}),
}
go c.Run()
return c
}
func (c *ChannelCounter) Run() {
var value float64
for {
// "select" statement chooses which of a set of
// possible send or receive operations will proceed.
select {
case delta := <-c.deltaChan:
value += delta
case <-c.done:
return
case c.valueChan <- value:
// Do nothing.
// If there is no default case, the "select" statement
// blocks until at least one of the communications can proceed.
}
}
}
func (c *ChannelCounter) Get() float64 {
return <-c.valueChan
}
func (c *ChannelCounter) Add(delta float64) {
c.deltaChan <- delta
}
func (c *ChannelCounter) Done() {
c.done <- struct{}{}
}
func (c *ChannelCounter) Close() {
close(c.deltaChan)
}
func main() {
counter := NewChannelCounter(0)
defer counter.Done()
defer counter.Close()
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
counter.Add(1.347)
counter.Get()
counter.Add(-5.5)
counter.Get()
counter.Add(0.340)
counter.Get()
}()
}
wg.Wait()
fmt.Println(counter.Get())
// -38.12999999999997
}
Count: ChannelCounter
(Buffer)
Try this:
package main
import (
"fmt"
"sync"
)
// Counter is an interface for counting.
// It contains counting data as long as a type
// implements all the methods in the interface.
type Counter interface {
// Get returns the current count.
Get() float64
// Add adds the delta value to the counter.
Add(delta float64)
}
// ChannelCounter counts through channels.
type ChannelCounter struct {
valueChan chan float64
deltaChan chan float64
done chan struct{}
}
func NewChannelCounter(buf int) *ChannelCounter {
c := &ChannelCounter{
make(chan float64),
make(chan float64, buf), // only buffer the deltaChan
make(chan struct{}),
}
go c.Run()
return c
}
func (c *ChannelCounter) Run() {
var value float64
for {
// "select" statement chooses which of a set of
// possible send or receive operations will proceed.
select {
case delta := <-c.deltaChan:
value += delta
case <-c.done:
return
case c.valueChan <- value:
// Do nothing.
// If there is no default case, the "select" statement
// blocks until at least one of the communications can proceed.
}
}
}
func (c *ChannelCounter) Get() float64 {
return <-c.valueChan
}
func (c *ChannelCounter) Add(delta float64) {
c.deltaChan <- delta
}
func (c *ChannelCounter) Done() {
c.done <- struct{}{}
}
func (c *ChannelCounter) Close() {
close(c.deltaChan)
}
func main() {
counter := NewChannelCounter(10)
defer counter.Done()
defer counter.Close()
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
counter.Add(1.347)
counter.Get()
counter.Add(-5.5)
counter.Get()
counter.Add(0.340)
counter.Get()
}()
}
wg.Wait()
fmt.Println(counter.Get())
// -38.12999999999997
}
Count: benchmark results
Add
, in the descending order of time per operation:
NaiveCounter
but should be ignored. Not thread-safeAtomicIntCounter
but only supportsint64
typeAtomicCounter
MutexCounter
RWMutexCounter
ChannelCounter_Buffer
is faster than non-buffered channelChannelCounter_NoBuffer
Get
, in the descending order of time per operation:
NaiveCounter
but should be ignored. Not thread-safeAtomicIntCounter
but only supportsint64
typeAtomicCounter
MutexCounter
RWMutexCounter
ChannelCounter_Buffer
is faster than non-buffered channelChannelCounter_NoBuffer
And channel
is slower than sync.Mutex
because it allocates more memory.
Dequeue problem
[In 2002] I got asked to fix the log analysis problem we had there. We were getting things like web query logs coming at a literally exponential rate at the time. And the analysis was written in Python and was getting close to one day per day, and at some point it was clear that that wasn’t going to last and we had to be able to generate the bills. … We’re talking massive, massive injection rates. You can’t do queries while you’re that much data. Even if you build a database to hold that much data, the incremental changes would kill you. … We designed a system called Sawmil … Time to process a day’s log dropped from 24 hours to 2 minutes. Program runs on thousands of machines in parallel.
Here's similar situation. Suppose a queue where you have to:
- Consume the queue.
- Decode raw messages into meaningful data.
- Only insert the unique data into a database.
- Messaging queue can be anything. In this case, use SQS from AWS.
- And PostgreSQL to store the data.
Solution #1: In-Memory
Given no memory constraint, I would hash every message into a Python dictionary, and use it as a lookup table and only insert the ones that are not in the database yet.
It would take only a few minutes to write a Python script for this:
def do():
# connect to AWS SQS
conn = boto.sqs.connect_to_region(...)
queue = conn.get_queue('message')
# messages need to be unique
existing_message = {}
cur = database.execute("select msg from table")
for r in cur:
existing_message[r['msg']] = True
while True:
msgs = queue.get_messages(...)
for msg in msgs:
raw = msg.get_body()
# transform(decode) the raw message
parsed = json.loads(raw)
# skip duplicate message
if parsed['msg'] in existing_message:
continue
# insert into database
database.execute("insert...")
print "Done"
This would work only with a small amount of data. With data growing at an exponential rate, messages would keep piling up and processing would take longer than it should.
And even worse when we add more challenges as below:
- Messages are NOT sorted and contain some duplicate entries.
- One message in database must be unique against all entries in the database.
- There are over millions of messages coming every day.
- They are too big to fit in memory.
Solution #2: Disk Key/Value Storage + Concurrency
But if you have millions of messages everyday, in-memory method cannot work. MySQL and PostgreSQL have INSERT syntax to handle entries with primary key conflicts. However:
- **
INSERTING
**ing each row manually is too slow. - Using a temporary table to join data into the database is also slow.
We need
COPY
command and import local csv files, which is much faster. But this does not
ensure the message uniqueness or prevent primary key conflicts. One primary key
conflict in a csv file can fail the whole csv import command.
So I first tried with a separate database:
- Rewrite Python code in Go in order to dequeue and decode raw messages with concurrency
- Set up a separate database LevelDB to maintain the data uniqueness.
- Import the filtered data in csv format, with
COPY
command.
Dequeue result
- Python version took 3 minutes to process 5,000 messages.
- Go version takes only 15 seconds: 12x faster.
package main
import (
"encoding/base64"
"fmt"
"log"
"os"
"runtime"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/sqs"
)
const (
awsAccessKey = "YOUR_ACCESS_KEY"
awsSecretKey = "YOUR_SECRET_KEY"
queueURL = "YOUR_SQS_ENDPOINT"
)
func init() {
println()
maxCPU := runtime.NumCPU()
runtime.GOMAXPROCS(maxCPU)
log.Println("Running with", maxCPU, "cpus")
println()
}
func main() {
msgs, receipts := getMessages(1)
for _, msg := range msgs {
fmt.Println(msg)
}
deleteMessageBatch(receipts)
}
type data struct {
// Use your own data type
// with your parsing function
rawMessage string
// receipt can be used later
// to delete messages
// after you process them
receipt *string
}
// in the actual code, I have my own JSON parsing
// function, which will be parallelized with these goroutines
func parse(bt []byte) string {
return string(bt)
}
func getMessages(maxSet int) ([]string, []*string) {
if err := os.Setenv("AWS_ACCESS_KEY", awsAccessKey); err != nil {
log.Fatal(err)
}
if err := os.Setenv("AWS_SECRET_KEY", awsSecretKey); err != nil {
log.Fatal(err)
}
creds := credentials.NewEnvCredentials()
config := aws.Config{}
config.Credentials = creds
config.Region = "us-west-2"
sqsClient := sqs.New(&config)
rcvInput := &sqs.ReceiveMessageInput{}
rcvInput.QueueURL = aws.String(queueURL)
var (
maxNum int64 = 10
vizTimeout int64 = 120
waitTimeout int64 = 10
)
rcvInput.MaxNumberOfMessages = &maxNum
rcvInput.VisibilityTimeout = &vizTimeout
rcvInput.WaitTimeSeconds = &waitTimeout
decodedEvents := []string{}
receiptHandlesToDelete := []*string{}
iterCount := 0
for {
fmt.Println("Dequeuing ", iterCount*10)
rcvOutput, err := sqsClient.ReceiveMessage(rcvInput)
if err != nil {
log.Fatal(err)
}
if rcvOutput == nil {
log.Println("No messages")
break
}
if rcvOutput.Messages == nil {
log.Println("No messages")
break
}
msgSize := len(rcvOutput.Messages)
if msgSize == 0 {
log.Println("No messages")
break
}
dataCh := make(chan data, 10)
for _, msg := range rcvOutput.Messages {
go func(msg *sqs.Message) {
if msg.Body == nil {
return
}
byteData, err := base64.StdEncoding.DecodeString(*msg.Body)
if err != nil {
log.Fatalf("sqs.Message: %+v %+v", msg, err)
}
oneData := data{}
oneData.rawMessage = parse(byteData)
oneData.receipt = msg.ReceiptHandle
dataCh <- oneData
}(msg)
}
i := 0
for data := range dataCh {
decodedEvents = append(decodedEvents, data.rawMessage)
receiptHandlesToDelete = append(receiptHandlesToDelete, data.receipt)
i++
if i == msgSize {
close(dataCh)
}
}
iterCount++
if iterCount == maxSet {
break
}
}
return decodedEvents, receiptHandlesToDelete
}
func deleteMessageBatch(receiptHandlesToDelete []*string) {
if err := os.Setenv("AWS_ACCESS_KEY", awsAccessKey); err != nil {
log.Fatal(err)
}
if err := os.Setenv("AWS_SECRET_KEY", awsSecretKey); err != nil {
log.Fatal(err)
}
creds := credentials.NewEnvCredentials()
config := aws.Config{}
config.Credentials = creds
config.Region = "us-west-2"
sqsClient := sqs.New(&config)
slices := []*sqs.DeleteMessageBatchInput{}
tempEntries := []*sqs.DeleteMessageBatchRequestEntry{}
for i, id := range receiptHandlesToDelete {
one := &sqs.DeleteMessageBatchRequestEntry{}
one.ID = aws.String(fmt.Sprintf("%d", i))
one.ReceiptHandle = id
tempEntries = append(tempEntries, one)
if len(tempEntries) == 10 {
dmbInput := &sqs.DeleteMessageBatchInput{}
dmbInput.QueueURL = aws.String(queueURL)
entries := []*sqs.DeleteMessageBatchRequestEntry{}
for _, elem := range tempEntries {
entries = append(entries, elem)
}
dmbInput.Entries = entries
tempEntries = []*sqs.DeleteMessageBatchRequestEntry{}
slices = append(slices, dmbInput)
}
}
for i, dinput := range slices {
if i%100 == 0 {
log.Println("Deleting", i*10, "/", len(receiptHandlesToDelete))
}
if _, err := sqsClient.DeleteMessageBatch(dinput); err != nil {
log.Fatal(err)
}
}
}
If you want even more concurrency:
package main
import (
"encoding/base64"
"fmt"
"log"
"os"
"runtime"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/sqs"
)
const (
awsAccessKey = "YOUR_ACCESS_KEY"
awsSecretKey = "YOUR_SECRET_KEY"
queueURL = "YOUR_SQS_ENDPOINT"
)
func init() {
println()
maxCPU := runtime.NumCPU()
runtime.GOMAXPROCS(maxCPU)
log.Println("Running with", maxCPU, "cpus")
println()
}
func main() {
msgs, receipts := getMessages(1)
for _, msg := range msgs {
fmt.Println(msg)
}
deleteMessageBatch(receipts)
}
type data struct {
// Use your own data type
// with your parsing function
rawMessage string
// receipt can be used later
// to delete messages
// after you process them
receipt *string
}
// in the actual code, I have my own JSON parsing
// function, which will be parallelized with these goroutines
func parse(bt []byte) string {
return string(bt)
}
func getMessages(maxSet int) ([]string, []*string) {
if err := os.Setenv("AWS_ACCESS_KEY", awsAccessKey); err != nil {
log.Fatal(err)
}
if err := os.Setenv("AWS_SECRET_KEY", awsSecretKey); err != nil {
log.Fatal(err)
}
creds := credentials.NewEnvCredentials()
config := aws.Config{}
config.Credentials = creds
config.Region = "us-west-2"
sqsClient := sqs.New(&config)
rcvInput := &sqs.ReceiveMessageInput{}
rcvInput.QueueURL = aws.String(queueURL)
var (
maxNum int64 = 10
vizTimeout int64 = 120
waitTimeout int64 = 10
)
rcvInput.MaxNumberOfMessages = &maxNum
rcvInput.VisibilityTimeout = &vizTimeout
rcvInput.WaitTimeSeconds = &waitTimeout
decodedEvents := []string{}
receiptHandlesToDelete := []*string{}
iterCount := 0
dataCh := make(chan data, 10*maxSet)
msgSize := 0
for {
fmt.Println("Dequeuing ", iterCount*10)
rcvOutput, err := sqsClient.ReceiveMessage(rcvInput)
if err != nil {
log.Fatal(err)
}
if rcvOutput == nil {
log.Println("No messages")
break
}
if rcvOutput.Messages == nil {
log.Println("No messages")
break
}
size := len(rcvOutput.Messages)
msgSize += size
if size == 0 {
log.Println("No messages")
break
}
for _, msg := range rcvOutput.Messages {
go func(msg *sqs.Message) {
if msg.Body == nil {
return
}
byteData, err := base64.StdEncoding.DecodeString(*msg.Body)
if err != nil {
log.Fatalf("sqs.Message: %+v %+v", msg, err)
}
oneData := data{}
oneData.rawMessage = parse(byteData)
oneData.receipt = msg.ReceiptHandle
dataCh <- oneData
}(msg)
}
iterCount++
if iterCount == maxSet {
break
}
}
i := 0
for data := range dataCh {
decodedEvents = append(decodedEvents, data.rawMessage)
receiptHandlesToDelete = append(receiptHandlesToDelete, data.receipt)
i++
if i == msgSize {
close(dataCh)
}
}
return decodedEvents, receiptHandlesToDelete
}
func deleteMessageBatch(receiptHandlesToDelete []*string) {
if err := os.Setenv("AWS_ACCESS_KEY", awsAccessKey); err != nil {
log.Fatal(err)
}
if err := os.Setenv("AWS_SECRET_KEY", awsSecretKey); err != nil {
log.Fatal(err)
}
creds := credentials.NewEnvCredentials()
config := aws.Config{}
config.Credentials = creds
config.Region = "us-west-2"
sqsClient := sqs.New(&config)
slices := []*sqs.DeleteMessageBatchInput{}
tempEntries := []*sqs.DeleteMessageBatchRequestEntry{}
for i, id := range receiptHandlesToDelete {
one := &sqs.DeleteMessageBatchRequestEntry{}
one.ID = aws.String(fmt.Sprintf("%d", i))
one.ReceiptHandle = id
tempEntries = append(tempEntries, one)
if len(tempEntries) == 10 {
dmbInput := &sqs.DeleteMessageBatchInput{}
dmbInput.QueueURL = aws.String(queueURL)
entries := []*sqs.DeleteMessageBatchRequestEntry{}
for _, elem := range tempEntries {
entries = append(entries, elem)
}
dmbInput.Entries = entries
tempEntries = []*sqs.DeleteMessageBatchRequestEntry{}
slices = append(slices, dmbInput)
}
}
for i, dinput := range slices {
if i%100 == 0 {
log.Println("Deleting", i*10, "/", len(receiptHandlesToDelete))
}
if _, err := sqsClient.DeleteMessageBatch(dinput); err != nil {
log.Fatal(err)
}
}
}
And here's how you would use LevelDB for this case:
package main
import (
"log"
"runtime"
"strings"
"time"
"github.com/syndtr/goleveldb/leveldb"
)
func init() {
maxCPU := runtime.NumCPU()
runtime.GOMAXPROCS(runtime.NumCPU())
log.Println("Concurrent execution with", maxCPU, "CPUs.")
}
func main() {
start := time.Now()
levelDBpath := "./db"
ldb, err := leveldb.OpenFile(levelDBpath, nil)
if err != nil {
log.Fatal(err)
}
defer ldb.Close()
maxSet := 450 // import maxSet * 10 messages
decodedEvents, receiptHandlersToDelete := getMessages(maxSet)
foundEvent := make(map[string]bool)
idsToPut := []string{}
for _, msg := range decodedEvents {
// check id collision within a batch
if _, ok := foundEvent[msg.EventHash]; ok {
continue
} else {
foundEvent[msg.EventHash] = true
data, err := ldb.Get([]byte(msg.EventHash), nil)
if err != nil {
if !strings.Contains(err.Error(), "not found") {
log.Fatal(err)
}
}
if data != nil {
log.Printf("Found Duplicate Event: %s", string(data))
continue
}
idsToPut = append(idsToPut, msg.EventHash)
}
}
// do your data import job here
log.Println("maintain the lookup table")
for _, id := range idsToPut {
// maintain the lookup table
if err := ldb.Put([]byte(id), []byte("true"), nil); err != nil {
log.Fatal(err)
}
}
}
Or you can even write directly to local disk storage:
package main
import (
"crypto/sha512"
"encoding/hex"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
)
func exist(fpath string) bool {
// Does a directory exist
st, err := os.Stat(fpath)
if err != nil {
return false
}
if st.IsDir() {
return true
}
if _, err := os.Stat(fpath); err != nil {
if strings.Contains(err.Error(), "no such file") {
return false
}
}
return true
}
func getHash(bt []byte) string {
if bt == nil {
return ""
}
h := sha512.New()
h.Write(bt)
sha512Hash := hex.EncodeToString(h.Sum(nil))
return sha512Hash
}
func main() {
var events = []string{
"Hello World!",
"Hello World!",
"different",
}
for _, event := range events {
eventHash := getHash([]byte(event))
if !exist(eventHash) {
if err := ioutil.WriteFile(eventHash, nil, 0644); err != nil {
log.Fatal(err)
}
fmt.Println("Saved", event, "with", eventHash)
} else {
fmt.Println("Found duplicate events:", event)
}
}
}
/*
Saved Hello World! with 861844d6704e8573fec34d967e20bcfef3d424cf48be04e6dc08f2bd58c729743371015ead891cc3cf1c9d34b49264b510751b1ff9e537937bc46b5d6ff4ecc8
Found duplicate events: Hello World!
Saved different with 49d5b8799558e22d3890d03b56a6c7a46faa1a7d216c2df22507396242ab3540e2317b870882b2384d707254333a8439fd3ca191e93293f745786ff78ef069f8
*/
But, don't do this!
My coworker pointed out the unnecessary use of external database. For this particular problem, we didn’t need a separate database. Just creating indexes in PostgreSQL was enough.
CREATE UNIQUE INDEX event_id
ON events_table (event_id)
;
SELECT event_id AS duplicate_id
FROM events_table
WHERE event_id IN ('EVENT_IDs from 5,000 Messages, ...')
;
This query finds the duplicates that could have failed the COPY
command. It
is slightly slower than LevelDB but is a good compromise considering the cost
of maintaining the separate database. And if I still need a separate
key-value database, I would switch to a Redis server
to avoid the data loss in local disk storage.
Lesson learned. Don’t introduce complexity unless it’s needed. The simplest are the best., and:
The competent programmer is fully aware of the strictly limited size of his own skull; therefore he approaches the programming task in full humility, and among other things he avoids clever tricks like the plague.
We shall do a much better programming job, provided that we approach the task with a full appreciation of its tremendous difficulty, provided that we stick to modest and elegant programming languages, provided that we respect the intrinsic limitations of the human mind and approach the task as Very Humble Programmers.
Dequeue Summary
And back to our original question: You have billions of events that do not fit in memory. How would you detect the duplicates?
- Most preferable is using traditional databases, such as MySQL or PostgreSQL. And create an index (*primary key) for its unique identifiers. You can get this by hashing the raw message.
- You can also have a separate key-value database, such as LevelDB or Redis. And maintain the unique event identifier table to filter out the duplicate events. This would be fastest but you need to maintain the separate database.
Find duplicates with concurrency
What if you don’t have PostgreSQL or LevelDB? How would you find duplicates out of big data that is too big to fit in memory? It's an interesting problem and I thought concurrency would be a good fit for this.
Simplest way is to sort the whole data and traverse them in order until duplicates are found. With concurrency, you can break the big chunk of data into smaller parts in order to sort each chunk independently, and later to merge.
Concurrency: Merge Sort
Merge sort seems perfect for this because it uses divide and conquer algorithm. That is, it divides(breaks) the problem into several sub-problems. And then it conquers(solves) the sub-problems recursively, and combine those solutions to the original problem. Merge sort divides a list of elements into a single element, and then merge those single elements into one sorted list.
And code:
package main
import "fmt"
func main() {
fmt.Println(mergeSort([]int{-5, 1, 43, 6, 3, 6, 7}))
// [-5 1 3 6 6 7 43]
}
// O(n * log n)
// Recursively splits the array into subarrays, until only one element.
// From each subarray, merge them into a sorted array.
func mergeSort(slice []int) []int {
if len(slice) < 2 {
return slice
}
idx := len(slice) / 2
left := mergeSort(slice[:idx])
right := mergeSort(slice[idx:])
return merge(left, right)
}
// O(n)
func merge(s1, s2 []int) []int {
final := make([]int, len(s1)+len(s2))
i, j := 0, 0
for i < len(s1) && j < len(s2) {
if s1[i] <= s2[j] {
final[i+j] = s1[i]
i++
continue
}
final[i+j] = s2[j]
j++
}
for i < len(s1) {
final[i+j] = s1[i]
i++
}
for j < len(s2) {
final[i+j] = s2[j]
j++
}
return final
}
They seem to be doing the same thing in two parts(left and right). Then what if we parallelize those two operations, like here?
package main
import (
"fmt"
"log"
"runtime"
)
func init() {
maxCPU := runtime.NumCPU()
runtime.GOMAXPROCS(runtime.NumCPU())
log.Println("Concurrent execution with", maxCPU, "CPUs.")
}
func main() {
result := make(chan []int)
go concurrentMergeSort([]int{-5, 1, 43, 6, 3, 6, 7}, result)
fmt.Println(<-result)
// [-5 1 3 6 6 7 43]
}
func concurrentMergeSort(slice []int, result chan []int) {
if len(slice) < 2 {
result <- slice
return
}
idx := len(slice) / 2
ch1, ch2 := make(chan []int), make(chan []int)
go concurrentMergeSort(slice[:idx], ch1)
go concurrentMergeSort(slice[idx:], ch2)
left := <-ch1
right := <-ch2
result <- merge(left, right)
}
func merge(s1, s2 []int) []int {
final := make([]int, len(s1)+len(s2))
i, j := 0, 0
for i < len(s1) && j < len(s2) {
if s1[i] <= s2[j] {
final[i+j] = s1[i]
i++
continue
}
final[i+j] = s2[j]
j++
}
for i < len(s1) {
final[i+j] = s1[i]
i++
}
for j < len(s2) {
final[i+j] = s2[j]
j++
}
return final
}
Theoretically, this concurrent code should sort faster since it's parallel. However, benchmark results show the opposite:
package merge_sort
import (
"math/rand"
"sort"
"testing"
)
func merge(s1, s2 []int) []int {
final := make([]int, len(s1)+len(s2))
i, j := 0, 0
for i < len(s1) && j < len(s2) {
if s1[i] <= s2[j] {
final[i+j] = s1[i]
i++
continue
}
final[i+j] = s2[j]
j++
}
for i < len(s1) {
final[i+j] = s1[i]
i++
}
for j < len(s2) {
final[i+j] = s2[j]
j++
}
return final
}
func mergeSort(slice []int) []int {
if len(slice) < 2 {
return slice
}
idx := len(slice) / 2
left := mergeSort(slice[:idx])
right := mergeSort(slice[idx:])
return merge(left, right)
}
func concurrentMergeSort(slice []int, result chan []int) {
if len(slice) < 2 {
result <- slice
return
}
idx := len(slice) / 2
ch1, ch2 := make(chan []int), make(chan []int)
go concurrentMergeSort(slice[:idx], ch1)
go concurrentMergeSort(slice[idx:], ch2)
left := <-ch1
right := <-ch2
result <- merge(left, right)
}
func BenchmarkStandardPackage(b *testing.B) {
var sampleIntSlice = []int{}
sampleIntSlice = rand.New(rand.NewSource(123123)).Perm(999999)
for i := 0; i < b.N; i++ {
sort.Ints(sampleIntSlice)
}
}
func BenchmarkMergeSort(b *testing.B) {
var sampleIntSlice = []int{}
sampleIntSlice = rand.New(rand.NewSource(123123)).Perm(999999)
for i := 0; i < b.N; i++ {
mergeSort(sampleIntSlice)
}
}
func BenchmarkConcurrentMergeSort(b *testing.B) {
var sampleIntSlice = []int{}
sampleIntSlice = rand.New(rand.NewSource(123123)).Perm(999999)
for i := 0; i < b.N; i++ {
result := make(chan []int)
go concurrentMergeSort(sampleIntSlice, result)
<-result
}
}
/*
go get github.com/cespare/prettybench
go test -bench . -benchmem -cpu 1,2,4 | prettybench
benchmark iter time/iter bytes alloc allocs
--------- ---- --------- ----------- ------
BenchmarkStandardPackage 10 131.37 ms/op 800929 B/op 1 allocs/op
BenchmarkStandardPackage-2 10 132.84 ms/op 800929 B/op 1 allocs/op
BenchmarkStandardPackage-4 10 131.81 ms/op 800929 B/op 1 allocs/op
BenchmarkMergeSort 5 204.42 ms/op 166369507 B/op 999998 allocs/op
BenchmarkMergeSort-2 5 202.03 ms/op 166369507 B/op 999998 allocs/op
BenchmarkMergeSort-4 5 229.62 ms/op 166369507 B/op 999998 allocs/op
BenchmarkConcurrentMergeSort 1 3994.73 ms/op 488144848 B/op 3537113 allocs/op
BenchmarkConcurrentMergeSort-2 1 2134.87 ms/op 377522704 B/op 3199159 allocs/op
BenchmarkConcurrentMergeSort-4 1 1242.12 ms/op 377254480 B/op 3194968 allocs/op
ok github.com/gyuho/kway/benchmarks/merge_sort 18.784s
*/
As you see, the concurrent merge-sort code is much slower because of the heavy memory usage in concurrency.
Why does using GOMAXPROCS > 1 sometimes make my program slower??
It depends on the nature of your program. Problems that are intrinsically sequential cannot be sped up by adding more goroutines. Concurrency only becomes parallelism when the problem is intrinsically parallel.
Merge sort works by merging the list of sub-lists that constantly requires additional memories. And merge has to happen in sequential order in order to combine every single sub-list. That is, it is not a intrinsically parallel problem. You only loose when you use concurrency for inherently sequential problems.
Or maybe I am just doing it wrong... I will get back to this later.
Concurrency: Prime Sieve
One legendary example is concurrent prime sieve:
// A concurrent prime sieve
// https://golang.org/doc/play/sieve.go
package main
import "fmt"
// Send the sequence 2, 3, 4, ... to channel 'ch'.
func Generate(ch chan<- int) {
for i := 2; ; i++ {
ch <- i // Send 'i' to channel 'ch'.
}
}
// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
func Filter(in <-chan int, out chan<- int, prime int) {
for {
i := <-in // Receive value from 'in'.
if i%prime != 0 {
out <- i // Send 'i' to 'out'.
}
}
}
// The prime sieve: Daisy-chain Filter processes.
func main() {
ch := make(chan int) // Create a new channel.
go Generate(ch) // Launch Generate goroutine.
for i := 0; i < 10; i++ {
prime := <-ch
fmt.Println(prime)
ch1 := make(chan int)
go Filter(ch, ch1, prime)
ch = ch1
}
}
close channel
package main
import "fmt"
func channelClose() <-chan int {
ch := make(chan int)
close(ch)
return ch
}
func channelCloseArg(ch chan int) {
close(ch)
}
func main() {
cc := make(chan int, 1)
cc <- 1
v, open := <-cc
fmt.Println(v, open) // 1 true
close(cc)
v, open = <-cc
fmt.Println(v, open) // 0 false
v, open = <-cc
fmt.Println(v, open) // 0 false
fmt.Println()
rc := channelClose()
v, open = <-rc
fmt.Println(v, open) // 0 false
v, open = <-rc
fmt.Println(v, open) // 0 false
v, open = <-rc
fmt.Println()
ch := make(chan int)
channelCloseArg(ch)
v, open = <-ch
fmt.Println(v, open) // 0 false
v, open = <-ch
fmt.Println(v, open) // 0 false
v, open = <-ch
}
blocking defer
package main
import (
"fmt"
"time"
)
func runGoroutine() {
go func() {
time.Sleep(time.Hour)
}()
}
func runDefer() {
defer func() {
time.Sleep(time.Hour)
}()
}
func main() {
fmt.Println("before runGoroutine #0")
runGoroutine()
fmt.Println("after runGoroutine #0") // return regardless of goroutine
fmt.Println("before runDefer #1")
runDefer()
fmt.Println("after runDefer #1") // does not return until defer is done
}
/*
before runGoroutine #0
after runGoroutine #0
before runDefer #1
...
*/
buffered channel copy
package main
import "fmt"
func main() {
ch1 := make(chan string, 5000)
fmt.Println("ch1:", len(ch1), cap(ch1)) // ch1: 0 5000
ch1 <- "aaa"
fmt.Println("ch1:", len(ch1), cap(ch1)) // ch1: 1 5000
fmt.Println(<-ch1) // aaa
fmt.Println("ch1:", len(ch1), cap(ch1)) // ch1: 0 5000
ch2 := getCh2()
fmt.Println("ch2:", len(ch2), cap(ch2)) // ch2: 0 5000
ch2 <- "aaa"
fmt.Println("ch2:", len(ch2), cap(ch2)) // ch2: 1 5000
ds := createDatas()
for _, d := range ds {
fmt.Println("ds ch:", len(d.ch), cap(d.ch))
}
var di DataInterface
di = &ds[0]
chd := di.Chan()
fmt.Println("chd:", len(chd), cap(chd)) // chd: 1 5000
}
func getCh2() chan string {
return make(chan string, 5000)
}
type Data struct {
ch chan string
}
func createDatas() []Data {
ds := make([]Data, 5)
bufCh := make(chan string, 5000)
for i := range ds {
ds[i].ch = bufCh
}
return ds
}
type DataInterface interface {
Chan() chan string
}
func (d *Data) Chan() chan string {
return d.ch
}
select
closed channel
package main
import (
"fmt"
"time"
)
func main() {
stream1 := make(chan string, 5000)
stream2 := make(chan string, 5000)
go func() {
for i := 0; i < 3; i++ {
stream1 <- fmt.Sprintf("%d", i)
time.Sleep(time.Second)
}
close(stream1)
}()
go func() {
for i := 0; i < 5; i++ {
stream2 <- fmt.Sprintf("%d", i)
time.Sleep(time.Microsecond)
}
close(stream2)
}()
escape:
for {
select {
case s, ok := <-stream1:
if !ok {
fmt.Println("stream1 closed")
break escape // when the channel is closed
// without escape, infinite loop
}
fmt.Println("stream1:", s)
case s, ok := <-stream2:
if !ok {
fmt.Println("stream2 closed")
break escape // when the channel is closed
}
fmt.Println("stream2:", s)
case <-time.After(time.Second):
// drain channel until it takes longer than 1 second
fmt.Println("escaping")
break escape
}
}
/*
stream1 couldn't finish!
stream1: 0
stream2: 0
stream2: 1
stream2: 2
stream2: 3
stream2: 4
stream2 closed
*/
}
package main
import (
"fmt"
"time"
)
func main() {
stream1 := make(chan string, 5000)
stream2 := make(chan string, 5000)
go func() {
for i := 0; i < 5; i++ {
stream1 <- fmt.Sprintf("%d", i)
time.Sleep(time.Second)
}
// close(stream1)
}()
go func() {
for i := 0; i < 5; i++ {
stream2 <- fmt.Sprintf("%d", i)
time.Sleep(time.Microsecond)
}
// close(stream2)
}()
escape1:
for {
select {
case s := <-stream1:
fmt.Println("stream1:", s)
case s := <-stream2:
fmt.Println("stream2:", s)
case <-time.After(500 * time.Millisecond):
fmt.Println("escape1")
break escape1
}
}
fmt.Println()
fmt.Println("first done")
escape2:
for {
select {
case s := <-stream1:
fmt.Println("stream1:", s)
case s := <-stream2:
fmt.Println("stream2:", s)
case <-time.After(5 * time.Second):
fmt.Println("escape2")
break escape2
}
}
/*
stream1: 0
stream2: 0
stream2: 1
stream2: 2
stream2: 3
stream2: 4
escape1
first done
stream1: 1
stream1: 2
stream1: 3
stream1: 4
escape2
*/
}
select
default
package main
import (
"fmt"
"time"
)
func main() {
select {
case <-time.After(time.Nanosecond):
fmt.Println("received from time.Nanosecond")
default:
fmt.Println("default")
}
// default
done := make(chan struct{})
close(done)
select {
case <-time.After(time.Nanosecond):
fmt.Println("received from time.Nanosecond")
case <-done:
fmt.Println("received from done")
default:
fmt.Println("default")
}
// received from done
}
sync.Cond
package main
import (
"fmt"
"sync"
)
type data struct {
cond *sync.Cond
lines []string
}
var nums = []int{0, 1, 2}
func main() {
d := data{
cond: &sync.Cond{L: &sync.Mutex{}},
lines: make([]string, 0),
}
for i := range nums {
go func(i int) {
d.cond.L.Lock()
d.lines = append(d.lines, fmt.Sprintf("%d: Hello World!", i))
d.cond.L.Unlock()
d.cond.Signal()
}(i)
}
for {
d.cond.L.Lock()
if len(d.lines) != len(nums) {
d.cond.Wait()
} else {
d.cond.L.Unlock()
break
}
d.cond.L.Unlock()
}
fmt.Println(d.lines)
// [2: Hello World! 0: Hello World! 1: Hello World!]
}
package main
import (
"fmt"
"sync"
"time"
)
func main() {
cond := &sync.Cond{L: &sync.Mutex{}}
sig := make(chan struct{})
for i := 0; i < 3; i++ {
go func(i int) {
cond.L.Lock()
sig <- struct{}{}
fmt.Println("Wait begin:", i)
cond.Wait()
fmt.Println("Wait end:", i)
cond.L.Unlock()
}(i)
}
for range []int{0, 1, 2} {
<-sig
}
// for i := 0; i < 3; i++ {
// cond.L.Lock()
// fmt.Println("Signal")
// cond.Signal()
// cond.L.Unlock()
// }
cond.L.Lock()
fmt.Println("Broadcast")
cond.Broadcast()
cond.L.Unlock()
fmt.Println("Sleep")
time.Sleep(time.Second)
}
/*
Wait begin: 2
Wait begin: 1
Wait begin: 0
Broadcast
Sleep
Wait end: 2
Wait end: 1
Wait end: 0
*/
blocking sync.Mutex
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
fmt.Println("Holding Lock!")
mu.Lock()
time.Sleep(time.Second)
mu.Unlock()
fmt.Println("Released Lock!")
donec := make(chan struct{})
go func() {
fmt.Println("goroutine is trying to holding the same Lock!")
mu.Lock()
fmt.Println("goroutine got the Lock!")
mu.Unlock()
fmt.Println("goroutine just released the Lock!")
close(donec)
}()
<-donec
fmt.Println("DONE")
}
/*
Holding Lock!
Released Lock!
goroutine is trying to holding the same Lock!
goroutine got the Lock!
goroutine just released the Lock!
DONE
*/
empty buffered channel
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string, 1)
donec := make(chan struct{})
go func() {
time.Sleep(time.Second)
ch <- "hello"
close(donec)
}()
fmt.Println(<-ch) // hello
<-donec
}
close two channels
package main
import (
"fmt"
"time"
)
func main() {
closed, donec := make(chan struct{}), make(chan struct{})
go func() {
select {
case <-time.After(3 * time.Second):
fmt.Println("close(closed) took too long")
case <-closed:
}
close(donec)
}()
close(closed)
select {
case <-time.After(3 * time.Second):
fmt.Println("close(donec) took too long")
case <-donec:
}
fmt.Println("DONE!")
// DONE!
}
atomic, defer
package main
import (
"fmt"
"sync/atomic"
"time"
)
var proposeCounter int32
func main() {
atomic.AddInt32(&proposeCounter, 1)
defer atomic.AddInt32(&proposeCounter, -1)
fmt.Println(atomic.LoadInt32(&proposeCounter)) // 1
time.Sleep(time.Second)
}
channel capacity
package main
import "fmt"
func main() {
ch := make(chan int, 1)
fmt.Println(len(ch), cap(ch)) // 0 1
ch <- 1
fmt.Println(len(ch), cap(ch)) // 1 1
if len(ch) == cap(ch) {
fmt.Println("channel is full")
} // channel is full
fmt.Println(<-ch)
fmt.Println(len(ch), cap(ch)) // 0 1
}
select continue
package main
import "fmt"
func main() {
donec := make(chan struct{})
for i := range []int{1, 2, 3} {
fmt.Println(i)
if i == 1 {
close(donec)
}
select {
case <-donec:
continue
default:
}
fmt.Println("hey")
}
}
/*
0
hey
1
2
*/
select nil chan
package main
import "fmt"
func main() {
var c chan struct{}
select { // select doesn't select nil channel, but without default it panics
case <-c:
panic(1)
default:
fmt.Println(c == nil) // true
}
}
select multiple
package main
import "fmt"
func main() {
c1, c2, c3 := make(chan struct{}), make(chan struct{}), make(chan struct{})
close(c1)
close(c2)
close(c3)
select { // select randomly
case <-c1:
fmt.Println("c1")
case <-c2:
fmt.Println("c2")
case <-c3:
fmt.Println("c3")
}
select { // select randomly
case <-c3:
fmt.Println("c3")
case <-c2:
fmt.Println("c2")
case <-c1:
fmt.Println("c1")
}
}
/*
c1
c2
*/
Directories ¶
Path | Synopsis |
---|---|
go run -race 31_no_race_surbl_with_mutex.go
|
go run -race 31_no_race_surbl_with_mutex.go |
Package count counts with concurrency.
|
Package count counts with concurrency. |
Package surbl checks URLs with surbl.org spam list using concurrency.
|
Package surbl checks URLs with surbl.org spam list using concurrency. |