Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrChannelOverload = errors.New("channel overload") ErrDocumentTypeEmpty = errors.New("document type is empty") )
Functions ¶
func ApplyMergeLight ¶ added in v5.0.2
func ApplyMergeLight(doc models.Document, command UpdateCommand) models.Document
Types ¶
type BulkIngestRequest ¶
type BulkIngestRequest struct { UUID string `json:"uuid"` DocumentType string `json:"documentType"` MergeConfig []connector.Config `json:"merge"` Docs []models.Document `json:"docs"` }
BulkIngestRequest wrap a collection of ingestion request (multiple documents with multiple mergeconfigs)
type BulkIngester ¶
type BulkIngester struct { TypedIngesters map[string]*TypedIngester Cache *ttlcache.Cache }
BulkIngester is a component which split BulkIngestRequest and affect the resulting IngestRequests to dedicated TypedIngester As a chokepoint, it doesn't do much processing and only acts as a request router
func NewBulkIngester ¶
func NewBulkIngester() *BulkIngester
NewBulkIngester returns a pointer to a new BulkIngester instance
func (*BulkIngester) Ingest ¶
func (ingester *BulkIngester) Ingest(bir BulkIngestRequest) error
Ingest process a single BulkIngestRequest The BulkIngestRequest is splitted in multiple IngestRequest, then sent to a specific TypedIngester The target TypedIngester is selected, based on which document type must be updated
type IndexingWorker ¶
type IndexingWorker interface { Run() GetMetricWorkerQueueGauge() metrics.Gauge GetData() chan UpdateCommand }
IndexingWorker is the unit of processing which can be started in parallel for elasticsearch ingestion
func NewIndexingWorker ¶
func NewIndexingWorker(typedIngester *TypedIngester, id int) (IndexingWorker, error)
type IndexingWorkerV8 ¶ added in v5.1.0
type IndexingWorkerV8 struct { Uuid uuid.UUID TypedIngester *TypedIngester ID int Data chan UpdateCommand // contains filtered or unexported fields }
IndexingWorkerV8 is the unit of processing which can be started in parallel for elasticsearch ingestion
func NewIndexingWorkerV8 ¶ added in v5.1.0
func NewIndexingWorkerV8(typedIngester *TypedIngester, id, mgetBatchSize int) *IndexingWorkerV8
NewIndexingWorkerV8 returns a new IndexingWorkerV8
func (*IndexingWorkerV8) GetData ¶ added in v5.1.0
func (worker *IndexingWorkerV8) GetData() chan UpdateCommand
func (*IndexingWorkerV8) GetMetricWorkerQueueGauge ¶ added in v5.1.0
func (worker *IndexingWorkerV8) GetMetricWorkerQueueGauge() metrics.Gauge
func (*IndexingWorkerV8) Run ¶ added in v5.1.0
func (worker *IndexingWorkerV8) Run()
Run start a worker
type IngestRequest ¶
type IngestRequest struct { UUID string `json:"uuid"` BulkUUID string `json:"bulkUuid"` DocumentType string `json:"documentType"` MergeConfig connector.Config `json:"merge"` Doc models.Document `json:"docs"` }
IngestRequest wrap a single ingestion request (one document with one mergeconfig)
type TypedIngester ¶
type TypedIngester struct { Uuid uuid.UUID DocumentType string Data chan *IngestRequest Workers map[int]IndexingWorker // contains filtered or unexported fields }
TypedIngester is a component which process IngestRequest It generates UpdateCommand which are processed by the attached IndexingWorker's
func NewTypedIngester ¶
func NewTypedIngester(bulkIngester *BulkIngester, documentType string) *TypedIngester
NewTypedIngester returns a pointer to a new TypedIngester instance
func (*TypedIngester) Run ¶
func (ingester *TypedIngester) Run()
Run is the main routine of a TypeIngester instance In case of Mode == SELF * The in-memory cache is filled with new informations * An update command is send to the dedicated indexer
In case of Mode == ENRICH_FROM (Which might be the same at last ?) * An update command is send to the dedicated indexer
In case of Mode == ENRICH_TO (Which might be the same at last ?) * A dedicated "relation cache" is queried to find all the object which must be updated * One or multiple update command are sent to the dedicated indexer
type UpdateCommand ¶
type UpdateCommand struct { DocumentID string `json:"documentId"` DocumentType string `json:"documentType"` NewDoc models.Document `json:"doc"` MergeConfig connector.Config `json:"merge"` Index string `json:"index"` }
UpdateCommand wrap all infos required to update a document in elasticsearch
func NewUpdateCommand ¶
func NewUpdateCommand(index string, documentID string, documentType string, newDoc models.Document, mergeConfig connector.Config) UpdateCommand
NewUpdateCommand returns a new UpdateCommand