workers

package
v2.1.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2019 License: Apache-2.0 Imports: 26 Imported by: 0

README

APTrust Workers

APTrust workers perform the work required for ingest, restoration, file deletion and ongoing fixity checks. The common.go file contains shared code. All other files have a corresponding "main" file in the exchange/apps directory. The main file compiles to a standalone binary, sets up the proper context, instantiates a worker, and then runs as a service.

Because the workers run as services, and because they depend a number of external services, they require integration tests rather than unit tests. The integration tests for workers are in exchange/integration, and they are set up and run by the Ruby tests scripts in the exchange/scripts directory.

Documentation

Index

Constants

View Source
const (
	ITEMS_PER_REQUEST = 100
	STORED_FILE       = 1
	DPN_STORED_FILE   = 2
)
View Source
const DAYS_TO_KEEP_IN_S3 = 5

Keep the files in S3 up to 5 days, in case we're having system problems and we need to attempt the restore multiple times.

View Source
const DEFAULT_CACHE_HOURS = 24

If Config.BucketReaderCacheHours isn't set to something sensible, cache Ingest WorkItems up to this many hours old.

View Source
const FIFTY_GIGABYTES = int64(50000000000)
View Source
const FIFTY_MEGABYTES = int64(52428800)
View Source
const (
	// GENERIC_FILE_BATCH_SIZE describes how many generic files
	// we should batch into a single HTTP POST when recording a
	// new IntellectualObject.
	GENERIC_FILE_BATCH_SIZE = 100
)
View Source
const GLACIER_DEEP_RECHECK_INTERVAL = 8 * time.Hour
View Source
const GLACIER_RECHECK_INTERVAL = 2 * time.Hour

After requesting a Glacier restoration, we need to recheck periodically to see if the item has been restored to S3. Restoring from standard Glacier storage typically takes 3-5 hours. Restoring from Glacier Deep Archive typically takes 12+ hours, so we have different recheck intervals for these two.

View Source
const MAX_KEYS = 1000

How many S3 keys should we fetch in each batch when we're getting the contents of a bucket?

View Source
const MAX_UPLOAD_ATTEMPTS = 15

15 seemed to be the magic number in the first generation of the software. On large uploads, network errors are common.

View Source
const PREMIS_EVENTS_FILE = "PremisEvents.json"
View Source
const RETRIEVAL_OPTION = "Standard"

Standard retrieval is 3-5 hours for Glacier and 12 hours for Glacier Deep Archive. We may want to consider adding "Bulk" as a retrieval option for Glacier Deep Archive, because it's 8x cheaper. See the Glacier sections of https://aws.amazon.com/s3/pricing/

View Source
const UNKNOWN_TOPIC = "unknown_topic"

Variables

View Source
var TAR_SUFFIX = regexp.MustCompile("\\.tar$")

Functions

func CreateNsqConsumer

func CreateNsqConsumer(config *models.Config, workerConfig *models.WorkerConfig) (*nsq.Consumer, error)

CreateNSQConsumer creates and returns an NSQ consumer for a worker process.

func DeleteFileFromStaging

func DeleteFileFromStaging(pathToFile string, _context *context.Context)

DeleteFileFromStaging deletes the bag from the staging area, and releases the reserved storage from the volume manager. This deletes both the tarred and untarred version of the bag, if they both exist.

func GetIngestState

func GetIngestState(message *nsq.Message, _context *context.Context, initIfEmpty bool) (*models.IngestState, error)

-------------------------------------------------------------------------------- TODO - Remove this -------------------------------------------------------------------------------- GetIngestState sets up the basic pieces of data we'll need to process a request. Param initIfEmpty says we should initialize an IntellectualObject if we can't find one in the IngestManifest. That should only happen in apt_fetcher, where we're often fetching new bags that Pharos has never seen before. All other workers should pass in false for initIfEmpty.

func GetWorkItem

func GetWorkItem(message *nsq.Message, _context *context.Context) (*models.WorkItem, error)

GetWorkItem returns the WorkItem with the specified Id from Pharos, or nil.

func GetWorkItemState

func GetWorkItemState(workItem *models.WorkItem, _context *context.Context, initIfEmpty bool) (*models.WorkItemState, error)

GetWorkItemState returns the WorkItemState associated with the specified WorkItem from Pharos, or nil if none exists. Param initIfEmpty should be true ONLY when calling from apt_fetcher, which is working with objects that are not yet in the system.

func InitWorkItemState

func InitWorkItemState(workItem *models.WorkItem) (*models.WorkItemState, error)

InitWorkItemState returns a new WorkItemState object. This is used only by apt_fetcher, when we're working on a brand new ingest bag that doesn't yet have a WorkItemState record. Param workItem is the workItem to be associated with the WorkItemState.

func LoadAPTrustBagValidationConfig

func LoadAPTrustBagValidationConfig(_context *context.Context) *validation.BagValidationConfig

Loads the bag validation config file specified in the general config options. This will die if the bag validation config cannot be loaded or is invalid.

func LogJson

func LogJson(ingestState *models.IngestState, jsonLog *log.Logger)

LogJson dumps the WorkItemState.State into the JSON log, surrounded by markers that make it easy to find. This log gets big.

func MarkWorkItemFailed

func MarkWorkItemFailed(ingestState *models.IngestState, _context *context.Context) error

MarkWorkItemFailed tells Pharos that this item failed processing due to a fatal error or too many unsuccessful attempts.

func MarkWorkItemRequeued

func MarkWorkItemRequeued(ingestState *models.IngestState, _context *context.Context) error

MarkWorkItemRequeued tells Pharos that this item has been requeued due to transient errors.

func MarkWorkItemStarted

func MarkWorkItemStarted(ingestState *models.IngestState, _context *context.Context, stage, message string) error

MarkWorkItemStarted tells Pharos that we've started work on this item.

func MarkWorkItemSucceeded

func MarkWorkItemSucceeded(ingestState *models.IngestState, _context *context.Context, nextStage string) error

MarkWorkItemSucceeded tells Pharos that this item was processed successfully.

func PushToQueue

func PushToQueue(ingestState *models.IngestState, _context *context.Context, queueTopic string)

PushToQueue pushes the WorkItem in ingestState into the specified NSQ topic.

func RecordWorkItemState

func RecordWorkItemState(ingestState *models.IngestState, _context *context.Context, activeResult *models.WorkSummary)

RecordWorkItemState saves the WorkItemState for this task. We drop a copy into our JSON log as a backup, and update the WorkItemState in Pharos, so the next worker knows what to do with this item.

Param activeResult will change, depending on what stage of processing we're in. It could be the IngestState.FetchResult, IngestState.RecordResult, etc.

func SetupIngestState

func SetupIngestState(message *nsq.Message, _context *context.Context) (*models.IngestState, error)

SetupIngestState sets up the IngestState object that the workers use during the ingest process.

Types

type APTAuditList

type APTAuditList struct {
	// contains filtered or unexported fields
}

APTAuditList lists the contents of S3 and Glacier buckets, for auditing and other purposes. When the bucket is one of the APTrust or DPN long-term preservation buckets, this prints extended metadata information for each item it finds. For other buckets, it prints standard metadata, such as the key name, etag, and size. This prints results to STDOUT, which you can then redirect to a file.

func NewAPTAuditList

func NewAPTAuditList(context *context.Context, region, bucket, keyPrefix, format string, limit, concurrency int) (*APTAuditList, error)

NewAuditList returns a new APTAuditList object. Param context is a context.Context object. Param region is the AWS S3/Glacier region to connect to. Param bucket is the name of the bucket to list. Param keyPrefix is the key, or the prefix of the keys, you want to look for. A keyPrefix of "abc" will return all keys beginning with "abc." An empty keyPrefix will return all keys. Param format can be "json", "csv" (comma-separated values) or "tsv" (tab-separated values). Param limit is the maximum number of keys to return. Set limit to zero to return an unlimited number of keys. Param concurrency is the number of items to fetch simultaneously. It defaults to 4. Max is 32.

func (*APTAuditList) Run

func (list *APTAuditList) Run() (int, error)

Run prints a list of files to STDOUT, and errors to STDERR. It returns the number of items listed, and an error if it encountered any errors during its run. Check the STDERR log for errors if List returns an error.

type APTBucketReader

type APTBucketReader struct {
	Context           *context.Context
	Institutions      map[string]*models.Institution
	RecentIngestItems map[string]*models.WorkItem
	// contains filtered or unexported fields
}

APTBucketReader scans APTrust receiving buckets for items that need to be ingested. It creates a WorkItem record and an NSQ entry for each qualifying bag, and is responsible for knowing whether items in the receiving buckets actually need to be queued.

func NewAPTBucketReader

func NewAPTBucketReader(context *context.Context, enableStats bool) *APTBucketReader

Creates a new bucket reader with the given context. Param enableStats is generally used for integration tests. Enabling stats in production can cause high memory usage, so keep that off unless you're trying to diagnose specific problems.

func (*APTBucketReader) GetStats

func (reader *APTBucketReader) GetStats() *stats.APTBucketReaderStats

func (*APTBucketReader) Run

func (reader *APTBucketReader) Run() error

type APTFetcher

type APTFetcher struct {
	Context             *context.Context
	BagValidationConfig *validation.BagValidationConfig
	FetchChannel        chan *models.IngestState
	ValidationChannel   chan *models.IngestState
	CleanupChannel      chan *models.IngestState
	RecordChannel       chan *models.IngestState
}

Fetches bags (tar files) from S3 receiving buckets and validates them.

func NewAPTFetcher

func NewAPTFetcher(_context *context.Context) *APTFetcher

func (*APTFetcher) HandleMessage

func (fetcher *APTFetcher) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

type APTFileDeleter

type APTFileDeleter struct {
	// Context contains basic information required to run,
	// connect to Pharos, S3, etc.
	Context *context.Context
	// DeleteChannel is for the go routines that delete GenericFiles
	// from primary and secondary storage.
	DeleteChannel chan *models.DeleteState
	// PostProcess channel is for the goroutines that record
	// the outcome of the deletion in Pharos and finish or
	// requeue the NSQ message.
	PostProcessChannel chan *models.DeleteState
	// RingList keeps track of IntellectualObject identifiers so we don't
	// spam Pharos with requests to api/v2/objects/<id>/finish_delete.
	// Because Rails runs multiple processes, issuing multiple concurrent
	// finish_delete requests (which is proven to happen regularly) results
	// in multiple IntellectualObject 'deletion' PREMIS events. The Rails
	// code includes guards to prevent that, but they don't work across
	// multiple processes, so we have to implement a guard here.
	// This list is for object identifiers only, not generic file identifiers.
	RecentlyDeleted *models.RingList
	// contains filtered or unexported fields
}

APTFileDeleter deletes files from S3 and Glacier long-term storage.

func NewAPTFileDeleter

func NewAPTFileDeleter(_context *context.Context) *APTFileDeleter

func (*APTFileDeleter) HandleMessage

func (deleter *APTFileDeleter) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

type APTFileRestorer

type APTFileRestorer struct {
	// Context contains basic information required to run,
	// connect to Pharos, S3, etc.
	Context *context.Context
	// RestoreChannel is for the go routines that restore GenericFiles
	// from primary and secondary storage.
	RestoreChannel chan *models.FileRestoreState
	// PostProcess channel is for the goroutines that record
	// the outcome of the restoration in Pharos and finish or
	// requeue the NSQ message.
	PostProcessChannel chan *models.FileRestoreState
}

APTFileRestorer restores files from S3 and Glacier long-term storage.

func NewAPTFileRestorer

func NewAPTFileRestorer(_context *context.Context) *APTFileRestorer

func (*APTFileRestorer) HandleMessage

func (restorer *APTFileRestorer) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

type APTFixityChecker

type APTFixityChecker struct {
	// Context contains contextual info, such as a reference to the logger,
	// a Pharos client, basic configuration data and other things the fixity
	// checker needs to do its work.
	Context *context.Context
	// FixityChannel is where we calculate Sha256 digests of files stored in S3.
	FixityChannel chan *models.FixityResult
	// RecordChannel is where record PremisEvents in Pharos.
	RecordChannel chan *models.FixityResult
	// PostProcessChannel is where we dispose of NSQ messages and log outcomes.
	PostProcessChannel chan *models.FixityResult
	// ItemsInProcess contains a map of items we're currently processing.
	// When there's a backlog, it's common for items to be queued more than
	// once for fixity checking. We don't want to perform the fixity check
	// if it's already underway.
	ItemsInProcess *models.SynchronizedMap
}

APTFixityChecker performs ongoing fixity checks on files stored in S3.

func NewAPTFixityChecker

func NewAPTFixityChecker(_context *context.Context) *APTFixityChecker

func (*APTFixityChecker) HandleMessage

func (checker *APTFixityChecker) HandleMessage(message *nsq.Message) error

HandleMessage handles a new message from NSQ. Unlike most other NSQ messages, where the message.Body is a WorkItem.Id (int as string), messages in the apt_fixity queue contain a GenericFile.Identifier. So the entire message body will be something like "georgetown.edu/georgetown.edu.10822_707412".

type APTGlacierRestoreInit

type APTGlacierRestoreInit struct {
	// Context includes logging, config, network connections, and
	// other general resources for the worker.
	Context *context.Context
	// RequestChannel is for requesting an item be moved from Glacier
	// into S3.
	RequestChannel chan *models.GlacierRestoreState
	// CleanupChannel is for housekeeping, like updating NSQ.
	CleanupChannel chan *models.GlacierRestoreState
	// PostTestChannel is for testing only. In production, nothing listens
	// on this channel.
	PostTestChannel chan *models.GlacierRestoreState
	// S3Url is a custom URL that the S3 client should connect to.
	// We use this only in testing, when we want the client to talk
	// to a local test server. This should not be set in demo or
	// production.
	S3Url string
}

Requests that an object be restored from Glacier to S3. This is the first step toward restoring a Glacier-only bag.

func NewGlacierRestore

func NewGlacierRestore(_context *context.Context) *APTGlacierRestoreInit

func (*APTGlacierRestoreInit) Cleanup

func (restorer *APTGlacierRestoreInit) Cleanup()

func (*APTGlacierRestoreInit) CreateRestoreWorkItem

func (restorer *APTGlacierRestoreInit) CreateRestoreWorkItem(state *models.GlacierRestoreState)

createRestoreWorkItem: We call this to create a normal WorkItem with action='Restore 'when we know all files have been restored from Glacier to S3. Once all files are in S3, the apt_restore process can follow the normal S3 restoration process. So we'll close out this WorkItem and open a new one, which will go into the apt_restore queue.

func (*APTGlacierRestoreInit) FinishWithError

func (restorer *APTGlacierRestoreInit) FinishWithError(state *models.GlacierRestoreState)

func (*APTGlacierRestoreInit) FinishWithMaxAttemptsExceeded

func (restorer *APTGlacierRestoreInit) FinishWithMaxAttemptsExceeded(state *models.GlacierRestoreState, report *models.GlacierRequestReport)

func (*APTGlacierRestoreInit) GetGenericFile

func (restorer *APTGlacierRestoreInit) GetGenericFile(state *models.GlacierRestoreState) (*models.GenericFile, error)

func (*APTGlacierRestoreInit) GetGlacierRestoreState

func (restorer *APTGlacierRestoreInit) GetGlacierRestoreState(message *nsq.Message, workItem *models.WorkItem) (*models.GlacierRestoreState, error)

func (*APTGlacierRestoreInit) GetIntellectualObject

func (restorer *APTGlacierRestoreInit) GetIntellectualObject(state *models.GlacierRestoreState) (*models.IntellectualObject, error)

func (*APTGlacierRestoreInit) GetRequestDetails

func (restorer *APTGlacierRestoreInit) GetRequestDetails(gf *models.GenericFile) (map[string]string, error)

This returns the info we'll need to ask AWS to move the file from Glacier to S3.

func (*APTGlacierRestoreInit) GetRequestRecord

func (restorer *APTGlacierRestoreInit) GetRequestRecord(state *models.GlacierRestoreState, gf *models.GenericFile, details map[string]string) *models.GlacierRestoreRequest

func (*APTGlacierRestoreInit) GetS3HeadClient

func (restorer *APTGlacierRestoreInit) GetS3HeadClient(storageOption string) (*network.S3Head, error)

func (*APTGlacierRestoreInit) HandleMessage

func (restorer *APTGlacierRestoreInit) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

func (*APTGlacierRestoreInit) HasPendingRestoreRequest

func (restorer *APTGlacierRestoreInit) HasPendingRestoreRequest(state *models.GlacierRestoreState) bool

PT #158734805: Check to make sure no existing restore request exists before we create a new one. If a pending restore request exists for this same item, we don't want to create a duplicate. This check allows us to safely re-run a Glacier restore without creating unnecessary extra work (the Restore request is a more heavyweight operation). The bigger dange of creating duplicate Restore requests is that if two workers are simultaneously restoring they same item, they will overwrite each other's work and cause errors. This returns true if there's a pending request.

func (*APTGlacierRestoreInit) InitializeRetrieval

func (restorer *APTGlacierRestoreInit) InitializeRetrieval(state *models.GlacierRestoreState, gf *models.GenericFile, details map[string]string, glacierRestoreRequest *models.GlacierRestoreRequest)

func (*APTGlacierRestoreInit) RequestAllFiles

func (restorer *APTGlacierRestoreInit) RequestAllFiles(state *models.GlacierRestoreState)

func (*APTGlacierRestoreInit) RequestFile

func (restorer *APTGlacierRestoreInit) RequestFile(state *models.GlacierRestoreState, gf *models.GenericFile)

func (*APTGlacierRestoreInit) RequestObject

func (restorer *APTGlacierRestoreInit) RequestObject(state *models.GlacierRestoreState)

func (*APTGlacierRestoreInit) RequestRestore

func (restorer *APTGlacierRestoreInit) RequestRestore()

func (*APTGlacierRestoreInit) RequeueForAdditionalRequests

func (restorer *APTGlacierRestoreInit) RequeueForAdditionalRequests(state *models.GlacierRestoreState)

requeueForAdditionalRequests: We call this when we know we didn't issue Glacier restore requests for some of the files we'll need to restore (or maybe we issued the requests, but AWS/Glacier didn't accept them). In this case, we put the item back in the current queue and reprocess it, requesting Glacier-to-S3 restoration for any files still needing to be restored. We can requeue with a one-minute timeout.

func (*APTGlacierRestoreInit) RequeueToCheckState

func (restorer *APTGlacierRestoreInit) RequeueToCheckState(state *models.GlacierRestoreState)

requeueToCheckState: We call this when we know we've requested Glacier-to-S3 restoration of all required files, and those requests have all been accepted. It typically takes 3-5 hours to get all the files into S3.

func (*APTGlacierRestoreInit) RestoreRequestNeeded

func (restorer *APTGlacierRestoreInit) RestoreRequestNeeded(state *models.GlacierRestoreState, gf *models.GenericFile) (bool, error)

func (*APTGlacierRestoreInit) SaveWorkItemState

func (restorer *APTGlacierRestoreInit) SaveWorkItemState(state *models.GlacierRestoreState)

saveWorkItemState saves a JSON representation of the GlacierRestoreState in Pharos' WorkItemState table. We do this primarily so an admin can review this info and trace evidence on problem cases. The WorkItemState JSON is visible on the WorkItem detail page of the Pharos UI.

func (*APTGlacierRestoreInit) UpdateWorkItem

func (restorer *APTGlacierRestoreInit) UpdateWorkItem(state *models.GlacierRestoreState)

updateWorkItem saves the updated WorkItem in Pharos.

type APTQueue

type APTQueue struct {
	Context   *context.Context
	NSQClient *network.NSQClient
	// contains filtered or unexported fields
}

func NewAPTQueue

func NewAPTQueue(_context *context.Context, topic string, enableStats, dryRun bool) *APTQueue

NewAPTQueue creates a new queue worker to push WorkItems from Pharos into NSQ, and marked them as queued. If param topic is specified, this will queue items destined for the specified topic; otherwise, it will queue items for all topics. If param enableStats is true, it will dump stats about what was queued to a JSON file. If param dryRun is true, it will log all the items it would have queued, without actually pushing anything to NSQ.

func (*APTQueue) GetStats

func (aptQueue *APTQueue) GetStats() *stats.APTQueueStats

func (*APTQueue) Run

func (aptQueue *APTQueue) Run()

Run retrieves all unqueued work items from Pharos and pushes them into the appropriate NSQ topic.

type APTQueueFixity

type APTQueueFixity struct {
	Context   *context.Context
	NSQClient *network.NSQClient
	// contains filtered or unexported fields
}

func NewAPTQueueFixity

func NewAPTQueueFixity(_context *context.Context, identifierLike string, maxFiles int) *APTQueueFixity

NewAPTQueueFixity creates a new worker to push files needing a fixity check into the NSQ apt_fixity_topic. Param _context is a Context object and maxFiles is the maximum number of files to queue. Param identifierLike is used in integration testing to select files we know exist.

func (*APTQueueFixity) Run

func (aptQueue *APTQueueFixity) Run()

Run retrieves a list of GenericFiles needing fixity checks and adds the Identifier of each file to the NSQ apt_fixity_check topic. It stops after queuing maxFiles.

type APTRecorder

type APTRecorder struct {
	Context        *context.Context
	RecordChannel  chan *models.IngestState
	CleanupChannel chan *models.IngestState
}

Records ingest data (objects, files and events) in Pharos

func NewAPTRecorder

func NewAPTRecorder(_context *context.Context) *APTRecorder

func (*APTRecorder) HandleMessage

func (recorder *APTRecorder) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

type APTRestorer

type APTRestorer struct {
	// Context contains basic information required to run,
	// connect to Pharos, S3, etc.
	Context *context.Context
	// PackageChannel is for the go routines that reassemble
	// the S3 files into a new bag.
	PackageChannel chan *models.RestoreState
	// ValidateChannel is for go routines that validate the
	// newly assembled bag before sending it off to the restoration bucket.
	ValidateChannel chan *models.RestoreState
	// CopyChannel is for the goroutines that copy the newly
	// packaged bag to the depositor's restoration bucket in S3.
	CopyChannel chan *models.RestoreState
	// PostProcess channel is for the goroutines that record
	// the outcome of the restoration in Pharos and NSQ, and
	// do any other required cleanup.
	PostProcessChannel chan *models.RestoreState
	// BagValidationConfig is loaded from a JSON file in the
	// config directory. It describes what constitutes a valid
	// APTrust bag.
	BagValidationConfig *validation.BagValidationConfig
}

APTRestorer restores bags by reassmbling their contents and pushing them into the depositor's restoration bucket.

func NewAPTRestorer

func NewAPTRestorer(_context *context.Context) *APTRestorer

func (*APTRestorer) HandleMessage

func (restorer *APTRestorer) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

func (*APTRestorer) WritePremisEventFile

func (restorer *APTRestorer) WritePremisEventFile(restoreState *models.RestoreState)

WritePremisEventFile: dump all PREMIS events to a file inside the restored bag, so users can see which files have been deleted or overwritten during the bag's time in APTrust.

type APTSpotTestRestore

type APTSpotTestRestore struct {
	Context          *context.Context
	CreatedBefore    time.Time
	NotRestoredSince time.Time
	MaxSize          int64
	DryRun           bool
}

func NewAPTSpotTestRestore

func NewAPTSpotTestRestore(_context *context.Context, maxSize int64, createdBefore, notRestoredSince time.Time) *APTSpotTestRestore

NewAPTSpotTestRestore creates a new restore spot test worker. This is meant to run as a cron job.

Param maxSize tells the worker to choose bags no larger than maxSize for restoration. Param createdBefore means choose bags created before this date. Param notRestoredSince means choose bags that have not been restored since this date (which helps prevent us restoring the same bag again and again).

func (*APTSpotTestRestore) CreateWorkItem

func (restoreTest *APTSpotTestRestore) CreateWorkItem(obj *models.IntellectualObject) (*models.WorkItem, error)

CreateWorkItem creates the Restore WorkItem for the specified object identifier.

func (*APTSpotTestRestore) GetInstitutions

func (restoreTest *APTSpotTestRestore) GetInstitutions() ([]*models.Institution, error)

GetInstitutions returns a list of all depositing institutions from Pharos.

func (*APTSpotTestRestore) GetLastIngestWorkItem

func (restoreTest *APTSpotTestRestore) GetLastIngestWorkItem(objIdentifier string) (*models.WorkItem, error)

func (*APTSpotTestRestore) GetObjectFor

func (restoreTest *APTSpotTestRestore) GetObjectFor(institution string) (*models.IntellectualObject, error)

GetObjectFor returns the IntellectualObject we should restore for the specified institution. This object will be the first one we find in Pharos that matches all of the criteria, which include:

institution - it belongs the specified institution

createdBefore - it was created before the specified date

notRestoredSince - it has not been restored since the specified date

maxSize - it's total size is less than or equal to this many bytes

state - is "A" for active

access - is not "restricted"

func (*APTSpotTestRestore) HasCompletedRestore

func (restoreTest *APTSpotTestRestore) HasCompletedRestore(objIdentifier string) (bool, error)

HasCompletedRestore returns true if the object with the specified identifier has been successfully restored since NotRestoredSince.

func (*APTSpotTestRestore) Run

func (restoreTest *APTSpotTestRestore) Run() ([]*models.WorkItem, error)

Run runs the spot test by choosing ONE bag from each institution that matches the specified criteria (smaller than maxSize, createdBefore the specified date and notRestoredSince the specified date). It creates a Restore WorkItem for each bag, and returns the WorkItems it created. The caller can get the WorkItem.Id and object identifier from there.

type APTStorer

type APTStorer struct {
	Context        *context.Context
	StorageChannel chan *models.IngestState
	CleanupChannel chan *models.IngestState
	RecordChannel  chan *models.IngestState
}

Stores GenericFiles in long-term storage (S3 and Glacier).

func NewAPTStorer

func NewAPTStorer(_context *context.Context) *APTStorer

func (*APTStorer) HandleMessage

func (storer *APTStorer) HandleMessage(message *nsq.Message) error

This is the callback that NSQ workers use to handle messages from NSQ.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL