event

package
v1.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2021 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CreateFilterForAll = func(ctx context.Context, handler *ExportHandler, event *FilterSubmitted, isPublished bool) (*observation.DimensionFilters, error) {

	dimensions, err := handler.datasetAPICli.GetVersionDimensions(ctx,
		"",
		handler.serviceAuthToken,
		"",
		event.DatasetID, event.Edition, event.Version,
	)
	if err != nil {
		return nil, err
	}

	var dbFilterDimensions []*observation.Dimension

	for _, dim := range dimensions.Items {

		opts, err := handler.datasetAPICli.GetOptionsInBatches(ctx,
			"",
			handler.serviceAuthToken,
			"",
			event.DatasetID, event.Edition, event.Version, dim.Name,
			100,
			10,
		)
		if err != nil {
			return nil, err
		}

		var options []string
		for _, opt := range opts.Items {
			options = append(options, opt.Option)
		}

		dbFilterDimensions = append(dbFilterDimensions, &observation.Dimension{
			Name:    dim.Name,
			Options: options,
		})
	}

	dbFilter := &observation.DimensionFilters{
		Dimensions: dbFilterDimensions,
		Published:  &isPublished,
	}

	return dbFilter, nil
}
View Source
var SortFilter = func(ctx context.Context, handler *ExportHandler, event *FilterSubmitted, dbFilter *observation.DimensionFilters) {
	nofDimensions := len(dbFilter.Dimensions)
	if nofDimensions <= 1 {
		return
	}
	// Create a slice of sorted dimension sizes
	type dim struct {
		index         int
		dimensionSize int
	}

	dimSizes := make([]dim, 0, nofDimensions)
	var dimSizesMutex sync.Mutex

	// get info from mongo
	var getErrorCount int32
	var concurrent = 10 // limit number of go routines to not put too much on heap
	var semaphoreChan = make(chan struct{}, concurrent)
	var wg sync.WaitGroup // number of working goroutines

	for i, dimension := range dbFilter.Dimensions {
		if atomic.LoadInt32(&getErrorCount) != 0 {
			break
		}
		semaphoreChan <- struct{}{}

		wg.Add(1)

		go func(i int, dimension *observation.Dimension) {
			defer func() {
				<-semaphoreChan
			}()

			defer wg.Done()

			options, err := handler.datasetAPICli.GetOptions(ctx,
				"",
				handler.serviceAuthToken,
				"",
				event.DatasetID, event.Edition, event.Version, dimension.Name,
				&dataset.QueryParams{Offset: 0, Limit: 0})

			if err != nil {
				if atomic.AddInt32(&getErrorCount, 1) <= 2 {

					logData := log.Data{"dataset_id": event.DatasetID, "edition": event.Edition, "version": event.Version, "dimension name": dimension.Name}
					log.Info(ctx, "SortFilter: GetOptions failed for dataset and dimension", logData)
				}
			} else {
				d := dim{dimensionSize: options.TotalCount, index: i}
				dimSizesMutex.Lock()
				dimSizes = append(dimSizes, d)
				dimSizesMutex.Unlock()
			}
		}(i, dimension)
	}
	wg.Wait()

	if getErrorCount != 0 {
		logData := log.Data{"dataset_id": event.DatasetID, "edition": event.Edition, "version": event.Version}
		log.Info(ctx, fmt.Sprintf("SortFilter: GetOptions failed for dataset %d times, sorting by default of 'geography' first", getErrorCount), logData)

		dimSizes = dimSizes[:0]
		for i, dimension := range dbFilter.Dimensions {
			if strings.ToLower(dimension.Name) == "geography" {
				d := dim{dimensionSize: 999999, index: i}
				dimSizes = append(dimSizes, d)
			} else {

				d := dim{dimensionSize: nofDimensions - i, index: i}
				dimSizes = append(dimSizes, d)
			}
		}
	}

	sort.Slice(dimSizes, func(i, j int) bool {
		return dimSizes[i].dimensionSize < dimSizes[j].dimensionSize
	})

	sortedDimensions := make([]observation.Dimension, 0, nofDimensions)

	for i := nofDimensions - 1; i >= 0; i-- {
		sortedDimensions = append(sortedDimensions, *dbFilter.Dimensions[dimSizes[i].index])
	}

	for i, dimension := range sortedDimensions {
		*dbFilter.Dimensions[i] = dimension
	}
}

SortFilter by Dimension size. Largest first, to make Neptune searches faster The sort is done here because the sizes are retrieved from Mongo and its best not to have the dp-graph library acquiring such coupling to its caller.

Functions

This section is empty.

Types

type AvroProducer

type AvroProducer struct {
	// contains filtered or unexported fields
}

AvroProducer of output events.

func NewAvroProducer

func NewAvroProducer(messageProducer chan []byte, marshaller Marshaller) *AvroProducer

NewAvroProducer returns a new instance of AvroProducer.

func (*AvroProducer) CSVExported

func (producer *AvroProducer) CSVExported(event *CSVExported) error

CSVExported produces a new CSV exported event for the given filter output ID.

type CSVExported

type CSVExported struct {
	FilterID   string `avro:"filter_output_id"`
	FileURL    string `avro:"file_url"`
	InstanceID string `avro:"instance_id"`
	DatasetID  string `avro:"dataset_id"`
	Edition    string `avro:"edition"`
	Version    string `avro:"version"`
	Filename   string `avro:"filename"`
	RowCount   int32  `avro:"row_count"`
}

CSVExported provides event data for a single exported CSV

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer consumes event messages.

func NewConsumer

func NewConsumer(numWorkers int) *Consumer

NewConsumer returns a new consumer instance.

func (*Consumer) Close

func (consumer *Consumer) Close(ctx context.Context) (err error)

Close safely closes the consumer and releases all resources

func (*Consumer) Consume

func (consumer *Consumer) Consume(messageConsumer MessageConsumer, handler Handler, errorHandler errors.Handler)

Consume converts messages to event instances, and pass the event to the provided handler.

type DatasetAPI

type DatasetAPI interface {
	PutVersion(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, datasetID, edition, version string, m dataset.Version) error
	GetVersion(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceAuthToken, collectionID, datasetID, edition, version string) (m dataset.Version, err error)
	GetInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID, ifMatch string) (m dataset.Instance, eTag string, err error)
	GetMetadataURL(id, edition, version string) (url string)
	GetVersionMetadata(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version string) (m dataset.Metadata, err error)
	GetOptions(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version, dimension string, q *dataset.QueryParams) (m dataset.Options, err error)
	GetVersionDimensions(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version string) (m dataset.VersionDimensions, err error)
	GetOptionsInBatches(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version, dimension string, batchSize, maxWorkers int) (m dataset.Options, err error)
}

DatasetAPI contains functions to call the dataset API.

type ExportHandler

type ExportHandler struct {
	// contains filtered or unexported fields
}

ExportHandler handles a single CSV export of a filtered dataset.

func NewExportHandler

func NewExportHandler(
	filterStore FilterStore,
	observationStore ObservationStore,
	fileStore FileStore,
	eventProducer Producer,
	datasetAPI DatasetAPI,
	cfg *config.Config,
) *ExportHandler

NewExportHandler returns a new instance using the given dependencies.

func (*ExportHandler) Handle

func (handler *ExportHandler) Handle(ctx context.Context, event *FilterSubmitted) error

Handle the export of a single filter output.

type FileStore

type FileStore interface {
	PutFile(ctx context.Context, reader io.Reader, filename string, isPublished bool) (url string, err error)
}

FileStore provides storage for filtered output files.

type FilterStore

type FilterStore interface {
	GetFilter(ctx context.Context, filterID string) (*filter.Model, error)
	PutCSVData(ctx context.Context, filterID string, downloadItem filter.Download) error
	PutStateAsEmpty(ctx context.Context, filterJobID string) error
	PutStateAsError(ctx context.Context, filterJobID string) error
}

FilterStore provides existing filter data.

type FilterSubmitted

type FilterSubmitted struct {
	FilterID   string `avro:"filter_output_id"`
	InstanceID string `avro:"instance_id"`
	DatasetID  string `avro:"dataset_id"`
	Edition    string `avro:"edition"`
	Version    string `avro:"version"`
}

FilterSubmitted is the structure of each event consumed.

type Handler

type Handler interface {
	Handle(ctx context.Context, filterSubmittedEvent *FilterSubmitted) error
}

Handler represents a handler for processing a single event.

type Marshaller

type Marshaller interface {
	Marshal(s interface{}) ([]byte, error)
}

Marshaller marshals events into messages.

type MessageConsumer

type MessageConsumer interface {
	Channels() *kafka.ConsumerGroupChannels
}

MessageConsumer provides a generic interface for consuming []byte messages

type ObservationStore

type ObservationStore interface {
	StreamCSVRows(ctx context.Context, instanceID, filterID string, filters *observation.DimensionFilters, limit *int) (observation.StreamRowReader, error)
}

ObservationStore provides filtered observation data in CSV rows.

type Producer

type Producer interface {
	CSVExported(e *CSVExported) error
}

Producer handles producing output events.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL