recordio

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

README

Using RecordIO

RecordIO allows you to write sequential key/value entities into a flat file and is heavily inspired by Hadoop's SequenceFile.

Below sections focus on reading and writing using plain byte slices. If you want to read more about the Protobuf bindings go to the section Using Proto RecordIO.

Writing

The general flow is:

  • create a new recordio writer (with a string path or file pointer, optionally with compression)
  • open the file
  • write a "record" as often and as much as you want
  • close the file

In go that looks like this:

import "github.com/thomasjungblut/go-sstables/recordio"

writer, err := recordio.NewFileWriter(
                     recordio.Path("some/path/records.rio"), 
                     recordio.CompressionType(recordio.CompressionTypeSnappy))
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Open()
if err != nil { log.Fatalf("error: %v", err) }

offset, err := writer.Write([]byte{1,3,3,7})
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Close()
if err != nil { log.Fatalf("error: %v", err) }

After Write, you get the offset in the file returned at which the record was written. This is quite useful for indexing and is used heavily in the sstables package.

There is another alternative method called WriteSync, which can be used to flush the disk write cache "fsync" to actually persist the data. That's a must-have in a write-ahead-log to guarantee the persistence on the disk. Keep in mind that this is drastically slower, consult the benchmark section for more information.

By default, the recordio.NewFileWriter will not use any compression, but if configured there are two compression libs available: Snappy and GZIP. The compression is per record and not for the whole file - so it might not be as efficient as compressing the whole content at once after closing.

Reading

Reading follows the general lifecycle as well. The reading works by reading the next byte slices until io.EOF (or a wrapped alternative) is returned - which is a familiar pattern from other "iterables".

import (
   "github.com/thomasjungblut/go-sstables/recordio"
)

reader, err := recordio.NewFileReaderWithFile(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

for {
    _, err := reader.ReadNext()
    // io.EOF signals that no records are left to be read, could be wrapped - so always check using errors.Is()
    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        log.Fatalf("error: %v", err)
    }

    log.Printf("%s", record.GetMessage())
}

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

Using Proto RecordIO

Reading and writing a recordio file using Protobuf and snappy compression can be done quite easily with the below sections. Here's the simple proto file we use:

syntax = "proto3";
package proto;
option go_package = "github.com/thomasjungblut/go-sstables/examples/proto";

message HelloWorld {
    string message = 1;
}

You can compile using protoc, here we are saving the message as part of our examples:

protoc --go_out=. --go_opt=paths=source_relative examples/proto/hello_world.proto

Writing

Writing a recordio file in Go then becomes:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

writer, err := rProto.NewWriter(
                     rProto.Path(path), 
                     rProto.CompressionType(recordio.CompressionTypeSnappy))
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Open()
if err != nil { log.Fatalf("error: %v", err) }

record := &proto.HelloWorld{Message: "Hello World"}
recordOffset, err := writer.Write(record)
if err != nil { log.Fatalf("error: %v", err) }

log.Printf("wrote a record at offset of %d bytes", recordOffset)

err = writer.Close()
if err != nil { log.Fatalf("error: %v", err) }

Reading

Reading the same file we just wrote can be done like this:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

reader, err := rProto.NewProtoReaderWithPath(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

for {
    record := &proto.HelloWorld{}
    _, err := reader.ReadNext(record)
    // io.EOF signals that no records are left to be read
    if err == io.EOF {
        break
    }

    if err != nil {
        log.Fatalf("error: %v", err)
    }

    log.Printf("%s", record.GetMessage())
}

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

SSTables support random reads of backing values, thus recordio also supports it using its mmap implementation:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

reader, err := rProto.NewMMapProtoReaderWithPath(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

record := &proto.HelloWorld{}
_, err = reader.ReadNextAt(record, 8)
if err != nil { log.Fatalf("error: %v", err) }

log.Printf("Reading message at offset 8: %s", record.GetMessage())

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

You can get the full example from examples/recordio.go.

Documentation

Index

Constants

View Source
const (
	// never reorder, always append
	CompressionTypeNone   = iota
	CompressionTypeGZIP   = iota
	CompressionTypeSnappy = iota
)
View Source
const CurrentVersion = Version2
View Source
const DefaultBufferSize = 1024 * 1024 * 4
View Source
const FileHeaderSizeBytes = 8

4 byte version number, 4 byte compression code = 8 bytes

View Source
const MagicNumberSeparator uint32 = 0x130691
View Source
const MagicNumberSeparatorLong uint64 = 0x130691
View Source
const RecordHeaderSizeBytes = 20

that's the max buffer sizes to prevent PutUvarint to panic: 10 byte magic number, 10 byte uncompressed size, 10 bytes for compressed size = 30 bytes

View Source
const Version1 uint32 = 0x01
View Source
const Version2 uint32 = 0x02

Variables

This section is empty.

Functions

func NewCompressorForType

func NewCompressorForType(compType int) (compressor.CompressionI, error)

Returns an instance of the desired compressor defined by its identifier. An error is returned if the desired compressor is not implemented. Only CompressionTypeNone, CompressionTypeSnappy and CompressionTypeGZIP are available currently.

func SkipNextV1 added in v1.1.0

func SkipNextV1(r *FileReader) error

legacy support path for non-vint compressed V1

Types

type CloseableI added in v1.1.0

type CloseableI interface {
	// Closes the given file. Errors can happen when:
	// File was already closed before or is not yet open.
	// File could not be closed on the filesystem (eg when flushes fail)
	Close() error
}

type CountingBufferedReader added in v1.1.0

type CountingBufferedReader struct {
	// contains filtered or unexported fields
}

func (*CountingBufferedReader) Count added in v1.3.0

func (c *CountingBufferedReader) Count() uint64

func (*CountingBufferedReader) Read added in v1.1.0

func (c *CountingBufferedReader) Read(p []byte) (n int, err error)

Read reads data into p. It returns the number of bytes read into p. The bytes are taken from at most one Read on the underlying Reader, hence n may be less than len(p). To read exactly len(p) bytes, use io.ReadFull(b, p). At EOF, the count will be zero and err will be io.EOF.

func (*CountingBufferedReader) ReadByte added in v1.1.0

func (c *CountingBufferedReader) ReadByte() (byte, error)

ReadByte reads and returns a single byte. If no byte is available, returns an error.

func (*CountingBufferedReader) Reset added in v1.1.0

func (c *CountingBufferedReader) Reset(r io.Reader)

Reset discards any buffered data, resets all state, and switches the buffered reader to read from r.

type CountingReaderResetComposite added in v1.1.0

type CountingReaderResetComposite interface {
	ReaderResetComposite
	Count() uint64
}

func NewCountingByteReader added in v1.1.0

func NewCountingByteReader(reader *bufio.Reader) CountingReaderResetComposite

type FileReader

type FileReader struct {
	// contains filtered or unexported fields
}

func (*FileReader) Close

func (r *FileReader) Close() error

func (*FileReader) Open

func (r *FileReader) Open() error

func (*FileReader) ReadNext

func (r *FileReader) ReadNext() ([]byte, error)

func (*FileReader) SkipNext

func (r *FileReader) SkipNext() error

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

* This type defines a binary file format (little endian). * The file header has a 32 bit version number and a 32 bit compression type enum according to the table above. * Each record written in the file follows the following format (sequentially): * - MagicNumber (encoding/binary/Uvarint) to separate records from each other. * - Uncompressed data payload size (encoding/binary/Uvarint). * - Compressed data payload size (encoding/binary/Uvarint), or 0 if the data is not compressed. * - Payload as plain bytes, possibly compressed

func (*FileWriter) Close

func (w *FileWriter) Close() error

func (*FileWriter) Open

func (w *FileWriter) Open() error

func (*FileWriter) Size added in v1.1.0

func (w *FileWriter) Size() uint64

func (*FileWriter) Write

func (w *FileWriter) Write(record []byte) (uint64, error)

Appends a record of bytes, returns the current offset this item was written to

func (*FileWriter) WriteSync

func (w *FileWriter) WriteSync(record []byte) (uint64, error)

Appends a record of bytes and forces a disk sync, returns the current offset this item was written to

type FileWriterOption added in v1.2.0

type FileWriterOption func(*FileWriterOptions)

func BufferSizeBytes added in v1.3.0

func BufferSizeBytes(p int) FileWriterOption

BufferSizeBytes sets the write buffer size, by default it uses DefaultBufferSize. This is the internal memory buffer before it's written to disk.

func CompressionType added in v1.2.0

func CompressionType(p int) FileWriterOption

CompressionType sets the record compression for the given file, the types are all prefixed with CompressionType*. Valid values for example are CompressionTypeNone, CompressionTypeSnappy, CompressionTypeGZIP.

func DirectIO added in v1.3.0

func DirectIO() FileWriterOption

Experimental: this flag enables DirectIO while writing, this currently might not work due to the misaligned allocations

func File added in v1.2.0

func File(p *os.File) FileWriterOption

File uses the given os.File as the sink to write into. The code manages the given file lifecycle (ie closing). Either this or Path must be supplied

func Path added in v1.2.0

func Path(p string) FileWriterOption

Path defines the file path where to write the recordio file into. Path will create a new file if it doesn't exist yet, it will not create any parent directories. Either this or File must be supplied.

type FileWriterOptions added in v1.2.0

type FileWriterOptions struct {
	// contains filtered or unexported fields
}
type Header struct {
	// contains filtered or unexported fields
}

type MMapReader

type MMapReader struct {
	// contains filtered or unexported fields
}

func (*MMapReader) Close

func (r *MMapReader) Close() error

func (*MMapReader) Open

func (r *MMapReader) Open() error

func (*MMapReader) ReadNextAt

func (r *MMapReader) ReadNextAt(offset uint64) ([]byte, error)

type OpenClosableI

type OpenClosableI interface {
	CloseableI
	OpenableI
}

type OpenableI added in v1.1.0

type OpenableI interface {
	// Opens the given file for reading or writing. Errors can happen in multiple circumstances:
	// File or directory doesn't exist or are not accessible.
	// File was already opened or closed before.
	// File is corrupt, header wasn't readable or versions are incompatible.
	Open() error
}

type ReadAtI

type ReadAtI interface {
	OpenClosableI
	// Reads the next record at the given offset, EOF error when it reaches the end signalled by (nil, io.EOF).
	// It can be wrapped however, so always check using errors.Is(err, io.EOF). Implementation must be thread-safe.
	ReadNextAt(offset uint64) ([]byte, error)
}

this type is thread-safe

func NewMemoryMappedReaderWithPath

func NewMemoryMappedReaderWithPath(path string) (ReadAtI, error)

NewMemoryMappedReaderWithPath creates a new mmap reader at the given path.

type ReaderI

type ReaderI interface {
	OpenClosableI
	// Reads the next record, EOF error when it reaches the end signalled by (nil, io.EOF). It can be wrapped however, so always check using errors.Is(err, io.EOF).
	ReadNext() ([]byte, error)
	// skips the next record, EOF error when it reaches the end signalled by io.EOF as the error. It can be wrapped however, so always check using errors.Is(err, io.EOF).
	SkipNext() error
}

func NewFileReaderWithFile

func NewFileReaderWithFile(file *os.File) (ReaderI, error)

NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files with the given file. The file will be managed from here on out (ie closing).

func NewFileReaderWithPath

func NewFileReaderWithPath(path string) (ReaderI, error)

NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files at the given path.

type ReaderResetComposite added in v1.3.0

type ReaderResetComposite interface {
	io.ByteReader
	io.Reader
	Reset
}

type Reset added in v1.1.0

type Reset interface {
	Reset(r io.Reader)
}

type SizeI added in v1.1.0

type SizeI interface {
	// returns the current size of the file in bytes
	Size() uint64
}

type WriterI

type WriterI interface {
	OpenClosableI
	SizeI
	// Appends a record of bytes, returns the current offset this item was written to
	Write(record []byte) (uint64, error)
	// Appends a record of bytes and forces a disk sync, returns the current offset this item was written to
	WriteSync(record []byte) (uint64, error)
}

func NewFileWriter added in v1.2.0

func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error)

creates a new writer with the given options, either Path or File must be supplied, compression is optional.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL