Documentation ¶
Index ¶
- Variables
- func Consume(ctx context.Context, messageConsumer kafka.IConsumerGroup, handler Handler, ...)
- type AvroProducer
- type CSVExported
- type DatasetAPI
- type EventGenerator
- type ExportHandler
- type FileStore
- type FilterStore
- type FilterSubmitted
- type GenerateOutputCreatedEvent
- type Handler
- type KafkaProducer
- type Marshaller
- type ObservationStore
- type Producer
Constants ¶
This section is empty.
Variables ¶
var CreateFilterForAll = func(ctx context.Context, handler *ExportHandler, event *FilterSubmitted, isPublished bool) (*observation.DimensionFilters, error) { dimensions, err := handler.datasetAPICli.GetVersionDimensions(ctx, "", handler.serviceAuthToken, "", event.DatasetID, event.Edition, event.Version, ) if err != nil { return nil, err } var dbFilterDimensions []*observation.Dimension for _, dim := range dimensions.Items { opts, err := handler.datasetAPICli.GetOptionsInBatches(ctx, "", handler.serviceAuthToken, "", event.DatasetID, event.Edition, event.Version, dim.Name, 100, 10, ) if err != nil { return nil, err } var options []string for _, opt := range opts.Items { options = append(options, opt.Option) } dbFilterDimensions = append(dbFilterDimensions, &observation.Dimension{ Name: dim.Name, Options: options, }) } dbFilter := &observation.DimensionFilters{ Dimensions: dbFilterDimensions, Published: &isPublished, } return dbFilter, nil }
var SortFilter = func(ctx context.Context, handler *ExportHandler, event *FilterSubmitted, dbFilter *observation.DimensionFilters) { nofDimensions := len(dbFilter.Dimensions) if nofDimensions <= 1 { return } // Create a slice of sorted dimension sizes type dim struct { index int dimensionSize int } dimSizes := make([]dim, 0, nofDimensions) var dimSizesMutex sync.Mutex // get info from mongo var getErrorCount int32 var concurrent = 10 // limit number of go routines to not put too much on heap var semaphoreChan = make(chan struct{}, concurrent) var wg sync.WaitGroup // number of working goroutines for i, dimension := range dbFilter.Dimensions { if atomic.LoadInt32(&getErrorCount) != 0 { break } semaphoreChan <- struct{}{} wg.Add(1) go func(i int, dimension *observation.Dimension) { defer func() { <-semaphoreChan }() defer wg.Done() options, err := handler.datasetAPICli.GetOptions(ctx, "", handler.serviceAuthToken, "", event.DatasetID, event.Edition, event.Version, dimension.Name, &dataset.QueryParams{Offset: 0, Limit: 0}) if err != nil { if atomic.AddInt32(&getErrorCount, 1) <= 2 { logData := log.Data{"dataset_id": event.DatasetID, "edition": event.Edition, "version": event.Version, "dimension name": dimension.Name} log.Info(ctx, "SortFilter: GetOptions failed for dataset and dimension", logData) } } else { d := dim{dimensionSize: options.TotalCount, index: i} dimSizesMutex.Lock() dimSizes = append(dimSizes, d) dimSizesMutex.Unlock() } }(i, dimension) } wg.Wait() if getErrorCount != 0 { logData := log.Data{"dataset_id": event.DatasetID, "edition": event.Edition, "version": event.Version} log.Info(ctx, fmt.Sprintf("SortFilter: GetOptions failed for dataset %d times, sorting by default of 'geography' first", getErrorCount), logData) dimSizes = dimSizes[:0] for i, dimension := range dbFilter.Dimensions { if strings.ToLower(dimension.Name) == "geography" { d := dim{dimensionSize: 999999, index: i} dimSizes = append(dimSizes, d) } else { d := dim{dimensionSize: nofDimensions - i, index: i} dimSizes = append(dimSizes, d) } } } sort.Slice(dimSizes, func(i, j int) bool { return dimSizes[i].dimensionSize < dimSizes[j].dimensionSize }) sortedDimensions := make([]observation.Dimension, 0, nofDimensions) for i := nofDimensions - 1; i >= 0; i-- { sortedDimensions = append(sortedDimensions, *dbFilter.Dimensions[dimSizes[i].index]) } for i, dimension := range sortedDimensions { *dbFilter.Dimensions[i] = dimension } }
SortFilter by Dimension size. Largest first, to make Neptune searches faster The sort is done here because the sizes are retrieved from Mongo and its best not to have the dp-graph library acquiring such coupling to its caller.
Functions ¶
Types ¶
type AvroProducer ¶
type AvroProducer struct {
// contains filtered or unexported fields
}
AvroProducer of output events.
func NewAvroProducer ¶
func NewAvroProducer(messageProducer chan kafka.BytesMessage, marshaller Marshaller) *AvroProducer
NewAvroProducer returns a new instance of AvroProducer.
func (*AvroProducer) CSVExported ¶
func (producer *AvroProducer) CSVExported(ctx context.Context, event *CSVExported) error
CSVExported produces a new CSV exported event for the given filter output ID.
type CSVExported ¶
type CSVExported struct { FilterID string `avro:"filter_output_id"` FileURL string `avro:"file_url"` InstanceID string `avro:"instance_id"` DatasetID string `avro:"dataset_id"` Edition string `avro:"edition"` Version string `avro:"version"` Filename string `avro:"filename"` RowCount int32 `avro:"row_count"` }
CSVExported provides event data for a single exported CSV
type DatasetAPI ¶
type DatasetAPI interface { PutVersion(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, datasetID, edition, version string, m dataset.Version) error GetVersion(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceAuthToken, collectionID, datasetID, edition, version string) (m dataset.Version, err error) GetInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID, ifMatch string) (m dataset.Instance, eTag string, err error) GetMetadataURL(id, edition, version string) (url string) GetVersionMetadata(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version string) (m dataset.Metadata, err error) GetOptions(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version, dimension string, q *dataset.QueryParams) (m dataset.Options, err error) GetVersionDimensions(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version string) (m dataset.VersionDimensions, err error) GetOptionsInBatches(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version, dimension string, batchSize, maxWorkers int) (m dataset.Options, err error) }
DatasetAPI contains functions to call the dataset API.
type EventGenerator ¶ added in v1.25.0
type EventGenerator interface {
GenerateOutput(ctx context.Context, event *CSVExported) error
}
type ExportHandler ¶
type ExportHandler struct {
// contains filtered or unexported fields
}
func NewExportHandler ¶
func NewExportHandler( filterStore FilterStore, observationStore ObservationStore, fileStore FileStore, eventProducer Producer, datasetAPI DatasetAPI, cfg *config.Config, ) *ExportHandler
func (*ExportHandler) Handle ¶
func (handler *ExportHandler) Handle(ctx context.Context, cfg *config.Config, event *FilterSubmitted) (err error)
Handle takes a single event.
type FileStore ¶
type FileStore interface {
PutFile(ctx context.Context, reader io.Reader, filename string, isPublished bool) (url string, err error)
}
FileStore provides storage for filtered output files.
type FilterStore ¶
type FilterStore interface { GetFilter(ctx context.Context, filterID string) (*filter.Model, error) PutCSVData(ctx context.Context, filterID string, downloadItem filter.Download) error PutStateAsEmpty(ctx context.Context, filterJobID string) error PutStateAsError(ctx context.Context, filterJobID string) error }
FilterStore provides existing filter data.
type FilterSubmitted ¶
type FilterSubmitted struct { FilterID string `avro:"filter_output_id"` InstanceID string `avro:"instance_id"` DatasetID string `avro:"dataset_id"` Edition string `avro:"edition"` Version string `avro:"version"` }
FilterSubmitted is the structure of each event consumed.
type GenerateOutputCreatedEvent ¶ added in v1.25.0
type Handler ¶
type Handler interface {
Handle(ctx context.Context, cfg *config.Config, filterSubmitted *FilterSubmitted) error
}
Handler represents a handler for processing a single event.
type KafkaProducer ¶ added in v1.25.0
type KafkaProducer interface {
Output() chan kafka.BytesMessage
}
type Marshaller ¶
Marshaller marshals events into messages.
type ObservationStore ¶
type ObservationStore interface {
StreamCSVRows(ctx context.Context, instanceID, filterID string, filters *observation.DimensionFilters, limit *int) (observation.StreamRowReader, error)
}
ObservationStore provides filtered observation data in CSV rows.