uploader

package
v0.0.0-...-c6cd810 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2021 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Backup

type Backup struct {
	EventDocuments       []models.ESDocument
	ConsumptionDocuments []models.ESDocument

	EventIndexName       string
	ConsumptionIndexName string
}

func (*Backup) FromJSON

func (backup *Backup) FromJSON(bytes []byte)

func (*Backup) ToJSON

func (backup *Backup) ToJSON() []byte

type BackupBuffer

type BackupBuffer struct {
	// contains filtered or unexported fields
}

func NewBackupBuffer

func NewBackupBuffer() *BackupBuffer

func (*BackupBuffer) Add

func (b *BackupBuffer) Add(unSavedDoc models.ESDocument, dataType postprocmodels.DataType)

func (*BackupBuffer) Clear

func (b *BackupBuffer) Clear(dataType postprocmodels.DataType)

Clear clears the documents of the given type from the backup buffer.

func (*BackupBuffer) GetBackupIndexNames

func (b *BackupBuffer) GetBackupIndexNames() (string, string)

func (*BackupBuffer) Load

func (b *BackupBuffer) Load() ([]models.ESDocument, []models.ESDocument)

func (*BackupBuffer) Reset

func (b *BackupBuffer) Reset()

Reset resets the backup buffer and clears the backup file contents.

func (*BackupBuffer) SetIndexNames

func (b *BackupBuffer) SetIndexNames(eventIndex string, consumptionIndex string)

type UploadBuffer

type UploadBuffer struct {
	// contains filtered or unexported fields
}

UploadBuffer stores data by index name until the datacount reaches a treshold, then uploads the contents, while implementing mutual exclosure.

func NewUploadBuffer

func NewUploadBuffer(
	esClient elastic.EsClient,
	size int,
	eventIndexName string,
	consumptionIndexName string,
	indexRecreationTimeSpec string,
) *UploadBuffer

NewUploadBuffer initializes the buffer.

func (*UploadBuffer) AppendAndUploadIfNeeded

func (d *UploadBuffer) AppendAndUploadIfNeeded(m models.ESDocument, dataType postprocmodels.DataType)

AppendAndUploadIfNeeded appends a message for the given key.

func (*UploadBuffer) GetCurrentMessages

func (d *UploadBuffer) GetCurrentMessages(key string) []models.ESDocument

GetCurrentMessages returns the current messages for a given key.

func (*UploadBuffer) UploadRemaining

func (d *UploadBuffer) UploadRemaining()

UploadRemaining uploads the data left in the buffer and clears the buffer.

type UploaderService

type UploaderService struct {
	// contains filtered or unexported fields
}

UploaderService encapsultes the data and logic of the uploading service.

func NewUploaderService

func NewUploaderService(
	messageConsumer rabbitmq.MessageConsumer,
	esClient elastic.EsClient,
	eventIndexName string,
	consumptionIndexName string,
	indexRecreationTimeSpec string,
) *UploaderService

NewUploaderService creates a new uploader service instance. The indexRecreationTimeSpec is used to time the creation of new ES indexes, see the docs of github.com/robfig/cron/v3 for the syntax.

func (*UploaderService) HandleMessages

func (service *UploaderService) HandleMessages()

HandleMessages consumes messages from rabbitMQ and uploads them to ES.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL