processing

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Overview

Package processing provides utilities for building pipelines to process and store FHIR resources.

Index

Constants

This section is empty.

Variables

View Source
var ErrUploadFailures = errors.New("non-zero FHIR store upload errors")

ErrUploadFailures is returned (wrapped) when uploads to FHIR Store have failed. It is primarily used to detect this specific failure in tests.

View Source
var ErrWorkerError = fmt.Errorf("at least one upload worker encountered errors, check the logs for details")

ErrWorkerError indicates one or more workers had fatal errors.

View Source
var ErrorDoNotModifyProto = errors.New("the pipeline is in the Sink stage(s), so the returned proto should not be mutated")

ErrorDoNotModifyProto indicates that the pipeline is in a Sink stage(s) so the returned proto by ResourceWrapper.Proto() should not be mutated.

Functions

This section is empty.

Types

type BaseProcessor

type BaseProcessor struct {
	Output OutputFunction
}

BaseProcessor may be embedded into processor implementations to provide a no-op Finalize function and an implementation of SetSink. Structs which embed BaseProcessor may call .sink(...) to pass on processed resources.

func (*BaseProcessor) Finalize

func (brp *BaseProcessor) Finalize(ctx context.Context) error

Finalize is Processor.Finalize. This implementation is a no-op.

func (*BaseProcessor) SetOutput

func (brp *BaseProcessor) SetOutput(output OutputFunction)

SetOutput is Processor.SetOutput. This implementation saves the provided output function so that it can be called by a Process function.

type DocumentsProcessorConfig

type DocumentsProcessorConfig struct {
	// Some APIs may require different authentication to be used for retrieving
	// documents vs the authentication used for the Bulk FHIR export process.
	Authenticator                        bulkfhir.Authenticator
	HTTPClient                           *http.Client
	LocalDirectory                       string
	GCSEndpoint, GCSBucket, GCSDirectory string
}

DocumentsProcessorConfig contains the configuration needed for creating a Documents Processor.

type FHIRStoreSinkConfig

type FHIRStoreSinkConfig struct {
	FHIRStoreConfig      *fhirstore.Config
	NoFailOnUploadErrors bool

	// If true, the sink will write NDJSON files to GCS, and use the FHIR Store
	// import functionality to read those files into the FHIR Store.
	UseGCSUpload bool

	// Parameters for direct upload
	BatchUpload         bool
	BatchSize           int
	MaxWorkers          int
	ErrorFileOutputPath string

	// Parameters for GCS-based upload
	GCSEndpoint         string
	GCSBucket           string
	GCSImportJobTimeout time.Duration
	GCSImportJobPeriod  time.Duration
	TransactionTime     *bulkfhir.TransactionTime
}

FHIRStoreSinkConfig defines the configuration passed to NewFHIRStoreSink.

type OutputFunction

type OutputFunction func(ctx context.Context, resource ResourceWrapper) error

OutputFunction is the signature of both Processor.Process and Sink.Write.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

A Pipeline consumes FHIR resources (as JSON), applies processing steps, and then writes the resources to zero or more sinks.

func NewPipeline

func NewPipeline(processors []Processor, sinks []Sink) (*Pipeline, error)

NewPipeline constructs a new Pipeline, plumbing together the given Processors and Sinks. Both processors and sinks may be empty if no processing or output is required. Note that processors and sinks should not be shared between pipelines.

func (*Pipeline) Finalize

func (p *Pipeline) Finalize(ctx context.Context) error

Finalize calls finalize on all of the underlying Processors and Sinks in the pipeline, returning the first error seen.

func (*Pipeline) Process

func (p *Pipeline) Process(ctx context.Context, resourceType cpb.ResourceTypeCode_Value, sourceURL string, json []byte) error

Process a single FHIR resource. The resource is passed through the processing steps to the sinks.

Pipelines do not apply any parallel processing. Resources pass through each processing step sequentially, and are written to each sink sequentially; this function returns only when the processor and sinks return. If a processor or sink needs to perform heavy lifting, it may use parallelism internally. An example could be a Sink that places work on an internal queue to handle concurrently and then returns immediately to not block subsquent pipeline processing. Such a Sink would ensure that all work on its internal queue is complete before returning in Finalize().

It is not safe to call this function from multiple Goroutines.

type Processor

type Processor interface {
	// SetOutput sets where resources should be passed to after processing.
	SetOutput(output OutputFunction)
	// Process a resource as required. This should return an error if SetSink has
	// not yet been called.
	Process(ctx context.Context, resource ResourceWrapper) error
	// Finalize performs any final processing and cleanup. This is called after
	// all resources have been passed to Process(), and so may be used to flush
	// any buffered or batched resources.
	Finalize(ctx context.Context) error
}

Processor defines a pipeline stage which may mutate resources before they are written.

Processors are assumed to not be thread-safe (i.e. it is unsafe to call Process from multiple goroutines). Because processors may be chained in a Pipeline, Processor implementations must call the sink function set with SetSink from exactly on goroutine - either the one from which Process is called, or a single goroutine created when the processor is created.

If a processor does create new goroutines, Finalize must not return until all of those goroutines have terminated, and the sink function will not be called again.

func NewBCDARectifyProcessor

func NewBCDARectifyProcessor() Processor

NewBCDARectifyProcessor creates a Processor which takes BCDA derived FHIR resources, and attempts to rectify them to fix known issues in source mapping (in the ways described below). This is a temporary, non-ideal, and minimalist approach, which aims to make the FHIR compatible with base R4 expectations so that otherwise useful data can be easily uploaded to FHIR store with validation for other areas still intact.

func NewDocumentsProcessor

func NewDocumentsProcessor(ctx context.Context, cfg *DocumentsProcessorConfig) (Processor, error)

NewDocumentsProcessor creates a Processor which downloads documents from the URLs found in DocumentReference resources, and replaces those URLs with URIs for the downloaded files.

type ResourceWrapper

type ResourceWrapper interface {
	// Type returns the type of the resource, for easy filtering by processors.
	Type() cpb.ResourceTypeCode_Value
	// SourceURL returns the URL the resource was obtained from.
	SourceURL() string
	// Proto returns a proto which can be mutated by processors. If you call this in a Sink (where
	// proto mutations should not happen), this will return the proto and the ErrorDoNotModifyProto
	// error.
	Proto() (*rpb.ContainedResource, error)
	// JSON serialises the ContainedResource proto to FHIR JSON. The call to JSON() should be thread
	// safe.
	JSON() ([]byte, error)
}

ResourceWrapper encapsulates resources to be processed and stored.

type Sink

type Sink interface {
	// Write a resource to storage.
	Write(ctx context.Context, resource ResourceWrapper) error
	// Perform any final writing and cleanup. This is called after all resources
	// have been passed to Write(), and so may be used to flush any buffered or
	// batched resources.
	Finalize(ctx context.Context) error
}

Sink represents a terminal pipeline stage which writes resources to storage.

Sinks are assumed to not be thread-safe (i.e. it is unsafe to call Write from multiple goroutines). Sinks may use parallelism and create goroutines internally; if so, Finalize must not return until all of those goroutines have terminated, and all resources have been written.

func NewFHIRStoreSink

func NewFHIRStoreSink(ctx context.Context, cfg *FHIRStoreSinkConfig) (Sink, error)

NewFHIRStoreSink creates a new Sink which writes resources to FHIR Store, either directly or via GCS.

func NewGCSNDJSONSink

func NewGCSNDJSONSink(ctx context.Context, endpoint, bucket, directory string) (Sink, error)

NewGCSNDJSONSink returns a Sink which writes NDJSON files to GCS. See NewNDJSONSink for additional documentation.

func NewNDJSONSink

func NewNDJSONSink(ctx context.Context, directory string) (Sink, error)

NewNDJSONSink creates a new Sink which writes resources to NDJSON files in the given directory. Resources are grouped by the URL they were retrieved from, with their file name containing the resource type and an incremented index to distinguish them.

It is threadsafe to call Write on this Sink from multiple goroutines.

type TestSink

type TestSink struct {
	WrittenResources []ResourceWrapper
	FinalizeCalled   bool
}

TestSink can be used for testing processors by capturing processed resources.

func (*TestSink) Finalize

func (ts *TestSink) Finalize(ctx context.Context) error

Finalize is Sink.Finalize

func (*TestSink) Write

func (ts *TestSink) Write(ctx context.Context, resource ResourceWrapper) error

Write is Sink.Write

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL