recordio

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

Using RecordIO

RecordIO allows you to write sequential key/value entities into a flat file and is heavily inspired by Hadoop's SequenceFile.

Below sections focus on reading and writing using plain byte slices. If you want to read more about the Protobuf bindings go to the section Using Proto RecordIO.

Writing

The general flow is:

  • create a new recordio writer (with a string path or file pointer, optionally with compression)
  • open the file
  • write a "record" as often and as much as you want
  • close the file

In go that looks like this:

import "github.com/thomasjungblut/go-sstables/recordio"

writer, err := recordio.NewFileWriter(
                     recordio.Path("some/path/records.rio"), 
                     recordio.CompressionType(recordio.CompressionTypeSnappy))
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Open()
if err != nil { log.Fatalf("error: %v", err) }

offset, err := writer.Write([]byte{1,3,3,7})
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Close()
if err != nil { log.Fatalf("error: %v", err) }

After Write, you get the offset in the file returned at which the record was written. This is quite useful for indexing and is used heavily in the sstables package.

There is another alternative method called WriteSync, which can be used to flush the disk write cache "fsync" to actually persist the data. That's a must-have in a write-ahead-log to guarantee the persistence on the disk. Keep in mind that this is drastically slower, consult the benchmark section for more information.

By default, the recordio.NewFileWriter will not use any compression, but if configured there are two compression libs available: Snappy and GZIP. The compression is per record and not for the whole file - so it might not be as efficient as compressing the whole content at once after closing.

Reading

Reading follows the general lifecycle as well. The reading works by reading the next byte slices until io.EOF (or a wrapped alternative) is returned - which is a familiar pattern from other "iterables".

import (
   "github.com/thomasjungblut/go-sstables/recordio"
)

reader, err := recordio.NewFileReaderWithFile(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

for {
    _, err := reader.ReadNext()
    // io.EOF signals that no records are left to be read, could be wrapped - so always check using errors.Is()
    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        log.Fatalf("error: %v", err)
    }

    log.Printf("%s", record.GetMessage())
}

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

Using Proto RecordIO

Reading and writing a recordio file using Protobuf and snappy compression can be done quite easily with the below sections. Here's the simple proto file we use:

syntax = "proto3";
package proto;
option go_package = "github.com/thomasjungblut/go-sstables/examples/proto";

message HelloWorld {
    string message = 1;
}

You can compile using protoc, here we are saving the message as part of our examples:

protoc --go_out=. --go_opt=paths=source_relative examples/proto/hello_world.proto

Writing

Writing a recordio file in Go then becomes:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

writer, err := rProto.NewWriter(
                     rProto.Path(path), 
                     rProto.CompressionType(recordio.CompressionTypeSnappy))
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Open()
if err != nil { log.Fatalf("error: %v", err) }

record := &proto.HelloWorld{Message: "Hello World"}
recordOffset, err := writer.Write(record)
if err != nil { log.Fatalf("error: %v", err) }

log.Printf("wrote a record at offset of %d bytes", recordOffset)

err = writer.Close()
if err != nil { log.Fatalf("error: %v", err) }

Reading

Reading the same file we just wrote can be done like this:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

reader, err := rProto.NewProtoReaderWithPath(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

for {
    record := &proto.HelloWorld{}
    _, err := reader.ReadNext(record)
    // io.EOF signals that no records are left to be read
    if err == io.EOF {
        break
    }

    if err != nil {
        log.Fatalf("error: %v", err)
    }

    log.Printf("%s", record.GetMessage())
}

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

SSTables support random reads of backing values, thus recordio also supports it using its mmap implementation:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

reader, err := rProto.NewMMapProtoReaderWithPath(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

record := &proto.HelloWorld{}
_, err = reader.ReadNextAt(record, 8)
if err != nil { log.Fatalf("error: %v", err) }

log.Printf("Reading message at offset 8: %s", record.GetMessage())

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

You can get the full example from examples/recordio.go.

DirectIO (experimental)

DirectIO is useful when you want to bypass the operating system memory caches when writing something to disk directly. This can be useful in database applications like bulk-imports, where you don't want to pollute/churn existing memory for pages that were recently written and won't be read anytime soon.

DirectIO can be enabled when creating a new writer by setting:

import "github.com/thomasjungblut/go-sstables/recordio"

writer, err := recordio.NewFileWriter(
	recordio.Path("some/path/records.rio"), 
	recordio.DirectIO(), 
	recordio.BufferSizeBytes(4096))
if err != nil { log.Fatalf("error: %v", err) }

It's highly recommended to check what buffer/block sizes are available on the target system, usually those need to be a power of two. It's very important to test a full open/write/close cycle as well, otherwise you might encounter rather strange error messages like The parameter is incorrect., which sadly isn't very meaningful and difficult to debug. Usually this either means that DirectIO wasn't available to begin with, or the block sizes are not aligned with what the operating system expects to be written.

You can check whether your OS is theoretically capable to enable DirectIO using:

import "github.com/thomasjungblut/go-sstables/recordio"

// true if yes, otherwise not
available, err := recordio.IsDirectIOAvailable()

In this package the DirectIO support comes through a library called ncw/directio, which has good support across Linux, macOS and Windows under a single interface. The caveats of each platform, for example the buffer/block sizes, need to still be taken into account.
Another caveat is that the block alignment causes to write a certain amount of waste. Let's imagine you have blocks of 1024 bytes and only want to write 1025 bytes, with DirectIO enabled you will end up with a file of size 2048 (2 blocks) instead of a file with only 1025 bytes with DirectIO disabled. The DirectIO file will be padded with zeroes towards the end and the in-library readers honor this format and not assume a corrupted file format.

Documentation

Index

Constants

View Source
const (
	CompressionTypeNone   = iota
	CompressionTypeGZIP   = iota
	CompressionTypeSnappy = iota
	CompressionTypeLzw    = iota
)

never reorder, always append

View Source
const CurrentVersion = Version2
View Source
const DefaultBufferSize = 1024 * 1024 * 4

DefaultBufferSize is four mebibyte and can be customized using the option BufferSizeBytes.

View Source
const FileHeaderSizeBytes = 8

FileHeaderSizeBytes has a 4 byte version number, 4 byte compression code = 8 bytes

View Source
const MagicNumberSeparator uint32 = 0x130691
View Source
const MagicNumberSeparatorLong uint64 = 0x130691
View Source
const RecordHeaderSizeBytes = 20

RecordHeaderV2MaxSizeBytes is the max buffer sizes to prevent PutUvarint to panic: 10 byte magic number, 10 byte uncompressed size, 10 bytes for compressed size = 30 bytes

View Source
const Version1 uint32 = 0x01
View Source
const Version2 uint32 = 0x02

Variables

View Source
var DirectIOSyncWriteErr = errors.New("currently not supporting directIO with sync writing")
View Source
var MagicNumberMismatchErr = fmt.Errorf("magic number mismatch")

Functions

func IsDirectIOAvailable added in v1.4.0

func IsDirectIOAvailable() (available bool, err error)

IsDirectIOAvailable tests whether DirectIO is available (on the OS / filesystem). It will return (true, nil) if that's the case, if it's not available it will be (false, nil). Any other error will be indicated by the error (either true/false).

func NewCompressorForType

func NewCompressorForType(compType int) (compressor.CompressionI, error)

NewCompressorForType returns an instance of the desired compressor defined by its identifier. An error is returned if the desired compressor is not implemented. Only CompressionTypeNone, CompressionTypeSnappy and CompressionTypeGZIP are available currently.

func SkipNextV1 added in v1.1.0

func SkipNextV1(r *FileReader) error

SkipNextV1 is legacy support path for non-vint compressed V1

Types

type BufferedIOFactory added in v1.4.0

type BufferedIOFactory struct {
}

func (BufferedIOFactory) CreateNewReader added in v1.4.0

func (d BufferedIOFactory) CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error)

func (BufferedIOFactory) CreateNewWriter added in v1.4.0

func (d BufferedIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error)

type ByteReaderReset added in v1.4.0

type ByteReaderReset interface {
	io.ByteReader
	io.Reader
	Reset
	Size() int
}

type ByteReaderResetCount added in v1.4.0

type ByteReaderResetCount interface {
	ByteReaderReset
	Count() uint64
}

func NewCountingByteReader added in v1.1.0

func NewCountingByteReader(reader ByteReaderReset) ByteReaderResetCount

type CloseableI added in v1.1.0

type CloseableI interface {
	// Close closes the given file. Errors can happen when:
	// File was already closed before or is not yet open.
	// File could not be closed on the filesystem (eg when flushes fail)
	Close() error
}

type CountingBufferedReader added in v1.1.0

type CountingBufferedReader struct {
	// contains filtered or unexported fields
}

func (*CountingBufferedReader) Count added in v1.3.0

func (c *CountingBufferedReader) Count() uint64

func (*CountingBufferedReader) Read added in v1.1.0

func (c *CountingBufferedReader) Read(p []byte) (n int, err error)

Read reads data into p. It returns the number of bytes read into p. The bytes are taken from at most one Read on the underlying Reader, hence n may be less than len(p). To read exactly len(p) bytes, use io.ReadFull(b, p). At EOF, the count will be zero and err will be io.EOF.

func (*CountingBufferedReader) ReadByte added in v1.1.0

func (c *CountingBufferedReader) ReadByte() (byte, error)

ReadByte reads and returns a single byte. If no byte is available, returns an error.

func (*CountingBufferedReader) Reset added in v1.1.0

func (c *CountingBufferedReader) Reset(r io.Reader)

Reset discards any buffered data, resets all state, and switches the buffered reader to read from r.

func (*CountingBufferedReader) Size added in v1.4.0

func (c *CountingBufferedReader) Size() int

type DirectIOFactory added in v1.4.0

type DirectIOFactory struct {
}

func (DirectIOFactory) CreateNewReader added in v1.4.0

func (d DirectIOFactory) CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error)

func (DirectIOFactory) CreateNewWriter added in v1.4.0

func (d DirectIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error)

type FileReader

type FileReader struct {
	// contains filtered or unexported fields
}

func (*FileReader) Close

func (r *FileReader) Close() error

func (*FileReader) Open

func (r *FileReader) Open() error

func (*FileReader) ReadNext

func (r *FileReader) ReadNext() ([]byte, error)

func (*FileReader) SkipNext

func (r *FileReader) SkipNext() error

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter defines a binary file format (little endian). The file header has a 32 bit version number and a 32 bit compression type enum according to the table above. Each record written in the file follows the following format (sequentially): - MagicNumber (encoding/binary/Uvarint) to separate records from each other. - Uncompressed data payload size (encoding/binary/Uvarint). - Compressed data payload size (encoding/binary/Uvarint), or 0 if the data is not compressed. - Payload as plain bytes, possibly compressed

func (*FileWriter) Close

func (w *FileWriter) Close() error

func (*FileWriter) Open

func (w *FileWriter) Open() error

func (*FileWriter) Size added in v1.1.0

func (w *FileWriter) Size() uint64

func (*FileWriter) Write

func (w *FileWriter) Write(record []byte) (uint64, error)

Write appends a record of bytes, returns the current offset this item was written to

func (*FileWriter) WriteSync

func (w *FileWriter) WriteSync(record []byte) (uint64, error)

WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to. When directIO is enabled however, we can't write misaligned blocks and immediately returns DirectIOSyncWriteErr

type FileWriterOption added in v1.2.0

type FileWriterOption func(*FileWriterOptions)

func BufferSizeBytes added in v1.3.0

func BufferSizeBytes(p int) FileWriterOption

BufferSizeBytes sets the write buffer size, by default it uses DefaultBufferSize. This is the internal memory buffer before it's written to disk.

func CompressionType added in v1.2.0

func CompressionType(p int) FileWriterOption

CompressionType sets the record compression for the given file, the types are all prefixed with CompressionType*. Valid values for example are CompressionTypeNone, CompressionTypeSnappy, CompressionTypeGZIP.

func DirectIO added in v1.3.0

func DirectIO() FileWriterOption

DirectIO is experimental: this flag enables DirectIO while writing. This has some limitation when writing headers and disables the ability to use WriteSync.

func File added in v1.2.0

func File(p *os.File) FileWriterOption

File uses the given os.File as the sink to write into. The code manages the given file lifecycle (ie closing). Either this or Path must be supplied

func Path added in v1.2.0

func Path(p string) FileWriterOption

Path defines the file path where to write the recordio file into. Path will create a new file if it doesn't exist yet, it will not create any parent directories. Either this or File must be supplied.

type FileWriterOptions added in v1.2.0

type FileWriterOptions struct {
	// contains filtered or unexported fields
}
type Header struct {
	// contains filtered or unexported fields
}

type MMapReader

type MMapReader struct {
	// contains filtered or unexported fields
}

func (*MMapReader) Close

func (r *MMapReader) Close() error

func (*MMapReader) Open

func (r *MMapReader) Open() error

func (*MMapReader) ReadNextAt

func (r *MMapReader) ReadNextAt(offset uint64) ([]byte, error)

type OpenClosableI

type OpenClosableI interface {
	CloseableI
	OpenableI
}

type OpenableI added in v1.1.0

type OpenableI interface {
	// Open opens the given file for reading or writing. Errors can happen in multiple circumstances:
	// File or directory doesn't exist or are not accessible.
	// File was already opened or closed before.
	// File is corrupt, header wasn't readable or versions are incompatible.
	Open() error
}

type ReadAtI

type ReadAtI interface {
	OpenClosableI
	// ReadNextAt reads the next record at the given offset, EOF error when it reaches the end signalled by (nil, io.EOF).
	// It can be wrapped however, so always check using errors.Is(err, io.EOF). Implementation must be thread-safe.
	ReadNextAt(offset uint64) ([]byte, error)
}

ReadAtI implementors must make their implementation thread-safe

func NewMemoryMappedReaderWithPath

func NewMemoryMappedReaderWithPath(path string) (ReadAtI, error)

NewMemoryMappedReaderWithPath creates a new mmap reader at the given path.

type Reader added in v1.4.0

type Reader struct {
	// contains filtered or unexported fields
}

Reader implements buffering for an io.Reader object. This is the same writer as bufio.Reader, but it allows us to supply the buffer from the outside. Namely, it only has a new constructor in NewReaderBuf and implements Close()

func NewReaderBuf added in v1.4.0

func NewReaderBuf(rd io.Reader, buf []byte) *Reader

func (*Reader) Buffered added in v1.4.0

func (b *Reader) Buffered() int

Buffered returns the number of bytes that can be read from the current buffer.

func (*Reader) Read added in v1.4.0

func (b *Reader) Read(p []byte) (n int, err error)

Read reads data into p. It returns the number of bytes read into p. The bytes are taken from at most one Read on the underlying Reader, hence n may be less than len(p). To read exactly len(p) bytes, use io.ReadFull(b, p). At EOF, the count will be zero and err will be io.EOF.

func (*Reader) ReadByte added in v1.4.0

func (b *Reader) ReadByte() (byte, error)

ReadByte reads and returns a single byte. If no byte is available, returns an error.

func (*Reader) Reset added in v1.4.0

func (b *Reader) Reset(r io.Reader)

Reset discards any buffered data, resets all state, and switches the buffered reader to read from r.

func (*Reader) Size added in v1.4.0

func (b *Reader) Size() int

Size returns the size of the underlying buffer in bytes.

type ReaderI

type ReaderI interface {
	OpenClosableI
	// ReadNext reads the next record, EOF error when it reaches the end signalled by (nil, io.EOF). It can be wrapped however, so always check using errors.Is(err, io.EOF).
	ReadNext() ([]byte, error)
	// SkipNext skips the next record, EOF error when it reaches the end signalled by io.EOF as the error. It can be wrapped however, so always check using errors.Is(err, io.EOF).
	SkipNext() error
}

func NewFileReaderWithFile

func NewFileReaderWithFile(file *os.File) (ReaderI, error)

NewFileReaderWithFile creates a new recordio file reader that can read RecordIO files with the given file. The file will be managed from here on out (ie closing).

func NewFileReaderWithPath

func NewFileReaderWithPath(path string) (ReaderI, error)

NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files at the given path.

type ReaderWriterCloserFactory added in v1.4.0

type ReaderWriterCloserFactory interface {
	CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error)
	CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error)
}

type Reset added in v1.1.0

type Reset interface {
	Reset(r io.Reader)
}

type SizeI added in v1.1.0

type SizeI interface {
	// Size returns the current size of the file in bytes
	Size() uint64
}

type WriteCloserFlusher added in v1.4.0

type WriteCloserFlusher interface {
	io.WriteCloser
	Flush() error
	Size() int
}

func NewAlignedWriterBuf added in v1.4.0

func NewAlignedWriterBuf(w io.WriteCloser, buf []byte) WriteCloserFlusher

func NewWriterBuf added in v1.4.0

func NewWriterBuf(w io.WriteCloser, buf []byte) WriteCloserFlusher

type Writer added in v1.4.0

type Writer struct {
	// contains filtered or unexported fields
}

Writer implements buffering for an io.Writer object. If an error occurs writing to a Writer, no more data will be accepted and all subsequent writes, and Flush, will return the error. After all data has been written, the client should call the Flush method to guarantee all data has been forwarded to the underlying io.Writer. This is the same writer as bufio.Writer, but it allows us to supply the buffer from the outside. Namely, it has a new constructor in NewWriterBuf & NewAlignedWriterBuf, implements Close() and supports block aligned flushes. Additionally, several methods that were not needed are removed to reduce the test surface of the original.

func (*Writer) Available added in v1.4.0

func (b *Writer) Available() int

Available returns how many bytes are unused in the buffer.

func (*Writer) Close added in v1.4.0

func (b *Writer) Close() error

func (*Writer) Flush added in v1.4.0

func (b *Writer) Flush() error

Flush writes any buffered data to the underlying io.Writer.

func (*Writer) Size added in v1.4.0

func (b *Writer) Size() int

Size returns the size of the underlying buffer in bytes.

func (*Writer) Write added in v1.4.0

func (b *Writer) Write(p []byte) (nn int, err error)

Write writes the contents of p into the buffer. It returns the number of bytes written. If nn < len(p), it also returns an error explaining why the write is short.

type WriterI

type WriterI interface {
	OpenClosableI
	SizeI
	// Write appends a record of bytes, returns the current offset this item was written to
	Write(record []byte) (uint64, error)
	// WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to
	WriteSync(record []byte) (uint64, error)
}

func NewFileWriter added in v1.2.0

func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error)

NewFileWriter creates a new writer with the given options, either Path or File must be supplied, compression is optional.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL