Documentation
¶
Index ¶
- Variables
- func BatchToUpsert(batch ItemNodes) *api.Request
- func InitConfig(cfgFile string)
- func InitConfigFunc(cfgFile string) func()
- func MessageToItem(msg *nats.Msg) (*sdp.Item, error)
- func NewDGraphClient(hostname string, port int, connectTimeout time.Duration) (*dgo.Dgraph, error)
- func NewNATSConnection(urls []string, retries int, sleep int, timeout time.Duration) *nats.Conn
- func SetConfigDefaults()
- func SetupSchemas(dg *dgo.Dgraph) error
- type DGraph
- type Ingestor
- type ItemInsertion
- type ItemNode
- type ItemNodes
- type Queries
- type Query
- type UpsertResult
- type Variable
Constants ¶
This section is empty.
Variables ¶
var RetryError = regexp.MustCompile(`(?i)Please retry`)
RetryError A Regex that matches a healthy transaction aborted error in dgraph. It is part of normal operation for a transaction to be aborted and need to be retried due to the fact that transactions don't hold locks in dgraph. If we get an error matching this regex them it means there wasn't anything wrong with the request, it just wants us t try again later
var Schema = `` /* 1412-byte string literal not displayed */
Schema Stores the overall schema. I'm sure this is a bad way to do thinks from a future compatability perspective...
Functions ¶
func BatchToUpsert ¶
BatchToUpsert converts a set of ItemNodes to an Upsert request
func InitConfig ¶
func InitConfig(cfgFile string)
InitConfig reads in config file and ENV variables if set and initialises all other config
func InitConfigFunc ¶
func InitConfigFunc(cfgFile string) func()
InitConfigFunc Returns a function that calls InitConfig(cfgFile)
func MessageToItem ¶
func MessageToItem(msg *nats.Msg) (*sdp.Item, error)
MessageToItem Converts a NATS message to an SDP Item
func NewDGraphClient ¶
NewDGraphClient Create a dgraph client connection
func NewNATSConnection ¶
NewNATSConnection connects to a given NATS URL, it also support retries. Servers should be supplied as a slice of URLs e.g.
link.NewNATSConnection([]string{"nats://127.0.0.1:1222", "nats://127.0.0.1:1223"}, 5, 5)
func SetConfigDefaults ¶
func SetConfigDefaults()
SetConfigDefaults Registers default values for config with Viper. This should always be called at some point before trying to do anything with this library
func SetupSchemas ¶
func SetupSchemas(dg *dgo.Dgraph) error
SetupSchemas Will create the schemas required for ingest to work. This will need to be run before anything can actually be inserted into the database
Types ¶
type DGraph ¶
type DGraph struct {
Conn *dgo.Dgraph
}
DGraph Stores details about the DGraph connection
type Ingestor ¶
type Ingestor struct { BatchSize int // The number of items to batch before inserting MaxWait time.Duration // Max amount of time to wait before inserting Dgraph *dgo.Dgraph // The DGraph connection to use DebugChannel chan UpsertResult IngestRetries int // contains filtered or unexported fields }
Ingestor is capable of ingesting items into the database
func (*Ingestor) AsyncHandle ¶
func (i *Ingestor) AsyncHandle(msg *nats.Msg)
AsyncHandle Creates a NATS message handler that upserts items into the given database
func (*Ingestor) EnsureItemChannel ¶
func (i *Ingestor) EnsureItemChannel()
EnsureItemChannel Ensures that the item channel exists
func (*Ingestor) ProcessBatches ¶
ProcessBatches will start inserting items into the database in batches. This will block forever
func (*Ingestor) RetryUpsert ¶
func (i *Ingestor) RetryUpsert(insertions []ItemInsertion)
RetryUpsert Will do something about retrying upserts. Maybe put the back in the queue using a TTL, maybe just sleep and retry... TODO: Decide on the retry functionality
type ItemInsertion ¶
ItemInsertion Represents an item to be inserted, it includes an item and the TTL. The TTL will be reduced each time it is retried
type ItemNode ¶
type ItemNode struct { Type string `json:"Type,omitempty"` UniqueAttribute string `json:"UniqueAttribute,omitempty"` Context string `json:"Context,omitempty"` Attributes string `json:"Attributes,omitempty"` UniqueAttributeValue string `json:"UniqueAttributeValue,omitempty"` GloballyUniqueName string `json:"GloballyUniqueName,omitempty"` Hash string `json:"Hash,omitempty"` Metadata *sdp.Metadata `json:"-"` LinkedItems ItemNodes `json:"-"` }
ItemNode Represents an item, it also is able to return a full list of mutations
## Attributes Predicate
Currently attributes are stored as a JSON string. This has made the database queries very easy but will likely cause performance issues in future. This is due to the fact that dgraph does predicate based sharding i.e. data is sharded by predicate and not by UID. This means that all values of "attributes" (which will represent the vast majority of tha database) will be stored in the same shard as it's all in the one predicate. Initially I had tried storing each attribute as its won predicate with links between them. This would be much better from a sharding perspective, but would make life much harder in a number of ways, as detailed below.
### Predicate Expansion
When querying we would need to expand the predicates which would mean that we would either need to know the predicates in advance (difficult due to the fact that attributes are arbitrary) or dynamically generate a named type in dgraph for each and store this in the schema.
### Orphaned Nodes
When updating data we could very easily orphan nodes since we are creating nodes arbitrarily with arbitrary relationships. Think of a kubernetes pod with many statuses, each of these will require a node and once the pod is deleted they would need to be deleted too. Also if a nested hash changed it would probably need to be re-created as opposed to updated since we don't know what makes it unique. This would mean that he old node would still exist but would now be an orphan. There would need to be some regular cleanup of these orphaned nodes probably
func ItemToItemNode ¶
ItemToItemNode converts an item to an ItemNode
func MessageToItemNode ¶
MessageToItemNode Converts a NATS message to a DGraph ItemNode
func (ItemNode) IsPlaceholder ¶
IsPlaceholder Returns true if the item is just a placeholder
func (ItemNode) MarshalJSON ¶
MarshalJSON Custom marshalling functionality that adds derived fields required for DGraph
func (*ItemNode) Queries ¶
Queries Returns the queries that should match specifically this item. It will also export the following variables:
- `{Hash}.item`: UID of this item
- `{Hash}.item.older`: UID of this item, if it is older than the supplied one
- `{Hash}.linkedItemsCount`: count() of the linked items
func (*ItemNode) UnmarshalJSON ¶
UnmarshalJSON Converts from JSON to ItemNode
type ItemNodes ¶
type ItemNodes []ItemNode
ItemNodes Represents a list of ItemNodes in dgraph
func (ItemNodes) Deduplicate ¶
Deduplicate Removes duplicate items, with clashes being resolved as follows:
- Newer items beat older items
- Complete items beat items that are only references/placeholders (i.e. those that do not have attributes and metadata)
func (ItemNodes) LinkedItems ¶
LinkedItems Returns the linked items as ItemNodes
type Queries ¶
type Queries []Query
Queries Is a list of dgraph queries
func (Queries) Deduplicate ¶
Deduplicate Removes duplicate queries