Documentation
¶
Overview ¶
Implements importer triggering based on SNS queues. This decodes incoming SNS messages and extracts files ready for importer code to run
Exposes the interface of the dataset importer aka converter and selecting one automatically based on what files are in the folder being imported. The converter supports various formats as delivered by GDS or test instruments and this is inteded to be extendable further to other lab instruments and devices in future.
Example (DecodeImportTrigger_Manual) ¶
Trigger for a manual dataset regeneration (user clicks save button on dataset edit page)
Output: Source Bucket: "" Source file: "" Dataset: "189137412" Job: "dataimport-zmzddoytch2krd7n" Err: "<nil>"
Example (DecodeImportTrigger_ManualBadDatasetID) ¶
Output: Source Bucket: "" Source file: "" Dataset: "" Job: "" Err: "Failed to find dataset ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadLogID) ¶
Output: Source Bucket: "" Source file: "" Dataset: "" Job: "" Err: "Failed to find job ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadMsg) ¶
Output: Source Bucket: "" Source file: "" Dataset: "" Job: "" Err: "Unexpected or no message type embedded in triggering SNS message"
Example (DecodeImportTrigger_OCS) ¶
Trigger from when a new zip arrives from the pipeline
Output: Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj" Source file: "189137412-07-09-2022-10-07-57.zip" Dataset: "189137412" Log Str Len: "43" Err: "<nil>"
Example (DecodeImportTrigger_OCS2) ¶
Trigger from when a new zip arrives from the pipeline
Output: Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj" Source file: "197329413-25-09-2022-14-33-39.zip" Dataset: "197329413" Job Str Len: "43" Err: "<nil>"
Example (DecodeImportTrigger_OCS3) ¶
Trigger from when a new zip arrives from the pipeline but pipeline stores it in a subdir of the bucket
Output: Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj" Source file: "data/197329413-25-09-2022-14-33-39.zip" Dataset: "197329413" Job Str Len: "43" Err: "<nil>"
Example (DecodeImportTrigger_OCS_BadEventType) ¶
Output: Source Bucket: "" Source file: "" Dataset: "" Job: "" Err: "Failed to decode dataset import trigger: Failed to decode sqs body to an S3 event: unexpected end of JSON input"
Example (DecodeImportTrigger_OCS_Error) ¶
Output: Source Bucket: "" Source file: "" Dataset: "" Job: "" Err: "Unexpected or no message type embedded in triggering SNS message"
Example (GetUpdateType_Drive) ¶
Output: housekeeping|<nil>
Example (GetUpdateType_LessContextImages) ¶
Output: image|<nil>
Example (GetUpdateType_MoreContextImages) ¶
Output: image|<nil>
Example (GetUpdateType_NormalSpectra) ¶
Output: spectra|<nil>
Example (GetUpdateType_RTT) ¶
Output: unknown|<nil>
Example (GetUpdateType_SameContextImages) ¶
Output: unknown|<nil>
Example (GetUpdateType_Title) ¶
Output: housekeeping|<nil>
Example (ImportForTrigger_Manual_JPL) ¶
Import a breadboard dataset from manual uploaded zip file
Output: Errors: <nil>, changes: unknown, isUpdate: false Logged "Downloading archived zip files...": true Logged "Downloaded 0 zip files, unzipped 0 files": true Logged "No zip files found in archive, dataset may have been manually uploaded. Trying to download...": true Logged "Dataset test1234 downloaded 3 files from manual upload area": true Logged "Downloading pseudo-intensity ranges...": true Logged "Downloading user customisation files...": true Logged "Reading 1261 files from spectrum directory...": true Logged "Reading spectrum [1135/1260] 90%": true Logged "PMC 1261 has 4 MSA/spectrum entries": true Logged "WARNING: No main context image determined": true Logged "Diffraction db saved successfully": true Logged "Warning: No import.json found, defaults will be used": true Logged "No auto-share destination found, so only importing user will be able to access this dataset.": false <nil>|{"id":"test1234","title":"test1234","dataTypes":[{"dataType":"SD_XRF","count":2520}],"instrument":"JPL_BREADBOARD","instrumentConfig":"Breadboard","meta":{"DriveId":"0","RTT":"","SCLK":"0","Site":"","SiteId":"0","Sol":"","Target":"","TargetId":"0"},"contentCounts":{"BulkSpectra":2,"DwellSpectra":0,"MaxSpectra":2,"NormalSpectra":2520,"PseudoIntensities":0},"creatorUserId":"JPLImport"}
Example (ImportForTrigger_Manual_SBU) ¶
Import a breadboard dataset from manual uploaded zip file
Output: Errors: <nil>, changes: unknown, isUpdate: false Logged "Downloading archived zip files...": true Logged "Downloaded 0 zip files, unzipped 0 files": true Logged "No zip files found in archive, dataset may have been manually uploaded. Trying to download...": true Logged "Dataset test1234sbu downloaded 4 files from manual upload area": true Logged "Downloading pseudo-intensity ranges...": true Logged "Downloading user customisation files...": true Logged "Reading 1261 files from spectrum directory...": true Logged "Reading spectrum [1135/1260] 90%": true Logged "PMC 1261 has 4 MSA/spectrum entries": true Logged "WARNING: No main context image determined": true Logged "Diffraction db saved successfully": true Logged "Warning: No import.json found, defaults will be used": false Logged "No auto-share destination found, so only importing user will be able to access this dataset.": false <nil>|{"id":"test1234sbu","title":"test1234sbu","dataTypes":[{"dataType":"SD_XRF","count":2520}],"instrument":"SBU_BREADBOARD","instrumentConfig":"StonyBrookBreadboard","meta":{"DriveId":"0","RTT":"","SCLK":"0","Site":"","SiteId":"0","Sol":"","Target":"","TargetId":"0"},"contentCounts":{"BulkSpectra":2,"DwellSpectra":0,"MaxSpectra":2,"NormalSpectra":2520,"PseudoIntensities":0},"creatorUserId":"SBUImport"}
Example (ImportForTrigger_OCS_Archive_BadData) ¶
func startTestWithMockMongo(name string, t *testing.T, testFunc func(mt *mtest.T)) { mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) defer mt.Close() mt.Run(name, testFunc) }
Import unknown dataset (simulate trigger by OCS pipeline), file goes to archive, then all files downloaded from archive, dataset create fails due to unknown data type
Output: Errors: Failed to determine dataset type to import., changes: , isUpdate: false Logged "Downloading archived zip files...": true Logged "Downloaded 2 zip files, unzipped 6 files": true Logged "Downloading pseudo-intensity ranges...": true Logged "Downloading user customisation files...": true Logged "SelectDataConverter: Path contains 3 files...": true Logged "Failed to open detector.json when determining dataset type": true
Example (ImportForTrigger_OCS_Archive_Exists) ¶
Import FM-style (simulate trigger by OCS pipeline), file already in archive, so should do nothing
Output: Errors: <nil>, changes: , isUpdate: false Logged "Archiving source file: "s3://./test-data/Archive_Exists/raw-data-bucket/70000_069-02-09-2021-06-25-13.zip"": true Log shows exists in archive: true
Example (ImportForTrigger_OCS_Archive_OK) ¶
Import FM-style (simulate trigger by OCS pipeline), file goes to archive, then all files downloaded from archive and dataset created
Output: Errors: <nil>, changes: unknown, isUpdate: false Logged "Downloading archived zip files...": true Logged "Downloaded 20 zip files, unzipped 364 files": true Logged "Downloading pseudo-intensity ranges...": true Logged "Downloading user customisation files...": true Logged "This dataset's detector config is PIXL": true Logged "PMC 218 has 4 MSA/spectrum entries": true Logged "Main context image: PCW_0125_0678031992_000RCM_N00417120483005510091075J02.png": true Logged "Diffraction db saved successfully": true Logged "Applying custom title: Naltsos": true Logged "Matched aligned image: PCCR0577_0718181212_000MSA_N029000020073728500030LUD01.tif, offset(0, 0), scale(1, 1). Match for aligned index: 0": true <nil>|{"contentCounts": {"BulkSpectra": 2,"DwellSpectra": 0,"MaxSpectra": 2,"NormalSpectra": 242,"PseudoIntensities": 121},"creatorUserId": "PIXLISEImport","dataTypes": [{"count": 5,"dataType": "SD_IMAGE"},{"count": 1,"dataType": "SD_RGBU"},{"count": 242,"dataType": "SD_XRF"}],"id": "048300551","instrument": "PIXL_FM","instrumentConfig": "PIXL","meta": {"DriveId": "1712","RTT": "048300551","SCLK": "678031418","Site": "","SiteId": "4","Sol": "0125","Target": "","TargetId": "?"},"title": "Naltsos"}
Example (ImportForTrigger_OCS_DatasetEdit) ¶
Import FM-style (simulate trigger by dataset edit screen), should create dataset with custom name+image
Output: Errors: <nil>, changes: unknown, isUpdate: true Logged "Downloading archived zip files...": true Logged "Downloaded 20 zip files, unzipped 364 files": true Logged "Downloading pseudo-intensity ranges...": true Logged "Downloading user customisation files...": true Logged "This dataset's detector config is PIXL": true Logged "PMC 218 has 4 MSA/spectrum entries": true Logged "Main context image: PCW_0125_0678031992_000RCM_N00417120483005510091075J02.png": true Logged "Diffraction db saved successfully": true Logged "Applying custom title: Naltsos": true Logged "Matched aligned image: PCCR0577_0718181212_000MSA_N029000020073728500030LUD01.tif, offset(0, 0), scale(1, 1). Match for aligned index: 0": true <nil>|{"contentCounts": {"BulkSpectra": 2,"DwellSpectra": 0,"MaxSpectra": 2,"NormalSpectra": 242,"PseudoIntensities": 121},"creatorUserId": "PIXLISEImport","dataTypes": [{"count": 5,"dataType": "SD_IMAGE"},{"count": 1,"dataType": "SD_RGBU"},{"count": 242,"dataType": "SD_XRF"}],"id": "048300551","instrument": "PIXL_FM","instrumentConfig": "PIXL","meta": {"DriveId": "1712","RTT": "048300551","SCLK": "678031418","Site": "","SiteId": "4","Sol": "0125","Target": "","TargetId": "?"},"title": "Naltsos"}
Index ¶
- Variables
- func ImportDataset(localFS fileaccess.FileAccess, remoteFS fileaccess.FileAccess, ...) (string, *protos.ScanItem, string, bool, error)
- func ImportFromLocalFileSystem(localFS fileaccess.FileAccess, remoteFS fileaccess.FileAccess, ...) (string, error)
- func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, jobId string, scanId string, snsTopic string) (*sns.PublishOutput, error)
- type DatasetCustomMeta
- type ImportResult
Examples ¶
- Package (DecodeImportTrigger_Manual)
- Package (DecodeImportTrigger_ManualBadDatasetID)
- Package (DecodeImportTrigger_ManualBadLogID)
- Package (DecodeImportTrigger_ManualBadMsg)
- Package (DecodeImportTrigger_OCS)
- Package (DecodeImportTrigger_OCS2)
- Package (DecodeImportTrigger_OCS3)
- Package (DecodeImportTrigger_OCS_BadEventType)
- Package (DecodeImportTrigger_OCS_Error)
- Package (GetUpdateType_Drive)
- Package (GetUpdateType_LessContextImages)
- Package (GetUpdateType_MoreContextImages)
- Package (GetUpdateType_NormalSpectra)
- Package (GetUpdateType_RTT)
- Package (GetUpdateType_SameContextImages)
- Package (GetUpdateType_Title)
- Package (ImportForTrigger_Manual_JPL)
- Package (ImportForTrigger_Manual_SBU)
- Package (ImportForTrigger_OCS_Archive_BadData)
- Package (ImportForTrigger_OCS_Archive_Exists)
- Package (ImportForTrigger_OCS_Archive_OK)
- Package (ImportForTrigger_OCS_DatasetEdit)
Constants ¶
This section is empty.
Variables ¶
var JobIDAutoImportPrefix = "auto-import-"
Functions ¶
func ImportDataset ¶
func ImportDataset( localFS fileaccess.FileAccess, remoteFS fileaccess.FileAccess, configBucket string, manualUploadBucket string, datasetBucket string, db *mongo.Database, datasetID string, log logger.ILogger, justArchived bool, ) (string, *protos.ScanItem, string, bool, error)
ImportFromArchive - Importing from dataset archive area. Calls ImportFromLocalFileSystem Returns: WorkingDir Saved dataset summary structure What changed (as a string), so caller can know what kind of notification to send (if any) IsUpdate flag Error (if any)
func ImportFromLocalFileSystem ¶
func ImportFromLocalFileSystem( localFS fileaccess.FileAccess, remoteFS fileaccess.FileAccess, db *mongo.Database, workingDir string, localImportPath string, localPseudoIntensityRangesPath string, datasetBucket string, datasetID string, log logger.ILogger) (string, error)
ImportFromLocalFileSystem - As the name says, imports from directory on local file system Returns: Dataset ID (in case it was modified during conversion) Error (if there was one)
func TriggerDatasetReprocessViaSNS ¶
func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, jobId string, scanId string, snsTopic string) (*sns.PublishOutput, error)
Firing a trigger message. Anything calling this is triggering a dataset reimport via a lambda function
Types ¶
type DatasetCustomMeta ¶
type ImportResult ¶
type ImportResult struct { WorkingDir string // so it can be cleaned up by caller if needed WhatChanged string // what changed between this import and a previous one, for notification reasons IsUpdate bool // IsUpdate flag DatasetTitle string // Name of the dataset that was imported DatasetID string // ID of the dataset that was imported Logger logger.ILogger // Caller must call Close() on it, otherwise we may lose the last few log events }
Structure returned after importing NOTE: the logger must have Close() called on it, otherwise we may lose the last few log events
func ImportForTrigger ¶
func ImportForTrigger( triggerMessage []byte, configBucket string, datasetBucket string, manualBucket string, db *mongo.Database, log logger.ILogger, remoteFS fileaccess.FileAccess) (ImportResult, error)
ImportForTrigger - Parses a trigger message (from SNS) and decides what to import Returns: Result struct - NOTE: logger must have Close() called on it, otherwise we may lose the last few log events Error (or nil)
Directories
¶
Path | Synopsis |
---|---|
Implements archiving/retrieval of dataset source zip files as delivered by GDS.
|
Implements archiving/retrieval of dataset source zip files as delivered by GDS. |
internal
|
|