Documentation ¶
Overview ¶
Package recordio implements the recordio file format. A recordio file stores a sequence of items, with optional compressiond, encryption, and indexing.
See the README.md file (https://github.com/grailbio/base/blob/master/recordio/README.md) for more detailed documentation.
Example (Basic) ¶
Example_basic demonstrates basic reads, writes, and flate complession.
package main import ( "bytes" "fmt" "io" "github.com/grailbio/base/recordio" "github.com/grailbio/base/recordio/recordioflate" ) func init() { recordioflate.Init() } func doWrite(out io.Writer) { wr := recordio.NewWriter(out, recordio.WriterOpts{ Transformers: []string{"flate"}, Marshal: func(scratch []byte, v interface{}) ([]byte, error) { return []byte(v.(string)), nil }, }) wr.Append("Item0") wr.Append("Item1") if err := wr.Finish(); err != nil { panic(err) } } func doRead(in io.ReadSeeker) { r := recordio.NewScanner(in, recordio.ScannerOpts{ Unmarshal: func(data []byte) (interface{}, error) { return string(data), nil }, }) for r.Scan() { fmt.Printf("Item: %s\n", r.Get().(string)) } if err := r.Err(); err != nil { panic(err) } } // Example_basic demonstrates basic reads, writes, and flate complession. func main() { buf := &bytes.Buffer{} doWrite(buf) doRead(bytes.NewReader(buf.Bytes())) }
Output: Item: Item0 Item: Item1
Example (Indexing) ¶
package main import ( "bytes" "encoding/gob" "fmt" "io" "github.com/grailbio/base/recordio" ) type recordioIndex map[string]recordio.ItemLocation func doWriteWithIndex(out io.Writer) { index := make(recordioIndex) wr := recordio.NewWriter(out, recordio.WriterOpts{ Marshal: func(scratch []byte, v interface{}) ([]byte, error) { return []byte(v.(string)), nil }, Index: func(loc recordio.ItemLocation, val interface{}) error { index[val.(string)] = loc return nil }, }) // To store a trailer block, AddHeader(recordio.KeyTrailer, true) must be // called beforehand. wr.AddHeader(recordio.KeyTrailer, true) wr.Append("Item0") wr.Append("Item1") wr.Append("Item2") wr.Flush() // Wait for the index callbacks to run. wr.Wait() // Write the index in the trailer. indexBuf := &bytes.Buffer{} encoder := gob.NewEncoder(indexBuf) if err := encoder.Encode(index); err != nil { panic(err) } wr.SetTrailer(indexBuf.Bytes()) if err := wr.Finish(); err != nil { panic(err) } } func doReadWithIndex(in io.ReadSeeker) { r := recordio.NewScanner(in, recordio.ScannerOpts{ Unmarshal: func(data []byte) (interface{}, error) { return string(data), nil }, }) // Read the trailer, parse it into the recordioIndex. decoder := gob.NewDecoder(bytes.NewReader(r.Trailer())) index := make(recordioIndex) if err := decoder.Decode(&index); err != nil { panic(err) } // Try reading individual items. r.Seek(index["Item1"]) for r.Scan() { fmt.Printf("Item: %s\n", r.Get().(string)) } r.Seek(index["Item0"]) for r.Scan() { fmt.Printf("Item: %s\n", r.Get().(string)) } if err := r.Err(); err != nil { panic(err) } } func main() { buf := &bytes.Buffer{} doWriteWithIndex(buf) doReadWithIndex(bytes.NewReader(buf.Bytes())) }
Output: Item: Item1 Item: Item2 Item: Item0 Item: Item1 Item: Item2
Index ¶
- Constants
- Variables
- func RegisterTransformer(name string, transformer TransformerFactory, untransformer TransformerFactory)
- type FormatVersion
- type IndexFunc
- type ItemLocation
- type KeyValue
- type MarshalFunc
- type ParsedHeader
- type Scanner
- type ScannerOpts
- type TransformFunc
- type TransformerFactory
- type Writer
- type WriterOpts
Examples ¶
Constants ¶
const ( // KeyTrailer must be set to true when the recordio file contains a trailer. // value type: bool KeyTrailer = "trailer" // KeyTransformer defines transformer functions used to encode blocks. KeyTransformer = "transformer" )
const ( // DefaultFlushParallelism is the default value for WriterOpts.MaxFlushParallelism. DefaultFlushParallelism = uint32(8) // MaxFlushParallelism is the max allowed value for WriterOpts.MaxFlushParallelism. MaxFlushParallelism = uint32(128) // MaxPackedItems defines the max items that can be // packed into a single record by a PackedWriter. MaxPackedItems = uint32(10 * 1024 * 1024) // DefaultPackedItems defines the default number of items that can // be packed into a single record by a PackedWriter. DefaultPackedItems = uint32(16 * 1024) )
Variables ¶
var MagicPacked = internal.MagicPacked
MagicPacked is the chunk header for legacy and v2 data chunks. Not for general use.
var MaxReadRecordSize = internal.MaxReadRecordSize
MaxReadRecordSize defines a max size for a record when reading to avoid crashes for unreasonable requests.
Functions ¶
func RegisterTransformer ¶
func RegisterTransformer(name string, transformer TransformerFactory, untransformer TransformerFactory)
RegisterTransformer registers a block transformer. Factory transformer should produce a transformer function. The factory is run by NewWriterV2. The transformer function is called by the writer to transform a block just before storing it in storage.
The untransformer factory is the reverse of the transformer factory. It is run by NewScannerV2. The untransformer function is called by the scanner to transform data read from storage into a block.
This function is usually called when the process starts.
The transformer and untransformer factories, as well as the functions generated by these factories must be all thread safe.
REQUIRES: A (un)transformer with the same "name" has not been registered already.
Types ¶
type FormatVersion ¶
type FormatVersion int
FormatVersion defines the file-format version. Not for general use. It may be removed without notice.
const ( // V1 is pre 2018-02 format V1 FormatVersion = 1 // V2 is post 2018-02 format V2 FormatVersion = 2 )
type IndexFunc ¶
type IndexFunc func(loc ItemLocation, item interface{}) error
IndexFunc runs after an item is flushed to storage. Parameter "loc" is the location of the item in the file. It can be later passed to Reader.Seek method to seek to the item.
type ItemLocation ¶
type ItemLocation struct { // Location of the first byte of the block within the file. Unit is bytes. Block uint64 // Index of the item within the block. The Nth item in the block (N=1,2,...) // has value N-1. Item int }
ItemLocation identifies the location of an item in a recordio file.
type KeyValue ¶
type KeyValue struct { // Key is the header key Key string // Value is the value corresponding to Key. The value must be one of int*, // uint*, float*, bool, or string type. Value interface{} }
KeyValue defines one entry stored in a recordio header block
type MarshalFunc ¶
MarshalFunc is called to serialize data. Parameter scratch is passed as an performance hint. If the result of the transformation fits in scratch, the function should store the result in scratch and return it as the first return value. Else, it should allocate a new []byte and return it.
type ParsedHeader ¶
type ParsedHeader []KeyValue
ParsedHeader is the result of parsing the recordio header block contents.
func (*ParsedHeader) HasTrailer ¶
func (h *ParsedHeader) HasTrailer() bool
HasTrailer checks if the header has a "trailer" entry.
type Scanner ¶
type Scanner interface { // Header returns the contents of the header block. Header() ParsedHeader // Scan returns true if a new record was read, false otherwise. It will return // false on encountering an error; the error may be retrieved using the Err // method. Note, that Scan will reuse storage from one invocation to the next. Scan() bool // Get returns the current item as read by a prior call to Scan. // // REQUIRES: Preceding Scan calls have returned true. There is no Seek // call between the last Scan call and the Get call. Get() interface{} // Err returns any error encountered by the writer. Once Err() becomes // non-nil, it stays so. Err() error // Set up so that the next Scan() call causes the pointer to move to the given // location. On any error, Err() will be set. // // REQUIRES: loc must be one of the values passed to the Index callback // during writes. Seek(loc ItemLocation) // Trailer returns the trailer block contents. If the trailer does not exist, // or is corrupt, it returns nil. The caller should examine Err() if Trailer // returns nil. Trailer() []byte // Return the file format version. Not for general use. Version() FormatVersion // Finish should be called exactly once, after the application has finished // using the scanner. It returns the value of Err(). // // The Finish method recycles the internal scanner resources for use by other // scanners, thereby reducing GC overhead. THe application must not touch the // scanner object after Finish. Finish() error }
Scanner defines an interface for recordio scanner.
A Scanner implementation must be thread safe. Legal path expression is defined below. Err, Header, and Trailer can be called at any time.
((Scan Get*) | Seek)* Finish
func NewScanner ¶
func NewScanner(in io.ReadSeeker, opts ScannerOpts) Scanner
NewScanner creates a new recordio scanner. The reader can read both legacy recordio files (packed or unpacked) or the new-format files. Any error is reported through the Scanner.Err method.
func NewShardScanner ¶
func NewShardScanner(in io.ReadSeeker, opts ScannerOpts, start, limit, nshard int) Scanner
NewShardScanner creates a new sharded recordio scanner. The returned scanner reads shard [start,limit) (of [0,nshard)) of the recordio file at the ReadSeeker in. Sharding is only supported for v2 recordio files; an error scanner is returned if NewShardScanner is called for a legacy recordio file.
NewShardScanner with shard and nshard set to 0 and 1 respectively (i.e., a single shard) behaves as NewScanner.
type ScannerOpts ¶
type ScannerOpts struct { // LegacyTransform is used only to read the legacy recordio files. For the V2 // recordio files, this field is ignored, and transformers are constructed // from the header metadata. LegacyTransform TransformFunc // Unmarshal transforms a byte slice into an application object. It is called // for every item read from storage. If nil, a function that returns []byte // unchanged is used. The return value from Unmarshal can be retrieved using // the Scanner.Get method. Unmarshal func(in []byte) (out interface{}, err error) }
ScannerOpts defines options used when creating a new scanner.
type TransformFunc ¶
TransformFunc is called to (un)compress or (un)encrypt data. Parameter scratch is passed as an performance hint. If the result of the transformation fits in scratch, the function should store the result in scratch and return it as the first return value. Else, it should allocate a new []byte and return it.
type TransformerFactory ¶
type TransformerFactory func(config string) (TransformFunc, error)
TransformerFactory is a function that creates a new TransformerFunc given an optional config string.
type Writer ¶
type Writer interface { // Add an arbitrary metadata to the file. This method must be called // before any other Append* or Set* functions. If the key had been already added // to the header, this method will overwrite it with the value. // // REQUIRES: Append, SetTrailer, Finish have not been called. AddHeader(key string, value interface{}) // Write one item. The marshaler will be eventually called to // serialize the item. The type of v must match the input type for // the Marshal function passed when the writer is created. Note that // since marhsalling is performed asynchronously, the object passed // to append should be considered owned by the writer, and must not // be reused by the caller. // // The writer flushes items to the storage in the order of addition. // // REQUIRES: Finish and SetTrailer have not been called. Append(v interface{}) // Schedule to flush the current block. The next item will be written in a new // block. This method just schedules for flush, and returns before the block // is actually written to storage. Call Wait to wait for Flush to finish. Flush() // Block the caller until all the prior Flush calls finish. Wait() // Add an arbitrary data at the end of the file. After this function, no // {Add*,Append*,Set*} functions may be called. // // REQUIRES: AddHeader(KeyTrailer, true) has been called. SetTrailer([]byte) // Err returns any error encountered by the writer. Once Err() becomes // non-nil, it stays so. Err() error // Finish must be called at the end of writing. Finish will internally call // Flush, then returns the value of Err. No method, other than Err, shall be // called in a future. Finish() error }
Writer defines an interface for recordio writer. An implementation must be thread safe.
Legal path expression is defined below. Err can be called at any time, so it is not included in the expression. ? means 0 or 1 call, * means 0 or more calls.
AddHeader* (Append|Flush)* SetTrailer? Finish
type WriterOpts ¶
type WriterOpts struct { // Marshal is called for every item added by Append. It serializes the the // record. If Marshal is nil, it defaults to a function that casts the value // to []byte and returns it. Marshal may be called concurrently. Marshal MarshalFunc // Index is called for every item added, just before it is written to // storage. Index callback may be called concurrently and out of order of // locations. // // After Index is called, the Writer guarantees that it never touches // the value again. The application may recycle the value in a freepool, if it // desires. Index may be nil. Index IndexFunc // Transformer specifies a list of functions to compress, encrypt, or modify // data in any other way, just before a block is written to storage. // // Each entry in Transformer must be of form "name" or "name config.." The // "name" is matched against the registry (see RegisterTransformer). The // "config" part is passed to the transformer factory function. If "name" is // not registered, the writer will fail immediately. // // If Transformers contains multiple strings, Transformers[0] is invoked // first, then its results are passed to Transformers[1], so on. // // If len(Transformers)==0, then an identity transformer is used. It will // return the block as is. // // Recordio package includes the following standard transformers: // // "zstd N" (N is -1 or an integer from 0 to 22): zstd compression level N. // If " N" part is omitted or N=-1, the default compression level is used. // To use zstd, import the 'recordiozstd' package and call // 'recordiozstd.Init()' in an init() function. // // "flate N" (N is -1 or an integer from 0 to 9): flate compression level N. // If " N" part is omitted or N=-1, the default compression level is used. // To use flate, import the 'recordioflate' package and call // 'recordioflate.Init()' in an init() function. Transformers []string // MaxItems is the maximum number of items to pack into a single record. // It defaults to DefaultPackedItems if set to 0. // If MaxItems exceeds MaxPackedItems it will silently set to MaxPackedItems. MaxItems uint32 // MaxFlushParallelism limits the maximum number of block flush operations in // flight before blocking the application. It defaults to // DefaultMaxFlushParallelism. MaxFlushParallelism uint32 }
WriterOpts defines options used when creating a new writer.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package recordioflate provides the "flate" transformer.
|
Package recordioflate provides the "flate" transformer. |
Package recordioiov provides utility functions for dealing with [][]bytes, used by recordio transformers.
|
Package recordioiov provides utility functions for dealing with [][]bytes, used by recordio transformers. |
Package recordiozstd implements zstd compression and decompression.
|
Package recordiozstd implements zstd compression and decompression. |