Documentation ¶
Overview ¶
Package blockfmt implements routines for reading and writing compressed and aligned ion blocks to/from backing storage.
The APIs in this package are designed with object storage in mind as the primary backing store.
The CompressionWriter type can be used to write aligned ion blocks (see ion.Chunker) to backing storage, and the CompressionReader type can provide positional access to compressed blocks within the backing storage.
Index ¶
- Constants
- Variables
- func ETag(ofs UploadFS, up Uploader, fullpath string) (string, error)
- func IsFatal(err error) bool
- func Sign(key *Key, idx *Index) ([]byte, error)
- func Validate(src io.Reader, t *Trailer, diag io.Writer) int
- func WriteDescriptor(buf *ion.Buffer, st *ion.Symtab, desc *Descriptor)
- func WriteDescriptors(buf *ion.Buffer, st *ion.Symtab, contents []Descriptor)
- type Blockdesc
- type BufferUploader
- type Collector
- type CompressionWriter
- func (w *CompressionWriter) Close() error
- func (w *CompressionWriter) Flush() error
- func (f *CompressionWriter) SetMinMax(path []string, min, max ion.Datum)
- func (w *CompressionWriter) SkipChecks()
- func (w *CompressionWriter) Write(p []byte) (n int, err error)
- func (w *CompressionWriter) WrittenBlocks() int
- type Compressor
- type Converter
- type Decoder
- type Descriptor
- type DirFS
- func (d *DirFS) Create(fullpath string) (Uploader, error)
- func (d *DirFS) ETag(fullpath string, info fs.FileInfo) (string, error)
- func (d *DirFS) Mmap(fullpath string) ([]byte, error)
- func (d *DirFS) Prefix() string
- func (d *DirFS) Remove(fullpath string) error
- func (d *DirFS) Unmap(buf []byte) error
- func (d *DirFS) WriteFile(fullpath string, buf []byte) (string, error)
- type FileTree
- type Filter
- func (f *Filter) Compile(e expr.Node)
- func (f *Filter) Intervals(si *SparseIndex) ints.Intervals
- func (f *Filter) MatchesAny(si *SparseIndex) bool
- func (f *Filter) Overlaps(si *SparseIndex, start, end int) bool
- func (f *Filter) Trivial() bool
- func (f *Filter) Visit(si *SparseIndex, interval func(start, end int))
- type Flag
- type Index
- func (idx *Index) Descs(src InputFS, keep *Filter) ([]Descriptor, []ints.Intervals, int64, error)
- func (idx *Index) HasPartition(x string) bool
- func (idx *Index) Objects() int
- func (idx *Index) SyncInputs(dir string, expiry time.Duration) error
- func (idx *Index) TimeRange(path []string) (min, max date.Time, ok bool)
- type IndexConfig
- type IndirectRef
- type IndirectTree
- type Input
- type InputFS
- type Key
- type MultiWriter
- type ObjectInfo
- type Quarantined
- type Range
- type RowFormat
- type S3FS
- type SparseIndex
- func (s *SparseIndex) Append(next *SparseIndex) bool
- func (s *SparseIndex) AppendBlocks(next *SparseIndex, i, j int) bool
- func (s *SparseIndex) Blocks() int
- func (s *SparseIndex) Clone() SparseIndex
- func (s *SparseIndex) Const(x string) (ion.Datum, bool)
- func (s *SparseIndex) Encode(dst *ion.Buffer, st *ion.Symtab)
- func (s *SparseIndex) FieldNames() []string
- func (s *SparseIndex) Fields() int
- func (s *SparseIndex) Get(path []string) *TimeIndex
- func (s *SparseIndex) MinMax(path []string) (min, max date.Time, ok bool)
- func (s *SparseIndex) Push(rng []Range)
- func (s *SparseIndex) Slice(i, j int) SparseIndex
- func (s *SparseIndex) Trim(j int) SparseIndex
- type Template
- type TimeIndex
- func (t *TimeIndex) Append(next *TimeIndex)
- func (t *TimeIndex) Blocks() int
- func (t *TimeIndex) Clone() TimeIndex
- func (t *TimeIndex) Contains(when date.Time) bool
- func (t *TimeIndex) EditLatest(min, max date.Time)
- func (t *TimeIndex) Encode(dst *ion.Buffer, st *ion.Symtab)
- func (t *TimeIndex) End(when date.Time) int
- func (t *TimeIndex) EndIntervals() int
- func (t *TimeIndex) Max() (date.Time, bool)
- func (t *TimeIndex) Min() (date.Time, bool)
- func (t *TimeIndex) Push(start, end date.Time)
- func (t *TimeIndex) PushEmpty(num int)
- func (t *TimeIndex) Reset()
- func (t *TimeIndex) Start(when date.Time) int
- func (t *TimeIndex) StartIntervals() int
- func (t *TimeIndex) String() string
- type TimeRange
- type Trailer
- func (t *Trailer) BlockRange(i int) (start, end int64)
- func (t *Trailer) BlockSize(i int) int64
- func (t *Trailer) Decode(st *ion.Symtab, body []byte) error
- func (t *Trailer) Decompressed() int64
- func (t *Trailer) DecompressedSize(i int) int64
- func (t *Trailer) Encode(dst *ion.Buffer, st *ion.Symtab)
- func (t *Trailer) ReadFrom(src io.ReaderAt, size int64) error
- type TrailerDecoder
- type UploadFS
- type Uploader
- type ZionWriter
Constants ¶
const ( // DefaultMaxReadsInFlight is the default maximum // number of outstanding read operations in Converter.Run DefaultMaxReadsInFlight = 400 // DefaultMaxBytesInFlight is the default maximum // number of bytes in flight in Converter.Run DefaultMaxBytesInFlight = 80 * 1024 * 1024 )
const ( // KeyLength is the length of // the key that needs to be provided // to Sign and DecodeIndex. // (The contents of the key should // be from a cryptographically secure // source of random bytes.) KeyLength = 32 // SignatureLength is the length // of the signature appended // to the index objects. SignatureLength = KeyLength + 2 )
const IndexVersion = 1
IndexVersion is the current version number of the index format.
const Version = "blockfmt/compressed/v2"
Version is the textual version of the objects produces by this package. If we start producing backwards-incompatible objects, this version number ought to be bumped.
Variables ¶
var ( // ErrBadMAC is returned when a signature // for an object does not match the // computed MAC. ErrBadMAC = errors.New("bad index signature") )
var ErrETagChanged = errors.New("FileTree: ETag changed")
ErrETagChanged is returned by FileTree.Append when attempting to perform an insert with a file that has had its ETag change.
var ErrIndexObsolete = errors.New("index version obsolete")
ErrIndexObsolete is returned when OpenIndex detects an index file with a version number lower than IndexVersion.
SuffixToFormat is a list of known filename suffixes that correspond to known constructors for RowFormat objects.
Functions ¶
func ETag ¶
ETag gets the ETag for the provided Uploader. If the Uploader has an ETag() method, that method is used directly; otherwise ofs.ETag is used to determine the ETag.
func IsFatal ¶
IsFatal returns true if the error is an error known to be fatal when returned from blockfmt.Format.Convert. (A fatal error is one that will not disappear on a retry.)
func Sign ¶
Sign encodes an index in a binary format and signs it with the provided HMAC key.
See DecodeIndex for authenticating and decoding a signed index blob.
func Validate ¶
Validate validates blockfmt-formatted object in src. Any errors encountered are written as distinct lines to diag. The caller can presume that no content written to diag means that the src had no errors.
func WriteDescriptor ¶
func WriteDescriptor(buf *ion.Buffer, st *ion.Symtab, desc *Descriptor)
WriteDescriptor writes a single descriptor to buf given the provided symbol table
func WriteDescriptors ¶
func WriteDescriptors(buf *ion.Buffer, st *ion.Symtab, contents []Descriptor)
Types ¶
type Blockdesc ¶
type Blockdesc struct { // Offset is the offset of the *compressed* // output data. Offset int64 // Chunks is the number of chunks // (with decompressed length equal to // 1 << Trailer.BlockShift) within // this block Chunks int }
Blockdesc is a descriptor that is attached to each block within a Trailer.
type BufferUploader ¶
type BufferUploader struct { PartSize int // contains filtered or unexported fields }
BufferUploader is a simple in-memory implementation of Uploader.
func (*BufferUploader) Bytes ¶
func (b *BufferUploader) Bytes() []byte
Bytes returns the final upload result after Close() has been called.
func (*BufferUploader) Close ¶
func (b *BufferUploader) Close(final []byte) error
func (*BufferUploader) MinPartSize ¶
func (b *BufferUploader) MinPartSize() int
MinPartSize implements Uploader.PartSize
func (*BufferUploader) Parts ¶
func (b *BufferUploader) Parts() int
func (*BufferUploader) Size ¶
func (b *BufferUploader) Size() int64
type Collector ¶
type Collector struct { // Pattern is the glob pattern // that input objects should match. Pattern string // Start is a filename below which // all inputs are ignored. (Start can // be used to begin a Collect operation // where a previous one has left off // by using the last returned path as // the Start value for the next collection operation.) Start string // MaxItems, if non-zero, is the maximum number // of items to collect. MaxItems int // MaxSize, if non-zero, is the maximum size // of items to collect. MaxSize int64 // Fallback is the function used to // determine the format of an input file. Fallback func(string) RowFormat }
Collector is a set of configuration parameters for collecting a list of objects.
type CompressionWriter ¶
type CompressionWriter struct { // Output is the destination to which // the compressed data should be uploaded. Output Uploader // Comp is the compression algorithm to use. Comp Compressor // InputAlign is the expected input alignment // of data blocks. CompressionWriter will disallow // calls to Write that do not have length // equal to InputAlign. InputAlign int // TargetSize is the target size of flushes // to Output. If TargetSize is zero, then // the output will be flushed around // Output.MinPartSize TargetSize int // Trailer is the trailer being // built by the compression writer Trailer // MinChunksPerBlock sets the minimum // number of chunks per output block. // Below this threshold, chunks are merged // into adjacent blocks. // See also MultiWriter.MinChunksPerBlock MinChunksPerBlock int // contains filtered or unexported fields }
CompressionWriter is a single-stream io.Writer that accepts blocks from an ion.Chunker and concatenates and compresses them into an output format that allows for seeking through the decompressed blocks without actually performing any decompression in advance.
func (*CompressionWriter) Close ¶
func (w *CompressionWriter) Close() error
Close closes the compression writer and finalizes the output upload.
func (*CompressionWriter) Flush ¶
func (w *CompressionWriter) Flush() error
func (*CompressionWriter) SetMinMax ¶
SetMinMax Sets the `min` and `max` values for the next ION chunk. This method should only be called once for each path.
func (*CompressionWriter) SkipChecks ¶
func (w *CompressionWriter) SkipChecks()
SkipChecks disable some runtime checks of the input data, which is ordinarily expected to be ion data. Do not use this except for testing.
func (*CompressionWriter) Write ¶
func (w *CompressionWriter) Write(p []byte) (n int, err error)
Write implements io.Writer. Each call to Write must be of w.InputAlign bytes.
func (*CompressionWriter) WrittenBlocks ¶
func (w *CompressionWriter) WrittenBlocks() int
WrittenBlocks returns the number of blocks written to the CompressionWriter (i.e. the number of calls to w.Write)
type Compressor ¶
func CompressorByName ¶
func CompressorByName(algo string) Compressor
CompressorByName produces the Compressor associated with the provided algorithm name, or nil if no such algorithm is known to the library.
Valid values include:
"zstd" "zion" "zion+zstd" (equivalent to "zion") "zion+iguana_v0"
type Converter ¶
type Converter struct { // Prepend, if R is not nil, // is a blockfmt-formatted stream // of data to prepend to the output stream. Prepend struct { // R should read data from Trailer // starting at offset Trailer.Blocks[0].Offset. // Converter will read bytes up to Trailer.Offset. R io.ReadCloser Trailer *Trailer } // Constants is the list of templated constants // to be inserted into the ingested data. Constants []ion.Field // Inputs is the list of input // streams that need to be converted // into the output format. Inputs []Input // Output is the Uploader to which // data will be written. The Uploader // will be wrapped in a CompressionWriter // or MultiWriter depending on the number // of input streams and the parallelism setting. Output Uploader // Comp is the name of the compression // algorithm used for uploaded data blocks. Comp string // Align is the pre-compression alignment // of chunks written to the uploader. Align int // FlushMeta is the maximum interval // at which metadata is flushed. // Note that metadata may be flushed // below this interval if there is not // enough input data to make the intervals this wide. FlushMeta int // TargetSize is the target size of // chunks written to the Uploader. TargetSize int // Parallel is the maximum parallelism of // uploads. If Parallel is <= 0, then // GOMAXPROCS is used instead. Parallel int // MinInputBytesPerCPU is used to determine // the level of parallelism used for converting data. // If this setting is non-zero, then the converter // will try to ensure that there are at least this // many bytes of input data for each independent // parallel stream used for conversion. // // Picking a larger setting for MinInputBytesPerCPU // will generally increase the effiency of the // conversion (in bytes converted per CPU-second) // and also the compactness of the output data. MinInputBytesPerCPU int64 // MaxReadsInFlight is the maximum number of // prefetched reads in flight. If this is less // than or equal to zero, then DefaultMaxReadsInFlight is used. MaxReadsInFlight int // DisablePrefetch, if true, disables // prefetching of inputs. DisablePrefetch bool // contains filtered or unexported fields }
Converter performs single- or multi-stream conversion of a list of inputs in parallel.
func (*Converter) MultiStream ¶
MultiStream returns whether the configuration of Converter would lead to a multi-stream upload.
func (*Converter) Run ¶
Run runs the conversion operation and returns the first error it ecounters. Additionally, it will populate c.Inputs[*].Err with any errors associated with the inputs. Note that Run stops at the first encountered error, so if one of the Inputs has Err set, then subsequent items in Inputs may not have been processed at all.
type Decoder ¶
type Decoder struct { // BlockShift is the log2 of the block size. // BlockShift is set automatically by Decoder.Set. BlockShift int // Algo is the algorithm to use for decompressing // the input data blocks. // Algo is set automatically by Decoder.Set. Algo string // Fields is the dereference push-down hint // for the fields that should be decompressed // from the input. Note that the zero value (nil) // means all fields, but a zero-length slice explicitly // means zero fields (i.e. decode empty structures). Fields []string // Malloc should return a slice with the given size. // If Malloc is nil, then make([]byte, size) is used. // If Malloc is non-nil, then Free should be set. Malloc func(size int) []byte // If Malloc is set, then Free should be // set to a corresponding function to release // data allocated via Malloc. Free func([]byte) // contains filtered or unexported fields }
Decoder is used to decode blocks from a Trailer and an associated data stream.
func (*Decoder) Copy ¶
Copy incrementally decompresses data from src and writes it to dst. It returns the number of bytes written to dst and the first error encountered, if any.
Copy always calls dst.Write with memory allocated via d.Malloc, so dst may be an io.Writer returned via a vm.QuerySink provided that d.Malloc is set to vm.Malloc.
func (*Decoder) CopyBytes ¶
CopyBytes incrementally decompresses data from src and writes it to dst. It returns the number of bytes written to dst and the first error encountered, if any. If dst implements ZionWriter and d.Algo is "zion" then compressed data may be passed directly to dst (see ZionWriter for more details).
func (*Decoder) Decompress ¶
Decompress decodes d.Trailer and puts its contents into dst. len(dst) must be equal to d.Trailer.Decompressed(). src must read the data that is referenced by the d.Trailer.
type Descriptor ¶
type Descriptor struct { // ObjectInfo describes *this* // object's full path, ETag, and format. ObjectInfo // Trailer is the trailer that is part // of the object. Trailer Trailer }
Descriptor describes a single object within an Index.
func ReadDescriptor ¶
func ReadDescriptor(d ion.Datum) (*Descriptor, error)
ReadDescriptor reads a descriptor from an ion datum.
func ReadDescriptors ¶
func ReadDescriptors(d ion.Datum) ([]Descriptor, error)
ReadDescriptors reads a list of descriptors from an ion datum.
func (*Descriptor) Decode ¶
func (d *Descriptor) Decode(td *TrailerDecoder, v ion.Datum, opts Flag) error
type DirFS ¶
DirFS is an InputFS and UploadFS that is rooted in a particular directory.
func (*DirFS) Mmap ¶
Mmap maps the file given by [fullpath]. The caller must ensure that the returned slice, if non-nil, is unmapped with [Unmap]. NOTE: Mmap is not supported on all platforms. The caller should be prepared to handle the error and fall back to ordinary [Open] and [Read] calls.
type FileTree ¶
type FileTree struct { // Backing is the backing store for the tree. // Backing must be set to perform tree operations. Backing UploadFS // contains filtered or unexported fields }
FileTree is a B+-tree that holds a list of (path, etag, id) triples in sorted order by path.
func (*FileTree) Append ¶
Append assigns an ID to a path and etag. Append returns (true, nil) if the (path, etag) tuple is inserted, or (false, nil) if the (path, etag) tuple has already been inserted. A tuple may be inserted under the following conditions:
- The path has never been appended before.
- The (path, etag) pair has been inserted with an id >= 0, and it is being marked as failed by being re-inserted with an id < 0.
- The same path but a different etag has been marked as failed (with id < 0), and Append is overwriting the previous entry with a new etag and id >= 0.
Otherwise, if there exists a (path, etag) tuple with a matching path but non-matching etag, then (false, ErrETagChanged) is returned.
func (*FileTree) EachFile ¶
EachFile iterates the backing of f and calls fn for each file that is pointed to by the FileTree (i.e. files that contain either inner nodes or leaf nodes that still constitute part of the tree state).
func (*FileTree) Prefetch ¶
Prefetch takes a list of inputs and prefetches inner nodes or leaves that are likely to be associated with an insert operation on f. Currently, Prefetch only fetches "down" one level of the tree structure.
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter represents a compiled condition that can be evaluated against a SparseIndex to produce intervals that match the compiled condition.
func (*Filter) Compile ¶
Compile sets the expression that the filter should evaluate. A call to Compile erases any previously-compiled expression.
func (*Filter) Intervals ¶
func (f *Filter) Intervals(si *SparseIndex) ints.Intervals
Intervals returns the intervals within [si] that match the filter.
func (*Filter) MatchesAny ¶
func (f *Filter) MatchesAny(si *SparseIndex) bool
MatchesAny returns true if f matches any non-empty intervals in si, or false otherwise.
func (*Filter) Overlaps ¶
func (f *Filter) Overlaps(si *SparseIndex, start, end int) bool
Overlaps returns whether or not the sparse index matches the predicate within the half-open interval [start, end)
The behavior of Overlaps when start >= end is unspecified.
func (*Filter) Trivial ¶
Trivial returns true if the compiled filter condition will never select non-trivial subranges of the input slice in Visit. (In other words, a trivial filter will always visit all the blocks in a sparse index.)
func (*Filter) Visit ¶
func (f *Filter) Visit(si *SparseIndex, interval func(start, end int))
Visit visits distinct (non-overlapping) intervals within si that correspond to the compiled filter.
When no range within si matches the filter, interval will be called once with (0, 0).
type Index ¶
type Index struct { // Name is the name of the index. Name string // Created is the time the index // was populated. Created date.Time // UserData is an arbitrary datum that can be // stored with the index and used externally. UserData ion.Datum // Algo is the compression algorithm used to // compress the index contents. Algo string // Inline is a list of object descriptors // that are inlined into the index object. // // Typically, Inline contains the objects // that have been ingested most recently // (or are otherwise known to be more likely // to be referenced). Inline []Descriptor // Indirect is the tree that contains // all the object descriptors that aren't // part of Inline. Indirect IndirectTree // Inputs is the collection of // objects that comprise Inline and Indirect. Inputs FileTree // ToDelete is a list of items // that are no longer referenced // by the Index except to indicate // that they should be deleted after // they have been unreferenced for // some period of time. ToDelete []Quarantined // LastScan is the time at which // the last scan operation completed. // This may be the zero time if no // scan has ever been performed. LastScan date.Time // Cursors is the list of scanning cursors. // These may not be present if no scan // has ever been performed. Cursors []string // Scanning indicates that scanning has // not yet completed. Scanning bool }
Index is a collection of formatted objects with a name.
Index objects are stored as MAC'd blobs in order to make it possible to detect tampering of the Contents of the index. (The modtime and ETag of the Contents are part of the signed payload, so we can refuse to operate on those objects if they fail to match the expected modtime and ETag.)
func DecodeIndex ¶
DecodeIndex decodes a signed index (see Sign) and returns the Index, or an error if the index was malformed or the signature doesn't match.
If FlagSkipInputs is passed in opts, this avoids decoding Index.Inputs.
NOTE: the returned Index may contain fields that alias the input slice.
func (*Index) Descs ¶
Descs collects the list of objects from an index and returns them as a list of descriptors against which queries can be run along with the number of (decompressed) bytes that comprise the returned objects.
If [keep] is non-nil, then blocks which are known to not contain any rows matching [keep] will be excluded from the returned slices.
Note that the returned slices may be empty if the index has no contents.
func (*Index) HasPartition ¶
HasPartition returns true if the index can partition descriptors on the top-level field x or false otherwise.
func (*Index) Objects ¶
Objects returns the number of packed objects that are pointed to by this Index.
func (*Index) SyncInputs ¶
SyncInputs syncs idx.Inputs to a directory within idx.Inputs.Backing, and queues old input files in idx.ToDelete with the provided expiry relative to the current time. Callers are required to call SyncInputs after updating idx.Inputs.
type IndexConfig ¶
type IndexConfig struct { // MaxInlined is the maximum number of bytes // to ingest in a single SyncOutputs operation // (not including merging). If MaxInlined is // less than or equal to zero, it is ignored // and no limit is applied. MaxInlined int64 // TargetSize is the target size of packfiles // when compacting. TargetSize int64 // TargetRefSize is the target size of stored // indirect references. If this is less than // or equal to zero, a default value is used. TargetRefSize int64 // Expiry is the minimum time that a // quarantined file should be left around // after it has been dereferenced. Expiry time.Duration }
A IndexConfig is a set of configurations for synchronizing an Index.
func (*IndexConfig) Compact ¶
func (c *IndexConfig) Compact(fs UploadFS, lst []Descriptor) ([]Descriptor, []Quarantined, error)
Compact compacts a list of descriptors and returns a new (hopefully shorter) list of descriptors containing the same data along with the list of quarantined descriptor paths that should be deleted.
func (*IndexConfig) SyncOutputs ¶
func (c *IndexConfig) SyncOutputs(idx *Index, ofs UploadFS, dir string) error
SyncOutputs synchronizes idx.Indirect to a directory with the provided UploadFS. SyncOutputs uses c.MaxInlined to determine which (if any) of the leading entries in idx.Inlined should be moved into the indirect tree by trimming leading entries until the decompressed size of the data referenced by idx.Inline is less than or equal to b.MaxInlined.
type IndirectRef ¶
type IndirectRef struct { ObjectInfo // Objects is the number of // object references inside // the packed file pointed to by Path. Objects int // OrigObjects is the number of objects // that were compacted to produce the // packfiles pointed to by Path. OrigObjects int // contains filtered or unexported fields }
IndirectRef references an object that contains a list of descriptors.
type IndirectTree ¶
type IndirectTree struct { // Refs is the list of objects containing // lists of descriptors, from oldest to newest. Refs []IndirectRef // Sparse describes the intervals within refs // that correspond to particular time ranges. Sparse SparseIndex }
IndirectTree is an ordered list of IndirectRefs.
See IndirectTree.Append for adding descriptors.
func (*IndirectTree) OrigObjects ¶
func (i *IndirectTree) OrigObjects() int
OrigObjects returns the total number of objects that have been flushed to the indirect tree.
func (*IndirectTree) Purge ¶
func (i *IndirectTree) Purge(ifs InputFS, keep *Filter, expiry time.Duration) ([]Quarantined, error)
Purge purges entries from the tree that do not satisfy the filter condition specified in [keep]. Entries to be deleted are returned as quarantined path entries.
Purge may elect to purge less than the maximum amount of data possible to purge if it would be prohibitively expensive to produce the quarantine list.
func (*IndirectTree) Search ¶
func (i *IndirectTree) Search(ifs InputFS, filt *Filter) ([]Descriptor, error)
Search traverses the IndirectTree through the backing store (ifs) to produce the list of blobs that match the given predicate.
type Input ¶
type Input struct {
// Path and ETag are used to
// populate the ObjectInfo
// in an Index built from a Converter.
Path, ETag string
// Size is the size of the input, in bytes
Size int64
// R is the source of unformatted data
R io.ReadCloser
// F is the formatter that produces output blocks
F RowFormat
// Err is an error specific
// to this input that is populated
// by Converter.Run.
Err error
}
Input is a combination of an input stream and a row-formatting function. Together they produce output blocks.
func CollectGlob ¶
CollectGlob turns a glob pattern into a list of Inputs, using fallback as the constructor for the RowFormat of each input object when the object suffix does not match any of the known format suffixes. If any of the files that match the glob pattern do not have known file suffixes and fallback does not return a non-nil RowFormat for those files, then CollectGlob will return an error indicating that the format for the file could not be determined.
type InputFS ¶
type InputFS interface { fs.FS fsutil.ETagFS // Prefix should return a string // that is prepended to filesystem // paths to indicate the filesystem "origin." // // For example, an S3 bucket FS would have // s3://bucket/ // as its prefix. Prefix() string }
InputFS describes the FS implementation that is required for reading inputs.
type MultiWriter ¶
type MultiWriter struct { // Output is the Uploader used to // upload parts to backing storage. Output Uploader // Algo is the compression algorithm // used to compress blocks. Algo string // InputAlign is the expected size // of input blocks that are provided // to io.Write in each stream. InputAlign int // TargetSize is the target size of // each output part written to backing // storage. TargetSize int // MinChunksPerBlock is the desired // number of chunks per block. // If it is set, then metadata blocks // are coalesced so that they are at // least this size (unless the total // number of chunks is less than // MinChunksPerBlock). MinChunksPerBlock int // Trailer is the trailer that // is appended to the output stream. // The fields in Trailer are only // valid once MultiWriter.Close has // been called. Trailer // contains filtered or unexported fields }
MultiWriter is a multi-stream writer that turns multiple streams of input blocks into a single output stream of compressed blocks.
MultiWriter tries to keep blocks written by each stream close together in the output so that sparse indexes that are built on each of the streams end up pointing to contiguous regions of output.
func (*MultiWriter) Close ¶
func (m *MultiWriter) Close() error
Close closes the MultiWriter. Close is only safe to call once Close has been called on each outstanding output stream created by calling Open.
Once Close returns, the Trailer field will be populated with the trailer that was assembled from all the constituent spans of input data.
func (*MultiWriter) Open ¶
func (m *MultiWriter) Open() (io.WriteCloser, error)
Open opens a stream for writing output. Each call to io.Write on the provided io.WriteCloser must be m.InputAlign bytes. The stream must be closed before the MultiWriter can be closed. Blocks written to a single stream are coalesced into large, contiguous "spans" of blocks in order to reduce index fragmentation caused by writing multiple streams of output parts simultaneously.
func (*MultiWriter) SkipChecks ¶
func (m *MultiWriter) SkipChecks()
SkipChecks disable some runtime checks of the input data, which is ordinarily expected to be ion data. Do not use this except for testing.
type ObjectInfo ¶
type ObjectInfo struct {
// Path is the path to the
// object. See fs.ValidPath
// for a description of what constitutes
// a valid path.
//
// ETag is the ETag of the object.
// The ETag is opaque to the blockfmt
// implementation.
Path, ETag string
// LastModified is the mtime of
// the object. We use both the ETag
// and the mtime to determine whether
// an object has been modified since
// we last looked at it.
LastModified date.Time
// Format specifies the format
// of the object. For output
// objects, the format indicates
// the blockfmt version used to
// write the ion object. For input
// objects, the format describes the
// conversion algorithm suffix used
// to convert the object (see SuffixToFormat).
Format string
// Size, if non-zero, is the size of
// the object. (Output objects are never 0 bytes.)
Size int64
}
ObjectInfo is a collection of information about an object.
type Quarantined ¶
Quarantined is an item that is queued for GC but has not yet been deleted.
type Range ¶
Range describes the (closed) interval that the value of a particular path expression could occupy
type RowFormat ¶
type RowFormat interface { // Convert should read data from r and write // rows into dst. For each row written to dst, // the provided list of constants should also be inserted. Convert(r io.Reader, dst *ion.Chunker, constants []ion.Field) error // Name is the name of the format // that will be included in an index description. Name() string }
RowFormat is the interface through which input streams are converted into aligned output blocks.
func MustSuffixToFormat ¶
func UnsafeION ¶
func UnsafeION() RowFormat
UnsafeION converts raw ion by decoding and re-encoding it.
NOTE: UnsafeION is called UnsafeION because the ion package has not been hardened against arbitrary user input. FIXME: harden the ion package against malicious input and then rename this to something else.
type S3FS ¶
S3FS implements UploadFS and InputFS.
type SparseIndex ¶
type SparseIndex struct {
// contains filtered or unexported fields
}
func (*SparseIndex) Append ¶
func (s *SparseIndex) Append(next *SparseIndex) bool
Append tries to append next to s and returns true if the append operation was successful, or false otherwise. (Append will fail if the set of indices tracked in each SparseIndex is not the same.) The block positions in next are assumed to start at s.Blocks().
func (*SparseIndex) AppendBlocks ¶
func (s *SparseIndex) AppendBlocks(next *SparseIndex, i, j int) bool
AppendBlocks is like Append, but only appends blocks from next from block i up to block j.
This will panic if i > j or j > next.Blocks().
func (*SparseIndex) Blocks ¶
func (s *SparseIndex) Blocks() int
func (*SparseIndex) Clone ¶
func (s *SparseIndex) Clone() SparseIndex
Clone produces a deep copy of s.
func (*SparseIndex) Const ¶
func (s *SparseIndex) Const(x string) (ion.Datum, bool)
Const extracts the datum associated with the constant x from the sparse index, or returns (ion.Empty, false) if no such datum exists.
func (*SparseIndex) FieldNames ¶
func (s *SparseIndex) FieldNames() []string
FieldNames returns the list of field names using '.' as a separator between the path components. NOTE: FieldNames does not escape the '.' character inside field names themselves, so the textual result of each field name may be ambiguous.
func (*SparseIndex) Fields ¶
func (s *SparseIndex) Fields() int
Fields returns the number of individually indexed fields.
func (*SparseIndex) Get ¶
func (s *SparseIndex) Get(path []string) *TimeIndex
Get gets a TimeIndex associated with a path. The returned TimeIndex may be nil if no such index exists.
func (*SparseIndex) MinMax ¶
func (s *SparseIndex) MinMax(path []string) (min, max date.Time, ok bool)
func (*SparseIndex) Push ¶
func (s *SparseIndex) Push(rng []Range)
func (*SparseIndex) Slice ¶
func (s *SparseIndex) Slice(i, j int) SparseIndex
Slice produces a sparse index for just the blocks in the half-open interval [i:j]. Slice will panic if i is greater than j, i is less than zero, or j is greater than the number of blocks in the index.
func (*SparseIndex) Trim ¶
func (s *SparseIndex) Trim(j int) SparseIndex
Trim produces a copy of s that only includes information up to block j. Trim will panic if j is greater than s.Blocks().
Trim is equivalent to s.Slice(0, j)
type Template ¶
type Template struct { Field string // Field is the name of the field to be generated. // Eval should generate an ion datum // from the input object. Eval func(in *Input) (ion.Datum, error) }
Template is a templated constant field.
type TimeIndex ¶
type TimeIndex struct {
// contains filtered or unexported fields
}
TimeIndex maintains a lossy mapping of time ranges to "blocks", where the time range -> block mapping is preserved precisely if the ranges are monotonic with respect to the block number. TimeIndex does not care about what constitutes a "block"; it merely maintains a linear mapping from timestamps to integers.
TimeIndex can answer leftmost- and rightmost-bound queries for timestamp values with respect to the range of values inserted via Push.
Because TimeIndex stores a monotonic list of time ranges and blocks, its serialized encoding is space-efficient, as the timestamps and block numbers can be delta-encoded.
See TimeIndex.Push, TimeIndex.Start, and TimeIndex.End
func (*TimeIndex) Append ¶
Append concatenates t and next so that the ranges indexed by next occur immediately after the ranges indexed by t.
func (*TimeIndex) Contains ¶
Contains returns true if the value 'when' could appear within this index, or false otherwise. Note that Contains is sensitive to holes in the index.
func (*TimeIndex) EditLatest ¶
EditLatest extends the range associated with the most recent call to Push. (EditLatest has no effect if (min, max) are no less/greater than the previous (min/max) pair.)
func (*TimeIndex) End ¶
End produces the highest offset (exclusive) at which the time 'when' could occur in the input block list. In other words, for a return value N, blocks [0, N) could contain the value "when".
func (*TimeIndex) EndIntervals ¶
EndIntervals returns the number of distinct values that t.End could return.
func (*TimeIndex) Push ¶
Push pushes one new block to the index with the associated start and end times.
If the time range specified in Push overlaps with block ranges that are already part of the index, those ranges will be coalesced into the union of the two ranges. In other words, overlapping ranges (or non-monotonic inserts more generally) will cause the precision of the TimeIndex mapping to relax until it can guarantee that it can maintain a monotonic time-to-block mapping. In the most degenerate case, the TimeIndex will simply map the minimum seen time to block 0 and maximum seen time to block N.
func (*TimeIndex) PushEmpty ¶
PushEmpty pushes num empty blocks to the index. The index should have more than zero entries already present (i.e. Push should have been called at least once).
func (*TimeIndex) Start ¶
Start produces the lowest offset (inclusive) at which the time 'when' could occur in the input block list.
func (*TimeIndex) StartIntervals ¶
StartIntervals returns the number of distinct values that t.Start could return.
type Trailer ¶
type Trailer struct { // Version is an indicator // of the encoded trailer format version Version int // Offset is the offset of the trailer // within the output stream. Offset int64 // Algo is the name of the compression // algorithm used to compress blocks Algo string // BlockShift is the alignment of each block // when it is fully decompressed (in bits) // // For example, BlockShift of 20 means that // blocks are 1MB (1 << 20) bytes each. BlockShift int // Blocks is the list of descriptors // for each block. Blocks []Blockdesc // Sparse contains a lossy secondary index // of timestamp ranges and constant fields // within Blocks. Sparse SparseIndex }
Trailer is a collection of block descriptions.
func ReadTrailer ¶
ReadTrailer reads a trailer from an io.ReaderAt that has a backing size of 'size'.
func (*Trailer) BlockRange ¶
BlockRange returns the start and end offsets of block [i] within the object.
func (*Trailer) Decompressed ¶
Decompressed returns the decompressed size of all of the data within the trailer blocks.
func (*Trailer) DecompressedSize ¶
DecompressedSize returns the decompressed size of block [i] within the object.
type TrailerDecoder ¶
type TrailerDecoder struct {
// contains filtered or unexported fields
}
A TrailerDecoder can be used to decode multiple trailers containing related information in a more memory-efficient way than decoding the trailers individually.
type UploadFS ¶
type UploadFS interface { InputFS // WriteFile should create the // file at path with the given contents. // If the file already exists, it should // be overwritten atomically. // WriteFile should return the ETag associated // with the written file along with the first encountered error. WriteFile(path string, buf []byte) (etag string, err error) // Create should create an Uploader // for the given path. The file should // not be visible at the provided path // until the Uploader has been closed // successfully. Create(path string) (Uploader, error) }
UploadFS describes the FS implementation that is required for writing outputs.
type Uploader ¶
type Uploader interface { // MinPartSize is the minimum supported // part size for the Uploader. MinPartSize() int // Upload should upload contents // as the given part number. // Part numbers may be sparse, but // they will always be positive and non-zero. // Upload is not required to handle // len(contents) < MinPartSize(). Upload(part int64, contents []byte) error // Close should append final to the // object contents and then finalize // the object. Close must handle // len(final) < MinPartSize(). Close(final []byte) error // Size should return the final size // of the uploaded object. It is only // required to return a valid value // after Close has been called. Size() int64 }
Uploader describes what we expect an object store upload API to look like.
(Take a look at aws/s3.Uploader.)
type ZionWriter ¶
type ZionWriter interface { // ConfigureZion is called with the set of // top-level path components that the caller // expects the callee to handle. ConfigureZion // should return true if the callee can handle // extracting the provided field list directly // from encoded zion data, or false if it cannot. ConfigureZion(blocksize int64, fields []string) bool }
ZionWriter is an optional interface implemented by an io.Writer passed to Decoder.CopyBytes or Decoder.Copy. An io.Writer that implements ZionWriter may receive raw zion-encoded data rather than decompressed ion data.