fastqueue

package
v1.11.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2023 License: Apache-2.0, Apache-2.0 Imports: 8 Imported by: 0

README

Disruptor Overview

This is a port of the LMAX Disruptor into the Go programming language. It retains the essence and spirit of the Disruptor and utilizes a lot of the same abstractions and concepts, but does not maintain the same API.

On a MacBook Pro (Intel Core i9-8950HK CPU @ 2.90GHz) using Go 1.13.1, it can pass many hundreds of millions of messages per second (yes, you read that right) from one goroutine to another goroutine.

Once initialized and running, one of the preeminent design considerations of the Disruptor is to process messages at a constant rate. It does this using two primary techniques. First, it avoids using locks at all costs which cause contention between CPU cores and prevents scaling the number of cores. Secondly, it produces no garbage by allowing the application to preallocate sequential space on a ring buffer. By avoiding garbage, the need for a garbage collection and the stop-the-world application pauses introduced can be almost entirely avoided.

In Go, the current implementation of a channel (chan) maintains a lock around send, receive, and len operations and it maxes out around 25 million messages per second for uncontended access—more than an orders of magnitude slower when compared to the Disruptor. The same channel, when contended between OS threads only pushes about 7 million messages per second.

Example Usage

func main() {
    writer, reader := disruptor.New(
        disruptor.WithCapacity(BufferSize),
        disruptor.WithConsumerGroup(MyConsumer{}))

    // producer
    go func() {
        reservation := writer.Reserve(1)
        ringBuffer[sequence&RingBufferMask] = 42 // example of incoming value from a network operation such as HTTP, TCP, UDP, etc.
        writer.Commit(reservation, reservation)

        _ = reader.Close() // close the Reader once we're done producing messages
    }()

    reader.Read() // blocks until fully closed
}

type MyConsumer struct{}

func (m MyConsumer) Consume(lowerSequence, upperSequence int64) {
    for sequence := lowerSequence; sequence <= upperSequence; sequence++ {
        index := sequence&RingBufferMask
        message := ringBuffer[index]
        fmt.Println(message)
    }
}

var ringBuffer = [BufferSize]int

const (
    BufferSize = 1024 * 64
    BufferMask = BufferSize - 1
)

The above example is more complex than a typical channel implementation. When removing all the comments, wireup, and extra fluff to explain what's happening, the code is very concise. In fact, a given "Publish" only takes three lines—Reserve a slot, update the ring buffer at that slot, and Commit the reserved sequence range. On the consumer side, there's a for-loop to handle all incoming items into your application. Again, this not quite as concise as a channel (nor as flexible), but it's much, much faster.

Benchmarks

Each of the following benchmark tests sends an incrementing sequence message from one goroutine to another. The receiving goroutine asserts that the message is received is the expected incrementing sequence value. Any failures cause a panic.

  • CPU: Intel Core i9-8950HK CPU @ 2.90GHz
  • Operation System: OS X 10.14.6
  • Go Runtime: Go 1.13.1
Scenario Per Operation Time
Channels: Buffered, Blocking 53 ns
Channels: Buffered, Blocking 54 ns
Channels: Buffered, Blocking, Contended Write 121 ns
Channels: Buffered, Non-blocking 60 ns
Channels: Buffered, Non-blocking 68 ns
Channels: Buffered, Non-blocking Contended Write 205 ns
Disruptor: Sequencer, Reserve One 9.73 ns
Disruptor: Sequencer, Reserve Many 1.64 ns
Disruptor: Sequencer, Reserve One, Multiple Readers 10.4 ns
Disruptor: Sequencer, Reserve Many, Multiple Readers 1.59 ns

When In Doubt, Use Channels

Despite Go channels being significantly slower than the Disruptor, channels should still be considered the easiest, best, and most desirable choice for the vast majority of all use cases. The Disruptor's target use case is ultra-low latency environments where application response times are measured in nanoseconds and where stable, consistent latency is paramount and latency spikes cannot be tolerated.

Pre-Alpha

This code is currently experimental and is not recommended for production environments. It does not have any unit tests and is only meant serve as a proof of concept that the Disruptor is possible on the Go runtime despite some of the limits imposed by the Go memory model.

We are very interested to receive feedback on this project and how performance can be improved using subtle techniques such as additional cache line padding, memory alignment, utilizing a pointer vs a struct in a given location, replacing less optimal techniques with more optimal ones, especially in the performance-critical operations Reserve/Commit in the DefaultWriter struct as well as the Read operation of the DefaultReader.

Caveats

In the Java-based Disruptor implementation, a ring buffer is created, preallocated, and prepopulated with instances of the class which serve as the message type to be transferred between threads. Because Go lacks generics, we have opted to not interact with ring buffers at all within the library code. This has the benefit of avoiding an unnecessary boxing/unboxing type conversions during the receipt of a given message. It also means that it is the responsibility of the application developer to create and populate their particular ring buffer during application wireup. Pre-populating the ring buffer at startup should ensure contiguous memory allocation for all items in the various ring buffer slots, whereas on-the-fly creation may introduce gaps in the memory allocation and subsequent CPU cache misses which introduce latency spikes.

The reference to the ring buffer can (but need not be) be scoped as a package-level variable. The reason for this is that any given application should have very few Disruptor instances. The instances are designed to be created at startup and stopped during shutdown. They are not typically meant to be created ad-hoc and passed around like channel instances. It is the responsibility of the application developer to manage references to the ring buffer instances such that the producer can push messages to the buffer and consumers can receive messages from the buffer.

Documentation

Index

Constants

View Source
const (
	BUFFER_PAD = 32 // java版本有这个pad, 作用不详

)
View Source
const (
	SpinMask = 1024*16 - 1 // arbitrary; we'll want to experiment with different values
)

Variables

View Source
var (
	ErrMinimumReservationSize = errors.New("the minimum reservation size is 1 slot")
)

Functions

This section is empty.

Types

type Barrier

type Barrier interface {
	Load() int64
}

func NewCompositeBarrier

func NewCompositeBarrier(sequences ...*Cursor) Barrier

type Consumer

type Consumer interface {
	Consume(lower, upper int64)
}

type Cursor

type Cursor [8]int64 // prevent false sharing of the sequence cursor by padding the CPU cache line with 64 *bytes* of data.

func NewCursor

func NewCursor() *Cursor

func (*Cursor) Load

func (this *Cursor) Load() int64

func (*Cursor) Store

func (this *Cursor) Store(value int64)

type DefaultReader

type DefaultReader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(current, written *Cursor, upstream Barrier, waiter WaitStrategy, consumer Consumer) *DefaultReader

func (*DefaultReader) Close

func (this *DefaultReader) Close() error

func (*DefaultReader) Read

func (this *DefaultReader) Read()

type DefaultWaitStrategy

type DefaultWaitStrategy struct{}

func NewWaitStrategy

func NewWaitStrategy() DefaultWaitStrategy

func (DefaultWaitStrategy) Gate

func (this DefaultWaitStrategy) Gate(count int64)

func (DefaultWaitStrategy) Idle

func (this DefaultWaitStrategy) Idle(count int64)

type DefaultWriter

type DefaultWriter struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(written *Cursor, upstream Barrier, capacity int64) *DefaultWriter

func (*DefaultWriter) Commit

func (this *DefaultWriter) Commit(_, upper int64)

func (*DefaultWriter) Reserve

func (this *DefaultWriter) Reserve(count int64) int64

type Disruptor

type Disruptor struct {
	Writer
	Reader
}

func NewDisruptor

func NewDisruptor(options ...Option) Disruptor

type Option

type Option func(*Wireup)

func WithCapacity

func WithCapacity(value int64) Option

func WithConsumerGroup

func WithConsumerGroup(value ...Consumer) Option

func WithWaitStrategy

func WithWaitStrategy(value WaitStrategy) Option

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue[T any](max ...int) *Queue[T]

func (*Queue[T]) Consume

func (q *Queue[T]) Consume(lower, upper int64)

func (*Queue[T]) Push

func (q *Queue[T]) Push(data ...T)

func (*Queue[T]) SetEvent

func (q *Queue[T]) SetEvent(event func(data []T))

func (*Queue[T]) Wait

func (q *Queue[T]) Wait()

type Reader

type Reader interface {
	Read()
	Close() error
}

type WaitStrategy

type WaitStrategy interface {
	Gate(int64)
	Idle(int64)
}

type Wireup

type Wireup struct {
	// contains filtered or unexported fields
}

func NewWireup

func NewWireup(options ...Option) (*Wireup, error)

func (*Wireup) Build

func (this *Wireup) Build() (Writer, Reader)

type Writer

type Writer interface {
	Reserve(count int64) int64
	Commit(lower, upper int64)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL