Documentation
¶
Overview ¶
Package bulkfhir helps manage communication and with bulk fhir APIs. At the moment, much of this package is still geared around the BCDA API, but is in the process of being generalized further.
Index ¶
- Variables
- func ResourceTypeCodeFromName(name string) (cpb.ResourceTypeCode_Value, error)
- func ResourceTypeCodeToName(val cpb.ResourceTypeCode_Value) (string, error)
- type Authenticator
- type BearerToken
- type BearerTokenAuthenticator
- type Client
- func (c *Client) Authenticate() error
- func (c *Client) AuthenticateIfNecessary() error
- func (c *Client) Close() error
- func (c *Client) GetData(bcdaURL string) (dataStream io.ReadCloser, err error)
- func (c *Client) JobStatus(jobStatusURL string) (st JobStatus, err error)
- func (c *Client) MonitorJobStatus(jobStatusURL string, checkPeriod, timeout time.Duration) <-chan *MonitorResult
- func (c *Client) StartBulkDataExport(types []cpb.ResourceTypeCode_Value, since time.Time, groupID string) (jobStatusURL string, err error)
- func (c *Client) StartBulkDataExportAll(types []cpb.ResourceTypeCode_Value, since time.Time) (jobStatusURL string, err error)
- type CredentialExchanger
- type HTTPBasicOAuthOptions
- type JWTKeyProvider
- type JWTOAuthOptions
- type JobStatus
- type MonitorResult
- type TransactionTime
- type TransactionTimeStore
Constants ¶
This section is empty.
Variables ¶
var ( // ErrorUnimplemented indicates that this method is currently unimplemented. ErrorUnimplemented = errors.New("method not implemented yet") // unauthorized. While authenticators should renew credentials automatically // if required, time-of-check-to-time-of-use may mean that this error is still // the result of expired credentials. Clients should consider retrying the // operation if needed. ErrorUnauthorized = errors.New("server indicates this client is unauthorized") // ErrorTimeout indicates the operation timed out. ErrorTimeout = errors.New("this operation timed out") // ErrorExportJobNotFound indicates that the Job URL returned a 404 status. ErrorExportJobNotFound = errors.New("job URL returned 404 not found") // ErrorUnexpectedStatusCode indicates an unexpected status code was present. ErrorUnexpectedStatusCode = errors.New("unexpected non-ok HTTP status code") // ErrorGreaterThanOneContentLocation indicates more than 1 Content-Location header was present. ErrorGreaterThanOneContentLocation = errors.New("greater than 1 Content-Location header") // ErrorUnexpectedNumberOfXProgress indicated unexpected number of X-Progress headers present. ErrorUnexpectedNumberOfXProgress = errors.New("unexpected number of x-progress headers") // ErrorRetryableHTTPStatus may be wrapped into other errors emitted by this package // to indicate to the caller that a retryable http error code was returned // from the server. // TODO(b/239596656): consider adding auto-retry logic within this package. ErrorRetryableHTTPStatus = errors.New("this is a retryable but unexpected HTTP status code error") )
var ErrUnsetTransactionTime = errors.New("TransactionTime.Set has not been called")
ErrUnsetTransactionTime is returned from TransactionTime.Get if it is called before TransactionTime.Set is called.
var ExportGroupAll = "all"
ExportGroupAll is a default group ID of "all" which can be supplied to StartBulkDataExport. Depending on your FHIR server, the all patients group ID may differ, so be sure to consult relevant documentation.
Functions ¶
func ResourceTypeCodeFromName ¶
func ResourceTypeCodeFromName(name string) (cpb.ResourceTypeCode_Value, error)
ResourceTypeCodeFromName returns the ResourceTypeCode for the given enum.
func ResourceTypeCodeToName ¶
func ResourceTypeCodeToName(val cpb.ResourceTypeCode_Value) (string, error)
ResourceTypeCodeToName returns the FHIR resource name corresponding to the ResourceTypeCode.
Types ¶
type Authenticator ¶
type Authenticator interface { // Authenticate unconditionally performs any credential exchange required to // make requests. It is generally not necessary to call this method, as it // will be called automatically by AddAuthenticationToRequest if credentials // have not yet been exchanged or have expired. Authenticate(hc *http.Client) error // AuthenticateIfNecessary performs any credential exchange required to make // requests, if the credentials have expired or have not yet been exchanged. // This can be used if you need to track authentication errors, but does not // need to be called otherwise; authentication will be done automatically when // requests are made using AddAuthenticationToRequest. AuthenticateIfNecessary(hc *http.Client) error // Add authentication credentials to an outbound request. This may perform // additional requests to perform credential exchange if required by the // authentication mechanism, both before any initial request, and on // subsequent requests if any acquired credentials have expired. // // Implementations should call their own AuthenticateIfNecessary method if // credential exchange is necessary. AddAuthenticationToRequest(hc *http.Client, req *http.Request) error }
Authenticator defines a module used for obtaining authentication credentials and attaching them to outbound requests to the Bulk FHIR APIs.
func NewHTTPBasicOAuthAuthenticator ¶
func NewHTTPBasicOAuthAuthenticator(username, password, tokenURL string, opts *HTTPBasicOAuthOptions) (Authenticator, error)
NewHTTPBasicOAuthAuthenticator creates a new Authenticator which uses 2-legged OAuth with HTTP Basic authentication to obtain a bearer token. The username and password are typically a client ID and client secret (respectively) supplied by the Bulk FHIR Provider.
func NewJWTOAuthAuthenticator ¶
func NewJWTOAuthAuthenticator(issuer, subject, tokenURL string, keyProvider JWTKeyProvider, opts *JWTOAuthOptions) (Authenticator, error)
NewJWTOAuthAuthenticator creates a new Authenticator which uses 2-legged OAuth with JWT authentication (according to RFC9068) to obtain a bearer token.
type BearerToken ¶
BearerToken encapsulates a bearer token presented as an Authorization header.
type BearerTokenAuthenticator ¶
type BearerTokenAuthenticator struct { Exchanger CredentialExchanger // contains filtered or unexported fields }
BearerTokenAuthenticator is an implementation of Authenticator which uses a CredentialExchanger to obtain a bearer token which is presented in an Authorization header.
Note: this implementation is not thread safe.
func (*BearerTokenAuthenticator) AddAuthenticationToRequest ¶
func (bta *BearerTokenAuthenticator) AddAuthenticationToRequest(hc *http.Client, req *http.Request) error
AddAuthenticationToRequest is Authenticator.AddAuthenticationToRequest.
This Authenticator adds an access token as an Authorization: Bearer {token} header, automatically requesting/refreshing the token as necessary.
func (*BearerTokenAuthenticator) Authenticate ¶
func (bta *BearerTokenAuthenticator) Authenticate(hc *http.Client) error
Authenticate is Authenticator.Authenticate.
This Authenticator uses the CredentialExchanger it contains to obtain a bearer token.
func (*BearerTokenAuthenticator) AuthenticateIfNecessary ¶
func (bta *BearerTokenAuthenticator) AuthenticateIfNecessary(hc *http.Client) error
AuthenticateIfNecessary is Authenticator.AuthenticateIfNecessary.
This Authenticator uses the CredentialExchanger it contains to obtain a bearer token.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents a Bulk FHIR API client at some API version.
func NewClient ¶
func NewClient(baseURL string, authenticator Authenticator) (*Client, error)
NewClient creates and returns a new bulk fhir API Client for the input baseURL, using the given authenticator.
func (*Client) Authenticate ¶
Authenticate calls through to the Authenticator the client was built with to unconditionally perform credential exchange.
func (*Client) AuthenticateIfNecessary ¶
AuthenticateIfNecessary calls through to the Authenticator the client was built with to perform credential exchange if necessary.
func (*Client) Close ¶
Close is a placeholder for any cleanup actions needed for the Client. Please call this when finished with a Client.
func (*Client) GetData ¶
func (c *Client) GetData(bcdaURL string) (dataStream io.ReadCloser, err error)
GetData retrieves the NDJSON data result from the provided BCDA result url. The caller must close the dataStream io.ReadCloser when finished.
func (*Client) JobStatus ¶
JobStatus retrieves the current JobStatus via the bulk fhir API for the provided job status URL.
func (*Client) MonitorJobStatus ¶
func (c *Client) MonitorJobStatus(jobStatusURL string, checkPeriod, timeout time.Duration) <-chan *MonitorResult
MonitorJobStatus will asynchronously check the status of job at the provided checkPeriod until either the job completes or until the timeout. Each time the job status is checked, a MonitorResult will be emitted to the returned channel for the caller to consume. When the timeout is reached or the job is completed, the final completed JobStatus will be sent to the channel (or the ErrorTimeout error), and the channel will be closed. If an ErrorUnauthroized is encountered, MonitorJobStatus will attempt to reauthenticate and continue trying.
func (*Client) StartBulkDataExport ¶
func (c *Client) StartBulkDataExport(types []cpb.ResourceTypeCode_Value, since time.Time, groupID string) (jobStatusURL string, err error)
StartBulkDataExport starts a job via the bulk FHIR API to begin exporting the requested resource types since the provided timestamp for the provided group, and returns the URL to query the job status (from the response Content- Location header). StartBulkDataExportAll can be used if you wish to export all FHIR resources without a group ID.
func (*Client) StartBulkDataExportAll ¶
func (c *Client) StartBulkDataExportAll(types []cpb.ResourceTypeCode_Value, since time.Time) (jobStatusURL string, err error)
StartBulkDataExportAll starts a job via the bulk FHIR to begin exporting the requested resource types since the provided timestamp for all patients and returns the URL to query the job status.
type CredentialExchanger ¶
type CredentialExchanger interface {
Authenticate(hc *http.Client) (*BearerToken, error)
}
CredentialExchanger is used by bearerTokenAuthenticator to exchange long-lived credentials for a short lived bearer token.
type HTTPBasicOAuthOptions ¶
type HTTPBasicOAuthOptions struct { // OAuth scopes used when authenticating. Scopes []string // Whether the authenticator should always refresh if the authentication // server does not provide an "expires_in" duration in the response. The // default behaviour is to automatically authenticate upon first use (when // AuthenticateIfNecessary or AddAuthenticationToRequest is called), and then // to not authenticate again if no expiry time can be determined. // // Consider using DefaultExpiry instead to provide an expiry duration that is // used for determining the expiry time after each credential exchange. AlwaysAuthenticateIfNoExpiresIn bool // A default expiry duration to use if the authentication server does not // provide an "expires_in" duration in the response. DefaultExpiry time.Duration }
HTTPBasicOAuthOptions contains optional parameters used by NewHTTPBasicOAuthAuthenticator.
type JWTKeyProvider ¶
type JWTKeyProvider interface { Key() (*rsa.PrivateKey, error) KeyID() string }
A JWTKeyProvider provides the RSA private key used for signing JSON Web Tokens.
func NewPEMFileKeyProvider ¶
func NewPEMFileKeyProvider(filename, keyID string) JWTKeyProvider
NewPEMFileKeyProvider returns a JWTKeyProvider which reads a PEM-encoded key from the given file.
type JWTOAuthOptions ¶
type JWTOAuthOptions struct { // How long the generated JWT is valid for (according to its "exp" claim). // Defaults to 5 minutes if unset. JWTLifetime time.Duration // OAuth scopes used when authenticating. Scopes []string // Whether the authenticator should always refresh if the authentication // server does not provide an "expires_in" duration in the response. The // default behaviour is to automatically authenticate upon first use (when // AuthenticateIfNecessary or AddAuthenticationToRequest is called), and then // to not authenticate again if no expiry time can be determined. // // Consider using DefaultExpiry instead to provide an expiry duration that is // used for determining the expiry time after each credential exchange. AlwaysAuthenticateIfNoExpiresIn bool // A default expiry duration to use if the authentication server does not // provide an "expires_in" duration in the response. DefaultExpiry time.Duration }
JWTOAuthOptions contains optional parameters used by NewJWTOAuthAuthenticator.
type JobStatus ¶
type JobStatus struct { IsComplete bool PercentComplete int RetryAfter time.Duration // ResultURLs holds the final NDJSON URLs for the job by resource type (if the job is complete). ResultURLs map[cpb.ResourceTypeCode_Value][]string // Indicates the FHIR server time when the bulk data export was processed. TransactionTime time.Time }
JobStatus represents the current status of a bulk fhir export Job, returned from GetJobStatus.
type MonitorResult ¶
type MonitorResult struct { // Status holdes the JobStatus Status JobStatus // Error holds an error associated with this entry (if any) Error error }
MonitorResult holds either a JobStatus or an error.
type TransactionTime ¶
type TransactionTime struct {
// contains filtered or unexported fields
}
A TransactionTime holds the transaction time for a bulk FHIR export. It is used to allow constructing processing pipelines before the export operation is started; pipeline steps may hold a pointer to the TransactionTime, and call Get once they receive a resource to process or store (by which time the cache should have been populated).
func NewTransactionTime ¶
func NewTransactionTime() *TransactionTime
NewTransactionTime returns a new TransactionTime.
func (*TransactionTime) Get ¶
func (tt *TransactionTime) Get() (time.Time, error)
Get the timestamp from the cache. Returns an error if Set() has not yet been called.
func (*TransactionTime) Set ¶
func (tt *TransactionTime) Set(timestamp time.Time)
Set the timestamp in the cache.
type TransactionTimeStore ¶
type TransactionTimeStore interface { // Load a previously stored transaction time. If no transaction time has // previously been stored (i.e. if the program has never been successfully run // with the current configuration), this should return a zero time with no // error. Load(ctx context.Context) (time.Time, error) // Store() saves the given timestamp to persistent storage so that it can be // retrieved by Load() the next time the program is run. Store(ctx context.Context, ts time.Time) error }
TransactionTimeStore manages the transaction time of Bulk FHIR fetches. The transaction timestamp of a successful export is saved so that it can be used as the _since parameter for the subsequent export.
func NewGCSTransactionTimeStore ¶
func NewGCSTransactionTimeStore(ctx context.Context, gcsEndpoint, uri string) (TransactionTimeStore, error)
NewGCSTransactionTimeStore returns an implementation of TransactionTimeStore which persists the since timestamp to a file in GCS at the given URI. A new line is appended to the file on each run, so that the entire history of transaction times may be seen.
func NewInMemoryTransactionTimeStore ¶
func NewInMemoryTransactionTimeStore(timestamp string) (TransactionTimeStore, error)
NewInMemoryTransactionTimeStore returns an implementation of TransactionTimeStore which does not persist the since timestamp anywhere. It is initialised with a string timestamp, which may be blank.
func NewLocalFileTransactionTimeStore ¶
func NewLocalFileTransactionTimeStore(path string) TransactionTimeStore
NewLocalFileTransactionTimeStore returns an implementation of TransactionTimeStore which persists the since timestamp to a local file at the given path. A new line is appended to the file on each run, so that the entire history of transaction times may be seen.