importer

package
v3.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Implements importer triggering based on SNS queues. This decodes incoming SNS messages and extracts files ready for importer code to run

Example (DecodeImportTrigger_Manual)

Trigger for a manual dataset regeneration (user clicks save button on dataset edit page)

trigger := `{
	"datasetID": "189137412",
	"logID": "dataimport-zmzddoytch2krd7n"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: "189137412"
Log: "dataimport-zmzddoytch2krd7n"
Err: "<nil>"
Example (DecodeImportTrigger_ManualBadDatasetID)
trigger := `{
	"datasetID": "",
	"logID": "dataimport-zmzddoytch2krd7n"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Failed to find dataset ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadLogID)
trigger := `{
		"datasetID": "qwerty"
	}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Failed to find log ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadMsg)
trigger := `{
	"weird": "message"
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Unexpected or no message type embedded in triggering SNS message"
Example (DecodeImportTrigger_OCS)

Trigger from when a new zip arrives from the pipeline

trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:s3",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-16T09:10:28.417Z",
            "eventName": "ObjectCreated:CompleteMultipartUpload",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "81.154.57.137"
            },
            "responseElements": {
                "x-amz-request-id": "G3QWWT0BAYKP81QK",
                "x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+"
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
                "bucket": {
                    "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
                    "ownerIdentity": {
                        "principalId": "AP902Y0PI20DF"
                    },
                    "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
                },
                "object": {
                    "key": "189137412-07-09-2022-10-07-57.zip",
                    "size": 54237908,
                    "eTag": "b21ebca14f67255be1cd28c01d494508-7",
                    "sequencer": "0063243D6858D568F0"
                }
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))

// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output:

Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
Source file: "189137412-07-09-2022-10-07-57.zip"
Dataset: "189137412"
Log Str Len: "43"
Err: "<nil>"
Example (DecodeImportTrigger_OCS2)

Trigger from when a new zip arrives from the pipeline

trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:s3",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-25T14:33:49.456Z",
            "eventName": "ObjectCreated:Put",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "3.12.95.94"
            },
            "responseElements": {
                "x-amz-request-id": "K811ZDJ52EYBJ8P2",
                "x-amz-id-2": "R7bGQ2fOjvSZHkHez700w3wRVpn32nmr6jVPVYhKtNE2c2KYOmgm9hjmOA5WSQFh8faLRe6fHAmANKSTNRhwCq7Xgol0DgX4"
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
                "bucket": {
                    "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
                    "ownerIdentity": {
                        "principalId": "AP902Y0PI20DF"
                    },
                    "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
                },
                "object": {
                    "key": "197329413-25-09-2022-14-33-39.zip",
                    "size": 1388,
                    "eTag": "932bda7d32c05d90ecc550d061862994",
                    "sequencer": "00633066CD68A4BF43"
                }
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))

// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output:

Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
Source file: "197329413-25-09-2022-14-33-39.zip"
Dataset: "197329413"
Log Str Len: "43"
Err: "<nil>"
Example (DecodeImportTrigger_OCS_BadEventType)
trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:sqs",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-16T09:10:28.417Z",
            "eventName": "ObjectCreated:CompleteMultipartUpload",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "81.154.57.137"
            },
            "responseElements": {
                "x-amz-request-id": "G3QWWT0BAYKP81QK",
                "x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+"
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Failed to decode dataset import trigger: Failed to decode sqs body to an S3 event: unexpected end of JSON input"
Example (DecodeImportTrigger_OCS_Error)
trigger := `{
		"Records": []
}`
sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, logID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Log: ""
Err: "Unexpected or no message type embedded in triggering SNS message"

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func TriggerDatasetReprocessViaSNS

func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, idGen services.IDGenerator, datasetID string, snsTopic string) (*sns.PublishOutput, string, error)

Firing a trigger message. Anything calling this is triggering a dataset reimport via a lambda function

Types

type ImportResult

type ImportResult struct {
	WorkingDir   string         // so it can be cleaned up by caller if needed
	WhatChanged  string         // what changed between this import and a previous one, for notification reasons
	IsUpdate     bool           // IsUpdate flag
	DatasetTitle string         // Name of the dataset that was imported
	DatasetID    string         // ID of the dataset that was imported
	Logger       logger.ILogger // Caller must call Close() on it, otherwise we may lose the last few log events
}

Structure returned after importing NOTE: the logger must have Close() called on it, otherwise we may lose the last few log events

func ImportForTrigger

func ImportForTrigger(
	triggerMessage []byte,
	envName string,
	configBucket string,
	datasetBucket string,
	manualBucket string,
	log logger.ILogger,
	remoteFS fileaccess.FileAccess) (ImportResult, error)

ImportForTrigger - Parses a trigger message (from SNS) and decides what to import Returns: Result struct - NOTE: logger must have Close() called on it, otherwise we may lose the last few log events Error (or nil)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL