immuta

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2024 License: Apache-2.0 Imports: 8 Imported by: 2

README

██╗███╗░░░███╗███╗░░░███╗██╗░░░██╗████████╗░█████╗░
██║████╗░████║████╗░████║██║░░░██║╚══██╔══╝██╔══██╗
██║██╔████╔██║██╔████╔██║██║░░░██║░░░██║░░░███████║
██║██║╚██╔╝██║██║╚██╔╝██║██║░░░██║░░░██║░░░██╔══██║
██║██║░╚═╝░██║██║░╚═╝░██║╚██████╔╝░░░██║░░░██║░░██║
╚═╝╚═╝░░░░░╚═╝╚═╝░░░░░╚═╝░╚═════╝░░░░╚═╝░░░╚═╝░░╚═╝

Immuta is a Append Only Log implementation based on single writer, multiple readers concept. It uses filesystem as it's core the format of the each record is as follows and uses solid for io signgling

  • the first 8 bytes provide the number of messages in the log file
  • loop
    • the next 8 bytes define the size of the payload (Header)
    • payload can be any arbitrary size
+----------+----------+---------------+----------+---------------+
|          |          |               |          |               |
| MESSAGES |  PAYLOAD |    PAYLOAD    |  PAYLOAD |    PAYLOAD    | ...
|   COUNT  |   SIZE   |               |   SIZE   |               |
+----------+----------+---------------+----------+---------------+
   8 bytes   8 bytes                    8 bytes

Installation

go get ella.to/immuta

Usgae

package main

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"ella.to/immuta"
)

func main() {
	filename := "./data.log"
	poolFileDescriptor := 10
	// fastwrite uses the buffer for each append
	// if you need gurrantee on saving on disk, enable set fastWrite to false
	// the Append operation will get the performance hit
	fastWrite := true

	log, err := immuta.New(filename, poolFileDescriptor, fastWrite)
	if err != nil {
		panic(err)
	}
	defer log.Close()

	content := []byte("hello world")

	// write to append only log
	index, size, err := log.Append(context.Background(), bytes.NewReader(content))
	if err != nil {
		panic(err)
	}

	if index != 8 {
		panic("index must be 8")
	}

	if size != 11 {
		panic("size must be 11")
	}

	// 0: start from beginning
	// negative value: start from latest append
	// positive number: skip those message
	var startPos int64 = 0

	// this call doesn't allocate any file descriptor yet
	stream := log.Stream(context.Background(), startPos)
	defer stream.Done()

	for {
		var buffer bytes.Buffer

		err := func() error {
			ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
			defer cancel()

			r, size, err := stream.Next(ctx)
			if err != nil {
				return err
			}
			// important: don't forget to call Done() to release the file descriptor
			defer r.Done()

			buffer.Reset()

			_, err = io.Copy(&buffer, r)
			if err != nil {
				return err
			}

			fmt.Printf("size of the record: %d\n", size)
			fmt.Printf("content: %s\n", buffer.String())

			return nil
		}()

		if errors.Is(err, context.DeadlineExceeded) {
			break
		} else if err != nil {
			panic(err)
		}
	}
}

Performance

  • Appending 100k records of 1kb took around 1 seconds
go test -benchmem -run=^$ -bench ^Benchmark1kbAppend$ ella.to/immuta

goos: darwin
goarch: arm64
pkg: ella.to/immuta
cpu: Apple M2 Pro
Benchmark1kbAppend-12             103730              9882 ns/op              64 B/op          3 allocs/op
  • Reading the 100k record is under 150ms
go test -timeout 30s -run ^TestRead100kMessages$ ella.to/immuta -v
=== RUN   TestRead100kMessages
time taken to write 100000: 895.478792ms
time taken to read 100000: 139.394542ms

Documentation

Index

Constants

View Source
const (
	FileHeaderSize   = 8 + 8 // total number of messages + index of the last message
	RecordHeaderSize = 8     // size of the content of the message
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Appender

type Appender interface {
	// Append writes the content of the reader to the storage medium.
	// and returns the index and size of the content written.
	Append(ctx context.Context, r io.Reader) (index int64, size int64, err error)
}

Appender is a single method interface that writes the content of the reader to the storage medium.

type Reader added in v0.0.3

type Reader struct {
	// contains filtered or unexported fields
}

func (*Reader) Done added in v0.0.3

func (r *Reader) Done() error

func (*Reader) Read added in v0.0.3

func (r *Reader) Read(p []byte) (n int, err error)

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func New

func New(filepath string, readerCount int, fastWrite bool) (*Storage, error)

func (*Storage) Append

func (s *Storage) Append(ctx context.Context, r io.Reader) (index int64, size int64, err error)

Append should only be called by a single goroutine at a time. It is not safe for concurrent use.

func (*Storage) Close

func (s *Storage) Close() error

func (*Storage) Details added in v0.0.3

func (s *Storage) Details() (string, error)

func (*Storage) Stream

func (s *Storage) Stream(ctx context.Context, startPos int64) Stream

Stream(ctx, 0) -> from the beginning Stream(ctx, -1) -> start from latest messages Stream(ctx, 10) -> start after 10nth message

NOTE: Creating a stream does not block the storage from writing new messages can it is concurrent safe.

func (*Storage) Verify added in v0.0.3

func (s *Storage) Verify() error

type Stream

type Stream interface {
	// Creates a io.Reader and provide the size of the content ahead of time.
	// If there is no more content to read, it will blocked until there is more content or the context is done.
	Next(ctx context.Context) (r *Reader, size int64, err error)
	// Done should be called to release the reader.
	// the best practice is once an stream is created successfully, call Done in defer.
	Done()
}

Stream is an interface that deals with reading from the storage medium.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL