gdbi

package
v0.0.0-...-96023b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2025 License: MIT Imports: 14 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// StateCustom The Pipeline will be emitting custom data structures
	StateCustom = 0
	// StateVertexList The Pipeline will be emitting a list of vertices
	StateVertexList = 1
	// StateEdgeList The Pipeline will be emitting a list of edges
	StateEdgeList = 2
	// StateRawVertexList The Pipeline will be emitting a list of all vertices, if there is an index
	// based filter, you can use skip listening and use that
	StateRawVertexList = 3
	// StateRawEdgeList The Pipeline will be emitting a list of all edges, if there is an index
	// based filter, you can use skip listening and use that
	StateRawEdgeList = 4
)

These consts mark the type of a Pipeline traveler chan

Variables

This section is empty.

Functions

func DualProcessor

func DualProcessor(ctx context.Context, reqChan chan ElementLookup, load bool, loader LoadData, deserializer Deserialize) chan ElementLookup

func LookupBatcher

func LookupBatcher(req chan ElementLookup, batchSize int, timeout time.Duration) chan []ElementLookup

func RenderTraveler

func RenderTraveler(traveler Traveler, template interface{}) interface{}

RenderTraveler takes a template and fills in the values using the data structure

func TravelerGetDoc

func TravelerGetDoc(traveler Traveler, ns ...string) map[string]any

GetDoc returns the document representing the traveler data

func TravelerGetMarkDoc

func TravelerGetMarkDoc(traveler Traveler, ns string) map[string]any

TravelerGetMarkDoc returns the document representing the traveler data

func TravelerPathExists

func TravelerPathExists(traveler Traveler, path string) bool

TravelerPathExists returns true if the field exists in the given Traveler

func TravelerPathLookup

func TravelerPathLookup(traveler Traveler, path string) interface{}

TravelerPathLookup gets the value of a field in the given Traveler

Example for a traveler containing:

{
    "_current": {...},
    "marks": {
      "gene": {
        "gid": 1,
        "label": "gene",
        "data": {
          "symbol": {
            "ensembl": "ENSG00000012048",
            "hgnc": 1100,
            "entrez": 672
          }
        }
      }
    }
  }
}

TravelerPathLookup(travler, "$gene.symbol.ensembl") returns "ENSG00000012048"

func TravelerSetValue

func TravelerSetValue(traveler Traveler, path string, val interface{}) error

TravelerSetValue(travler, "$gene.symbol.ensembl", "hi") inserts the value in the location"

Types

type Aggregate

type Aggregate struct {
	Name  string
	Key   interface{}
	Value float64
}

type BaseTraveler

type BaseTraveler struct {
	Current     *DataElement
	Marks       map[string]*DataElement
	Selections  map[string]*DataElement
	Aggregation *Aggregate
	Count       uint32
	Render      interface{}
	Path        []DataElementID
	Signal      *Signal
}

Traveler is a query element that traverse the graph

func (*BaseTraveler) AddCurrent

func (t *BaseTraveler) AddCurrent(r DataRef) Traveler

AddCurrent creates a new copy of the travel with new 'current' value

func (*BaseTraveler) AddMark

func (t *BaseTraveler) AddMark(label string, r DataRef) Traveler

AddMark adds a result to travels state map using `label` as the name

func (*BaseTraveler) Copy

func (t *BaseTraveler) Copy() Traveler

AddCurrent creates a new copy of the travel with new 'current' value

func (BaseTraveler) GetAggregation

func (t BaseTraveler) GetAggregation() *Aggregate

func (*BaseTraveler) GetCount

func (t *BaseTraveler) GetCount() uint32

func (*BaseTraveler) GetCurrent

func (t *BaseTraveler) GetCurrent() DataRef

GetCurrent get current result value attached to the traveler

func (*BaseTraveler) GetCurrentID

func (t *BaseTraveler) GetCurrentID() string

func (*BaseTraveler) GetMark

func (t *BaseTraveler) GetMark(label string) DataRef

GetMark gets stored result in travels state using its label

func (*BaseTraveler) GetPath

func (t *BaseTraveler) GetPath() []DataElementID

func (*BaseTraveler) GetRender

func (t *BaseTraveler) GetRender() interface{}

func (*BaseTraveler) GetSelections

func (t *BaseTraveler) GetSelections() map[string]DataRef

func (*BaseTraveler) GetSignal

func (tr *BaseTraveler) GetSignal() Signal

func (*BaseTraveler) HasMark

func (t *BaseTraveler) HasMark(label string) bool

HasMark checks to see if a results is stored in a travelers statemap

func (*BaseTraveler) IsNull

func (tr *BaseTraveler) IsNull() bool

func (*BaseTraveler) IsSignal

func (tr *BaseTraveler) IsSignal() bool

func (*BaseTraveler) ListMarks

func (t *BaseTraveler) ListMarks() []string

ListMarks returns the list of marks in a travelers statemap

func (*BaseTraveler) UpdateMark

func (t *BaseTraveler) UpdateMark(label string, r DataRef)

type CompileOptions

type CompileOptions struct {
	//Compile pipeline extension
	Extends    *PipelineExtension
	StoreMarks bool
}

type Compiler

type Compiler interface {
	Compile(stmts []*gripql.GraphStatement, opts *CompileOptions) (Pipeline, error)
}

Compiler takes a gripql query and turns it into an executable pipeline

type CustomProcGen

type CustomProcGen interface {
	GetType() DataType
	GetProcessor(db GraphInterface, ps PipelineState) (Processor, error)
}

type DataElement

type DataElement struct {
	ID       string
	Label    string
	From, To string
	Data     map[string]interface{}
	Loaded   bool
}

DataElement is a single data element

func (*DataElement) Copy

func (d *DataElement) Copy() DataRef

func (*DataElement) FromDict

func (elem *DataElement) FromDict(d map[string]any)

func (*DataElement) Get

func (d *DataElement) Get() *DataElement

func (*DataElement) ToDict

func (elem *DataElement) ToDict() map[string]interface{}

ToDict converts data element to generic map

func (*DataElement) ToEdge

func (elem *DataElement) ToEdge() *gripql.Edge

ToEdge converts data element to edge

func (*DataElement) ToVertex

func (elem *DataElement) ToVertex() *gripql.Vertex

ToVertex converts data element to vertex

type DataElementID

type DataElementID struct {
	Vertex string
	Edge   string
}

type DataRef

type DataRef interface {
	Get() *DataElement
	Copy() DataRef
}

DataRef is a handler interface above DataElement, that allows processing pipelines to avoid loading data data required for DataElement until it is actually needed

type DataType

type DataType uint8

DataType is a possible output data type

const (
	NoData DataType = iota
	VertexData
	EdgeData
	CountData
	AggregationData
	SelectionData
	RenderData
	PathData
)

DataTypes

func DataTypeString

func DataTypeString(s string) (DataType, error)

DataTypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func DataTypeValues

func DataTypeValues() []DataType

DataTypeValues returns all values of the enum

func (DataType) IsADataType

func (i DataType) IsADataType() bool

IsADataType returns "true" if the value is listed in the enum definition. "false" otherwise

func (DataType) String

func (i DataType) String() string

type DeleteData

type DeleteData struct {
	Graph    string
	Vertices []string
	Edges    []string
}

type Deserialize

type Deserialize func(req ElementLookup, data interface{}) ElementLookup

type Edge

type Edge = DataElement

func NewElementFromEdge

func NewElementFromEdge(e *gripql.Edge) *Edge

type EdgeRef

type EdgeRef = DataRef

type ElementLookup

type ElementLookup struct {
	ID     string
	Ref    Traveler
	Vertex VertexRef
	Edge   EdgeRef
}

ElementLookup request to look up data

func (ElementLookup) IsSignal

func (req ElementLookup) IsSignal() bool

type GraphDB

type GraphDB interface {
	AddGraph(string) error
	DeleteGraph(string) error
	ListGraphs() []string
	Graph(graphID string) (GraphInterface, error)
	BuildSchema(ctx context.Context, graphID string, sampleN uint32, random bool) (*gripql.Graph, error)
	Close() error
}

GraphDB is the base interface for graph databases

type GraphElement

type GraphElement struct {
	Vertex *Vertex
	Edge   *Edge
	Graph  string
}

func NewGraphElement

func NewGraphElement(g *gripql.GraphElement) *GraphElement

type GraphInterface

type GraphInterface interface {
	Compiler() Compiler

	GetTimestamp() string

	GetVertex(key string, load bool) *Vertex
	GetEdge(key string, load bool) *Edge

	AddVertex(vertex []*Vertex) error
	AddEdge(edge []*Edge) error

	BulkAdd(<-chan *GraphElement) error
	BulkDel(*DeleteData) error

	DelVertex(key string) error
	DelEdge(key string) error

	VertexLabelScan(ctx context.Context, label string) chan string
	// EdgeLabelScan(ctx context.Context, label string) chan string
	ListVertexLabels() ([]string, error)
	ListEdgeLabels() ([]string, error)

	AddVertexIndex(label string, field string) error
	DeleteVertexIndex(label string, field string) error
	GetVertexIndexList() <-chan *gripql.IndexID

	GetVertexList(ctx context.Context, load bool) <-chan *Vertex
	GetEdgeList(ctx context.Context, load bool) <-chan *Edge

	GetVertexChannel(ctx context.Context, req chan ElementLookup, load bool) chan ElementLookup
	GetOutChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup
	GetInChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup
	GetOutEdgeChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup
	GetInEdgeChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup
}

GraphInterface is the base Graph data storage interface, the PipeEngine will be able to run queries on a data system backend that implements this interface

type InPipe

type InPipe <-chan Traveler

InPipe incoming traveler messages

type LoadData

type LoadData func(req ElementLookup, load bool) chan interface{}

type Manager

type Manager interface {
	//Get handle to temporary KeyValue store driver
	GetTempKV() kvi.KVInterface
	GetTmpDir() string
	Cleanup()
}

Manager is a resource manager that is passed to processors to allow them ] to make resource requests

type MemQueue

type MemQueue struct {
	// contains filtered or unexported fields
}

func (*MemQueue) GetInput

func (q *MemQueue) GetInput() chan Traveler

func (*MemQueue) GetOutput

func (q *MemQueue) GetOutput() chan Traveler

type OutPipe

type OutPipe chan<- Traveler

OutPipe collects output traveler messages

type Pipeline

type Pipeline interface {
	Graph() GraphInterface
	Processors() []Processor
	DataType() DataType
	MarkTypes() map[string]DataType
}

Pipeline represents a set of processors

type PipelineExtension

type PipelineExtension struct {
	StartType  DataType
	MarksTypes map[string]DataType
}

type PipelineState

type PipelineState interface {
	GetLastType() DataType
	SetLastType(DataType)
}

type Processor

type Processor interface {
	Process(ctx context.Context, man Manager, in InPipe, out OutPipe) context.Context
}

Processor is the interface for a step in the pipe engine

func StatementProcessor

func StatementProcessor(
	sc StatementCompiler,
	gs *gripql.GraphStatement,
	db GraphInterface,
	ps *State) (Processor, error)

type Queue

type Queue interface {
	GetInput() chan Traveler
	GetOutput() chan Traveler
}

func NewQueue

func NewQueue() Queue

type RetrievedData

type RetrievedData struct {
	Req  ElementLookup
	Data interface{}
}

type Signal

type Signal struct {
	Dest string
	ID   int
}

type State

type State struct {
	LastType    DataType
	MarkTypes   map[string]DataType
	Steps       []string
	StepOutputs map[string][]string
	CurStep     string
}

func NewPipelineState

func NewPipelineState(stmts []*gripql.GraphStatement, storeMarks bool) *State

func (*State) GetLastType

func (ps *State) GetLastType() DataType

func (*State) SetCurStatment

func (ps *State) SetCurStatment(a int)

func (*State) SetLastType

func (ps *State) SetLastType(a DataType)

func (*State) StepLoadData

func (ps *State) StepLoadData() bool

type StatementCompiler

type StatementCompiler interface {
	V(gs *gripql.GraphStatement_V, ps *State) (Processor, error)
	E(gs *gripql.GraphStatement_E, ps *State) (Processor, error)
	In(gs *gripql.GraphStatement_In, ps *State) (Processor, error)
	Out(gs *gripql.GraphStatement_Out, ps *State) (Processor, error)
	InNull(gs *gripql.GraphStatement_InNull, ps *State) (Processor, error)
	OutNull(gs *gripql.GraphStatement_OutNull, ps *State) (Processor, error)
	Both(gs *gripql.GraphStatement_Both, ps *State) (Processor, error)
	InE(gs *gripql.GraphStatement_InE, ps *State) (Processor, error)
	InENull(gs *gripql.GraphStatement_InENull, ps *State) (Processor, error)
	OutE(gs *gripql.GraphStatement_OutE, ps *State) (Processor, error)
	OutENull(gs *gripql.GraphStatement_OutENull, ps *State) (Processor, error)
	BothE(gs *gripql.GraphStatement_BothE, ps *State) (Processor, error)
	Has(gs *gripql.GraphStatement_Has, ps *State) (Processor, error)
	HasLabel(gs *gripql.GraphStatement_HasLabel, ps *State) (Processor, error)
	HasKey(gs *gripql.GraphStatement_HasKey, ps *State) (Processor, error)
	HasID(gs *gripql.GraphStatement_HasId, ps *State) (Processor, error)

	Limit(gs *gripql.GraphStatement_Limit, ps *State) (Processor, error)
	Skip(gs *gripql.GraphStatement_Skip, ps *State) (Processor, error)
	Range(gs *gripql.GraphStatement_Range, ps *State) (Processor, error)
	Count(gs *gripql.GraphStatement_Count, ps *State) (Processor, error)
	Distinct(gs *gripql.GraphStatement_Distinct, ps *State) (Processor, error)
	As(gs *gripql.GraphStatement_As, ps *State) (Processor, error)
	Set(gs *gripql.GraphStatement_Set, ps *State) (Processor, error)
	Increment(gs *gripql.GraphStatement_Increment, ps *State) (Processor, error)
	Mark(gs *gripql.GraphStatement_Mark, ps *State) (Processor, error)
	Jump(gs *gripql.GraphStatement_Jump, ps *State) (Processor, error)
	Select(gs *gripql.GraphStatement_Select, ps *State) (Processor, error)

	Render(gs *gripql.GraphStatement_Render, ps *State) (Processor, error)
	Pivot(gs *gripql.GraphStatement_Pivot, ps *State) (Processor, error)

	Path(gs *gripql.GraphStatement_Path, ps *State) (Processor, error)
	Unwind(gs *gripql.GraphStatement_Unwind, ps *State) (Processor, error)
	Group(gs *gripql.GraphStatement_Group, ps *State) (Processor, error)
	ToType(gs *gripql.GraphStatement_Totype, ps *State) (Processor, error)

	Fields(gs *gripql.GraphStatement_Fields, ps *State) (Processor, error)
	Aggregate(gs *gripql.GraphStatement_Aggregate, ps *State) (Processor, error)

	Sort(gs *gripql.GraphStatement_Sort, ps *State) (Processor, error)

	Custom(gs *gripql.GraphStatement, ps *State) (Processor, error)
}

type Traveler

type Traveler interface {
	IsSignal() bool
	GetSignal() Signal
	IsNull() bool
	GetCurrent() DataRef
	GetCurrentID() string
	AddCurrent(r DataRef) Traveler
	Copy() Traveler
	HasMark(label string) bool
	GetMark(label string) DataRef
	// AddMark adds a new mark to the data and return a duplicated Traveler
	AddMark(label string, r DataRef) Traveler
	// UpdateMark changes the data of a mark in the original traveler (vs AddMark which changes a copy of the traveler)
	UpdateMark(label string, r DataRef)
	ListMarks() []string
	GetSelections() map[string]DataRef
	GetRender() interface{}
	GetPath() []DataElementID
	GetAggregation() *Aggregate
	GetCount() uint32
}

func SelectTravelerFields

func SelectTravelerFields(t Traveler, keys ...string) Traveler

SelectTravelerFields returns a new copy of the traveler with only the selected fields

type Vertex

type Vertex = DataElement

func NewElementFromVertex

func NewElementFromVertex(v *gripql.Vertex) *Vertex

func (*Vertex) Validate

func (vertex *Vertex) Validate() error

Validate returns an error if the vertex is invalid

type VertexRef

type VertexRef = DataRef

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL