proio

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2019 License: BSD-3-Clause Imports: 17 Imported by: 7

README

proio for Go

Build Status codecov Codacy Badge

Please see the main proio repository for general information on proio.

API

API documentation is provided by godoc.org

GoDoc

Installation

go-proio and included command-line tools are go get-able. Make sure you have the go compiler installed and set up:

go get github.com/proio-org/go-proio/...

If you do not have the go compiler, you can find pre-compiled binaries for the tools in the releases.

For information on what versions of Go are supported, please see the Travis CI page.

Examples

Documentation

Overview

Example (Print)
event := proio.NewEvent()

parent := &model.Particle{Pdg: 443}
parentID := event.AddEntry("Particle", parent)
event.TagEntry(parentID, "Truth", "Primary")

child1 := &model.Particle{Pdg: 11}
child2 := &model.Particle{Pdg: -11}
childIDs := event.AddEntries("Particle", child1, child2)
for _, id := range childIDs {
	event.TagEntry(id, "Truth", "GenStable")
}

parent.Child = append(parent.Child, childIDs...)
child1.Parent = append(child1.Parent, parentID)
child2.Parent = append(child2.Parent, parentID)

fmt.Print(event)
Output:

---------- TAG: GenStable ----------
ID: 2
Entry type: proio.model.example.Particle
parent: 1
pdg: 11

ID: 3
Entry type: proio.model.example.Particle
parent: 1
pdg: -11

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
child: 2
child: 3
pdg: 443

ID: 2
Entry type: proio.model.example.Particle
parent: 1
pdg: 11

ID: 3
Entry type: proio.model.example.Particle
parent: 1
pdg: -11

---------- TAG: Primary ----------
ID: 1
Entry type: proio.model.example.Particle
child: 2
child: 3
pdg: 443

---------- TAG: Truth ----------
ID: 1
Entry type: proio.model.example.Particle
child: 2
child: 3
pdg: 443

ID: 2
Entry type: proio.model.example.Particle
parent: 1
pdg: 11

ID: 3
Entry type: proio.model.example.Particle
parent: 1
pdg: -11
Example (PushGetInspect)
buffer := &bytes.Buffer{}
writer := proio.NewWriter(buffer)

eventOut := proio.NewEvent()

// Create entries and hold onto their IDs for referencing

parent := &model.Particle{Pdg: 443}
parentID := eventOut.AddEntry("Particle", parent)
eventOut.TagEntry(parentID, "Truth", "Primary")

child1 := &model.Particle{Pdg: 11}
child2 := &model.Particle{Pdg: -11}
childIDs := eventOut.AddEntries("Particle", child1, child2)
for _, id := range childIDs {
	eventOut.TagEntry(id, "Truth", "GenStable")
}

parent.Child = append(parent.Child, childIDs...)
child1.Parent = append(child1.Parent, parentID)
child2.Parent = append(child2.Parent, parentID)

writer.Push(eventOut)

writer.Flush()

// Event created and serialized, now to deserialize and inspect

reader := proio.NewReader(buffer)
eventIn := reader.Next()

mcParts := eventIn.TaggedEntries("Primary")
fmt.Print(len(mcParts), " Primary particle(s)...\n")
for i, parentID := range mcParts {
	part := eventIn.GetEntry(parentID).(*model.Particle)
	fmt.Print(i, ". PDG: ", part.GetPdg(), "\n")
	fmt.Print("  ", len(part.Child), " children...\n")
	for j, childID := range part.Child {
		fmt.Print("  ", j, ". PDG: ", eventIn.GetEntry(childID).(*model.Particle).GetPdg(), "\n")
	}
}
Output:

1 Primary particle(s)...
0. PDG: 443
  2 children...
  0. PDG: 11
  1. PDG: -11
Example (Scan)
buffer := &bytes.Buffer{}
writer := proio.NewWriter(buffer)

for i := 0; i < 8; i++ {
	event := proio.NewEvent()
	p := &model.Particle{
		Pdg: int32(11 + i),
	}
	event.AddEntry("Particle", p)
	writer.Push(event)
}
writer.Flush()

reader := proio.NewReader(buffer)

for event := range reader.ScanEvents(10) {
	fmt.Print(event)
}
Output:

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 11

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 12

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 13

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 14

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 15

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 16

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 17

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 18
Example (Skip)
buffer := &bytes.Buffer{}
writer := proio.NewWriter(buffer)

for i := 0; i < 8; i++ {
	event := proio.NewEvent()
	p := &model.Particle{
		Pdg: int32(11 + i),
	}
	event.AddEntry("Particle", p)
	writer.Push(event)
}
writer.Flush()

bytesReader := bytes.NewReader(buffer.Bytes())
reader := proio.NewReader(bytesReader)

reader.Skip(7)
event := reader.Next()
fmt.Print(event)
reader.SeekToStart()
event = reader.Next()
fmt.Print(event)
Output:

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 18

---------- TAG: Particle ----------
ID: 1
Entry type: proio.model.example.Particle
pdg: 11

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func StoredFileDescriptorProtos added in v0.3.0

func StoredFileDescriptorProtos() []protobuf.Message

StoredFileDescriptorProtos returns a slice of protobuf messages that represent all of the entry types collected by reading files or looking up FileDescriptorProtos from memory

Types

type Compression

type Compression int
const (
	UNCOMPRESSED Compression = iota
	GZIP
	LZ4
	LZMA
)

type Event

type Event struct {
	Err      error
	Metadata map[string][]byte
	// contains filtered or unexported fields
}

Event contains all data for an event, and provides methods for adding and retrieving data.

func CopyEvent added in v0.5.0

func CopyEvent(event *Event) *Event

CopyEvent copies an Event.

func NewEvent

func NewEvent() *Event

NewEvent is required for constructing an Event.

func (*Event) AddEntries

func (evt *Event) AddEntries(tag string, entries ...protobuf.Message) []uint64

AddEntries is like AddEntry, except that it is variadic, taking an arbitrary number of entries separated by commas. Additionally, the return value is a slice of IDs.

func (*Event) AddEntry

func (evt *Event) AddEntry(tag string, entry protobuf.Message) uint64

AddEntry takes a single primary tag for an entry and an entry protobuf message, and returns a new ID number for the entry. This ID number can be used to persistently reference the entry. For example, pass the ID TagEntry to add additional tags to the entry.

func (*Event) AddSerializedEntry added in v0.7.0

func (evt *Event) AddSerializedEntry(
	tag string,
	wireData []byte,
	pbType string,
	descriptor []byte,
) (id uint64, err error)

AddSerializedEntry takes a single primary tag for an entry, a serialized entry protobuf message, a protobuf type string, and a gzip-compressed protobuf descriptor byte string (obtained from message's Descriptor() receiver), and returns a new ID number for the entry.

func (*Event) AllEntries

func (evt *Event) AllEntries() []uint64

AllEntries returns a slice of identifiers for all entries contained in the Event.

func (*Event) DeleteTag

func (evt *Event) DeleteTag(tag string)

DeleteTag takes a tag name as an argument and deletes that tag from the Event if it exists.

func (*Event) EntryTags

func (evt *Event) EntryTags(id uint64) []string

EntryTags does a reverse lookup of tags that point to a given entry ID.

func (*Event) FlushCache added in v0.3.0

func (evt *Event) FlushCache()

FlushCache forces all event entries to be serialized, among other things. This is useful for putting the main serialization load into parallel routines before aggregating the events into an output stream

func (*Event) GetEntry

func (evt *Event) GetEntry(id uint64) protobuf.Message

GetEntry retrieves and deserializes an entry corresponding to the given ID number. The deserialized entry is returned. The entry type must be one that has been linked (and therefore initialized) with the current executable, otherwise it is an unknown type and nil is returned.

func (*Event) RemoveEntry

func (evt *Event) RemoveEntry(id uint64)

RemoveEntry takes an entry id and removes the referenced entry from the Event.

func (*Event) String

func (evt *Event) String() string

func (*Event) TagEntry

func (evt *Event) TagEntry(id uint64, tags ...string)

TagEntry adds additional tags to an entry ID returned by AddEntry.

func (*Event) TaggedEntries

func (evt *Event) TaggedEntries(tag string) []uint64

TaggedEntries returns a slice of ID numbers that are referenced by the given tag.

func (*Event) Tags

func (evt *Event) Tags() []string

Tags returns a list of all tags in the Event.

func (*Event) UntagEntry

func (evt *Event) UntagEntry(id uint64, tag string)

UntagEntry removes the association between a tag and an entry.

type Reader

type Reader struct {
	BucketHeader *proto.BucketHeader
	Metadata     map[string][]byte
	Err          error

	sync.Mutex
	// contains filtered or unexported fields
}

Reader serves to read Events from a stream in the proio format. The Reader is not inherently thread safe, but it conveniently embeds sync.Mutex so that it can be locked and unlocked.

func NewReader

func NewReader(streamReader io.Reader) *Reader

NewReader wraps an existing io.Reader for reading proio Events. Either Open or NewReader should be called to construct a new Reader.

func Open

func Open(filename string) (*Reader, error)

Open opens the given existing file (in read-only mode), returning an error where appropriate. Upon success, a new Reader is created to wrap the file, and returned. Either Open or NewReader should be called to construct a new Reader.

func (*Reader) Close

func (rdr *Reader) Close()

Close closes any file that was opened by the library, and stops any unfinished scans. Close does not close io.Readers passed directly to NewReader.

func (*Reader) DeferUntilClose added in v0.5.0

func (rdr *Reader) DeferUntilClose(thisFunc func())

func (*Reader) Next

func (rdr *Reader) Next() *Event

Next retrieves the next event from the stream. The Reader's Err member is assigned the error status of this call.

func (*Reader) ScanEvents

func (rdr *Reader) ScanEvents(bufSize int) <-chan *Event

ScanEvents returns a buffered channel of type Event where all of the events in the stream will be pushed. The channel buffer size is defined by the argument. The goroutine responsible for fetching events will not break until there are no more events, Reader.StopScan() is called, or Reader.Close() is called.

func (*Reader) SeekToStart

func (rdr *Reader) SeekToStart() error

SeekToStart seeks seekable streams to the beginning, and prepares the stream to read from there.

func (*Reader) Skip

func (rdr *Reader) Skip(nEvents uint64) (nSkipped uint64, err error)

Skip skips nEvents events. If the return error is nil, nEvents have been skipped.

func (*Reader) StopScan

func (rdr *Reader) StopScan()

StopScan stops all scans initiated by Reader.ScanEvents().

type Writer

type Writer struct {
	BucketDumpThres int
	CompLevel       int

	sync.Mutex
	// contains filtered or unexported fields
}

Writer serves to write Events into a stream in the proio format. The Writer is not inherently thread safe, but it conveniently embeds sync.Mutex so that it can be locked and unlocked.

func Create

func Create(filename string) (*Writer, error)

Create makes a new file specified by filename, overwriting any existing file, and returns a Writer for the file. Either NewWriter or Create must be used to construct a Writer.

func NewWriter

func NewWriter(streamWriter io.Writer) *Writer

NewWriter takes an io.Writer and wraps it in a new proio Writer. Either NewWriter or Create must be used to construct a Writer.

func (*Writer) Close

func (wrt *Writer) Close() error

Close calls Flush and closes any file that was created by the library. Close does not close io.Writers passed directly to NewWriter.

func (*Writer) DeferUntilClose added in v0.5.0

func (wrt *Writer) DeferUntilClose(thisFunc func() error)

func (*Writer) Flush

func (wrt *Writer) Flush() error

Flush flushes any of the Writer's bucket contents.

func (*Writer) Push

func (wrt *Writer) Push(event *Event) error

Serialize the given Event. Once this is performed, changes to the Event in memory are not reflected in the output stream.

func (*Writer) PushMetadata

func (wrt *Writer) PushMetadata(name string, data []byte) error

func (*Writer) SetCompression

func (wrt *Writer) SetCompression(comp Compression) error

Set compression type, for example to GZIP or UNCOMPRESSED. This can be called even after writing some events.

Directories

Path Synopsis
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL