Versions in this module Expand all Collapse all v0 v0.1.7 May 20, 2024 v0.1.6 Nov 2, 2023 Changes in this version + var ErrUploadFailures = errors.New("non-zero FHIR store upload errors") + var ErrWorkerError = fmt.Errorf("at least one upload worker encountered errors, check the logs for details") + var ErrorDoNotModifyProto = errors.New(...) + type BaseProcessor struct + Output OutputFunction + func (brp *BaseProcessor) Finalize(ctx context.Context) error + func (brp *BaseProcessor) SetOutput(output OutputFunction) + type DocumentsProcessorConfig struct + Authenticator bulkfhir.Authenticator + GCSBucket string + GCSDirectory string + GCSEndpoint string + HTTPClient *http.Client + LocalDirectory string + type FHIRStoreSinkConfig struct + BatchSize int + BatchUpload bool + ErrorFileOutputPath string + FHIRStoreConfig *fhirstore.Config + GCSBucket string + GCSEndpoint string + GCSImportJobPeriod time.Duration + GCSImportJobTimeout time.Duration + MaxWorkers int + NoFailOnUploadErrors bool + TransactionTime *bulkfhir.TransactionTime + UseGCSUpload bool + type OutputFunction func(ctx context.Context, resource ResourceWrapper) error + type Pipeline struct + func NewPipeline(processors []Processor, sinks []Sink) (*Pipeline, error) + func (p *Pipeline) Finalize(ctx context.Context) error + func (p *Pipeline) Process(ctx context.Context, resourceType cpb.ResourceTypeCode_Value, sourceURL string, ...) error + type Processor interface + Finalize func(ctx context.Context) error + Process func(ctx context.Context, resource ResourceWrapper) error + SetOutput func(output OutputFunction) + func NewBCDARectifyProcessor() Processor + func NewDocumentsProcessor(ctx context.Context, cfg *DocumentsProcessorConfig) (Processor, error) + type ResourceWrapper interface + JSON func() ([]byte, error) + Proto func() (*rpb.ContainedResource, error) + SourceURL func() string + Type func() cpb.ResourceTypeCode_Value + type Sink interface + Finalize func(ctx context.Context) error + Write func(ctx context.Context, resource ResourceWrapper) error + func NewFHIRStoreSink(ctx context.Context, cfg *FHIRStoreSinkConfig) (Sink, error) + func NewGCSNDJSONSink(ctx context.Context, endpoint, bucket, directory string) (Sink, error) + func NewNDJSONSink(ctx context.Context, directory string) (Sink, error) + type TestSink struct + FinalizeCalled bool + WrittenResources []ResourceWrapper + func (ts *TestSink) Finalize(ctx context.Context) error + func (ts *TestSink) Write(ctx context.Context, resource ResourceWrapper) error