Documentation
¶
Index ¶
- Constants
- func DualProcessor(ctx context.Context, reqChan chan ElementLookup, load bool, loader LoadData, ...) chan ElementLookup
- func LookupBatcher(req chan ElementLookup, batchSize int, timeout time.Duration) chan []ElementLookup
- func RenderTraveler(traveler Traveler, template interface{}) interface{}
- func TravelerGetDoc(traveler Traveler, ns ...string) map[string]any
- func TravelerGetMarkDoc(traveler Traveler, ns string) map[string]any
- func TravelerPathExists(traveler Traveler, path string) bool
- func TravelerPathLookup(traveler Traveler, path string) interface{}
- func TravelerSetValue(traveler Traveler, path string, val interface{}) error
- type Aggregate
- type BaseTraveler
- func (t *BaseTraveler) AddCurrent(r DataRef) Traveler
- func (t *BaseTraveler) AddMark(label string, r DataRef) Traveler
- func (t *BaseTraveler) Copy() Traveler
- func (t BaseTraveler) GetAggregation() *Aggregate
- func (t *BaseTraveler) GetCount() uint32
- func (t *BaseTraveler) GetCurrent() DataRef
- func (t *BaseTraveler) GetCurrentID() string
- func (t *BaseTraveler) GetMark(label string) DataRef
- func (t *BaseTraveler) GetPath() []DataElementID
- func (t *BaseTraveler) GetRender() interface{}
- func (t *BaseTraveler) GetSelections() map[string]DataRef
- func (tr *BaseTraveler) GetSignal() Signal
- func (t *BaseTraveler) HasMark(label string) bool
- func (tr *BaseTraveler) IsNull() bool
- func (tr *BaseTraveler) IsSignal() bool
- func (t *BaseTraveler) ListMarks() []string
- func (t *BaseTraveler) UpdateMark(label string, r DataRef)
- type CompileOptions
- type Compiler
- type CustomProcGen
- type DataElement
- type DataElementID
- type DataRef
- type DataType
- type DeleteData
- type Deserialize
- type Edge
- type EdgeRef
- type ElementLookup
- type GraphDB
- type GraphElement
- type GraphInterface
- type InPipe
- type LoadData
- type Manager
- type MemQueue
- type OutPipe
- type Pipeline
- type PipelineExtension
- type PipelineState
- type Processor
- type Queue
- type RetrievedData
- type Signal
- type State
- type StatementCompiler
- type Traveler
- type Vertex
- type VertexRef
Constants ¶
const ( // StateCustom The Pipeline will be emitting custom data structures StateCustom = 0 // StateVertexList The Pipeline will be emitting a list of vertices StateVertexList = 1 // StateEdgeList The Pipeline will be emitting a list of edges StateEdgeList = 2 // StateRawVertexList The Pipeline will be emitting a list of all vertices, if there is an index // based filter, you can use skip listening and use that StateRawVertexList = 3 // StateRawEdgeList The Pipeline will be emitting a list of all edges, if there is an index // based filter, you can use skip listening and use that StateRawEdgeList = 4 )
These consts mark the type of a Pipeline traveler chan
Variables ¶
This section is empty.
Functions ¶
func DualProcessor ¶
func DualProcessor(ctx context.Context, reqChan chan ElementLookup, load bool, loader LoadData, deserializer Deserialize) chan ElementLookup
func LookupBatcher ¶
func LookupBatcher(req chan ElementLookup, batchSize int, timeout time.Duration) chan []ElementLookup
func RenderTraveler ¶
func RenderTraveler(traveler Traveler, template interface{}) interface{}
RenderTraveler takes a template and fills in the values using the data structure
func TravelerGetDoc ¶
GetDoc returns the document representing the traveler data
func TravelerGetMarkDoc ¶
TravelerGetMarkDoc returns the document representing the traveler data
func TravelerPathExists ¶
TravelerPathExists returns true if the field exists in the given Traveler
func TravelerPathLookup ¶
TravelerPathLookup gets the value of a field in the given Traveler
Example for a traveler containing:
{ "_current": {...}, "marks": { "gene": { "gid": 1, "label": "gene", "data": { "symbol": { "ensembl": "ENSG00000012048", "hgnc": 1100, "entrez": 672 } } } } } }
TravelerPathLookup(travler, "$gene.symbol.ensembl") returns "ENSG00000012048"
func TravelerSetValue ¶
TravelerSetValue(travler, "$gene.symbol.ensembl", "hi") inserts the value in the location"
Types ¶
type BaseTraveler ¶
type BaseTraveler struct { Current *DataElement Marks map[string]*DataElement Selections map[string]*DataElement Aggregation *Aggregate Count uint32 Render interface{} Path []DataElementID Signal *Signal }
Traveler is a query element that traverse the graph
func (*BaseTraveler) AddCurrent ¶
func (t *BaseTraveler) AddCurrent(r DataRef) Traveler
AddCurrent creates a new copy of the travel with new 'current' value
func (*BaseTraveler) AddMark ¶
func (t *BaseTraveler) AddMark(label string, r DataRef) Traveler
AddMark adds a result to travels state map using `label` as the name
func (*BaseTraveler) Copy ¶
func (t *BaseTraveler) Copy() Traveler
AddCurrent creates a new copy of the travel with new 'current' value
func (BaseTraveler) GetAggregation ¶
func (t BaseTraveler) GetAggregation() *Aggregate
func (*BaseTraveler) GetCount ¶
func (t *BaseTraveler) GetCount() uint32
func (*BaseTraveler) GetCurrent ¶
func (t *BaseTraveler) GetCurrent() DataRef
GetCurrent get current result value attached to the traveler
func (*BaseTraveler) GetCurrentID ¶
func (t *BaseTraveler) GetCurrentID() string
func (*BaseTraveler) GetMark ¶
func (t *BaseTraveler) GetMark(label string) DataRef
GetMark gets stored result in travels state using its label
func (*BaseTraveler) GetPath ¶
func (t *BaseTraveler) GetPath() []DataElementID
func (*BaseTraveler) GetRender ¶
func (t *BaseTraveler) GetRender() interface{}
func (*BaseTraveler) GetSelections ¶
func (t *BaseTraveler) GetSelections() map[string]DataRef
func (*BaseTraveler) GetSignal ¶
func (tr *BaseTraveler) GetSignal() Signal
func (*BaseTraveler) HasMark ¶
func (t *BaseTraveler) HasMark(label string) bool
HasMark checks to see if a results is stored in a travelers statemap
func (*BaseTraveler) IsNull ¶
func (tr *BaseTraveler) IsNull() bool
func (*BaseTraveler) IsSignal ¶
func (tr *BaseTraveler) IsSignal() bool
func (*BaseTraveler) ListMarks ¶
func (t *BaseTraveler) ListMarks() []string
ListMarks returns the list of marks in a travelers statemap
func (*BaseTraveler) UpdateMark ¶
func (t *BaseTraveler) UpdateMark(label string, r DataRef)
type CompileOptions ¶
type CompileOptions struct { //Compile pipeline extension Extends *PipelineExtension StoreMarks bool }
type Compiler ¶
type Compiler interface {
Compile(stmts []*gripql.GraphStatement, opts *CompileOptions) (Pipeline, error)
}
Compiler takes a gripql query and turns it into an executable pipeline
type CustomProcGen ¶
type CustomProcGen interface { GetType() DataType GetProcessor(db GraphInterface, ps PipelineState) (Processor, error) }
type DataElement ¶
type DataElement struct { ID string Label string From, To string Data map[string]interface{} Loaded bool }
DataElement is a single data element
func (*DataElement) Copy ¶
func (d *DataElement) Copy() DataRef
func (*DataElement) FromDict ¶
func (elem *DataElement) FromDict(d map[string]any)
func (*DataElement) Get ¶
func (d *DataElement) Get() *DataElement
func (*DataElement) ToDict ¶
func (elem *DataElement) ToDict() map[string]interface{}
ToDict converts data element to generic map
func (*DataElement) ToEdge ¶
func (elem *DataElement) ToEdge() *gripql.Edge
ToEdge converts data element to edge
func (*DataElement) ToVertex ¶
func (elem *DataElement) ToVertex() *gripql.Vertex
ToVertex converts data element to vertex
type DataElementID ¶
type DataRef ¶
type DataRef interface { Get() *DataElement Copy() DataRef }
DataRef is a handler interface above DataElement, that allows processing pipelines to avoid loading data data required for DataElement until it is actually needed
type DataType ¶
type DataType uint8
DataType is a possible output data type
const ( NoData DataType = iota VertexData EdgeData CountData AggregationData SelectionData RenderData PathData )
DataTypes
func DataTypeString ¶
DataTypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func DataTypeValues ¶
func DataTypeValues() []DataType
DataTypeValues returns all values of the enum
func (DataType) IsADataType ¶
IsADataType returns "true" if the value is listed in the enum definition. "false" otherwise
type DeleteData ¶
type Deserialize ¶
type Deserialize func(req ElementLookup, data interface{}) ElementLookup
type ElementLookup ¶
ElementLookup request to look up data
func (ElementLookup) IsSignal ¶
func (req ElementLookup) IsSignal() bool
type GraphDB ¶
type GraphDB interface { AddGraph(string) error DeleteGraph(string) error ListGraphs() []string Graph(graphID string) (GraphInterface, error) BuildSchema(ctx context.Context, graphID string, sampleN uint32, random bool) (*gripql.Graph, error) Close() error }
GraphDB is the base interface for graph databases
type GraphElement ¶
func NewGraphElement ¶
func NewGraphElement(g *gripql.GraphElement) *GraphElement
type GraphInterface ¶
type GraphInterface interface { Compiler() Compiler GetTimestamp() string GetVertex(key string, load bool) *Vertex GetEdge(key string, load bool) *Edge AddVertex(vertex []*Vertex) error AddEdge(edge []*Edge) error BulkAdd(<-chan *GraphElement) error BulkDel(*DeleteData) error DelVertex(key string) error DelEdge(key string) error VertexLabelScan(ctx context.Context, label string) chan string // EdgeLabelScan(ctx context.Context, label string) chan string ListVertexLabels() ([]string, error) ListEdgeLabels() ([]string, error) AddVertexIndex(label string, field string) error DeleteVertexIndex(label string, field string) error GetVertexIndexList() <-chan *gripql.IndexID GetVertexList(ctx context.Context, load bool) <-chan *Vertex GetEdgeList(ctx context.Context, load bool) <-chan *Edge GetVertexChannel(ctx context.Context, req chan ElementLookup, load bool) chan ElementLookup GetOutChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup GetInChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup GetOutEdgeChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup GetInEdgeChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup }
GraphInterface is the base Graph data storage interface, the PipeEngine will be able to run queries on a data system backend that implements this interface
type LoadData ¶
type LoadData func(req ElementLookup, load bool) chan interface{}
type Manager ¶
type Manager interface { //Get handle to temporary KeyValue store driver GetTempKV() kvi.KVInterface GetTmpDir() string Cleanup() }
Manager is a resource manager that is passed to processors to allow them ] to make resource requests
type Pipeline ¶
type Pipeline interface { Graph() GraphInterface Processors() []Processor DataType() DataType MarkTypes() map[string]DataType }
Pipeline represents a set of processors
type PipelineExtension ¶
type PipelineState ¶
type Processor ¶
type Processor interface {
Process(ctx context.Context, man Manager, in InPipe, out OutPipe) context.Context
}
Processor is the interface for a step in the pipe engine
func StatementProcessor ¶
func StatementProcessor( sc StatementCompiler, gs *gripql.GraphStatement, db GraphInterface, ps *State) (Processor, error)
type RetrievedData ¶
type RetrievedData struct { Req ElementLookup Data interface{} }
type State ¶
type State struct { LastType DataType MarkTypes map[string]DataType Steps []string StepOutputs map[string][]string CurStep string }
func NewPipelineState ¶
func NewPipelineState(stmts []*gripql.GraphStatement, storeMarks bool) *State
func (*State) GetLastType ¶
func (*State) SetCurStatment ¶
func (*State) SetLastType ¶
func (*State) StepLoadData ¶
type StatementCompiler ¶
type StatementCompiler interface { V(gs *gripql.GraphStatement_V, ps *State) (Processor, error) E(gs *gripql.GraphStatement_E, ps *State) (Processor, error) In(gs *gripql.GraphStatement_In, ps *State) (Processor, error) Out(gs *gripql.GraphStatement_Out, ps *State) (Processor, error) InNull(gs *gripql.GraphStatement_InNull, ps *State) (Processor, error) OutNull(gs *gripql.GraphStatement_OutNull, ps *State) (Processor, error) Both(gs *gripql.GraphStatement_Both, ps *State) (Processor, error) InE(gs *gripql.GraphStatement_InE, ps *State) (Processor, error) InENull(gs *gripql.GraphStatement_InENull, ps *State) (Processor, error) OutE(gs *gripql.GraphStatement_OutE, ps *State) (Processor, error) OutENull(gs *gripql.GraphStatement_OutENull, ps *State) (Processor, error) BothE(gs *gripql.GraphStatement_BothE, ps *State) (Processor, error) Has(gs *gripql.GraphStatement_Has, ps *State) (Processor, error) HasLabel(gs *gripql.GraphStatement_HasLabel, ps *State) (Processor, error) HasKey(gs *gripql.GraphStatement_HasKey, ps *State) (Processor, error) HasID(gs *gripql.GraphStatement_HasId, ps *State) (Processor, error) Limit(gs *gripql.GraphStatement_Limit, ps *State) (Processor, error) Skip(gs *gripql.GraphStatement_Skip, ps *State) (Processor, error) Range(gs *gripql.GraphStatement_Range, ps *State) (Processor, error) Count(gs *gripql.GraphStatement_Count, ps *State) (Processor, error) Distinct(gs *gripql.GraphStatement_Distinct, ps *State) (Processor, error) As(gs *gripql.GraphStatement_As, ps *State) (Processor, error) Set(gs *gripql.GraphStatement_Set, ps *State) (Processor, error) Increment(gs *gripql.GraphStatement_Increment, ps *State) (Processor, error) Mark(gs *gripql.GraphStatement_Mark, ps *State) (Processor, error) Jump(gs *gripql.GraphStatement_Jump, ps *State) (Processor, error) Select(gs *gripql.GraphStatement_Select, ps *State) (Processor, error) Render(gs *gripql.GraphStatement_Render, ps *State) (Processor, error) Pivot(gs *gripql.GraphStatement_Pivot, ps *State) (Processor, error) Path(gs *gripql.GraphStatement_Path, ps *State) (Processor, error) Unwind(gs *gripql.GraphStatement_Unwind, ps *State) (Processor, error) Group(gs *gripql.GraphStatement_Group, ps *State) (Processor, error) ToType(gs *gripql.GraphStatement_Totype, ps *State) (Processor, error) Fields(gs *gripql.GraphStatement_Fields, ps *State) (Processor, error) Aggregate(gs *gripql.GraphStatement_Aggregate, ps *State) (Processor, error) Sort(gs *gripql.GraphStatement_Sort, ps *State) (Processor, error) Custom(gs *gripql.GraphStatement, ps *State) (Processor, error) }
type Traveler ¶
type Traveler interface { IsSignal() bool GetSignal() Signal IsNull() bool GetCurrent() DataRef GetCurrentID() string AddCurrent(r DataRef) Traveler Copy() Traveler HasMark(label string) bool GetMark(label string) DataRef // AddMark adds a new mark to the data and return a duplicated Traveler AddMark(label string, r DataRef) Traveler // UpdateMark changes the data of a mark in the original traveler (vs AddMark which changes a copy of the traveler) UpdateMark(label string, r DataRef) ListMarks() []string GetSelections() map[string]DataRef GetRender() interface{} GetPath() []DataElementID GetAggregation() *Aggregate GetCount() uint32 }
func SelectTravelerFields ¶
SelectTravelerFields returns a new copy of the traveler with only the selected fields