seekable

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2022 License: MIT Imports: 11 Imported by: 4

README

License GoDoc Build Status Go Report

ZSTD Seekable compression format implementation in Go

Seekable ZSTD compression format implemented in Go.

Installation

go get -u github.com/SaveTheRbtz/zstd-seekable-format-go

Using the seekable format

Writing is done through the Writer interface:

import (
	"github.com/klauspost/compress/zstd"
	seekable "github.com/SaveTheRbtz/zstd-seekable-format-go"
)

enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))
if err != nil {
	log.Fatal(err)
}
defer enc.Close()

w, err := seekable.NewWriter(f, enc)
if err != nil {
	log.Fatal(err)
}

// Write data in chunks.
for _, b := range [][]byte{[]byte("Hello"), []byte(" World!")} {
	_, err = w.Write(b)
	if err != nil {
		log.Fatal(err)
	}
}

// Close and flush seek table.
err = w.Close()
if err != nil {
	log.Fatal(err)
}

NB! Do not forget to call Close since it is responsible for flushing the seek table.

Reading can either be done through ReaderAt interface:

dec, err := zstd.NewReader(nil)
if err != nil {
	log.Fatal(err)
}

r, err := seekable.NewReader(f, dec)
if err != nil {
	log.Fatal(err)
}

ello := make([]byte, 4)
// ReaderAt
r.ReadAt(ello, 1)
if !bytes.Equal(ello, []byte("ello")) {
	log.Fatalf("%+v != ello", ello)
}

Or through the ReadSeeker:

world := make([]byte, 5)
// Seeker
r.Seek(-6, io.SeekEnd)
// Reader
r.Read(world)
if !bytes.Equal(world, []byte("World")) {
	log.Fatalf("%+v != World", world)
}

Seekable format utilizes ZSTD skippable frames so it is a valid ZSTD stream:

// Standard ZSTD Reader
f.Seek(0, io.SeekStart)
dec, err := zstd.NewReader(f)
if err != nil {
	log.Fatal(err)
}

all, err := io.ReadAll(dec)
if err != nil {
	log.Fatal(err)
}
if !bytes.Equal(all, []byte("Hello World!")) {
	log.Fatalf("%+v != Hello World!", all)
}

Documentation

Overview

Package adds an ability create ZSTD files in seekable format and randomly access them using uncompressed offsets.

Example
package main

import (
	"fmt"
	"io"
	"log"
	"os"

	"github.com/klauspost/compress/zstd"

	seekable "github.com/SaveTheRbtz/zstd-seekable-format-go"
)

func main() {
	f, err := os.CreateTemp("", "example")
	if err != nil {
		log.Fatal(err)
	}
	defer os.Remove(f.Name())

	enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))
	if err != nil {
		log.Fatal(err)
	}
	defer enc.Close()

	w, err := seekable.NewWriter(f, enc)
	if err != nil {
		log.Fatal(err)
	}

	// Write data in chunks.
	for _, b := range [][]byte{[]byte("Hello"), []byte(" World!")} {
		_, err = w.Write(b)
		if err != nil {
			log.Fatal(err)
		}
	}

	// Close and flush seek table.
	err = w.Close()
	if err != nil {
		log.Fatal(err)
	}

	dec, err := zstd.NewReader(nil)
	if err != nil {
		log.Fatal(err)
	}

	r, err := seekable.NewReader(f, dec)
	if err != nil {
		log.Fatal(err)
	}

	ello := make([]byte, 4)
	// ReaderAt
	_, err = r.ReadAt(ello, 1)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Offset: 1 from the start: %s\n", string(ello))

	world := make([]byte, 5)
	// Seeker
	_, err = r.Seek(-6, io.SeekEnd)
	if err != nil {
		log.Fatal(err)
	}
	// Reader
	_, err = r.Read(world)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Offset: -6 from the end: %s\n", string(world))

	_, _ = f.Seek(0, io.SeekStart)

	// Standard ZSTD Reader.
	dec, err = zstd.NewReader(f)
	if err != nil {
		log.Fatal(err)
	}

	all, err := io.ReadAll(dec)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Whole string: %s\n", string(all))

}
Output:

Offset: 1 from the start: ello
Offset: -6 from the end: World
Whole string: Hello World!

Index

Examples

Constants

View Source
const (
	/*
		The format consists of a number of frames (Zstandard compressed frames and skippable frames), followed by a final skippable frame at the end containing the seek table.

		Seek Table Format

		The structure of the seek table frame is as follows:

			|`Skippable_Magic_Number`|`Frame_Size`|`[Seek_Table_Entries]`|`Seek_Table_Footer`|
			|------------------------|------------|----------------------|-------------------|
			| 4 bytes                | 4 bytes    | 8-12 bytes each      | 9 bytes           |

		Skippable_Magic_Number

		Value: 0x184D2A5E.
		This is for compatibility with Zstandard skippable frames: https://github.com/facebook/zstd/blob/release/doc/zstd_compression_format.md#skippable-frames.

		Since it is legal for other Zstandard skippable frames to use the same
		magic number, it is not recommended for a decoder to recognize frames
		solely on this.

		Frame_Size

		The total size of the skippable frame, not including the `Skippable_Magic_Number` or `Frame_Size`.
		This is for compatibility with Zstandard skippable frames: https://github.com/facebook/zstd/blob/release/doc/zstd_compression_format.md#skippable-frames.

		https://github.com/facebook/zstd/blob/dev/contrib/seekable_format/zstd_seekable_compression_format.md
	*/
	SkippableFrameMagic = 0x184D2A50
)

Variables

This section is empty.

Functions

func CreateSkippableFrame

func CreateSkippableFrame(tag uint32, payload []byte) ([]byte, error)

CreateSkippableFrame returns a payload formatted as a ZSDT skippable frame.

| `Magic_Number` | `Frame_Size` | `User_Data` |
|:--------------:|:------------:|:-----------:|
|   4 bytes      |  4 bytes     |   n bytes   |

Skippable frames allow the insertion of user-defined metadata into a flow of concatenated frames.

Magic_Number

4 Bytes, __little-endian__ format. Value : 0x184D2A5?, which means any value from 0x184D2A50 to 0x184D2A5F. All 16 values are valid to identify a skippable frame. This specification doesn't detail any specific tagging for skippable frames.

Frame_Size

This is the size, in bytes, of the following `User_Data` (without including the magic number nor the size field itself). This field is represented using 4 Bytes, __little-endian__ format, unsigned 32-bits. This means `User_Data` can’t be bigger than (2^32-1) bytes.

User_Data

The `User_Data` can be anything. Data will just be skipped by the decoder.

https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#skippable-frames

Types

type Decoder

type Decoder interface {
	// GetIndexByDecompOffset returns FrameOffsetEntry for an offset in the decompressed stream.
	// Will return nil if offset is greater or equal than Size().
	GetIndexByDecompOffset(off uint64) *FrameOffsetEntry

	// GetIndexByID returns FrameOffsetEntry for a given frame id.
	// Will return nil if offset is greater or equal than NumFrames() or less than 0.
	GetIndexByID(id int64) *FrameOffsetEntry

	// Size returns the size of the uncompressed stream.
	Size() int64

	// NumFrames returns number of frames in the compressed stream.
	NumFrames() int64
}

Decoder is a byte-oriented API that is useful for cases where wrapping io.ReadSeeker is not desirable.

func NewDecoder

func NewDecoder(seekTable []byte, decoder ZSTDDecoder, opts ...ROption) (Decoder, error)

NewDecoder creates a byte-oriented Decode interface from a given seektable index. This index can either be produced by either Writer's WriteSeekTable or Encoder's EndStream. Decoder can be used concurrently.

type Encoder

type Encoder interface {
	// Encode returns compressed data and appends a frame to in-memory seek table.
	Encode(src []byte) ([]byte, error)
	// EndStream returns in-memory seek table as a ZSTD's skippable frame.
	EndStream() ([]byte, error)
}

Encoder is a byte-oriented API that is useful where wrapping io.Writer is not desirable.

func NewEncoder

func NewEncoder(encoder ZSTDEncoder, opts ...WOption) (Encoder, error)

type FrameOffsetEntry

type FrameOffsetEntry struct {
	// ID is the is the sequence number of the frame in the index.
	ID int64

	// CompOffset is the offset within compressed stream.
	CompOffset uint64
	// DecompOffset is the offset within decompressed stream.
	DecompOffset uint64
	// CompSize is the size of the compressed frame.
	CompSize uint32
	// DecompSize is the size of the original data.
	DecompSize uint32

	// Checksum is the lower 32 bits of the XXH64 hash of the uncompressed data.
	Checksum uint32
}

FrameOffsetEntry is the post-proccessed view of the Seek_Table_Entries suitable for indexing.

func (*FrameOffsetEntry) Less

func (o *FrameOffsetEntry) Less(than btree.Item) bool

func (*FrameOffsetEntry) MarshalLogObject

func (o *FrameOffsetEntry) MarshalLogObject(enc zapcore.ObjectEncoder) error

type REnvironment

type REnvironment interface {
	// GetFrameByIndex returns the compressed frame by its index.
	GetFrameByIndex(index FrameOffsetEntry) ([]byte, error)
	// ReadFooter returns buffer whose last 9 bytes are interpreted as a `Seek_Table_Footer`.
	ReadFooter() ([]byte, error)
	// ReadSkipFrame returns the full Seek Table Skippable frame
	// including the `Skippable_Magic_Number` and `Frame_Size`.
	ReadSkipFrame(skippableFrameOffset int64) ([]byte, error)
}

Environment can be used to inject a custom file reader that is different from normal ReadSeeker. This is useful when, for example there is a custom chunking code.

type ROption

type ROption func(*readerOptions) error

func WithREnvironment

func WithREnvironment(env REnvironment) ROption

func WithRLogger

func WithRLogger(l *zap.Logger) ROption

type Reader

type Reader interface {
	io.ReadSeeker
	io.ReaderAt
}

func NewReader

func NewReader(rs io.ReadSeeker, decoder ZSTDDecoder, opts ...ROption) (Reader, error)

NewReader returns ZSTD stream reader that can be randomly accessed using uncompressed data offset. Ideally, passed io.ReadSeeker should implement io.ReaderAt interface.

type ReaderImpl

type ReaderImpl struct {
	// contains filtered or unexported fields
}

func (*ReaderImpl) GetIndexByDecompOffset

func (s *ReaderImpl) GetIndexByDecompOffset(off uint64) (found *FrameOffsetEntry)

func (*ReaderImpl) GetIndexByID

func (s *ReaderImpl) GetIndexByID(id int64) (found *FrameOffsetEntry)

func (*ReaderImpl) NumFrames

func (s *ReaderImpl) NumFrames() int64

func (*ReaderImpl) Read

func (s *ReaderImpl) Read(p []byte) (n int, err error)

Read implements io.Reader interface to randomly access data. This method is NOT goroutine-safe and CAN NOT be called concurrently since it modifies the underlying offset.

func (*ReaderImpl) ReadAt

func (s *ReaderImpl) ReadAt(p []byte, off int64) (n int, err error)

ReadAt implements io.ReaderAt interface to randomly access data. This method is goroutine-safe and can be called concurrently ONLY if the underlying reader supports io.ReaderAt interface.

func (*ReaderImpl) Seek

func (s *ReaderImpl) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker interface to randomly access data. This method is NOT goroutine-safe and CAN NOT be called concurrently since it modifies the underlying offset.

func (*ReaderImpl) Size

func (s *ReaderImpl) Size() int64

type SeekTableDescriptor

type SeekTableDescriptor struct {
	// If the checksum flag is set, each of the seek table entries contains a 4 byte checksum
	// of the uncompressed data contained in its frame.
	ChecksumFlag bool
}

SeekTableDescriptor is a Go representation of a bitfiled.

A bitfield describing the format of the seek table.

| Bit number | Field name                |
| ---------- | ----------                |
| 7          | `Checksum_Flag`           |
| 6-2        | `Reserved_Bits`           |
| 1-0        | `Unused_Bits`             |

While only `Checksum_Flag` currently exists, there are 7 other bits in this field that can be used for future changes to the format, for example the addition of inline dictionaries.

func (*SeekTableDescriptor) MarshalLogObject

func (d *SeekTableDescriptor) MarshalLogObject(enc zapcore.ObjectEncoder) error

type SeekTableEntry

type SeekTableEntry struct {
	// The compressed size of the frame.
	// The cumulative sum of the `Compressed_Size` fields of frames `0` to `i` gives the offset in the compressed file of frame `i+1`.
	CompressedSize uint32
	// The size of the decompressed data contained in the frame.  For skippable or otherwise empty frames, this value is 0.
	DecompressedSize uint32
	// Only present if `Checksum_Flag` is set in the `Seek_Table_Descriptor`.  Value : the least significant 32 bits of the XXH64 digest of the uncompressed data, stored in little-endian format.
	Checksum uint32
}

SeekTableEntry is an element of the Seek Table describing each of the ZSTD-compressed frames in the stream.

`Seek_Table_Entries` consists of `Number_Of_Frames` (one for each frame in the data, not including the seek table frame) entries of the following form, in sequence:

|`Compressed_Size`|`Decompressed_Size`|`[Checksum]`|
|-----------------|-------------------|------------|
| 4 bytes         | 4 bytes           | 4 bytes    |

https://github.com/facebook/zstd/blob/dev/contrib/seekable_format/zstd_seekable_compression_format.md#seek_table_entries

func (*SeekTableEntry) MarshalBinary

func (e *SeekTableEntry) MarshalBinary() ([]byte, error)

func (*SeekTableEntry) MarshalLogObject

func (e *SeekTableEntry) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*SeekTableEntry) UnmarshalBinary

func (e *SeekTableEntry) UnmarshalBinary(p []byte) error

type SeekTableFooter

type SeekTableFooter struct {
	// The number of stored frames in the data.
	NumberOfFrames uint32
	// A bitfield describing the format of the seek table.
	SeekTableDescriptor SeekTableDescriptor
	// Value : 0x8F92EAB1.
	SeekableMagicNumber uint32
}

SeekTableFooter is the footer of a seekable ZSTD stream.

The seek table footer format is as follows:

|`Number_Of_Frames`|`Seek_Table_Descriptor`|`Seekable_Magic_Number`|
|------------------|-----------------------|-----------------------|
| 4 bytes          | 1 byte                | 4 bytes               |

https://github.com/facebook/zstd/blob/dev/contrib/seekable_format/zstd_seekable_compression_format.md#seek_table_footer

func (*SeekTableFooter) MarshalBinary

func (f *SeekTableFooter) MarshalBinary() ([]byte, error)

func (*SeekTableFooter) MarshalLogObject

func (f *SeekTableFooter) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*SeekTableFooter) UnmarshalBinary

func (f *SeekTableFooter) UnmarshalBinary(p []byte) error

type WEnvironment

type WEnvironment interface {
	// WriteFrame is called each time frame is encoded and needs to be written upstream.
	WriteFrame(p []byte) (n int, err error)
	// WriteSeekTable is called on Close to flush the seek table.
	WriteSeekTable(p []byte) (n int, err error)
}

Environment can be used to inject a custom file reader that is different from normal ReadSeeker. This is useful when, for example there is a custom chunking code.

type WOption

type WOption func(*writerOptions) error

func WithWEnvironment

func WithWEnvironment(env WEnvironment) WOption

func WithWLogger

func WithWLogger(l *zap.Logger) WOption

type Writer

type Writer interface {
	io.WriteCloser
}

func NewWriter

func NewWriter(w io.Writer, encoder ZSTDEncoder, opts ...WOption) (Writer, error)

NewWriter wraps the passed io.Writer and Encoder into and indexed ZSTD stream. Resulting stream then can be randomly accessed through the Reader and Decoder interfaces.

type WriterImpl

type WriterImpl struct {
	// contains filtered or unexported fields
}

func (*WriterImpl) Close

func (s *WriterImpl) Close() (err error)

Close implement io.Closer interface. It writes the seek table footer and releases occupied memory.

Caller is still responsible to Close the underlying writer.

func (*WriterImpl) Encode

func (s *WriterImpl) Encode(src []byte) ([]byte, error)

func (*WriterImpl) EndStream

func (s *WriterImpl) EndStream() ([]byte, error)

func (*WriterImpl) Write

func (s *WriterImpl) Write(src []byte) (int, error)

Write writes a chunk of data as a separate frame into the datastream.

Note that Write does not do any coalescing nor splitting of data, so each write will map to a separate ZSTD Frame.

type ZSTDDecoder

type ZSTDDecoder interface {
	DecodeAll(input, dst []byte) ([]byte, error)
}

type ZSTDEncoder

type ZSTDEncoder interface {
	EncodeAll(src, dst []byte) []byte
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL