observer

package module
v0.0.0-...-ec668f9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2022 License: BSD-3-Clause Imports: 5 Imported by: 0

README

observer

GoDoc

Experimental concurrency primatives!

Subject[T]

Subject is a generic unbounded concurrent observer. It's based on an atomic counter per fixed length array. Adding elements increments the counter to select the insert position. If the counter is greater than the list size a new array is allocated and the process is repeated for the new array. Readers hold a pointer to a position in the array. Replaces a list of channels.

var s observer.Subject[string]
go s.Set("hello")

go func() {
	for v := s.View(); ; v = v.Next() {
		fmt.Println(v.Value())
	}
}()
Benchmark

Observer allocations are amortize over the array length. Faster than a single channel and cost is linear when increase the number of readers unlike a an array of channels. Trade off is in the queue becoming unbounded.

BenchmarkObserver/1-8    35960995  34.22 ns/op  9 B/op  0 allocs/op
BenchmarkObserver/8-8     7084509  145.5 ns/op  9 B/op  0 allocs/op
BenchmarkObserver/32-8   14361388  94.58 ns/op  9 B/op  0 allocs/op
BenchmarkChannel/1-8     20000000   74.2 ns/op  0 B/op  0 allocs/op
BenchmarkChannel/8-8      1000000   1353 ns/op  0 B/op  0 allocs/op
BenchmarkChannel/32-8      200000   6261 ns/op  0 B/op  0 allocs/op

Map

Concurrent map is similar to the builtin sync.Map but with a different locking stratergy. The map is duplicated into a read/write pair that are switched based on three atomic counters. A counter shared between the two maps and a counter per map. When loading the maps mutual counter is incremented. This flags which map to read from. Once the read is finished the maps counter is incremented. Writers wait for all readers of the map to leave, then swap the read/write map flag so the next write can be processed.

This has some benifits/trade offs in read/write workloads. See the benchmarks. There is no concurrent process, like compaction, running alongside the map which has the benifit of maintaing throughput under sustained load.

Range operation isn't support as this would lock the entire map, unlike sync.Map. However we gain a Tx method that supports a transaction like read/write`. Useful for counters and optional updates.

var m observer.Map
m.Set("key", 1)

m.Tx("key", func(val interface{}, ok bool) {
  if ok {
    return val.(int) + 1, ok
  }
  return nil, false
})
val, ok := m.Get(key)
Benchmark

Benchmark is based off of Dgraphs cache testing.

key: Map is this Map, SyncMap is sync.Map and RWMap is a Go map with a sync.Mutex to protect it.

BenchmarkCaches/MapZipfRead
BenchmarkCaches/MapZipfRead-8           21317090                54.80 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/SyncMapZipfRead
BenchmarkCaches/SyncMapZipfRead-8       22807447                54.58 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/RWMapZipfRead
BenchmarkCaches/RWMapZipfRead-8         27089253                45.28 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/MapOneKeyRead
BenchmarkCaches/MapOneKeyRead-8         31896468                38.77 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/SyncMapOneKeyRead
BenchmarkCaches/SyncMapOneKeyRead-8     100000000               10.91 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/RWMapOneKeyRead
BenchmarkCaches/RWMapOneKeyRead-8       28684920                41.93 ns/op            0 B/op          0 allocs/op
BenchmarkCaches/MapZipIfWrite
BenchmarkCaches/MapZipIfWrite-8          2000820               610.6 ns/op            55 B/op          3 allocs/op
BenchmarkCaches/SyncMapZipfWrite
BenchmarkCaches/SyncMapZipfWrite-8       2193643               550.7 ns/op            71 B/op          5 allocs/op
BenchmarkCaches/RWMapZipfWrite
BenchmarkCaches/RWMapZipfWrite-8         4045146               298.6 ns/op            15 B/op          1 allocs/op
BenchmarkCaches/MapOneIfWrite
BenchmarkCaches/MapOneIfWrite-8          2679439               446.5 ns/op            56 B/op          4 allocs/op
BenchmarkCaches/SyncMapOneKeyWrite
BenchmarkCaches/SyncMapOneKeyWrite-8     3835924               312.0 ns/op            72 B/op          5 allocs/op
BenchmarkCaches/RWMapOneKeyWrite
BenchmarkCaches/RWMapOneKeyWrite-8       5000916               240.5 ns/op            16 B/op          2 allocs/op
BenchmarkCaches/MapZipfMixed
BenchmarkCaches/MapZipfMixed-8          16913481                68.68 ns/op            2 B/op          0 allocs/op
BenchmarkCaches/SyncMapZipfMixed
BenchmarkCaches/SyncMapZipfMixed-8      19314884                58.52 ns/op           14 B/op          0 allocs/op
BenchmarkCaches/RWMapZipfMixed
BenchmarkCaches/RWMapZipfMixed-8         2572059               443.6 ns/op             1 B/op          0 allocs/op
BenchmarkCaches/MapOneKeyMixed
BenchmarkCaches/MapOneKeyMixed-8        26252911                45.45 ns/op            2 B/op          0 allocs/op
BenchmarkCaches/SyncMapOneKeyMixed
BenchmarkCaches/SyncMapOneKeyMixed-8    64579701                18.14 ns/op            6 B/op          0 allocs/op
BenchmarkCaches/RWMapOneKeyMixed
BenchmarkCaches/RWMapOneKeyMixed-8       8419248               141.7 ns/op             0 B/op          0 allocs/op

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Map

type Map struct {
	// contains filtered or unexported fields
}

func (*Map) Del

func (m *Map) Del(key interface{})

func (*Map) Get

func (m *Map) Get(key interface{}) (val interface{}, ok bool)

func (*Map) Set

func (m *Map) Set(key, val interface{})

func (*Map) Tx

func (m *Map) Tx(key interface{}, fn TxFn) (val interface{}, ok bool)

type Subject

type Subject[T any] struct {
	// contains filtered or unexported fields
}

A Subject controls broadcasting events to multiple viewers.

func (*Subject[T]) Set

func (s *Subject[T]) Set(val T) (v View[T])

Set the latest view to val and notify waiting viewers.

func (*Subject[T]) View

func (s *Subject[T]) View() View[T]

View returns the latest value for the subject. Blocks if Set has not been called.

type TxFn

type TxFn func(val interface{}, ok bool) (interface{}, bool)

type View

type View[T any] struct {
	// contains filtered or unexported fields
}

View is a value seen by the observer.

func (View[T]) Len

func (v View[T]) Len() int

Len returns the current length.

func (View[T]) Next

func (v View[T]) Next() View[T]

Next returns the next view or blocks until a new value is set.

func (View[T]) Value

func (v View[T]) Value() T

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL