Documentation ¶
Index ¶
- type Backup
- type BackupBuffer
- func (b *BackupBuffer) Add(unSavedDoc models.ESDocument, dataType postprocmodels.DataType)
- func (b *BackupBuffer) Clear(dataType postprocmodels.DataType)
- func (b *BackupBuffer) GetBackupIndexNames() (string, string)
- func (b *BackupBuffer) Load() ([]models.ESDocument, []models.ESDocument)
- func (b *BackupBuffer) Reset()
- func (b *BackupBuffer) SetIndexNames(eventIndex string, consumptionIndex string)
- type UploadBuffer
- type UploaderService
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Backup ¶
type Backup struct { EventDocuments []models.ESDocument ConsumptionDocuments []models.ESDocument EventIndexName string ConsumptionIndexName string }
type BackupBuffer ¶
type BackupBuffer struct {
// contains filtered or unexported fields
}
func NewBackupBuffer ¶
func NewBackupBuffer() *BackupBuffer
func (*BackupBuffer) Add ¶
func (b *BackupBuffer) Add(unSavedDoc models.ESDocument, dataType postprocmodels.DataType)
func (*BackupBuffer) Clear ¶
func (b *BackupBuffer) Clear(dataType postprocmodels.DataType)
Clear clears the documents of the given type from the backup buffer.
func (*BackupBuffer) GetBackupIndexNames ¶
func (b *BackupBuffer) GetBackupIndexNames() (string, string)
func (*BackupBuffer) Load ¶
func (b *BackupBuffer) Load() ([]models.ESDocument, []models.ESDocument)
func (*BackupBuffer) Reset ¶
func (b *BackupBuffer) Reset()
Reset resets the backup buffer and clears the backup file contents.
func (*BackupBuffer) SetIndexNames ¶
func (b *BackupBuffer) SetIndexNames(eventIndex string, consumptionIndex string)
type UploadBuffer ¶
type UploadBuffer struct {
// contains filtered or unexported fields
}
UploadBuffer stores data by index name until the datacount reaches a treshold, then uploads the contents, while implementing mutual exclosure.
func NewUploadBuffer ¶
func NewUploadBuffer( esClient elastic.EsClient, size int, eventIndexName string, consumptionIndexName string, indexRecreationTimeSpec string, ) *UploadBuffer
NewUploadBuffer initializes the buffer.
func (*UploadBuffer) AppendAndUploadIfNeeded ¶
func (d *UploadBuffer) AppendAndUploadIfNeeded(m models.ESDocument, dataType postprocmodels.DataType)
AppendAndUploadIfNeeded appends a message for the given key.
func (*UploadBuffer) GetCurrentMessages ¶
func (d *UploadBuffer) GetCurrentMessages(key string) []models.ESDocument
GetCurrentMessages returns the current messages for a given key.
func (*UploadBuffer) UploadRemaining ¶
func (d *UploadBuffer) UploadRemaining()
UploadRemaining uploads the data left in the buffer and clears the buffer.
type UploaderService ¶
type UploaderService struct {
// contains filtered or unexported fields
}
UploaderService encapsultes the data and logic of the uploading service.
func NewUploaderService ¶
func NewUploaderService( messageConsumer rabbitmq.MessageConsumer, esClient elastic.EsClient, eventIndexName string, consumptionIndexName string, indexRecreationTimeSpec string, ) *UploaderService
NewUploaderService creates a new uploader service instance. The indexRecreationTimeSpec is used to time the creation of new ES indexes, see the docs of github.com/robfig/cron/v3 for the syntax.
func (*UploaderService) HandleMessages ¶
func (service *UploaderService) HandleMessages()
HandleMessages consumes messages from rabbitMQ and uploads them to ES.