ingester

package
v5.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2023 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyMergeLight added in v5.0.2

func ApplyMergeLight(doc models.Document, command UpdateCommand) models.Document

Types

type BulkIngestRequest

type BulkIngestRequest struct {
	UUID         string             `json:"uuid"`
	DocumentType string             `json:"documentType"`
	MergeConfig  []connector.Config `json:"merge"`
	Docs         []models.Document  `json:"docs"`
}

BulkIngestRequest wrap a collection of ingestion request (multiple documents with multiple mergeconfigs)

type BulkIngester

type BulkIngester struct {
	TypedIngesters map[string]*TypedIngester
	Cache          *ttlcache.Cache
}

BulkIngester is a component which split BulkIngestRequest and affect the resulting IngestRequests to dedicated TypedIngester As a chokepoint, it doesn't do much processing and only acts as a request router

func NewBulkIngester

func NewBulkIngester() *BulkIngester

NewBulkIngester returns a pointer to a new BulkIngester instance

func (*BulkIngester) Ingest

func (ingester *BulkIngester) Ingest(bir BulkIngestRequest) error

Ingest process a single BulkIngestRequest The BulkIngestRequest is splitted in multiple IngestRequest, then sent to a specific TypedIngester The target TypedIngester is selected, based on which document type must be updated

type GetQuery

type GetQuery struct {
	DocumentType string
	ID           string
}

GetQuery ...

type IndexingWorker

type IndexingWorker interface {
	Run()
	GetMetricWorkerQueueGauge() metrics.Gauge
	GetData() chan UpdateCommand
}

IndexingWorker is the unit of processing which can be started in parallel for elasticsearch ingestion

func NewIndexingWorker

func NewIndexingWorker(typedIngester *TypedIngester, id int) (IndexingWorker, error)

type IndexingWorkerV6 added in v5.1.0

type IndexingWorkerV6 struct {
	Uuid          uuid.UUID
	TypedIngester *TypedIngester
	ID            int
	Data          chan UpdateCommand
	Client        *goelasticsearch.Client
	// contains filtered or unexported fields
}

IndexingWorkerV6 is the unit of processing which can be started in parallel for elasticsearch ingestion

func NewIndexingWorkerV6 added in v5.1.0

func NewIndexingWorkerV6(typedIngester *TypedIngester, id int) *IndexingWorkerV6

NewIndexingWorkerV6 returns a new IndexingWorkerV6

func (*IndexingWorkerV6) GetData added in v5.1.0

func (worker *IndexingWorkerV6) GetData() chan UpdateCommand

func (*IndexingWorkerV6) GetMetricWorkerQueueGauge added in v5.1.0

func (worker *IndexingWorkerV6) GetMetricWorkerQueueGauge() metrics.Gauge

func (*IndexingWorkerV6) Run added in v5.1.0

func (worker *IndexingWorkerV6) Run()

Run start a worker

type IndexingWorkerV8 added in v5.1.0

type IndexingWorkerV8 struct {
	Uuid          uuid.UUID
	TypedIngester *TypedIngester
	ID            int
	Data          chan UpdateCommand
	// contains filtered or unexported fields
}

IndexingWorkerV8 is the unit of processing which can be started in parallel for elasticsearch ingestion

func NewIndexingWorkerV8 added in v5.1.0

func NewIndexingWorkerV8(typedIngester *TypedIngester, id int) *IndexingWorkerV8

NewIndexingWorkerV8 returns a new IndexingWorkerV8

func (*IndexingWorkerV8) GetData added in v5.1.0

func (worker *IndexingWorkerV8) GetData() chan UpdateCommand

func (*IndexingWorkerV8) GetMetricWorkerQueueGauge added in v5.1.0

func (worker *IndexingWorkerV8) GetMetricWorkerQueueGauge() metrics.Gauge

func (*IndexingWorkerV8) Run added in v5.1.0

func (worker *IndexingWorkerV8) Run()

Run start a worker

type IngestRequest

type IngestRequest struct {
	UUID         string           `json:"uuid"`
	BulkUUID     string           `json:"bulkUuid"`
	DocumentType string           `json:"documentType"`
	MergeConfig  connector.Config `json:"merge"`
	Doc          models.Document  `json:"docs"`
}

IngestRequest wrap a single ingestion request (one document with one mergeconfig)

type TypedIngester

type TypedIngester struct {
	Uuid uuid.UUID

	DocumentType string
	Data         chan *IngestRequest
	Workers      map[int]IndexingWorker
	// contains filtered or unexported fields
}

TypedIngester is a component which process IngestRequest It generates UpdateCommand which are processed by the attached IndexingWorker's

func NewTypedIngester

func NewTypedIngester(bulkIngester *BulkIngester, documentType string) *TypedIngester

NewTypedIngester returns a pointer to a new TypedIngester instance

func (*TypedIngester) Run

func (ingester *TypedIngester) Run()

Run is the main routine of a TypeIngester instance In case of Mode == SELF * The in-memory cache is filled with new informations * An update command is send to the dedicated indexer

In case of Mode == ENRICH_FROM (Which might be the same at last ?) * An update command is send to the dedicated indexer

In case of Mode == ENRICH_TO (Which might be the same at last ?) * A dedicated "relation cache" is queried to find all the object which must be updated * One or multiple update command are sent to the dedicated indexer

type UpdateCommand

type UpdateCommand struct {
	DocumentID   string           `json:"documentId"`
	DocumentType string           `json:"documentType"`
	NewDoc       models.Document  `json:"doc"`
	MergeConfig  connector.Config `json:"merge"`
	Index        string           `json:"index"`
}

UpdateCommand wrap all infos required to update a document in elasticsearch

func NewUpdateCommand

func NewUpdateCommand(index string, documentID string, documentType string, newDoc models.Document, mergeConfig connector.Config) UpdateCommand

NewUpdateCommand returns a new UpdateCommand

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL