Documentation ¶
Overview ¶
Package processing provides utilities for building pipelines to process and store FHIR resources.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrUploadFailures = errors.New("non-zero FHIR store upload errors")
ErrUploadFailures is returned (wrapped) when uploads to FHIR Store have failed. It is primarily used to detect this specific failure in tests.
var ErrWorkerError = fmt.Errorf("at least one upload worker encountered errors, check the logs for details")
ErrWorkerError indicates one or more workers had fatal errors.
var ErrorDoNotModifyProto = errors.New("the pipeline is in the Sink stage(s), so the returned proto should not be mutated")
ErrorDoNotModifyProto indicates that the pipeline is in a Sink stage(s) so the returned proto by ResourceWrapper.Proto() should not be mutated.
Functions ¶
This section is empty.
Types ¶
type BaseProcessor ¶
type BaseProcessor struct {
Output OutputFunction
}
BaseProcessor may be embedded into processor implementations to provide a no-op Finalize function and an implementation of SetSink. Structs which embed BaseProcessor may call .sink(...) to pass on processed resources.
func (*BaseProcessor) Finalize ¶
func (brp *BaseProcessor) Finalize(ctx context.Context) error
Finalize is Processor.Finalize. This implementation is a no-op.
func (*BaseProcessor) SetOutput ¶
func (brp *BaseProcessor) SetOutput(output OutputFunction)
SetOutput is Processor.SetOutput. This implementation saves the provided output function so that it can be called by a Process function.
type DocumentsProcessorConfig ¶
type DocumentsProcessorConfig struct { // Some APIs may require different authentication to be used for retrieving // documents vs the authentication used for the Bulk FHIR export process. Authenticator bulkfhir.Authenticator HTTPClient *http.Client LocalDirectory string GCSEndpoint, GCSBucket, GCSDirectory string }
DocumentsProcessorConfig contains the configuration needed for creating a Documents Processor.
type FHIRStoreSinkConfig ¶
type FHIRStoreSinkConfig struct { FHIRStoreConfig *fhirstore.Config NoFailOnUploadErrors bool // If true, the sink will write NDJSON files to GCS, and use the FHIR Store // import functionality to read those files into the FHIR Store. UseGCSUpload bool // Parameters for direct upload BatchUpload bool BatchSize int MaxWorkers int ErrorFileOutputPath string // Parameters for GCS-based upload GCSEndpoint string GCSBucket string GCSImportJobTimeout time.Duration GCSImportJobPeriod time.Duration TransactionTime *bulkfhir.TransactionTime }
FHIRStoreSinkConfig defines the configuration passed to NewFHIRStoreSink.
type OutputFunction ¶
type OutputFunction func(ctx context.Context, resource ResourceWrapper) error
OutputFunction is the signature of both Processor.Process and Sink.Write.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
A Pipeline consumes FHIR resources (as JSON), applies processing steps, and then writes the resources to zero or more sinks.
func NewPipeline ¶
NewPipeline constructs a new Pipeline, plumbing together the given Processors and Sinks. Both processors and sinks may be empty if no processing or output is required. Note that processors and sinks should not be shared between pipelines.
func (*Pipeline) Finalize ¶
Finalize calls finalize on all of the underlying Processors and Sinks in the pipeline, returning the first error seen.
func (*Pipeline) Process ¶
func (p *Pipeline) Process(ctx context.Context, resourceType cpb.ResourceTypeCode_Value, sourceURL string, json []byte) error
Process a single FHIR resource. The resource is passed through the processing steps to the sinks.
Pipelines do not apply any parallel processing. Resources pass through each processing step sequentially, and are written to each sink sequentially; this function returns only when the processor and sinks return. If a processor or sink needs to perform heavy lifting, it may use parallelism internally. An example could be a Sink that places work on an internal queue to handle concurrently and then returns immediately to not block subsquent pipeline processing. Such a Sink would ensure that all work on its internal queue is complete before returning in Finalize().
It is not safe to call this function from multiple Goroutines.
type Processor ¶
type Processor interface { // SetOutput sets where resources should be passed to after processing. SetOutput(output OutputFunction) // Process a resource as required. This should return an error if SetSink has // not yet been called. Process(ctx context.Context, resource ResourceWrapper) error // Finalize performs any final processing and cleanup. This is called after // all resources have been passed to Process(), and so may be used to flush // any buffered or batched resources. Finalize(ctx context.Context) error }
Processor defines a pipeline stage which may mutate resources before they are written.
Processors are assumed to not be thread-safe (i.e. it is unsafe to call Process from multiple goroutines). Because processors may be chained in a Pipeline, Processor implementations must call the sink function set with SetSink from exactly on goroutine - either the one from which Process is called, or a single goroutine created when the processor is created.
If a processor does create new goroutines, Finalize must not return until all of those goroutines have terminated, and the sink function will not be called again.
func NewBCDARectifyProcessor ¶
func NewBCDARectifyProcessor() Processor
NewBCDARectifyProcessor creates a Processor which takes BCDA derived FHIR resources, and attempts to rectify them to fix known issues in source mapping (in the ways described below). This is a temporary, non-ideal, and minimalist approach, which aims to make the FHIR compatible with base R4 expectations so that otherwise useful data can be easily uploaded to FHIR store with validation for other areas still intact.
func NewDocumentsProcessor ¶
func NewDocumentsProcessor(ctx context.Context, cfg *DocumentsProcessorConfig) (Processor, error)
NewDocumentsProcessor creates a Processor which downloads documents from the URLs found in DocumentReference resources, and replaces those URLs with URIs for the downloaded files.
type ResourceWrapper ¶
type ResourceWrapper interface { // Type returns the type of the resource, for easy filtering by processors. Type() cpb.ResourceTypeCode_Value // SourceURL returns the URL the resource was obtained from. SourceURL() string // Proto returns a proto which can be mutated by processors. If you call this in a Sink (where // proto mutations should not happen), this will return the proto and the ErrorDoNotModifyProto // error. Proto() (*rpb.ContainedResource, error) // JSON serialises the ContainedResource proto to FHIR JSON. The call to JSON() should be thread // safe. JSON() ([]byte, error) }
ResourceWrapper encapsulates resources to be processed and stored.
type Sink ¶
type Sink interface { // Write a resource to storage. Write(ctx context.Context, resource ResourceWrapper) error // Perform any final writing and cleanup. This is called after all resources // have been passed to Write(), and so may be used to flush any buffered or // batched resources. Finalize(ctx context.Context) error }
Sink represents a terminal pipeline stage which writes resources to storage.
Sinks are assumed to not be thread-safe (i.e. it is unsafe to call Write from multiple goroutines). Sinks may use parallelism and create goroutines internally; if so, Finalize must not return until all of those goroutines have terminated, and all resources have been written.
func NewFHIRStoreSink ¶
func NewFHIRStoreSink(ctx context.Context, cfg *FHIRStoreSinkConfig) (Sink, error)
NewFHIRStoreSink creates a new Sink which writes resources to FHIR Store, either directly or via GCS.
func NewGCSNDJSONSink ¶
NewGCSNDJSONSink returns a Sink which writes NDJSON files to GCS. See NewNDJSONSink for additional documentation.
func NewNDJSONSink ¶
NewNDJSONSink creates a new Sink which writes resources to NDJSON files in the given directory. Resources are grouped by the URL they were retrieved from, with their file name containing the resource type and an incremented index to distinguish them.
It is threadsafe to call Write on this Sink from multiple goroutines.