recordio

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2021 License: Apache-2.0 Imports: 10 Imported by: 0

README

Using RecordIO

RecordIO allows you to write sequential key/value entities into a flat file and is heavily inspired by Hadoop's SequenceFile.

Below sections focus on reading and writing using plain byte slices. If you want to read more about the Protobuf bindings go to the section Using Proto RecordIO.

Writing

The general flow is:

  • create a new recordio writer (with a string path or file pointer, optionally with compression)
  • open the file
  • write a "record" as often and as much as you want
  • close the file

In go that looks like this:

import "github.com/thomasjungblut/go-sstables/recordio"

writer, err := recordio.NewFileWriter(
                     recordio.Path("some/path/records.rio"), 
                     recordio.CompressionType(recordio.CompressionTypeSnappy))
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Open()
if err != nil { log.Fatalf("error: %v", err) }

offset, err := writer.Write([]byte{1,3,3,7})
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Close()
if err != nil { log.Fatalf("error: %v", err) }

After Write, you get the offset in the file returned at which the record was written. This is quite useful for indexing and is used heavily in the sstables package.

There is another alternative method called WriteSync, which can be used to flush the disk write cache "fsync" to actually persist the data. That's a must-have in a write-ahead-log to guarantee the persistence on the disk. Keep in mind that this is drastically slower, consult the benchmark section for more information.

By default, the recordio.NewFileWriter will not use any compression, but if configured there are two compression libs available: Snappy and GZIP. The compression is per record and not for the whole file - so it might not be as efficient as compressing the whole content at once after closing.

Reading

Reading follows the general lifecycle as well. The reading works by reading the next byte slices until io.EOF is returned - which is a familiar pattern from other "iterables".

import (
   "github.com/thomasjungblut/go-sstables/recordio"
)

reader, err := recordio.NewFileReaderWithFile(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

for {
    _, err := reader.ReadNext()
    // io.EOF signals that no records are left to be read
    if err == io.EOF {
        break
    }

    if err != nil {
        log.Fatalf("error: %v", err)
    }

    log.Printf("%s", record.GetMessage())
}

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

Using Proto RecordIO

Reading and writing a recordio file using Protobuf and snappy compression can be done quite easily with the below sections. Here's the simple proto file we use:

syntax = "proto3";
package proto;
option go_package = "github.com/thomasjungblut/go-sstables/examples/proto";

message HelloWorld {
    string message = 1;
}

You can compile using protoc, here we are saving the message as part of our examples:

protoc --go_out=. --go_opt=paths=source_relative examples/proto/hello_world.proto

Writing

Writing a recordio file in Go then becomes:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

writer, err := rProto.NewWriter(
                     rProto.Path(path), 
                     rProto.CompressionType(recordio.CompressionTypeSnappy))
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Open()
if err != nil { log.Fatalf("error: %v", err) }

record := &proto.HelloWorld{Message: "Hello World"}
recordOffset, err := writer.Write(record)
if err != nil { log.Fatalf("error: %v", err) }

log.Printf("wrote a record at offset of %d bytes", recordOffset)

err = writer.Close()
if err != nil { log.Fatalf("error: %v", err) }

Reading

Reading the same file we just wrote can be done like this:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

reader, err := rProto.NewProtoReaderWithPath(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

for {
    record := &proto.HelloWorld{}
    _, err := reader.ReadNext(record)
    // io.EOF signals that no records are left to be read
    if err == io.EOF {
        break
    }

    if err != nil {
        log.Fatalf("error: %v", err)
    }

    log.Printf("%s", record.GetMessage())
}

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

SSTables support random reads of backing values, thus recordio also supports it using its mmap implementation:

import (
   "github.com/thomasjungblut/go-sstables/examples/proto" // generated proto
   rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

reader, err := rProto.NewMMapProtoReaderWithPath(path)
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
if err != nil { log.Fatalf("error: %v", err) }

record := &proto.HelloWorld{}
_, err = reader.ReadNextAt(record, 8)
if err != nil { log.Fatalf("error: %v", err) }

log.Printf("Reading message at offset 8: %s", record.GetMessage())

err = reader.Close()
if err != nil { log.Fatalf("error: %v", err) }

You can get the full example from examples/recordio.go.

Documentation

Index

Constants

View Source
const (
	// never reorder, always append
	CompressionTypeNone   = iota
	CompressionTypeGZIP   = iota
	CompressionTypeSnappy = iota
)
View Source
const CurrentVersion = Version2
View Source
const FileHeaderSizeBytes = 8

4 byte version number, 4 byte compression code = 8 bytes

View Source
const MagicNumberSeparator uint32 = 0x130691
View Source
const MagicNumberSeparatorLong uint64 = 0x130691
View Source
const RecordHeaderSizeBytes = 20

that's the max buffer sizes to prevent PutUvarint to panic: 10 byte magic number, 10 byte uncompressed size, 10 bytes for compressed size = 30 bytes

View Source
const Version1 uint32 = 0x01
View Source
const Version2 uint32 = 0x02

Variables

This section is empty.

Functions

func NewCompressorForType

func NewCompressorForType(compType int) (compressor.CompressionI, error)

func SkipNextV1 added in v1.1.0

func SkipNextV1(r *FileReader) error

legacy support path for non-vint compressed V1

Types

type CloseableI added in v1.1.0

type CloseableI interface {
	// Closes the given file. Errors can happen when:
	// File was already closed before or is not yet open.
	// File could not be closed on the filesystem (eg when flushes fail)
	Close() error
}

type CountingBufferedReader added in v1.1.0

type CountingBufferedReader struct {
	// contains filtered or unexported fields
}

func NewCountingByteReader added in v1.1.0

func NewCountingByteReader(reader *bufio.Reader) *CountingBufferedReader

func (*CountingBufferedReader) Read added in v1.1.0

func (c *CountingBufferedReader) Read(p []byte) (n int, err error)

Read reads data into p. It returns the number of bytes read into p. The bytes are taken from at most one Read on the underlying Reader, hence n may be less than len(p). To read exactly len(p) bytes, use io.ReadFull(b, p). At EOF, the count will be zero and err will be io.EOF.

func (*CountingBufferedReader) ReadByte added in v1.1.0

func (c *CountingBufferedReader) ReadByte() (byte, error)

ReadByte reads and returns a single byte. If no byte is available, returns an error.

func (*CountingBufferedReader) Reset added in v1.1.0

func (c *CountingBufferedReader) Reset(r io.Reader)

Reset discards any buffered data, resets all state, and switches the buffered reader to read from r.

type CountingReaderResetComposite added in v1.1.0

type CountingReaderResetComposite interface {
	io.ByteReader
	io.Reader
	Reset
}

type FileReader

type FileReader struct {
	// contains filtered or unexported fields
}

func NewFileReaderWithFile

func NewFileReaderWithFile(file *os.File) (*FileReader, error)

func NewFileReaderWithPath

func NewFileReaderWithPath(path string) (*FileReader, error)

func (*FileReader) Close

func (r *FileReader) Close() error

func (*FileReader) Open

func (r *FileReader) Open() error

func (*FileReader) ReadNext

func (r *FileReader) ReadNext() ([]byte, error)

func (*FileReader) SkipNext

func (r *FileReader) SkipNext() error

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

* This type defines a binary file format (little endian). * The file header has a 32 bit version number and a 32 bit compression type enum according to the table above. * Each record written in the file follows the following format (sequentially): * - MagicNumber (encoding/binary/Uvarint) to separate records from each other. * - Uncompressed data payload size (encoding/binary/Uvarint). * - Compressed data payload size (encoding/binary/Uvarint), or 0 if the data is not compressed. * - Payload as plain bytes, possibly compressed

func NewFileWriter added in v1.2.0

func NewFileWriter(writerOptions ...FileWriterOption) (*FileWriter, error)

creates a new writer with the given options, either Path or File must be supplied, compression is optional.

func (*FileWriter) Close

func (w *FileWriter) Close() error

func (*FileWriter) Open

func (w *FileWriter) Open() error

func (*FileWriter) Size added in v1.1.0

func (w *FileWriter) Size() uint64

func (*FileWriter) Write

func (w *FileWriter) Write(record []byte) (uint64, error)

Appends a record of bytes, returns the current offset this item was written to

func (*FileWriter) WriteSync

func (w *FileWriter) WriteSync(record []byte) (uint64, error)

Appends a record of bytes and forces a disk sync, returns the current offset this item was written to

type FileWriterOption added in v1.2.0

type FileWriterOption func(*FileWriterOptions)

func CompressionType added in v1.2.0

func CompressionType(p int) FileWriterOption

func File added in v1.2.0

func File(p *os.File) FileWriterOption

func Path added in v1.2.0

func Path(p string) FileWriterOption

type FileWriterOptions added in v1.2.0

type FileWriterOptions struct {
	// contains filtered or unexported fields
}
type Header struct {
	// contains filtered or unexported fields
}

type MMapReader

type MMapReader struct {
	// contains filtered or unexported fields
}

func NewMemoryMappedReaderWithPath

func NewMemoryMappedReaderWithPath(path string) (*MMapReader, error)

func (*MMapReader) Close

func (r *MMapReader) Close() error

func (*MMapReader) Open

func (r *MMapReader) Open() error

func (*MMapReader) ReadNextAt

func (r *MMapReader) ReadNextAt(offset uint64) ([]byte, error)

type OpenClosableI

type OpenClosableI interface {
	CloseableI
	OpenableI
}

type OpenableI added in v1.1.0

type OpenableI interface {
	// Opens the given file for reading or writing. Errors can happen in multiple circumstances:
	// File or directory doesn't exist or are not accessible.
	// File was already opened or closed before.
	// File is corrupt, header wasn't readable or versions are incompatible.
	Open() error
}

type ReadAtI

type ReadAtI interface {
	OpenClosableI
	// Reads the next record at the given offset, EOF error when it reaches the end signalled by (nil, io.EOF), implementation must be thread-safe
	ReadNextAt(offset uint64) ([]byte, error)
}

this type is thread-safe

type ReaderI

type ReaderI interface {
	OpenClosableI
	// Reads the next record, EOF error when it reaches the end signalled by (nil, io.EOF)
	ReadNext() ([]byte, error)
	// skips the next record, EOF error when it reaches the end signalled by io.EOF as the error
	SkipNext() error
}

type Reset added in v1.1.0

type Reset interface {
	Reset(r io.Reader)
}

type SizeI added in v1.1.0

type SizeI interface {
	// returns the current size of the file in bytes
	Size() uint64
}

type WriterI

type WriterI interface {
	OpenClosableI
	SizeI
	// Appends a record of bytes, returns the current offset this item was written to
	Write(record []byte) (uint64, error)
	// Appends a record of bytes and forces a disk sync, returns the current offset this item was written to
	WriteSync(record []byte) (uint64, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL