Documentation ¶
Overview ¶
Example (Print) ¶
event := proio.NewEvent() parent := &model.Particle{Pdg: 443} parentID := event.AddEntry("Particle", parent) event.TagEntry(parentID, "Truth", "Primary") child1 := &model.Particle{Pdg: 11} child2 := &model.Particle{Pdg: -11} childIDs := event.AddEntries("Particle", child1, child2) for _, id := range childIDs { event.TagEntry(id, "Truth", "GenStable") } parent.Child = append(parent.Child, childIDs...) child1.Parent = append(child1.Parent, parentID) child2.Parent = append(child2.Parent, parentID) fmt.Print(event)
Output: ---------- TAG: GenStable ---------- ID: 2 Entry type: proio.model.example.Particle parent: 1 pdg: 11 ID: 3 Entry type: proio.model.example.Particle parent: 1 pdg: -11 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle child: 2 child: 3 pdg: 443 ID: 2 Entry type: proio.model.example.Particle parent: 1 pdg: 11 ID: 3 Entry type: proio.model.example.Particle parent: 1 pdg: -11 ---------- TAG: Primary ---------- ID: 1 Entry type: proio.model.example.Particle child: 2 child: 3 pdg: 443 ---------- TAG: Truth ---------- ID: 1 Entry type: proio.model.example.Particle child: 2 child: 3 pdg: 443 ID: 2 Entry type: proio.model.example.Particle parent: 1 pdg: 11 ID: 3 Entry type: proio.model.example.Particle parent: 1 pdg: -11
Example (PushGetInspect) ¶
buffer := &bytes.Buffer{} writer := proio.NewWriter(buffer) eventOut := proio.NewEvent() // Create entries and hold onto their IDs for referencing parent := &model.Particle{Pdg: 443} parentID := eventOut.AddEntry("Particle", parent) eventOut.TagEntry(parentID, "Truth", "Primary") child1 := &model.Particle{Pdg: 11} child2 := &model.Particle{Pdg: -11} childIDs := eventOut.AddEntries("Particle", child1, child2) for _, id := range childIDs { eventOut.TagEntry(id, "Truth", "GenStable") } parent.Child = append(parent.Child, childIDs...) child1.Parent = append(child1.Parent, parentID) child2.Parent = append(child2.Parent, parentID) writer.Push(eventOut) writer.Flush() // Event created and serialized, now to deserialize and inspect reader := proio.NewReader(buffer) eventIn := reader.Next() mcParts := eventIn.TaggedEntries("Primary") fmt.Print(len(mcParts), " Primary particle(s)...\n") for i, parentID := range mcParts { part := eventIn.GetEntry(parentID).(*model.Particle) fmt.Print(i, ". PDG: ", part.GetPdg(), "\n") fmt.Print(" ", len(part.Child), " children...\n") for j, childID := range part.Child { fmt.Print(" ", j, ". PDG: ", eventIn.GetEntry(childID).(*model.Particle).GetPdg(), "\n") } }
Output: 1 Primary particle(s)... 0. PDG: 443 2 children... 0. PDG: 11 1. PDG: -11
Example (Scan) ¶
buffer := &bytes.Buffer{} writer := proio.NewWriter(buffer) for i := 0; i < 8; i++ { event := proio.NewEvent() p := &model.Particle{ Pdg: int32(11 + i), } event.AddEntry("Particle", p) writer.Push(event) } writer.Flush() reader := proio.NewReader(buffer) for event := range reader.ScanEvents(10) { fmt.Print(event) }
Output: ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 11 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 12 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 13 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 14 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 15 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 16 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 17 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 18
Example (Skip) ¶
buffer := &bytes.Buffer{} writer := proio.NewWriter(buffer) for i := 0; i < 8; i++ { event := proio.NewEvent() p := &model.Particle{ Pdg: int32(11 + i), } event.AddEntry("Particle", p) writer.Push(event) } writer.Flush() bytesReader := bytes.NewReader(buffer.Bytes()) reader := proio.NewReader(bytesReader) reader.Skip(7) event := reader.Next() fmt.Print(event) reader.SeekToStart() event = reader.Next() fmt.Print(event)
Output: ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 18 ---------- TAG: Particle ---------- ID: 1 Entry type: proio.model.example.Particle pdg: 11
Index ¶
- func StoredFileDescriptorProtos() []protobuf.Message
- type Compression
- type Event
- func (evt *Event) AddEntries(tag string, entries ...protobuf.Message) []uint64
- func (evt *Event) AddEntry(tag string, entry protobuf.Message) uint64
- func (evt *Event) AddSerializedEntry(tag string, wireData []byte, pbType string, descriptor []byte) (id uint64, err error)
- func (evt *Event) AllEntries() []uint64
- func (evt *Event) DeleteTag(tag string)
- func (evt *Event) EntryTags(id uint64) []string
- func (evt *Event) FlushCache()
- func (evt *Event) GetEntry(id uint64) protobuf.Message
- func (evt *Event) RemoveEntry(id uint64)
- func (evt *Event) String() string
- func (evt *Event) TagEntry(id uint64, tags ...string)
- func (evt *Event) TaggedEntries(tag string) []uint64
- func (evt *Event) Tags() []string
- func (evt *Event) UntagEntry(id uint64, tag string)
- type Reader
- func (rdr *Reader) Close()
- func (rdr *Reader) DeferUntilClose(thisFunc func())
- func (rdr *Reader) Next() *Event
- func (rdr *Reader) ScanEvents(bufSize int) <-chan *Event
- func (rdr *Reader) SeekToStart() error
- func (rdr *Reader) Skip(nEvents uint64) (nSkipped uint64, err error)
- func (rdr *Reader) StopScan()
- type Writer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StoredFileDescriptorProtos ¶ added in v0.3.0
StoredFileDescriptorProtos returns a slice of protobuf messages that represent all of the entry types collected by reading files or looking up FileDescriptorProtos from memory
Types ¶
type Event ¶
type Event struct { Err error Metadata map[string][]byte // contains filtered or unexported fields }
Event contains all data for an event, and provides methods for adding and retrieving data.
func (*Event) AddEntries ¶
AddEntries is like AddEntry, except that it is variadic, taking an arbitrary number of entries separated by commas. Additionally, the return value is a slice of IDs.
func (*Event) AddEntry ¶
AddEntry takes a single primary tag for an entry and an entry protobuf message, and returns a new ID number for the entry. This ID number can be used to persistently reference the entry. For example, pass the ID TagEntry to add additional tags to the entry.
func (*Event) AddSerializedEntry ¶ added in v0.7.0
func (evt *Event) AddSerializedEntry( tag string, wireData []byte, pbType string, descriptor []byte, ) (id uint64, err error)
AddSerializedEntry takes a single primary tag for an entry, a serialized entry protobuf message, a protobuf type string, and a gzip-compressed protobuf descriptor byte string (obtained from message's Descriptor() receiver), and returns a new ID number for the entry.
func (*Event) AllEntries ¶
AllEntries returns a slice of identifiers for all entries contained in the Event.
func (*Event) DeleteTag ¶
DeleteTag takes a tag name as an argument and deletes that tag from the Event if it exists.
func (*Event) FlushCache ¶ added in v0.3.0
func (evt *Event) FlushCache()
FlushCache forces all event entries to be serialized, among other things. This is useful for putting the main serialization load into parallel routines before aggregating the events into an output stream
func (*Event) GetEntry ¶
GetEntry retrieves and deserializes an entry corresponding to the given ID number. The deserialized entry is returned. The entry type must be one that has been linked (and therefore initialized) with the current executable, otherwise it is an unknown type and nil is returned.
func (*Event) RemoveEntry ¶
RemoveEntry takes an entry id and removes the referenced entry from the Event.
func (*Event) TaggedEntries ¶
TaggedEntries returns a slice of ID numbers that are referenced by the given tag.
func (*Event) UntagEntry ¶
UntagEntry removes the association between a tag and an entry.
type Reader ¶
type Reader struct { BucketHeader *proto.BucketHeader Metadata map[string][]byte Err error sync.Mutex // contains filtered or unexported fields }
Reader serves to read Events from a stream in the proio format. The Reader is not inherently thread safe, but it conveniently embeds sync.Mutex so that it can be locked and unlocked.
func NewReader ¶
NewReader wraps an existing io.Reader for reading proio Events. Either Open or NewReader should be called to construct a new Reader.
func Open ¶
Open opens the given existing file (in read-only mode), returning an error where appropriate. Upon success, a new Reader is created to wrap the file, and returned. Either Open or NewReader should be called to construct a new Reader.
func (*Reader) Close ¶
func (rdr *Reader) Close()
Close closes any file that was opened by the library, and stops any unfinished scans. Close does not close io.Readers passed directly to NewReader.
func (*Reader) DeferUntilClose ¶ added in v0.5.0
func (rdr *Reader) DeferUntilClose(thisFunc func())
func (*Reader) Next ¶
Next retrieves the next event from the stream. The Reader's Err member is assigned the error status of this call.
func (*Reader) ScanEvents ¶
ScanEvents returns a buffered channel of type Event where all of the events in the stream will be pushed. The channel buffer size is defined by the argument. The goroutine responsible for fetching events will not break until there are no more events, Reader.StopScan() is called, or Reader.Close() is called.
func (*Reader) SeekToStart ¶
SeekToStart seeks seekable streams to the beginning, and prepares the stream to read from there.
type Writer ¶
type Writer struct { BucketDumpThres int CompLevel int sync.Mutex // contains filtered or unexported fields }
Writer serves to write Events into a stream in the proio format. The Writer is not inherently thread safe, but it conveniently embeds sync.Mutex so that it can be locked and unlocked.
func Create ¶
Create makes a new file specified by filename, overwriting any existing file, and returns a Writer for the file. Either NewWriter or Create must be used to construct a Writer.
func NewWriter ¶
NewWriter takes an io.Writer and wraps it in a new proio Writer. Either NewWriter or Create must be used to construct a Writer.
func (*Writer) Close ¶
Close calls Flush and closes any file that was created by the library. Close does not close io.Writers passed directly to NewWriter.
func (*Writer) DeferUntilClose ¶ added in v0.5.0
func (*Writer) Push ¶
Serialize the given Event. Once this is performed, changes to the Event in memory are not reflected in the output stream.
func (*Writer) SetCompression ¶
func (wrt *Writer) SetCompression(comp Compression) error
Set compression type, for example to GZIP or UNCOMPRESSED. This can be called even after writing some events.