Documentation ¶
Overview ¶
Implements importer triggering based on SNS queues. This decodes incoming SNS messages and extracts files ready for importer code to run
Example (DecodeImportTrigger_Manual) ¶
Trigger for a manual dataset regeneration (user clicks save button on dataset edit page)
trigger := `{ "datasetID": "189137412", "logID": "dataimport-zmzddoytch2krd7n" }` sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger)) fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "189137412" Log: "dataimport-zmzddoytch2krd7n" Err: "<nil>"
Example (DecodeImportTrigger_ManualBadDatasetID) ¶
trigger := `{ "datasetID": "", "logID": "dataimport-zmzddoytch2krd7n" }` sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger)) fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Failed to find dataset ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadLogID) ¶
trigger := `{ "datasetID": "qwerty" }` sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger)) fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Failed to find log ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadMsg) ¶
trigger := `{ "weird": "message" }` sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger)) fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Unexpected or no message type embedded in triggering SNS message"
Example (DecodeImportTrigger_OCS) ¶
Trigger from when a new zip arrives from the pipeline
trigger := `{ "Records": [ { "eventVersion": "2.1", "eventSource": "aws:s3", "awsRegion": "us-east-1", "eventTime": "2022-09-16T09:10:28.417Z", "eventName": "ObjectCreated:CompleteMultipartUpload", "userIdentity": { "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS" }, "requestParameters": { "sourceIPAddress": "81.154.57.137" }, "responseElements": { "x-amz-request-id": "G3QWWT0BAYKP81QK", "x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+" }, "s3": { "s3SchemaVersion": "1.0", "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz", "bucket": { "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj", "ownerIdentity": { "principalId": "AP902Y0PI20DF" }, "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj" }, "object": { "key": "189137412-07-09-2022-10-07-57.zip", "size": 54237908, "eTag": "b21ebca14f67255be1cd28c01d494508-7", "sequencer": "0063243D6858D568F0" } } } ] }` sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger)) // NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output: Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj" Source file: "189137412-07-09-2022-10-07-57.zip" Dataset: "189137412" Log Str Len: "43" Err: "<nil>"
Example (DecodeImportTrigger_OCS2) ¶
Trigger from when a new zip arrives from the pipeline
trigger := `{ "Records": [ { "eventVersion": "2.1", "eventSource": "aws:s3", "awsRegion": "us-east-1", "eventTime": "2022-09-25T14:33:49.456Z", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS" }, "requestParameters": { "sourceIPAddress": "3.12.95.94" }, "responseElements": { "x-amz-request-id": "K811ZDJ52EYBJ8P2", "x-amz-id-2": "R7bGQ2fOjvSZHkHez700w3wRVpn32nmr6jVPVYhKtNE2c2KYOmgm9hjmOA5WSQFh8faLRe6fHAmANKSTNRhwCq7Xgol0DgX4" }, "s3": { "s3SchemaVersion": "1.0", "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz", "bucket": { "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj", "ownerIdentity": { "principalId": "AP902Y0PI20DF" }, "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj" }, "object": { "key": "197329413-25-09-2022-14-33-39.zip", "size": 1388, "eTag": "932bda7d32c05d90ecc550d061862994", "sequencer": "00633066CD68A4BF43" } } } ] }` sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger)) // NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output: Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj" Source file: "197329413-25-09-2022-14-33-39.zip" Dataset: "197329413" Log Str Len: "43" Err: "<nil>"
Example (DecodeImportTrigger_OCS_BadEventType) ¶
trigger := `{ "Records": [ { "eventVersion": "2.1", "eventSource": "aws:sqs", "awsRegion": "us-east-1", "eventTime": "2022-09-16T09:10:28.417Z", "eventName": "ObjectCreated:CompleteMultipartUpload", "userIdentity": { "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS" }, "requestParameters": { "sourceIPAddress": "81.154.57.137" }, "responseElements": { "x-amz-request-id": "G3QWWT0BAYKP81QK", "x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+" } } ] }` sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger)) fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Failed to decode dataset import trigger: Failed to decode sqs body to an S3 event: unexpected end of JSON input"
Example (DecodeImportTrigger_OCS_Error) ¶
trigger := `{ "Records": [] }` sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger)) fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output: Source Bucket: "" Source file: "" Dataset: "" Log: "" Err: "Unexpected or no message type embedded in triggering SNS message"
Index ¶
Examples ¶
- Package (DecodeImportTrigger_Manual)
- Package (DecodeImportTrigger_ManualBadDatasetID)
- Package (DecodeImportTrigger_ManualBadLogID)
- Package (DecodeImportTrigger_ManualBadMsg)
- Package (DecodeImportTrigger_OCS)
- Package (DecodeImportTrigger_OCS2)
- Package (DecodeImportTrigger_OCS_BadEventType)
- Package (DecodeImportTrigger_OCS_Error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func TriggerDatasetReprocessViaSNS ¶
func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, idGen services.IDGenerator, datasetID string, snsTopic string) (*sns.PublishOutput, string, error)
Firing a trigger message. Anything calling this is triggering a dataset reimport via a lambda function
Types ¶
type ImportResult ¶
type ImportResult struct { WorkingDir string // so it can be cleaned up by caller if needed WhatChanged string // what changed between this import and a previous one, for notification reasons IsUpdate bool // IsUpdate flag DatasetTitle string // Name of the dataset that was imported DatasetID string // ID of the dataset that was imported Logger logger.ILogger // Caller must call Close() on it, otherwise we may lose the last few log events }
Structure returned after importing NOTE: the logger must have Close() called on it, otherwise we may lose the last few log events
func ImportForTrigger ¶
func ImportForTrigger( triggerMessage []byte, envName string, configBucket string, datasetBucket string, manualBucket string, log logger.ILogger, remoteFS fileaccess.FileAccess) (ImportResult, error)
ImportForTrigger - Parses a trigger message (from SNS) and decides what to import Returns: Result struct - NOTE: logger must have Close() called on it, otherwise we may lose the last few log events Error (or nil)