Documentation ¶
Index ¶
- Constants
- Variables
- func DumpBytes(description string, rs io.ReadSeeker, position int64, count int, ...)
- func WriteTestMultiseriesStream() (raw []byte, footers []*SeriesFooter1, sb *StreamBuilder)
- type FooterType
- type GobSingleObjectDecoderDatasource
- type GobSingleObjectEncoderDatasource
- type Index
- type Iterator
- type MilestoneType
- type ScopeType
- type SeriesDataDatasourceReader
- type SeriesDataDatasourceReaderWrapper
- type SeriesDataDatasourceWriter
- type SeriesDataDatasourceWriterWrapper
- type SeriesFooter
- type SeriesFooter1
- func (sf *SeriesFooter1) BytesLength() uint64
- func (sf *SeriesFooter1) CreatedTime() time.Time
- func (sf *SeriesFooter1) DataFnv1aChecksum() uint32
- func (sf *SeriesFooter1) HeadRecordTime() time.Time
- func (sf *SeriesFooter1) RecordCount() uint64
- func (sf *SeriesFooter1) SetBytesLength(bytesLength uint64)
- func (sf *SeriesFooter1) SourceSha1() []byte
- func (sf *SeriesFooter1) String() string
- func (sf *SeriesFooter1) TailRecordTime() time.Time
- func (sf *SeriesFooter1) TouchUpdatedTime()
- func (sf *SeriesFooter1) UpdatedTime() time.Time
- func (sf *SeriesFooter1) Uuid() string
- func (sf *SeriesFooter1) Version() SeriesFooterVersion
- type SeriesFooterVersion
- type StreamBuilder
- func (sb *StreamBuilder) AddSeries(seriesDataWriter interface{}, sf SeriesFooter) (err error)
- func (sb *StreamBuilder) AddSeriesNoWrite(footerDataPosition int64, totalSeriesSize int, sf SeriesFooter) (err error)
- func (sb *StreamBuilder) Finish() (totalSize int, err error)
- func (sb *StreamBuilder) NextOffset() int64
- func (sb *StreamBuilder) SetStructureLogging(flag bool)
- func (sb *StreamBuilder) StreamWriter() *StreamWriter
- func (sb *StreamBuilder) Structure() *StreamStructure
- type StreamFooter
- type StreamFooter1
- type StreamFooterVersion
- type StreamIndexedSequenceInfo
- type StreamIndexedSequenceInfo1
- func (sisi StreamIndexedSequenceInfo1) AbsolutePosition() int64
- func (sisi StreamIndexedSequenceInfo1) HeadRecordTime() time.Time
- func (sisi StreamIndexedSequenceInfo1) String() string
- func (sisi StreamIndexedSequenceInfo1) TailRecordTime() time.Time
- func (sisi StreamIndexedSequenceInfo1) Uuid() string
- type StreamReader
- func (sr *StreamReader) ReadSeriesInfoWithBoundaryPosition(position int64) (seriesFooter SeriesFooter, dataOffset int64, seriesSize int, err error)
- func (sr *StreamReader) ReadSeriesInfoWithIndexedInfo(sisi StreamIndexedSequenceInfo) (seriesFooter SeriesFooter, dataOffset int64, seriesSize int, err error)
- func (sr *StreamReader) ReadSeriesWithIndexedInfo(sisi StreamIndexedSequenceInfo, seriesDataReader interface{}) (seriesFooter SeriesFooter, seriesSize int, checksumOk bool, err error)
- func (sr *StreamReader) Reset() (err error)
- func (sr *StreamReader) SetStructureLogging(flag bool)
- func (sr *StreamReader) Structure() *StreamStructure
- type StreamStructure
- func (ss *StreamStructure) AllSeriesMilestones() (milestoneIndex map[string][]StreamStructureOffsetInfo)
- func (ss *StreamStructure) Dump()
- func (ss *StreamStructure) Milestones() []StreamStructureOffsetInfo
- func (ss *StreamStructure) MilestonesWithFilter(milestoneType string, scopeType int) []StreamStructureOffsetInfo
- func (ss *StreamStructure) Push(offset int64, milestoneType MilestoneType, scopeType ScopeType, ...)
- func (ss *StreamStructure) SeriesMilestones(uuid string) []StreamStructureOffsetInfo
- func (ss *StreamStructure) StreamMilestones() []StreamStructureOffsetInfo
- func (ss *StreamStructure) String() string
- type StreamStructureOffsetInfo
- type StreamWriter
- type Truncater
- type UpdateStats
- type Updater
Examples ¶
Constants ¶
const ( // SeriesDataCopyBufferSize is the size of the buffer to use for the copy of // the time-series data into the output stream. SeriesDataCopyBufferSize = 1024 * 1024 )
const ( // // version + type + length + boundary marker // ShadowFooterSize = 2 + 1 + 2 + 1 )
Variables ¶
var ( // TestTimeSeriesData is test data. TestTimeSeriesData = []byte("some time series data") // TestTimeSeriesData2 is test data. TestTimeSeriesData2 = []byte("X some time series data 2 X") )
Functions ¶
func WriteTestMultiseriesStream ¶
func WriteTestMultiseriesStream() (raw []byte, footers []*SeriesFooter1, sb *StreamBuilder)
WriteTestMultiseriesStream creates a stream with multiple test-series and validates that it looks okay before returning.
Types ¶
type FooterType ¶
type FooterType byte
FooterType is an enum that represents all footer types.
const ( FooterType = 1 FtStreamFooter FooterType = 2 )FtSeriesFooter
type GobSingleObjectDecoderDatasource ¶
type GobSingleObjectDecoderDatasource struct {
// contains filtered or unexported fields
}
GobSingleObjectDecoderDatasource wraps a `gob.Decoder` as a `SeriesDataDatasourceReader`.
func NewGobSingleObjectDecoderDatasource ¶
func NewGobSingleObjectDecoderDatasource(outputValue interface{}) *GobSingleObjectDecoderDatasource
NewGobSingleObjectDecoderDatasource returns a new `GobSingleObjectDecoderDatasource` struct.
func (*GobSingleObjectDecoderDatasource) ReadData ¶
func (gdd *GobSingleObjectDecoderDatasource) ReadData(r io.Reader, sf SeriesFooter) (n int, err error)
ReadData is called when series data needs to be read and decodes the raw series-data into the struct that we were initialized with.
type GobSingleObjectEncoderDatasource ¶
type GobSingleObjectEncoderDatasource struct {
// contains filtered or unexported fields
}
GobSingleObjectEncoderDatasource wraps a `gob.Encoder` as a `SeriesDataDatasourceWriter`.
func NewGobSingleObjectEncoderDatasource ¶
func NewGobSingleObjectEncoderDatasource(inputValue interface{}) *GobSingleObjectEncoderDatasource
NewGobSingleObjectEncoderDatasource returns a new GobSingleObjectEncoderDatasource struct.
func (GobSingleObjectEncoderDatasource) WriteData ¶
func (ged GobSingleObjectEncoderDatasource) WriteData(w io.Writer, sf SeriesFooter) (n int, err error)
WriteData is called when series data needs to be written and encodes the struct that we were initialized with into the writer we are given.
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index allows you to efficiently identify a recorded series based on criteria that can be found in the info in the stream footer.
func NewIndex ¶
func NewIndex(rs io.ReadSeeker) (index *Index, err error)
NewIndex returns a new `Index` struct.
func (*Index) GetWithTimestamp ¶
func (index *Index) GetWithTimestamp(timestamp time.Time) (matched []StreamIndexedSequenceInfo, err error)
GetWithTimestamp returns all series that contain the given timestamp.
Example ¶
b := rifs.NewSeekableBuffer() // Stage stream. sb := NewStreamBuilder(b) series := AddTestSeries(sb) _, err := sb.Finish() log.PanicIf(err) raw := b.Bytes() for i, series := range series { fmt.Printf("Test series (%d): %s [%v, %v]\n", i, series.Uuid(), series.HeadRecordTime(), series.TailRecordTime()) } fmt.Printf("\n") // Parse stream. r := bytes.NewReader(raw) index, err := NewIndex(r) log.PanicIf(err) queryTimestamp := time.Date(2016, 10, 1, 12, 34, 57, 0, time.UTC) fmt.Printf("Query: %v\n", queryTimestamp) fmt.Printf("\n") matched, err := index.GetWithTimestamp(queryTimestamp) log.PanicIf(err) for _, matchedSeries := range matched { fmt.Printf("MATCHED: %s\n", matchedSeries.Uuid()) }
Output: Test series (0): d095abf5-126e-48a7-8974-885de92bd964 [2016-10-01 12:34:56 +0000 UTC, 2016-10-01 12:35:16 +0000 UTC] Test series (1): 8a4ba0c4-0a0d-442f-8256-1d61adb16abc [2016-10-01 12:35:06 +0000 UTC, 2016-10-01 12:35:26 +0000 UTC] Query: 2016-10-01 12:34:57 +0000 UTC MATCHED: d095abf5-126e-48a7-8974-885de92bd964
type Iterator ¶
type Iterator struct {
// contains filtered or unexported fields
}
Iterator efficiently steps backwards through the series in a stream in order.
func NewIterator ¶
func NewIterator(sr *StreamReader) (it *Iterator, err error)
NewIterator returns an `Iterator` struct.
func (*Iterator) Current ¶
Current returns the number of the series that we're currently on. This decrements after each call and returns less than zero on EOF.
func (*Iterator) Iterate ¶
func (it *Iterator) Iterate(dataWriter io.Writer) (seriesFooter SeriesFooter, checksumOk bool, err error)
Iterate reads the next series in the stream, from the back of the stream to the front.
Example ¶
ExampleIterator_Iterate shows how to parse and step through stream data. Remember that we'll start from the end and step backwards.
See ExampleStreamReader_ReadSeriesWithIndexedInfo for an example of how to perform random or ordered reads of series within a stream (instead of having to step backward through all of them, in order).
b := rifs.NewSeekableBuffer() // Stage stream. sb := NewStreamBuilder(b) series := AddTestSeries(sb) for i, seriesFooter := range series { fmt.Printf("Test series (%d): [%s]\n", i, seriesFooter.Uuid()) } fmt.Printf("\n") _, err := sb.Finish() log.PanicIf(err) raw := b.Bytes() // Open the stream. r := bytes.NewReader(raw) sr := NewStreamReader(r) sr.SetStructureLogging(true) it, err := NewIterator(sr) log.PanicIf(err) // Very cheap calls. Keep in mind that we will actually iterate through // these in reverse order, below. fmt.Printf("Number of series recorded in stream footer: (%d)\n", it.Count()) sisi := it.SeriesInfo(0) fmt.Printf("Indexed series 0: %s\n", sisi.Uuid()) sisi = it.SeriesInfo(1) fmt.Printf("Indexed series 1: %s\n", sisi.Uuid()) fmt.Printf("\n") // Read first encountered series. seriesNumber := it.Current() seriesData := new(bytes.Buffer) seriesFooter, checksumOk, err := it.Iterate(seriesData) log.PanicIf(err) if checksumOk != true { log.Panicf("first encountered checksum does not match") } fmt.Printf("Encountered series (%d): %s\n", seriesNumber, seriesFooter.Uuid()) // This is the original time-series' blob. It's the caller's responsibility // to encode it and decode it. fmt.Printf("Series (%d) data: %s\n", seriesNumber, string(seriesData.Bytes())) // Read second encountered series. seriesNumber = it.Current() seriesData = new(bytes.Buffer) seriesFooter, checksumOk, err = it.Iterate(seriesData) log.PanicIf(err) if checksumOk != true { log.Panicf("second encountered checksum does not match") } fmt.Printf("Encountered series (%d): %s\n", seriesNumber, seriesFooter.Uuid()) // This is the original time-series' blob. It's the caller's responsibility // to encode it and decode it. fmt.Printf("Series (%d) data: %s\n", seriesNumber, string(seriesData.Bytes())) // Check EOF. _, _, err = it.Iterate(nil) if err != io.EOF { log.Panicf("expected EOF") } fmt.Printf("\n") // Show that the structure loggingrepresents the offsets in reverse order // (the order that they're visited). Note that certain milestones will // include more than one entry. Some milestones can't be completely // interpreted/applied until more information is read. So, we'll log those // milestones as soon as they're encountered as well as when we have more // information about them. sr.Structure().Dump()
Output: Test series (0): [d095abf5-126e-48a7-8974-885de92bd964] Test series (1): [8a4ba0c4-0a0d-442f-8256-1d61adb16abc] Number of series recorded in stream footer: (2) Indexed series 0: d095abf5-126e-48a7-8974-885de92bd964 Indexed series 1: 8a4ba0c4-0a0d-442f-8256-1d61adb16abc Encountered series (1): 8a4ba0c4-0a0d-442f-8256-1d61adb16abc Series (1) data: X some time series data 2 X Encountered series (0): d095abf5-126e-48a7-8974-885de92bd964 Series (0) data: some time series data ================ Stream Structure ================ OFF 553 MT boundary_marker SCOPE stream UUID COMM MT boundary_marker SCOPE misc UUID COMM OFF 548 MT shadow_footer_head_byte SCOPE misc UUID COMM OFF 348 MT footer_head_byte SCOPE misc UUID COMM MT stream_footer_head_byte SCOPE stream UUID COMM MT stream_footer_decoded SCOPE stream UUID COMM Stream: StreamFooter1<COUNT=(2)> OFF 347 MT boundary_marker SCOPE series UUID COMM MT boundary_marker SCOPE misc UUID COMM OFF 342 MT shadow_footer_head_byte SCOPE misc UUID COMM OFF 198 MT footer_head_byte SCOPE misc UUID COMM MT series_footer_head_byte SCOPE series UUID COMM MT series_footer_decoded SCOPE series UUID 8a4ba0c4-0a0d-442f-8256-1d61adb16abc COMM OFF 171 MT series_data_head_byte SCOPE series UUID 8a4ba0c4-0a0d-442f-8256-1d61adb16abc COMM OFF 170 MT boundary_marker SCOPE series UUID COMM MT boundary_marker SCOPE misc UUID COMM OFF 165 MT shadow_footer_head_byte SCOPE misc UUID COMM OFF 21 MT footer_head_byte SCOPE misc UUID COMM MT series_footer_head_byte SCOPE series UUID COMM MT series_footer_decoded SCOPE series UUID d095abf5-126e-48a7-8974-885de92bd964 COMM OFF 0 MT series_data_head_byte SCOPE series UUID d095abf5-126e-48a7-8974-885de92bd964 COMM
func (*Iterator) SeriesInfo ¶
func (it *Iterator) SeriesInfo(i int) StreamIndexedSequenceInfo
SeriesInfo efficiently returns summary information for one of the series in the stream.
type MilestoneType ¶
type MilestoneType string
MilestoneType is the name of the event-type.
const ( // MtSeriesDataHeadByte marks the first byte of a series' time-series data. MtSeriesDataHeadByte MilestoneType = "series_data_head_byte" // type. MtFooterHeadByte MilestoneType = "footer_head_byte" MtSeriesFooterHeadByte MilestoneType = "series_footer_head_byte" MtStreamFooterHeadByte MilestoneType = "stream_footer_head_byte" // MtBoundaryMarker identifies a boundary marker. MtBoundaryMarker MilestoneType = "boundary_marker" // that immediately follows any other type of footer. MtShadowFooterHeadByte MilestoneType = "shadow_footer_head_byte" // been successfully decoded. MtSeriesFooterDecoded MilestoneType = "series_footer_decoded" // been successfully decoded. MtStreamFooterDecoded MilestoneType = "stream_footer_decoded" )
type ScopeType ¶
type ScopeType int
ScopeType is which type of data the event applies to.
const ( // StSeries describes milestones that pertain to series. StSeries ScopeType = iota // StStream describes milestones that pertain to streams. StStream ScopeType = iota // StMisc describes milestones that are either agnostic (not likely) or // could be any other scope type but there's not yet enough information // to tell (likely). StMisc ScopeType = iota )
type SeriesDataDatasourceReader ¶
type SeriesDataDatasourceReader interface {
ReadData(r io.Reader, sf SeriesFooter) (n int, err error)
}
SeriesDataDatasourceReader can be provided by a call to read the data themselves rather than providing an `io.Writer`.
type SeriesDataDatasourceReaderWrapper ¶
type SeriesDataDatasourceReaderWrapper struct {
// contains filtered or unexported fields
}
SeriesDataDatasourceReaderWrapper wraps a simple `io.Writer` and satisfies the `SeriesDataDatasourceReader` interface. It essentially converts a writer to a reader. This may not have a practical use, but we use it for testing.
func NewSeriesDataDatasourceReaderWrapperFromWriter ¶
func NewSeriesDataDatasourceReaderWrapperFromWriter(w io.Writer) SeriesDataDatasourceReaderWrapper
NewSeriesDataDatasourceReaderWrapperFromWriter creates a new `SeriesDataDatasourceWriterWrapper` struct.
func (SeriesDataDatasourceReaderWrapper) ReadData ¶
func (sddww SeriesDataDatasourceReaderWrapper) ReadData(r io.Reader, sf SeriesFooter) (n int, err error)
ReadData copies the reader to the writer.
type SeriesDataDatasourceWriter ¶
type SeriesDataDatasourceWriter interface {
WriteData(w io.Writer, sf SeriesFooter) (n int, err error)
}
SeriesDataDatasourceWriter can be provided by the caller to write the series- data themselves if an `io.Reader` is too simple for them.
type SeriesDataDatasourceWriterWrapper ¶
type SeriesDataDatasourceWriterWrapper struct {
// contains filtered or unexported fields
}
SeriesDataDatasourceWriterWrapper wraps a simple `io.Reader` and satisfies the `SeriesDataDatasourceWriter` interface. It essentially converts a reader to a writer. This may not have a practical use, but we use it for testing.
func NewSeriesDataDatasourceWriterWrapperFromReader ¶
func NewSeriesDataDatasourceWriterWrapperFromReader(r io.Reader) SeriesDataDatasourceWriterWrapper
NewSeriesDataDatasourceWriterWrapperFromReader creates a new `SeriesDataDatasourceWriterWrapper` struct.
func (SeriesDataDatasourceWriterWrapper) WriteData ¶
func (sddww SeriesDataDatasourceWriterWrapper) WriteData(w io.Writer, sf SeriesFooter) (n int, err error)
WriteData copies the reader to the writer.
type SeriesFooter ¶
type SeriesFooter interface { string HeadRecordTime() time.Time TailRecordTime() time.Time BytesLength() uint64 RecordCount() uint64 CreatedTime() time.Time UpdatedTime() time.Time // if the source-data has changed SourceSha1() []byte DataFnv1aChecksum() uint32 Version() SeriesFooterVersion TouchUpdatedTime() // and the count is attained. SetBytesLength(bytesLength uint64) }Uuid()
SeriesFooter describes data derived from a stream footer.
type SeriesFooter1 ¶
type SeriesFooter1 struct {
// contains filtered or unexported fields
}
SeriesFooter1 describes the data in a single series. Version 1.
func AddTestSeries ¶
func AddTestSeries(sb *StreamBuilder) (footers []*SeriesFooter1)
AddTestSeries will append two test series to the given builder.
func NewSeriesFooter1 ¶
func NewSeriesFooter1(headRecordTime time.Time, tailRecordTime time.Time, recordCount uint64, sourceSha1 []byte) *SeriesFooter1
NewSeriesFooter1 returns a series footer structure. Version 1. The checksum will be populated on write.
func NewSeriesFooter1FromEncoded ¶
func NewSeriesFooter1FromEncoded(footerBytes []byte) (sf *SeriesFooter1, err error)
NewSeriesFooter1FromEncoded returns a series footer struct (version 1). The checksum that was recorded during the write will be populated.
func (*SeriesFooter1) BytesLength ¶
func (sf *SeriesFooter1) BytesLength() uint64
BytesLength is the number of bytes of series data.
func (*SeriesFooter1) CreatedTime ¶
func (sf *SeriesFooter1) CreatedTime() time.Time
CreatedTime is the timestamp of the first write of this series
func (*SeriesFooter1) DataFnv1aChecksum ¶
func (sf *SeriesFooter1) DataFnv1aChecksum() uint32
DataFnv1aChecksum is the FNV-1a checksum of the original data. This is set and checked automatically, though the result of the check is returned to the caller rather than being enforced by us.
func (*SeriesFooter1) HeadRecordTime ¶
func (sf *SeriesFooter1) HeadRecordTime() time.Time
HeadRecordTime is the earliest timestamp represented in the series data.
func (*SeriesFooter1) RecordCount ¶
func (sf *SeriesFooter1) RecordCount() uint64
RecordCount is the number of records in the series-data.
func (*SeriesFooter1) SetBytesLength ¶
func (sf *SeriesFooter1) SetBytesLength(bytesLength uint64)
func (*SeriesFooter1) SourceSha1 ¶
func (sf *SeriesFooter1) SourceSha1() []byte
SourceSha1 is the SHA1 of the original data.
func (*SeriesFooter1) String ¶
func (sf *SeriesFooter1) String() string
func (*SeriesFooter1) TailRecordTime ¶
func (sf *SeriesFooter1) TailRecordTime() time.Time
TailRecordTime is the latest timestamp represented in the series data.
func (*SeriesFooter1) TouchUpdatedTime ¶
func (sf *SeriesFooter1) TouchUpdatedTime()
TouchUpdatedTime bumps the updated-time field.
func (*SeriesFooter1) UpdatedTime ¶
func (sf *SeriesFooter1) UpdatedTime() time.Time
UpdatedTime is the timestamp of the last update
func (*SeriesFooter1) Uuid ¶
func (sf *SeriesFooter1) Uuid() string
Uuid returns the UUID of the series.
func (*SeriesFooter1) Version ¶
func (sf *SeriesFooter1) Version() SeriesFooterVersion
Version returns the series-protocol represented by this struct.
type SeriesFooterVersion ¶
type SeriesFooterVersion uint16
SeriesFooterVersion enum
const ( // single series in the stream. SeriesFooterVersion1 SeriesFooterVersion = 1 )
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
StreamBuilder is the high-level interface that owns the stream-building process and wraps `StreamWriter`.
func NewStreamBuilder ¶
func NewStreamBuilder(ws io.WriteSeeker) *StreamBuilder
NewStreamBuilder returns a new `StreamBuilder`.
func (*StreamBuilder) AddSeries ¶
func (sb *StreamBuilder) AddSeries(seriesDataWriter interface{}, sf SeriesFooter) (err error)
AddSeries adds a single series and associated metadata to the stream. The actual series data is provided to us by the caller in serialized (encoded) form from whatever their original format was.
func (*StreamBuilder) AddSeriesNoWrite ¶
func (sb *StreamBuilder) AddSeriesNoWrite(footerDataPosition int64, totalSeriesSize int, sf SeriesFooter) (err error)
AddSeriesNoWrite logs a single series and associated metadata but doesn't actually write. It will be written (or potentially retained) through other means.
func (*StreamBuilder) Finish ¶
func (sb *StreamBuilder) Finish() (totalSize int, err error)
Finish will finalize/complete the stream.
func (*StreamBuilder) NextOffset ¶
func (sb *StreamBuilder) NextOffset() int64
NextOffset returns the position that the head bytes
func (*StreamBuilder) SetStructureLogging ¶
func (sb *StreamBuilder) SetStructureLogging(flag bool)
SetStructureLogging enables/disables structure tracking.
func (*StreamBuilder) StreamWriter ¶
func (sb *StreamBuilder) StreamWriter() *StreamWriter
StreamWriter returns the underlying `StreamWriter` struct.
func (*StreamBuilder) Structure ¶
func (sb *StreamBuilder) Structure() *StreamStructure
Structure returns the `StreamStructure` struct (if enabled).
type StreamFooter ¶
type StreamFooter interface {
}StreamFooter describes a type that can return summary information about the series in a stream. This represents a basic encoded stream type.
func NewStreamFooter1FromEncoded ¶
func NewStreamFooter1FromEncoded(footerBytes []byte) (sf StreamFooter, err error)
NewStreamFooter1FromEncoded decodes the given bytes and returns a `StreamFooter`-compatible struct.
func NewStreamFooter1FromStreamIndexedSequenceInfoSlice ¶
func NewStreamFooter1FromStreamIndexedSequenceInfoSlice(series []StreamIndexedSequenceInfo) StreamFooter
NewStreamFooter1FromStreamIndexedSequenceInfoSlice returns a new `StreamFooter`-compatible struct.
type StreamFooter1 ¶
type StreamFooter1 struct {
// contains filtered or unexported fields
}
StreamFooter1 represents the stream footer (version 1) that's encoded in the stream.
func (*StreamFooter1) Series ¶
func (sf *StreamFooter1) Series() []StreamIndexedSequenceInfo
Series returns a list of all of the summary series information.
func (*StreamFooter1) String ¶
func (sf *StreamFooter1) String() string
type StreamFooterVersion ¶
type StreamFooterVersion uint16
StreamFooterVersion enum
const ( // the whole stream. StreamFooterVersion1 StreamFooterVersion = 1 )
type StreamIndexedSequenceInfo ¶
type StreamIndexedSequenceInfo interface { // Uuid is a unique string that uniquely identifies this series. Uuid() string // HeadRecordTime is the timestamp of the first record HeadRecordTime() time.Time // TailRecordTime is the timestamp of the last record TailRecordTime() time.Time // AbsolutePosition is the absolute position of the boundary marker (NUL) AbsolutePosition() int64 }
StreamIndexedSequenceInfo describes summary information for a single series encoded into the stream footer.
type StreamIndexedSequenceInfo1 ¶
type StreamIndexedSequenceInfo1 struct {
// contains filtered or unexported fields
}
StreamIndexedSequenceInfo1 briefly describes all series.
func NewStreamIndexedSequenceInfo1 ¶
func NewStreamIndexedSequenceInfo1(uuid string, headRecordTime, tailRecordTime time.Time, absolutePosition int64) *StreamIndexedSequenceInfo1
NewStreamIndexedSequenceInfo1 returns a sequence-info structure.
func NewStreamIndexedSequenceInfo1WithSeriesFooter ¶
func NewStreamIndexedSequenceInfo1WithSeriesFooter(seriesFooter SeriesFooter, absolutePosition int64) *StreamIndexedSequenceInfo1
NewStreamIndexedSequenceInfo1WithSeriesFooter returns a summary `StreamIndexedSequenceInfo1` struct representing the given `SeriesFooter`-compatible struct.
func (StreamIndexedSequenceInfo1) AbsolutePosition ¶
func (sisi StreamIndexedSequenceInfo1) AbsolutePosition() int64
AbsolutePosition is the absolute position of the boundary marker (NUL)
func (StreamIndexedSequenceInfo1) HeadRecordTime ¶
func (sisi StreamIndexedSequenceInfo1) HeadRecordTime() time.Time
HeadRecordTime is the timestamp of the first record
func (StreamIndexedSequenceInfo1) String ¶
func (sisi StreamIndexedSequenceInfo1) String() string
func (StreamIndexedSequenceInfo1) TailRecordTime ¶
func (sisi StreamIndexedSequenceInfo1) TailRecordTime() time.Time
TailRecordTime is the timestamp of the last record
func (StreamIndexedSequenceInfo1) Uuid ¶
func (sisi StreamIndexedSequenceInfo1) Uuid() string
Uuid is the timestamp of the first record
type StreamReader ¶
type StreamReader struct {
// contains filtered or unexported fields
}
StreamReader knows how to parse the raw stream.
func NewStreamReader ¶
func NewStreamReader(rs io.ReadSeeker) *StreamReader
NewStreamReader returns a new `StreamReader`.
func (*StreamReader) ReadSeriesInfoWithBoundaryPosition ¶
func (sr *StreamReader) ReadSeriesInfoWithBoundaryPosition(position int64) (seriesFooter SeriesFooter, dataOffset int64, seriesSize int, err error)
ReadSeriesInfoWithBoundaryPosition returns a `SeriesFooter` for the series whose boundary marker is at the given position.
func (*StreamReader) ReadSeriesInfoWithIndexedInfo ¶
func (sr *StreamReader) ReadSeriesInfoWithIndexedInfo(sisi StreamIndexedSequenceInfo) (seriesFooter SeriesFooter, dataOffset int64, seriesSize int, err error)
ReadSeriesInfoWithIndexedInfo returns the `SeriesFooter` struct described by the given `StreamIndexedSequenceInfo` struct.
func (*StreamReader) ReadSeriesWithIndexedInfo ¶
func (sr *StreamReader) ReadSeriesWithIndexedInfo(sisi StreamIndexedSequenceInfo, seriesDataReader interface{}) (seriesFooter SeriesFooter, seriesSize int, checksumOk bool, err error)
ReadSeriesWithIndexedInfo returns the `SeriesFooter` struct described by the given `StreamIndexedSequenceInfo` struct and writes the raw data associated with it to `dataWriter`.
func (*StreamReader) Reset ¶
func (sr *StreamReader) Reset() (err error)
Reset will put us at the end of the file. This is required in order to iterate.
func (*StreamReader) SetStructureLogging ¶
func (sr *StreamReader) SetStructureLogging(flag bool)
SetStructureLogging enables/disables structure tracking.
func (*StreamReader) Structure ¶
func (sr *StreamReader) Structure() *StreamStructure
Structure returns the `StreamStructure` struct (if enabled).
type StreamStructure ¶
type StreamStructure struct {
// contains filtered or unexported fields
}
StreamStructure holds all of the milestones recorded for a given stream.
func NewStreamStructure ¶
func NewStreamStructure() *StreamStructure
NewStreamStructure returns a new `StreamStructure`.
func (*StreamStructure) AllSeriesMilestones ¶
func (ss *StreamStructure) AllSeriesMilestones() (milestoneIndex map[string][]StreamStructureOffsetInfo)
AllSeriesMilestones returns a map of all recorded series.
func (*StreamStructure) Dump ¶
func (ss *StreamStructure) Dump()
Dump prints a table with all of the recorded milestones in the order that they were encountered.
Example ¶
ExampleStreamStructure_Dump shows how to use structure-tracking to print the structure of the stream. The `Structure()` and `SetStructureLogging` methods (to enable and retrieve the `StreamStructure` struct, if enabled) are available on the `StreamBuilder`, “StreamReader`, `StreamWriter`, and `Updater` types.
This table is printed in forward order when writing a stream and reverse order when reading a stream.
Columns: 1) absolute offset in the stream, 2) milestone type (describes the type of data), 3) scope type (describes whether it's related to streams or series, or 'misc' if not enough is known yet during a parse), 4) UUID (only for series; usually present for at least all 'series_data_head_byte' milestone types), 5) milestone comment (not stored in original data).
b := rifs.NewSeekableBuffer() sb := NewStreamBuilder(b) sb.SetStructureLogging(true) AddTestSeries(sb) _, err := sb.Finish() log.PanicIf(err) sb.Structure().Dump()
Output: ================ Stream Structure ================ OFF 0 MT series_data_head_byte SCOPE series UUID d095abf5-126e-48a7-8974-885de92bd964 COMM OFF 21 MT series_footer_head_byte SCOPE series UUID d095abf5-126e-48a7-8974-885de92bd964 COMM OFF 165 MT shadow_footer_head_byte SCOPE series UUID COMM OFF 170 MT boundary_marker SCOPE series UUID COMM OFF 171 MT series_data_head_byte SCOPE series UUID 8a4ba0c4-0a0d-442f-8256-1d61adb16abc COMM OFF 198 MT series_footer_head_byte SCOPE series UUID 8a4ba0c4-0a0d-442f-8256-1d61adb16abc COMM OFF 342 MT shadow_footer_head_byte SCOPE series UUID COMM OFF 347 MT boundary_marker SCOPE series UUID COMM OFF 348 MT stream_footer_head_byte SCOPE stream UUID COMM Stream: StreamFooter1<COUNT=(2)> OFF 548 MT shadow_footer_head_byte SCOPE stream UUID COMM OFF 553 MT boundary_marker SCOPE stream UUID COMM
func (*StreamStructure) Milestones ¶
func (ss *StreamStructure) Milestones() []StreamStructureOffsetInfo
Milestones returns all recorded milestones.
func (*StreamStructure) MilestonesWithFilter ¶
func (ss *StreamStructure) MilestonesWithFilter(milestoneType string, scopeType int) []StreamStructureOffsetInfo
MilestonesWithFilter returns all milestones, optionally applying a filter.
func (*StreamStructure) Push ¶
func (ss *StreamStructure) Push(offset int64, milestoneType MilestoneType, scopeType ScopeType, seriesUuid string, comment string)
Push records a single event.
func (*StreamStructure) SeriesMilestones ¶
func (ss *StreamStructure) SeriesMilestones(uuid string) []StreamStructureOffsetInfo
SeriesMilestones returns all series-specific milestones,optionally filtering for a specific one. Returned as a flat list.
func (*StreamStructure) StreamMilestones ¶
func (ss *StreamStructure) StreamMilestones() []StreamStructureOffsetInfo
StreamMilestones returns all stream-specific milestones.
func (*StreamStructure) String ¶
func (ss *StreamStructure) String() string
type StreamStructureOffsetInfo ¶
type StreamStructureOffsetInfo struct { Offset int64 MilestoneType MilestoneType ScopeType ScopeType // SeriesUuid is a UUID of the series, if this offset refers to a series. SeriesUuid string Comment string }
StreamStructureOffsetInfo describes a single recorded milestone.
func (StreamStructureOffsetInfo) String ¶
func (ssoi StreamStructureOffsetInfo) String() string
type StreamWriter ¶
type StreamWriter struct {
// contains filtered or unexported fields
}
StreamWriter owns the semantics of encoding our storage structs to the raw bytes.
func NewStreamWriter ¶
func NewStreamWriter(w io.Writer) *StreamWriter
NewStreamWriter returns a new `StreamWriter` struct.
func (*StreamWriter) SetStructureLogging ¶
func (sw *StreamWriter) SetStructureLogging(flag bool)
SetStructureLogging enabled/disables structure logging.
func (*StreamWriter) Structure ¶
func (sw *StreamWriter) Structure() *StreamStructure
Structure returns the recorded structure (if enabled).
type Truncater ¶
Truncater is a type that knows how to truncate its bytes stream. This will frequently be paired with a `io.ReadWriteSeeker`.
type UpdateStats ¶
UpdateStats keeps a tally of various operations.
func (UpdateStats) String ¶
func (us UpdateStats) String() string
type Updater ¶
type Updater struct {
// contains filtered or unexported fields
}
Updater manages syncing what the caller has with what is stored.
- Copy all unchanged series, in their current sequence, from where they currently are to the front of the file.
- Use the data-writer interface to generate a serialized representation of the changed/new ones. Place them at the end in the order that they were stored before (those that are being updated) or in the order they were added (the new ones).
func NewUpdater ¶
func NewUpdater(rws io.ReadWriteSeeker, seriesDataWriter interface{}) *Updater
NewUpdater returns a new `Updater` struct.
func (*Updater) AddSeries ¶
func (updater *Updater) AddSeries(seriesFooter SeriesFooter)
AddSeries queues a series to be added. It's not actually written until Write() is called.
Example ¶
b := rifs.NewSeekableBuffer() // Stage stream. sb := NewStreamBuilder(b) sb.SetStructureLogging(true) series := AddTestSeries(sb) _, err := sb.Finish() log.PanicIf(err) fmt.Printf("\n") fmt.Printf("Original:\n") fmt.Printf("\n") sb.Structure().Dump() raw := b.Bytes() // Update the stream with a new series. sourceSha13 := []byte{ 77, 88, 99, } now := time.Now() series3 := NewSeriesFooter1( now.Add(time.Second*20), now.Add(time.Second*30), 33, sourceSha13) series3.SetBytesLength(uint64(len(TestTimeSeriesData2))) // Force a specific UUID so we know the exact output in support of the // testable examples. series3.uuid = "9a0e2d13-d14f-4a57-b43c-24bd3de6581e" dataReader3 := bytes.NewBuffer(TestTimeSeriesData2) sdtg := &SeriesDataTestGenerator{ data: map[string]io.Reader{ series3.Uuid(): dataReader3, }, } rws := rifs.NewSeekableBufferWithBytes(raw) updater := NewUpdater(rws, sdtg) updater.AddSeries(series[0]) updater.AddSeries(series[1]) updater.AddSeries(series3) _, _, err = updater.Write() log.PanicIf(err) finalRaw := rws.Bytes() // Read the new stream. r := bytes.NewReader(finalRaw) sr := NewStreamReader(r) sr.SetStructureLogging(true) it, err := NewIterator(sr) log.PanicIf(err) for { _, _, err := it.Iterate(nil) if err == io.EOF { break } } fmt.Printf("Updated:\n") fmt.Printf("\n") sr.Structure().Dump()
Output:
func (*Updater) SetStructureLogging ¶
SetStructureLogging enables/disables structure tracking.
func (*Updater) Structure ¶
func (updater *Updater) Structure() *StreamStructure
Structure returns the `StreamStructure` struct (if enabled).