Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ApplyMerge ¶
func ApplyMerge(doc *models.Document, command *UpdateCommand, secondary []*models.Document) *models.Document
ApplyMerge execute a merge based on a specific UpdateCommand
Types ¶
type BulkIngestRequest ¶
type BulkIngestRequest struct { UUID string `json:"uuid"` DocumentType string `json:"documentType"` MergeConfig []*merge.Config `json:"merge"` Docs []*models.Document `json:"docs"` }
BulkIngestRequest wrap a collection of ingestion request (multiple documents with multiple mergeconfigs)
type BulkIngester ¶
type BulkIngester struct { EsExecutor *elasticsearch.EsExecutor TypedIngesters map[string]*TypedIngester Cache *ttlcache.Cache }
BulkIngester is a component which split BulkIngestRequest and affect the resulting IngestRequests to dedicated TypedIngester As a chokepoint, it doesn't do much processing and only acts as a request router
func NewBulkIngester ¶
func NewBulkIngester(esExecutor *elasticsearch.EsExecutor) *BulkIngester
NewBulkIngester returns a pointer to a new BulkIngester instance
func (*BulkIngester) Ingest ¶
func (ingester *BulkIngester) Ingest(bir BulkIngestRequest)
Ingest process a single BulkIngestRequest The BulkIngestRequest is splitted in multiple IngestRequest, then sent to a specific TypedIngester The target TypedIngester is selected, based on which document type must be updated
type IndexingWorker ¶
type IndexingWorker struct { TypedIngester *TypedIngester ID int Data chan *UpdateCommand Client *elasticsearch.EsExecutor }
IndexingWorker is the unit of processing which can be started in parallel for elasticsearch ingestion
func NewIndexingWorker ¶
func NewIndexingWorker(typedIngester *TypedIngester, id int) *IndexingWorker
NewIndexingWorker returns a new IndexingWorker
func (*IndexingWorker) BulkChainedUpdate ¶
func (worker *IndexingWorker) BulkChainedUpdate(documents [][]*UpdateCommand)
BulkChainedUpdate process multiple groups of UpdateCommand It execute sequentialy every single UpdateCommand on a specific "source" document, for each group of commands
type IngestRequest ¶
type IngestRequest struct { UUID string `json:"uuid"` BulkUUID string `json:"bulkUuid"` DocumentType string `json:"documentType"` MergeConfig *merge.Config `json:"merge"` Doc *models.Document `json:"docs"` }
IngestRequest wrap a single ingestion request (one document with one mergeconfig)
type TypedIngester ¶
type TypedIngester struct { DocumentType string Data chan *IngestRequest Workers map[int]*IndexingWorker // contains filtered or unexported fields }
TypedIngester is a component which process IngestRequest It generates UpdateCommand which are processed by the attached IndexingWorker's
func NewTypedIngester ¶
func NewTypedIngester(bulkIngester *BulkIngester, documentType string) *TypedIngester
NewTypedIngester returns a pointer to a new TypedIngester instance
func (*TypedIngester) Run ¶
func (ingester *TypedIngester) Run()
Run is the main routine of a TypeIngester instance In case of Mode == SELF * The in-memory cache is filled with new informations * An update command is send to the dedicated indexer
In case of Mode == ENRICH_FROM (Which might be the same at last ?) * An update command is send to the dedicated indexer
In case of Mode == ENRICH_TO (Which might be the same at last ?) * A dedicated "relation cache" is queried to find all the object which must be updated * One or multiple update command are sent to the dedicated indexer
type UpdateCommand ¶
type UpdateCommand struct { DocumentID string `json:"documentId"` DocumentType string `json:"documentType"` NewDoc *models.Document `json:"doc"` MergeConfig *merge.Config `json:"merge"` }
UpdateCommand wrap all infos required to update a document in elasticsearch
func NewUpdateCommand ¶
func NewUpdateCommand(documentID string, documentType string, newDoc *models.Document, mergeConfig *merge.Config) *UpdateCommand
NewUpdateCommand returns a new UpdateCommand