sonic

package module
v0.0.0-...-bda8275 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2024 License: MIT Imports: 17 Imported by: 0

README

work-in-progress - expect breaking changes until v1.0.0

Sonic

Sonic is a Go library for network and I/O programming that provides developers with a consistent asynchronous model, with a focus on achieving the lowest possible latency and jitter in Go. Sonic aims to make it easy to write network protocols (websocket, http2, custom exchange binary) on a series of bytestreams and then make use of those bytestreams through multiple connections running in a single-thread and goroutine.

Sonic is an alternative to the net package. It removes the need to use multiple goroutines to handle multiple connections and reads/writes in the same process. By doing that, a single goroutine and thread is used to read/write from multiple connections which brings several benefits:

  • No need to use synchronization primitives (channels, mutexes, etc.) as multiple connections can be handled in the same goroutine.
  • It removes the need for the Go scheduler to do any work which could slow down the program.
  • It allows latency-sensitive programs to run in a hot-loop pinned to a thread on an isolated core in order to achieve low latency and jitter.

Sonic currently supports only Unix-based systems (BSD, macOS, Linux).

func main() {
    // Create an IO object which can execute asynchronous operations on the
    // current goroutine.
    ioc := sonic.MustIO()
    defer ioc.Close()

    // Create 10 connections. Each connection reads a message into it's
    // buffer and then closes.
    for i := 0; i < 10; i++ {
        conn, _ := sonic.Dial(ioc, "tcp", "localhost:8080")
		
        b := make([]byte, 128)
        conn.AsyncRead(b, func(err error, n int) {
            if err != nil {
                fmt.Printf("could not read from %d err=%v\n", i, err)
            } else {
                b = b[:n]
                fmt.Println("got=", string(b))
                conn.Close()
            }
        })
    }

    // Execute all pending reads scheduled in the for-loop, then exit.
    ioc.RunPending()
}

Getting Started

See examples/. A good starting point is examples/timer. All examples can be built by calling make in the root path of sonic. The builds will be put in bin/.

UDP Multicast

sonic offers a full-featured UDP Multicast peer for both IPv4 and IPv6. See multicast/peer.go. This peer can read and write data to a multicast group, join a group with source-IP and network interface filtering, and control its group membership by blocking/unblocking source-IPs at runtime.

Moreover, this peer, unlike the websocket client, does not allocate and copy any data in any of its functions. Additionally, the peer gives the programmer the option to change its read buffer after scheduling a read on it i.e.

var (
    b1, b2 []byte
)
peer.AsyncRead(b1, func(...) { ... }) // schedule an asynchronous read in b1
// ... some other code here
peer.SetAsyncReadBuffer(b2) // make the previously scheduled asynchronous read use b2 instead of b1

This is very useful when multiple UDP peers share the same read buffer. For example:

b := make([]byte, 1024 * 1024)

// We expect packets to be less than 256 bytes. When either peer reads, it calls the updateAndProcessBuffer function.
peer1.AsyncRead(b[:256], func(...) { updateAndProcessBuffer() })
peer2.AsyncRead(b[:256], func(...) { updateAndProcessBuffer() })

func updateAndProcessBuffer() {
    // One of the peers read something. We instruct the peers to read into the next 256 byte chunk of b such that we can
    // process the previous 256 bytes.
    peer1.AsyncRead(b[256:512], func(...) { updateAndProcessBuffer() })
    peer2.AsyncRead(b[256:512], func(...) { updateAndProcessBuffer() })

    go process(b[:256])
}
Zero-copy FIFO buffers

We provide two types of FIFO buffers with zero-copy semantics. Regardless of the type, a FIFO buffer is essential when writing protocol encoders/decoders over UDP or TCP with Linux's socket API to minimize syscalls. For example, say we have a simple protocol where each message has a fixed-size header and a variable-sized payload - the length of the payload is in the header. Say we read data through TCP. We then have two options:

// buffer in which we read; assume header size is 1 byte.

b := make([]byte, 1024)

// option 1: read the header first and then the payload from the network
conn.Read(b[:1]) // read the header
payloadSize := int(b[0])
payload := b[1:payloadSize]
conn.Read(payload) // read the payload
// do something with the payload

// option 2: read as much as you can from the network and then parse the bytes
conn.Read(b)
i := 0
while i < len(b) {
    payloadSize := int(b[i:i+1])
    if i + 1 + payloadSize <= len(b) {
        payload := b[i+1:i+1+payloadSize]
        process(payload)
    }
    i += 1 + payloadSize
}

option 1 is not efficient as n messages need n * 2 syscalls. option 2 is efficient as the number if syscalls is minimized - in the limit, we need just 1 syscall to read n messages. option 2 however is missing something:

  • what if the last read message was incomplete i.e. we read the header with its size, say 255, but only had space to read 100 of those bytes into b as we're near the end of b.
  • to read the rest of the 255 - 100 = 155 bytes of the payload, we need to move the read 100 bytes to the beginning of b, overwriting the already processed payloads.
  • in other words, we need FIFO semantics over b.

The naive way of offering FIFO semantics over b would be to simply copy the 100 bytes to the beginning of the slice. But that's a memcpy that will take a lot of time if the message is relatively big, say over 1KB. That's not acceptable even though that's how we do things for websocket (see byte_buffer.go and codec/websocket/codec.go). In those cases we offer two types of FIFO semantics over a byte slice, both offering the same API:

  • Claim(n) []byte - claim at most n bytes from the underlying []byte slice. Callers can now read into the returned slice.
  • Commit(n) int - commit at most n previously claimed bytes i.e. queue at most n bytes
  • Consume(n) int - consume at most n previously committed/queued bytes
Mirrored Buffer

This is a zero-copy FIFO buffer that works for both TCP and UDP protocols. It offers contiguous byte slices in a FIFO manner without care for wrapping. See bytes/mirrored_buffer.go. The only limitations are that the buffer size must be a multiple of the system's page size and the system must expose a shared memory filesystem like /dev/shm. In short, the mirrored buffer provides zero-copy FIFO semantics over a byte slice in the following way:

  • it creates the underlying byte slice of size n (where n is a multiple of page size) and maps it twice, contiguously, in the process' virtual address space with mmap.
  • there are n physical bytes backing up the underlying byte slice and n * 2 virtual bytes
  • the buffer is mirrored in a sense that reading/writing to the sequence b[n], b[n+1], ..., b[2*n-1] is permitted and in fact, touches the bytes at b[0], b[1], ..., b[n-1]
Bip Buffer

This is a zero-copy FIFO buffer meant solely for writing packet-based (UDP) protocols. Refer to the creator's post for an explanation of how it works.

What is next

The two buffers above are not yet standardized across sonic. TCP codecs, including websocket, still use the memcpy based byte buffer abstraction byte_buffer.go which is not that performant for large messages. The plan is to port websocket to use the mirrored buffer by v1.0.0.

The Bip Buffer is actively used in Talos UDP-based gateways.

Peculiarities

Async preemption

If, for some reason, you have a single goroutine that ends up waiting for more than 10ms for something to happen, sonic will crash on Linux due to epoll_wait being interrupted by the signal SIGURG. This happens because, by default, the Go runtime non-cooperatively preempts goroutines that are idle for more than 10ms. To turn off this behavior, set GODEBUG=asyncpreemptoff=1 before running your binary.

This issue has been addressed in this commit.

Credits

Documentation

Index

Constants

View Source
const (
	WarmDefaultBusyCycles = 10
	WarmDefaultTimeout    = time.Millisecond
)
View Source
const (
	// MaxCallbackDispatch is the maximum number of callbacks which can be
	// placed onto the stack for immediate invocation.
	MaxCallbackDispatch int = 32
)

Variables

View Source
var ErrNoSpaceLeftForSlot = errors.New(
	"no space left to buffer the given slot",
)

Functions

func GetBoundDevice

func GetBoundDevice(fd int) (string, error)

func NewAsyncAdapter

func NewAsyncAdapter(
	ioc *IO,
	sc syscall.Conn,
	rw io.ReadWriter,
	cb AsyncAdapterHandler,
	opts ...sonicopts.Option,
)

NewAsyncAdapter takes in an IO instance and an interface of syscall.Conn and io.ReadWriter pertaining to the same object and invokes a completion handler which:

  • provides the async adapter on successful completion
  • provides an error if any occurred when async-adapting the provided object

See async_adapter_test.go for examples on how to setup an AsyncAdapter.

Types

type AcceptCallback

type AcceptCallback func(error, Conn)

type AcceptPacketCallback

type AcceptPacketCallback func(error, PacketConn)

type AsyncAdapter

type AsyncAdapter struct {
	// contains filtered or unexported fields
}

AsyncAdapter is a wrapper around syscall.Conn which enables clients to schedule async read and write operations on the underlying file descriptor.

func (*AsyncAdapter) AsyncClose

func (a *AsyncAdapter) AsyncClose(cb func(err error))

func (*AsyncAdapter) AsyncRead

func (a *AsyncAdapter) AsyncRead(b []byte, cb AsyncCallback)

AsyncRead reads data from the underlying file descriptor into b asynchronously.

AsyncRead returns no error on short reads. If you want to ensure that the provided buffer is completely filled, use AsyncReadAll.

func (*AsyncAdapter) AsyncReadAll

func (a *AsyncAdapter) AsyncReadAll(b []byte, cb AsyncCallback)

AsyncReadAll reads data from the underlying file descriptor into b asynchronously.

The provided handler is invoked in the following cases:

  • an error occurred
  • the provided buffer has been fully filled after zero or several underlying read(...) operations.

func (*AsyncAdapter) AsyncWrite

func (a *AsyncAdapter) AsyncWrite(b []byte, cb AsyncCallback)

AsyncWrite writes data from the supplied buffer to the underlying file descriptor asynchronously.

AsyncWrite returns no error on short writes. If you want to ensure that the provided buffer is completely written, use AsyncWriteAll.

func (*AsyncAdapter) AsyncWriteAll

func (a *AsyncAdapter) AsyncWriteAll(b []byte, cb AsyncCallback)

AsyncWriteAll writes data from the supplied buffer to the underlying file descriptor asynchronously.

The provided handler is invoked in the following cases:

  • an error occurred
  • the provided buffer has been fully written after zero or several underlying write(...) operations.

func (*AsyncAdapter) Cancel

func (a *AsyncAdapter) Cancel()

Cancel cancels any asynchronous operations scheduled on the underlying file descriptor.

func (*AsyncAdapter) Close

func (a *AsyncAdapter) Close() error

func (*AsyncAdapter) Closed

func (a *AsyncAdapter) Closed() bool

func (*AsyncAdapter) RawFd

func (a *AsyncAdapter) RawFd() int

func (*AsyncAdapter) Read

func (a *AsyncAdapter) Read(b []byte) (int, error)

Read reads data from the underlying file descriptor into b.

func (*AsyncAdapter) Write

func (a *AsyncAdapter) Write(b []byte) (int, error)

Write writes data from the supplied buffer to the underlying file descriptor.

type AsyncAdapterHandler

type AsyncAdapterHandler func(error, *AsyncAdapter)

type AsyncCallback

type AsyncCallback func(error, int)

type AsyncCanceller

type AsyncCanceller interface {
	// Cancel cancells all asynchronous operations on the next layer.
	Cancel()
}

type AsyncReadCallbackPacket

type AsyncReadCallbackPacket func(error, int, net.Addr)

type AsyncReadStream

type AsyncReadStream interface {
	AsyncReader
	AsyncCanceller
	io.Closer
}

type AsyncReadWriter

type AsyncReadWriter interface {
	AsyncReader
	AsyncWriter
}

type AsyncReader

type AsyncReader interface {
	// AsyncRead reads up to len(b) bytes into b asynchronously.
	//
	// This call should not block. The provided completion handler is called
	// in the following cases:
	//  - a read of n bytes completes
	//  - an error occurs
	//
	// Callers should always process the n > 0 bytes returned before considering
	// the error err.
	//
	// Implementation of AsyncRead are discouraged from invoking the handler
	// with a zero byte count with a nil error, except when len(b) == 0.
	// Callers should treat a return of 0 and nil as indicating that nothing
	// happened; in particular, it does not indicate EOF.
	//
	// Implementations must not retain b. Ownership of b must be retained by the caller,
	// which must guarantee that it remains valid until the handler is called.
	AsyncRead(b []byte, cb AsyncCallback)

	// AsyncReadAll reads len(b) bytes into b asynchronously.
	AsyncReadAll(b []byte, cb AsyncCallback)
}

AsyncReader is the interface that wraps the AsyncRead and AsyncReadAll methods.

type AsyncReaderFrom

type AsyncReaderFrom interface {
	AsyncReadFrom(AsyncReader, AsyncCallback)
}

type AsyncStream

type AsyncStream interface {
	AsyncReadStream
	AsyncWriteStream
	io.Closer
}

type AsyncWriteCallbackPacket

type AsyncWriteCallbackPacket func(error)

type AsyncWriteStream

type AsyncWriteStream interface {
	AsyncWriter
	AsyncCanceller
	io.Closer
}

type AsyncWriter

type AsyncWriter interface {
	// AsyncWrite writes up to len(b) bytes into the underlying data stream asynchronously.
	//
	// This call should not block. The provided completion handler is called in the following cases:
	//  - a write of n bytes completes
	//  - an error occurs
	//
	// AsyncWrite must provide a non-nil error if it writes n < len(b) bytes.
	//
	// Implementations must not retain b. Ownership of b must be retained by the caller,
	// which must guarantee that it remains valid until the handler is called.
	// AsyncWrite must not modify b, even temporarily.
	AsyncWrite(b []byte, cb AsyncCallback)

	// AsyncWriteAll writes len(b) bytes into the underlying data stream asynchronously.
	AsyncWriteAll(b []byte, cb AsyncCallback)
}

AsyncWriter is the interface that wraps the AsyncRead and AsyncReadAll methods.

type AsyncWriterTo

type AsyncWriterTo interface {
	AsyncWriteTo(AsyncWriter, AsyncCallback)
}

type BipBuffer

type BipBuffer struct {
	// contains filtered or unexported fields
}

BipBuffer is a circular buffer capable of providing continuous, arbitrarily sized byte chunks in a first-in-first-out manner.

A BipBuffer is useful when writing packet-based network protocols, as it allows one to access the packets in a first-in-first-out manner without allocations or copies. Callers are always provided a full packet and never have to care about wrapped byte chunks.

The usual workflow is as follows: - b = buf.Claim(128) - n = reader.Read(b) - buf.Commit(n) - b = buf.Claim(128) - n = reader.Read(b) - buf.Commit(n) - buf.Consume(128) gets rid of the first 128 bytes - buf.Consume(128) gets rid of the last 128 bytes

Copyright: Simon Cooke.

func NewBipBuffer

func NewBipBuffer(n int) *BipBuffer

func (*BipBuffer) Claim

func (buf *BipBuffer) Claim(n int) []byte

Claim n bytes.

func (*BipBuffer) Claimed

func (buf *BipBuffer) Claimed() int

Claimed returns the number of claimed bytes in the buffer.

func (*BipBuffer) Commit

func (buf *BipBuffer) Commit(n int) []byte

Commit n bytes of the previously claimed slice. Returns the committed chunk at the tail of the buffer.

func (*BipBuffer) Committed

func (buf *BipBuffer) Committed() int

Committed returns the number of used bytes in the buffer.

func (*BipBuffer) Consume

func (buf *BipBuffer) Consume(n int)

Consume n bytes from the head of the buffer. This means that the first n bytes will get overwritten by a Claim + Commit at some point.

func (*BipBuffer) Empty

func (buf *BipBuffer) Empty() bool

Empty ...

func (*BipBuffer) Head

func (buf *BipBuffer) Head() []byte

Head returns the first (and possibly only) contiguous byte slice in the buffer.

func (*BipBuffer) Prefault

func (buf *BipBuffer) Prefault()

Prefault the buffer, forcing physical memory allocation.

func (*BipBuffer) Reset

func (buf *BipBuffer) Reset()

Reset the buffer. All committed state is lost.

func (*BipBuffer) Size

func (buf *BipBuffer) Size() int

Size of the buffer.

func (*BipBuffer) Wrapped

func (buf *BipBuffer) Wrapped() bool

type BlockingCodecConn

type BlockingCodecConn[Enc, Dec any] struct {
	// contains filtered or unexported fields
}

BlockingCodecConn handles the decoding/encoding of bytes funneled through a provided blocking file descriptor.

func NewBlockingCodecConn

func NewBlockingCodecConn[Enc, Dec any](
	stream Stream,
	codec Codec[Enc, Dec],
	src, dst *ByteBuffer,
) (*BlockingCodecConn[Enc, Dec], error)

func (*BlockingCodecConn[Enc, Dec]) AsyncReadNext

func (c *BlockingCodecConn[Enc, Dec]) AsyncReadNext(cb func(error, Dec))

func (*BlockingCodecConn[Enc, Dec]) AsyncWriteNext

func (c *BlockingCodecConn[Enc, Dec]) AsyncWriteNext(item Enc, cb AsyncCallback)

func (*BlockingCodecConn[Enc, Dec]) Close

func (c *BlockingCodecConn[Enc, Dec]) Close() error

func (*BlockingCodecConn[Enc, Dec]) NextLayer

func (c *BlockingCodecConn[Enc, Dec]) NextLayer() Stream

func (*BlockingCodecConn[Enc, Dec]) ReadNext

func (c *BlockingCodecConn[Enc, Dec]) ReadNext() (Dec, error)

func (*BlockingCodecConn[Enc, Dec]) WriteNext

func (c *BlockingCodecConn[Enc, Dec]) WriteNext(item Enc) (n int, err error)

type ByteBuffer

type ByteBuffer struct {
	// contains filtered or unexported fields
}

ByteBuffer provides operations that make it easier to handle byte slices in networking code.

A ByteBuffer has 3 areas. In order: - save area: [0, si) - read area: [si, ri) - write area: [ri, wi)

A usual workflow is as follows:

  • Bytes are written to the write area. These bytes cannot be read yet.
  • Bytes from the write area are made available for reading in the read area, by calling Commit.
  • Bytes from the read area can be either Saved or Consumed. If Saved, the bytes are kept in the save area. If Consumed, the bytes' lifetime ends, they are automatically discarded. Saved bytes must be discarded later.

Invariants: - 0 <= si <= ri <= wi <= min(len(data), cap(b.data)) <= cap(b.data) - everytime wi changes, b.data should grow/shrink accordingly:

  • b.data = b.data[:b.wi]

func NewByteBuffer

func NewByteBuffer() *ByteBuffer

func (*ByteBuffer) AsyncReadFrom

func (b *ByteBuffer) AsyncReadFrom(r AsyncReader, cb AsyncCallback)

AsyncReadFrom the supplied asynchronous reader into the write area.

The buffer is not automatically grown to accommodate all data from the reader. The responsibility is left to the caller which can reserve enough space through Reserve.

func (*ByteBuffer) AsyncWriteTo

func (b *ByteBuffer) AsyncWriteTo(w AsyncWriter, cb AsyncCallback)

AsyncWriteTo the provided asynchronous writer bytes from the read area. Consume them if no error occurred.

func (*ByteBuffer) Cap

func (b *ByteBuffer) Cap() int

Cap returns the length of the underlying byte slice.

func (*ByteBuffer) Claim

func (b *ByteBuffer) Claim(fn func(b []byte) int)

Claim a byte slice of the write area.

Claim allows callers to write directly into the write area of the buffer.

`fn` implementations should return the number of bytes written into the provided byte slice.

Callers have the option to write less than they claim. The amount is returned in the callback and the unused bytes will be used in future claims.

func (*ByteBuffer) ClaimFixed

func (b *ByteBuffer) ClaimFixed(n int) (claimed []byte)

ClaimFixed claims a fixed byte slice from the write area.

Callers do not have the option to write less than they claim. The write area will grow by `n`.

func (*ByteBuffer) Commit

func (b *ByteBuffer) Commit(n int)

Commit moves `n` bytes from the write area to the read area.

func (*ByteBuffer) Consume

func (b *ByteBuffer) Consume(n int)

Consume removes the first `n` bytes of the read area. The removed bytes cannot be referenced after a call to Consume. If that's desired, use Save.

func (*ByteBuffer) Data

func (b *ByteBuffer) Data() []byte

Data returns the bytes in the read area.

func (*ByteBuffer) Discard

func (b *ByteBuffer) Discard(slot Slot) (discarded int)

Discard a previously saved slot.

This call reduces the save area by slot.Length. Returns slot.Length.

func (*ByteBuffer) DiscardAll

func (b *ByteBuffer) DiscardAll()

DiscardAll saved slots.

The save area's size will be 0 after this call.

func (*ByteBuffer) Len

func (b *ByteBuffer) Len() int

Len returns the length of the underlying byte slice.

func (*ByteBuffer) Prefault

func (b *ByteBuffer) Prefault()

Prefault the buffer, forcing physical memory allocation.

NOTE: this should be used sparingly. Even though an array is contiguous in the process' virtual memory map, it is probably fragmented in main memory. Iterating over the array will cause a bunch of page faults, thus triggering virtual to physical memory mapping. This means that if you Reserve 1GB initially, you will get nothing allocated. But if you Prefault after Reserve, you will get the entire 1GB allocated which is maybe not what you want in a resourced constrained application.

func (*ByteBuffer) PrepareRead

func (b *ByteBuffer) PrepareRead(n int) (err error)

PrepareRead prepares n bytes to be read from the read area. If less than n bytes are available, ErrNeedMore is returned and no bytes are committed to the read area, hence made available for reading.

func (*ByteBuffer) Read

func (b *ByteBuffer) Read(dst []byte) (int, error)

Read the bytes from the read area into `dst`. Consume them.

func (*ByteBuffer) ReadByte

func (b *ByteBuffer) ReadByte() (byte, error)

ReadByte returns and consumes one byte from the read area.

func (*ByteBuffer) ReadFrom

func (b *ByteBuffer) ReadFrom(r io.Reader) (int64, error)

ReadFrom the supplied reader into the write area.

The buffer is not automatically grown to accommodate all data from the reader. The responsibility is left to the caller which can reserve enough space through Reserve.

func (*ByteBuffer) ReadLen

func (b *ByteBuffer) ReadLen() int

ReadLen returns the length of the read area.

func (*ByteBuffer) Reserve

func (b *ByteBuffer) Reserve(n int)

Reserve capacity for at least `n` more bytes to be written into the ByteBuffer's write area.

This call grows the write area by at least `n` bytes. This might allocate.

func (*ByteBuffer) Reserved

func (b *ByteBuffer) Reserved() int

Reserved returns the number of bytes that can be written in the write area of the buffer.

func (*ByteBuffer) Reset

func (b *ByteBuffer) Reset()

func (*ByteBuffer) Save

func (b *ByteBuffer) Save(n int) (slot Slot)

Save n bytes from the read area. Save is like Consume, except that the bytes can still be referenced after the read area is updated.

Saved bytes should be discarded at some point with Discard(...).

func (*ByteBuffer) SaveLen

func (b *ByteBuffer) SaveLen() int

SaveLen returns the length of the save area.

func (*ByteBuffer) Saved

func (b *ByteBuffer) Saved() []byte

Saved bytes.

func (*ByteBuffer) SavedSlot

func (b *ByteBuffer) SavedSlot(slot Slot) []byte

SavedSlot ...

func (*ByteBuffer) ShrinkBy

func (b *ByteBuffer) ShrinkBy(n int) int

ShrinkBy shrinks the write area by at most `n` bytes.

func (*ByteBuffer) ShrinkTo

func (b *ByteBuffer) ShrinkTo(n int) (shrunkBy int)

ShrinkTo shrinks the write to contain min(n, WriteLen()) bytes.

func (*ByteBuffer) UnreadByte

func (b *ByteBuffer) UnreadByte() error

UnreadByte from the write area.

func (*ByteBuffer) Write

func (b *ByteBuffer) Write(bb []byte) (int, error)

Write the supplied slice into the write area. Grow the write area if needed.

func (*ByteBuffer) WriteByte

func (b *ByteBuffer) WriteByte(bb byte) error

WriteByte into the write area. Grow the write area if needed.

func (*ByteBuffer) WriteLen

func (b *ByteBuffer) WriteLen() int

WriteLen returns the length of the write area.

func (*ByteBuffer) WriteString

func (b *ByteBuffer) WriteString(s string) (int, error)

WriteString into the write area. Grow the write area if needed.

func (*ByteBuffer) WriteTo

func (b *ByteBuffer) WriteTo(w io.Writer) (int64, error)

WriteTo the provided writer bytes from the read area. Consume them if no error occurred.

type Codec

type Codec[Enc, Dec any] interface {
	Encoder[Enc]
	Decoder[Dec]
}

Codec defines a generic interface through which one can encode/decode a raw stream of bytes.

Implementations are optionally able to track their state which enables writing both stateful and stateless parsers.

type CodecConn

type CodecConn[Enc, Dec any] interface {
	AsyncReadNext(func(error, Dec))
	ReadNext() (Dec, error)

	AsyncWriteNext(Enc, AsyncCallback)
	WriteNext(Enc) (int, error)

	NextLayer() Stream

	Close() error
}

type Conn

type Conn interface {
	FileDescriptor
	net.Conn
}

Conn is a generic stream-oriented network connection.

func Dial

func Dial(
	ioc *IO,
	network, addr string,
	opts ...sonicopts.Option,
) (Conn, error)

Dial establishes a stream based connection to the specified address.

Data can be sent or received only from the specified address for all networks: tcp, udp and unix domain sockets.

func DialTimeout

func DialTimeout(
	ioc *IO, network, addr string,
	timeout time.Duration,
	opts ...sonicopts.Option,
) (Conn, error)

type Decoder

type Decoder[Item any] interface {
	// Decode decodes the given stream into an `Item`.
	//
	// An implementation of Codec takes a byte stream that has already
	// been buffered in `src` and decodes the data into a stream of
	// `Item` objects.
	//
	// Implementations should return an empty Item and ErrNeedMore if
	// there are not enough bytes to decode into an Item.
	Decode(src *ByteBuffer) (Item, error)
}

type Encoder

type Encoder[Item any] interface {
	// Encode encodes the given item into the `dst` byte stream.
	//
	// Implementations should:
	// - Commit the bytes into the read area of `dst`.
	// - ensure `dst` is big enough to hold the serialized item by
	//   calling dst.Reserve(...)
	Encode(item Item, dst *ByteBuffer) error
}

type File

type File interface {
	FileDescriptor
	io.Seeker
}

func Open

func Open(ioc *IO, path string, flags int, mode os.FileMode) (File, error)

type FileDescriptor

type FileDescriptor interface {
	RawFd() int

	io.Closer
	io.ReadWriter
	AsyncReadWriter
	AsyncCanceller
}

type IO

type IO struct {
	// contains filtered or unexported fields
}

IO is the executor of all asynchronous operations and the way any object can schedule them. It runs fully in the calling goroutine.

A goroutine must not have more than one IO. There might be multiple IOs in the same process, each within its own goroutine.

func MustIO

func MustIO() *IO

func NewIO

func NewIO() (*IO, error)

func (*IO) Close

func (ioc *IO) Close() error

func (*IO) Closed

func (ioc *IO) Closed() bool

func (*IO) Deregister

func (ioc *IO) Deregister(slot *internal.Slot)

func (*IO) Pending

func (ioc *IO) Pending() int64

func (*IO) Poll

func (ioc *IO) Poll() error

Poll runs the event processing loop to execute ready handlers.

This will return immediately in case there is no event to process.

func (*IO) PollOne

func (ioc *IO) PollOne() (n int, err error)

PollOne runs the event processing loop to execute one ready handler.

This will return immediately in case there is no event to process.

func (*IO) Post

func (ioc *IO) Post(handler func()) error

Post schedules the provided handler to be run immediately by the event processing loop in its own thread.

It is safe to call Post concurrently.

func (*IO) Posted

func (ioc *IO) Posted() int

Posted returns the number of handlers registered with Post.

It is safe to call Posted concurrently.

func (*IO) Register

func (ioc *IO) Register(slot *internal.Slot)

func (*IO) Run

func (ioc *IO) Run() error

Run runs the event processing loop.

func (*IO) RunOne

func (ioc *IO) RunOne() (err error)

RunOne runs the event processing loop to execute at most one handler.

This call blocks the calling goroutine until an event occurs.

func (*IO) RunOneFor

func (ioc *IO) RunOneFor(dur time.Duration) (err error)

RunOneFor runs the event processing loop for the given duration. The duration must not be lower than 1ms.

This call blocks the calling goroutine until an event occurs.

func (*IO) RunPending

func (ioc *IO) RunPending() error

RunPending runs the event processing loop to execute all the pending handlers. The function returns (and the event loop stops running) when there are no more operations to complete.

func (*IO) RunWarm

func (ioc *IO) RunWarm(busyCycles int, timeout time.Duration) (err error)

RunWarm runs the event loop in a combined busy-wait and yielding mode, meaning that if the current cycle does not process anything, the event-loop will busy-wait for at most `busyCycles` which we call the warm-state. After `busyCycles` of not processing anything, the event-loop is out of the warm-state and falls back to yielding with the provided timeout. If at any moment an event occurs and something is processed, the event-loop transitions to its warm-state.

func (*IO) SetRead

func (ioc *IO) SetRead(slot *internal.Slot) error

func (*IO) SetWrite

func (ioc *IO) SetWrite(slot *internal.Slot) error

type Listener

type Listener interface {
	// Accept waits for and returns the next connection to the listener synchronously.
	Accept() (Conn, error)

	// AsyncAccept waits for and returns the next connection to the listener asynchronously.
	AsyncAccept(AcceptCallback)

	// Close closes the listener.
	Close() error

	// Addr returns the listener's network address.
	Addr() net.Addr

	RawFd() int
}

Listener is a generic network listener for stream-oriented protocols.

func Listen

func Listen(
	ioc *IO,
	network,
	addr string,
	opts ...sonicopts.Option,
) (Listener, error)

Listen creates a Listener that listens for new connections on the local address.

If the option Nonblocking with value set to false is passed in, you should use Accept() to accept incoming connections. In this case, Accept() will block if no connections are present in the queue.

If the option Nonblocking with value set to true is passed in, you should use AsyncAccept() to accept incoming connections. In this case, AsyncAccept() will not block if no connections are present in the queue.

type NonblockingCodecConn

type NonblockingCodecConn[Enc, Dec any] struct {
	// contains filtered or unexported fields
}

func NewNonblockingCodecConn

func NewNonblockingCodecConn[Enc, Dec any](
	stream Stream,
	codec Codec[Enc, Dec],
	src, dst *ByteBuffer,
) (*NonblockingCodecConn[Enc, Dec], error)

func (*NonblockingCodecConn[Enc, Dec]) AsyncReadNext

func (c *NonblockingCodecConn[Enc, Dec]) AsyncReadNext(cb func(error, Dec))

func (*NonblockingCodecConn[Enc, Dec]) AsyncWriteNext

func (c *NonblockingCodecConn[Enc, Dec]) AsyncWriteNext(item Enc, cb AsyncCallback)

func (*NonblockingCodecConn[Enc, Dec]) Close

func (c *NonblockingCodecConn[Enc, Dec]) Close() error

func (*NonblockingCodecConn[Enc, Dec]) NextLayer

func (c *NonblockingCodecConn[Enc, Dec]) NextLayer() Stream

func (*NonblockingCodecConn[Enc, Dec]) ReadNext

func (c *NonblockingCodecConn[Enc, Dec]) ReadNext() (Dec, error)

func (*NonblockingCodecConn[Enc, Dec]) WriteNext

func (c *NonblockingCodecConn[Enc, Dec]) WriteNext(item Enc) (n int, err error)

type PacketConn

type PacketConn interface {
	ReadFrom([]byte) (n int, addr net.Addr, err error)
	AsyncReadFrom([]byte, AsyncReadCallbackPacket)
	AsyncReadAllFrom([]byte, AsyncReadCallbackPacket)

	WriteTo([]byte, net.Addr) error
	AsyncWriteTo([]byte, net.Addr, AsyncWriteCallbackPacket)

	Close() error
	Closed() bool

	LocalAddr() net.Addr
	RawFd() int
}

PacketConn is a generic packet-oriented connection.

func ListenPacket

func ListenPacket(
	ioc *IO,
	network, addr string,
	opts ...sonicopts.Option,
) (PacketConn, error)

func NewPacketConn

func NewPacketConn(ioc *IO, network, addr string, opts ...sonicopts.Option) (PacketConn, error)

NewPacketConn establishes a packet based stream-less connection which is optionally bound to the specified addr.

If addr is empty, the connection is bound to a random address which can be obtained by calling LocalAddr().

type Slot

type Slot struct {
	Index  int
	Length int
}

Slot from the save area. See Save and Discard.

func OffsetSlot

func OffsetSlot(offset int, slot Slot) Slot

OffsetSlot ...

Usually used when we reference a few Slots and Discard several of them. - Slots that precede a discarded Slot must be offset - Slots that follow a discarded Slot must not be offset

type SlotOffsetter

type SlotOffsetter struct {
	// contains filtered or unexported fields
}

SlotOffsetter helps with offsetting a Slot's Index such that the bytes referred by the slot can be safely removed from a ByteBuffer's saved area.

Why do we need to care about offsetting slots? Consider the following example, where [x, y] is a slot with Index x and Length n: - say we have the following slots, generated after calling ByteBuffer.Save(...): A:[0, 1] B:[1, 2] C:[3, 3] i.e. bytes: ABBCCC. - we ByteBuffer.Discard(B) and arrive at: ACCC - we want to ByteBuffer.Discard(C). However B was removed before C so [3, 3] now became [1, 3]. So C's index moved back by an offset of 2 bytes. - we offset C from [3, 3] to [1, 3] and then call ByteBuffer.Discard(C) which will give us A.

SlotOffsetter does the offsetting for you. The worflow is as follows: - slot := ByteBuffer.Save(...) - slot = offsetter.Add(slot) - ... - ByteBuffer.Discard(Offset(slot))

func NewSlotOffsetter

func NewSlotOffsetter(maxBytes int) *SlotOffsetter

func (*SlotOffsetter) Add

func (s *SlotOffsetter) Add(slot Slot) (Slot, error)

Add a slot such that it can be Offset accordingly at a later time. Slots are generated by calling ByteBuffer.Save().

func (*SlotOffsetter) Offset

func (s *SlotOffsetter) Offset(slot Slot) Slot

Offset a previously Added slot such that it can be removed from the ByteBuffer's save area through ByteBuffer.Discard.

func (*SlotOffsetter) Reset

func (s *SlotOffsetter) Reset()

type SlotSequencer

type SlotSequencer struct {
	// contains filtered or unexported fields
}

func NewSlotSequencer

func NewSlotSequencer(maxSlots, maxBytes int) *SlotSequencer

func (*SlotSequencer) Bytes

func (s *SlotSequencer) Bytes() int

func (*SlotSequencer) FillPct

func (s *SlotSequencer) FillPct() float64

func (*SlotSequencer) MaxBytes

func (s *SlotSequencer) MaxBytes() int

func (*SlotSequencer) Pop

func (s *SlotSequencer) Pop(seq int) (Slot, bool)

Pop the slot identified by `seq`. The popped Slot must be discarded through ByteBuffer.Discard before Pop is called again.

func (*SlotSequencer) Push

func (s *SlotSequencer) Push(seq int, slot Slot) (ok bool, err error)

Push a Slot that's uniquely identified and ordered by `seq`.

func (*SlotSequencer) Reset

func (s *SlotSequencer) Reset()

func (*SlotSequencer) Size

func (s *SlotSequencer) Size() int

type Socket

type Socket struct {
	// contains filtered or unexported fields
}

func NewSocket

func NewSocket(
	domain SocketDomain,
	socketType SocketType,
	protocol SocketProtocol,
) (*Socket, error)

func (*Socket) Bind

func (s *Socket) Bind(addrPort netip.AddrPort) error

func (*Socket) BindToDevice

func (s *Socket) BindToDevice(name string) (*net.Interface, error)

BindToDevice binds the socket to the device with the given name. The device must be a network interface (`ip link` to see all interfaces).

This makes it such that only packets from the given device will be processed by the socket.

func (*Socket) BoundDevice

func (s *Socket) BoundDevice() *net.Interface

func (*Socket) Close

func (s *Socket) Close() (err error)

func (*Socket) IsNonblocking

func (s *Socket) IsNonblocking() (bool, error)

func (*Socket) RawFd

func (s *Socket) RawFd() int

func (*Socket) RecvFrom

func (s *Socket) RecvFrom(
	b []byte,
	flags SocketIOFlags,
) (n int, peerAddr netip.AddrPort, err error)

func (*Socket) ReuseAddr

func (s *Socket) ReuseAddr(reuse bool) error

func (*Socket) ReusePort

func (s *Socket) ReusePort(reuse bool) error

func (*Socket) SendTo

func (s *Socket) SendTo(
	b []byte,
	flags SocketIOFlags,
	peerAddr netip.AddrPort,
) (int, error)

func (*Socket) SetNoDelay

func (s *Socket) SetNoDelay(delay bool) error

func (*Socket) SetNonblocking

func (s *Socket) SetNonblocking(nonblocking bool) error

func (*Socket) UnbindFromDevice

func (s *Socket) UnbindFromDevice() error

UnbindFromDevice is not working, and honestly I have no clue why.

type SocketDomain

type SocketDomain int
const (
	SocketDomainUnix SocketDomain = iota
	SocketDomainIPv4
	SocketDomainIPv6
)

func SocketDomainFromIP

func SocketDomainFromIP(ip net.IP) SocketDomain

func (SocketDomain) String

func (s SocketDomain) String() string

type SocketIOFlags

type SocketIOFlags int

type SocketProtocol

type SocketProtocol int
const (
	SocketProtocolTCP SocketProtocol = iota
	SocketProtocolUDP
)

func (SocketProtocol) String

func (s SocketProtocol) String() string

type SocketType

type SocketType int
const (
	SocketTypeStream SocketType = iota
	SocketTypeDatagram
	SocketRaw
)

func (SocketType) String

func (s SocketType) String() string

type Stream

type Stream interface {
	RawFd() int

	AsyncStream
	SyncStream
}

Stream represents a full-duplex connection between two processes, where data represented as bytes may be received reliably in the same order they were written.

type SyncReadStream

type SyncReadStream interface {
	io.Reader
	io.Closer
}

type SyncStream

type SyncStream interface {
	SyncReadStream
	SyncWriteStream
	io.Closer
}

type SyncWriteStream

type SyncWriteStream interface {
	io.Writer
	io.Closer
}

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

func NewTimer

func NewTimer(ioc *IO) (*Timer, error)

func (*Timer) Cancel

func (t *Timer) Cancel() error

func (*Timer) Close

func (t *Timer) Close() (err error)

Close closes the timer, render it useless for scheduling any more operations on it. A timer cannot be used after Close(). Any pending operations that have been scheduled but not yet completed are cancelled, and will therefore never complete.

func (*Timer) ScheduleOnce

func (t *Timer) ScheduleOnce(delay time.Duration, cb func()) (err error)

ScheduleOnce schedules a callback for execution after a delay.

The callback is guaranteed to never be called before the delay. However, it is possible that it will be called a little after the delay.

If the delay is negative or 0, the callback is executed as soon as possible.

func (*Timer) ScheduleRepeating

func (t *Timer) ScheduleRepeating(repeat time.Duration, cb func()) error

ScheduleRepeating schedules a callback for execution once per interval.

The callback is guaranteed to never be called before the repeat delay. However, it is possible that it will be called a little after the repeat delay.

If the delay is negative or 0, the operation is cancelled.

func (*Timer) Scheduled

func (t *Timer) Scheduled() bool

type UDPMulticastClient

type UDPMulticastClient interface {
	Join(multicastAddr *net.UDPAddr) error
	JoinSource(multicastAddr, sourceAddr *net.UDPAddr) error

	Leave(multicastAddr *net.UDPAddr) error
	LeaveSource(multicastAddr, sourceAddr *net.UDPAddr) error

	BlockSource(multicastAddr, sourceAddr *net.UDPAddr) error
	UnblockSource(multicastAddr, sourceAddr *net.UDPAddr) error

	// ReadFrom and AsyncReadFrom read a partial or complete datagram into the provided buffer. When the datagram
	// is smaller than the passed buffer, only that much data is returned; when it is bigger, the packet is truncated
	// to fit into the buffer. The truncated bytes are lost.
	//
	// It is the responsibility of the caller to ensure the passed buffer can hold an entire datagram. A rule of thumb
	// is to have the buffer size equal to the network interface's MTU.
	ReadFrom([]byte) (n int, addr net.Addr, err error)
	AsyncReadFrom([]byte, AsyncReadCallbackPacket)

	RawFd() int
	Interface() *net.Interface
	LocalAddr() *net.UDPAddr

	Close() error
	Closed() bool
}

UDPMulticastClient defines a UDP multicast client that can read data from one or multiple multicast groups, optionally filtering packets on the source IP.

Directories

Path Synopsis
Package bytes provides containers and utilities for working with byte arrays and slices.
Package bytes provides containers and utilities for working with byte arrays and slices.
codec
examples
tcp
udp
net
stress_test
tests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL