Documentation ¶
Index ¶
- Constants
- Variables
- func CreateNsqConsumer(config *models.Config, workerConfig *models.WorkerConfig) (*nsq.Consumer, error)
- func DeleteFileFromStaging(pathToFile string, _context *context.Context)
- func GetIngestState(message *nsq.Message, _context *context.Context, initIfEmpty bool) (*models.IngestState, error)
- func GetWorkItem(message *nsq.Message, _context *context.Context) (*models.WorkItem, error)
- func GetWorkItemState(workItem *models.WorkItem, _context *context.Context, initIfEmpty bool) (*models.WorkItemState, error)
- func InitWorkItemState(workItem *models.WorkItem) (*models.WorkItemState, error)
- func LoadAPTrustBagValidationConfig(_context *context.Context) *validation.BagValidationConfig
- func LogJson(ingestState *models.IngestState, jsonLog *log.Logger)
- func MarkWorkItemFailed(ingestState *models.IngestState, _context *context.Context) error
- func MarkWorkItemRequeued(ingestState *models.IngestState, _context *context.Context) error
- func MarkWorkItemStarted(ingestState *models.IngestState, _context *context.Context, ...) error
- func MarkWorkItemSucceeded(ingestState *models.IngestState, _context *context.Context, nextStage string) error
- func PushToQueue(ingestState *models.IngestState, _context *context.Context, queueTopic string)
- func RecordWorkItemState(ingestState *models.IngestState, _context *context.Context, ...)
- func SetupIngestState(message *nsq.Message, _context *context.Context) (*models.IngestState, error)
- type APTAuditList
- type APTBucketReader
- type APTFetcher
- type APTFileDeleter
- type APTFileRestorer
- type APTFixityChecker
- type APTGlacierRestoreInit
- func (restorer *APTGlacierRestoreInit) Cleanup()
- func (restorer *APTGlacierRestoreInit) CreateRestoreWorkItem(state *models.GlacierRestoreState)
- func (restorer *APTGlacierRestoreInit) FinishWithError(state *models.GlacierRestoreState)
- func (restorer *APTGlacierRestoreInit) FinishWithMaxAttemptsExceeded(state *models.GlacierRestoreState, report *models.GlacierRequestReport)
- func (restorer *APTGlacierRestoreInit) GetGenericFile(state *models.GlacierRestoreState) (*models.GenericFile, error)
- func (restorer *APTGlacierRestoreInit) GetGlacierRestoreState(message *nsq.Message, workItem *models.WorkItem) (*models.GlacierRestoreState, error)
- func (restorer *APTGlacierRestoreInit) GetIntellectualObject(state *models.GlacierRestoreState) (*models.IntellectualObject, error)
- func (restorer *APTGlacierRestoreInit) GetRequestDetails(gf *models.GenericFile) (map[string]string, error)
- func (restorer *APTGlacierRestoreInit) GetRequestRecord(state *models.GlacierRestoreState, gf *models.GenericFile, ...) *models.GlacierRestoreRequest
- func (restorer *APTGlacierRestoreInit) GetS3HeadClient(storageOption string) (*network.S3Head, error)
- func (restorer *APTGlacierRestoreInit) HandleMessage(message *nsq.Message) error
- func (restorer *APTGlacierRestoreInit) HasPendingRestoreRequest(state *models.GlacierRestoreState) bool
- func (restorer *APTGlacierRestoreInit) InitializeRetrieval(state *models.GlacierRestoreState, gf *models.GenericFile, ...)
- func (restorer *APTGlacierRestoreInit) RequestAllFiles(state *models.GlacierRestoreState)
- func (restorer *APTGlacierRestoreInit) RequestFile(state *models.GlacierRestoreState, gf *models.GenericFile)
- func (restorer *APTGlacierRestoreInit) RequestObject(state *models.GlacierRestoreState)
- func (restorer *APTGlacierRestoreInit) RequestRestore()
- func (restorer *APTGlacierRestoreInit) RequeueForAdditionalRequests(state *models.GlacierRestoreState)
- func (restorer *APTGlacierRestoreInit) RequeueToCheckState(state *models.GlacierRestoreState)
- func (restorer *APTGlacierRestoreInit) RestoreRequestNeeded(state *models.GlacierRestoreState, gf *models.GenericFile) (bool, error)
- func (restorer *APTGlacierRestoreInit) SaveWorkItemState(state *models.GlacierRestoreState)
- func (restorer *APTGlacierRestoreInit) UpdateWorkItem(state *models.GlacierRestoreState)
- type APTQueue
- type APTQueueFixity
- type APTRecorder
- type APTRestorer
- type APTSpotTestRestore
- func (restoreTest *APTSpotTestRestore) CreateWorkItem(obj *models.IntellectualObject) (*models.WorkItem, error)
- func (restoreTest *APTSpotTestRestore) GetInstitutions() ([]*models.Institution, error)
- func (restoreTest *APTSpotTestRestore) GetLastIngestWorkItem(objIdentifier string) (*models.WorkItem, error)
- func (restoreTest *APTSpotTestRestore) GetObjectFor(institution string) (*models.IntellectualObject, error)
- func (restoreTest *APTSpotTestRestore) HasCompletedRestore(objIdentifier string) (bool, error)
- func (restoreTest *APTSpotTestRestore) Run() ([]*models.WorkItem, error)
- type APTStorer
Constants ¶
const ( ITEMS_PER_REQUEST = 100 STORED_FILE = 1 DPN_STORED_FILE = 2 )
const DAYS_TO_KEEP_IN_S3 = 5
Keep the files in S3 up to 5 days, in case we're having system problems and we need to attempt the restore multiple times.
const DEFAULT_CACHE_HOURS = 24
If Config.BucketReaderCacheHours isn't set to something sensible, cache Ingest WorkItems up to this many hours old.
const FIFTY_GIGABYTES = int64(50000000000)
const FIFTY_MEGABYTES = int64(52428800)
const ( // GENERIC_FILE_BATCH_SIZE describes how many generic files // we should batch into a single HTTP POST when recording a // new IntellectualObject. GENERIC_FILE_BATCH_SIZE = 100 )
const GLACIER_DEEP_RECHECK_INTERVAL = 8 * time.Hour
const GLACIER_RECHECK_INTERVAL = 2 * time.Hour
After requesting a Glacier restoration, we need to recheck periodically to see if the item has been restored to S3. Restoring from standard Glacier storage typically takes 3-5 hours. Restoring from Glacier Deep Archive typically takes 12+ hours, so we have different recheck intervals for these two.
const MAX_KEYS = 1000
How many S3 keys should we fetch in each batch when we're getting the contents of a bucket?
const MAX_UPLOAD_ATTEMPTS = 15
15 seemed to be the magic number in the first generation of the software. On large uploads, network errors are common.
const PREMIS_EVENTS_FILE = "PremisEvents.json"
const RETRIEVAL_OPTION = "Standard"
Standard retrieval is 3-5 hours for Glacier and 12 hours for Glacier Deep Archive. We may want to consider adding "Bulk" as a retrieval option for Glacier Deep Archive, because it's 8x cheaper. See the Glacier sections of https://aws.amazon.com/s3/pricing/
const UNKNOWN_TOPIC = "unknown_topic"
Variables ¶
var TAR_SUFFIX = regexp.MustCompile("\\.tar$")
Functions ¶
func CreateNsqConsumer ¶
func CreateNsqConsumer(config *models.Config, workerConfig *models.WorkerConfig) (*nsq.Consumer, error)
CreateNSQConsumer creates and returns an NSQ consumer for a worker process.
func DeleteFileFromStaging ¶
DeleteFileFromStaging deletes the bag from the staging area, and releases the reserved storage from the volume manager. This deletes both the tarred and untarred version of the bag, if they both exist.
func GetIngestState ¶
func GetIngestState(message *nsq.Message, _context *context.Context, initIfEmpty bool) (*models.IngestState, error)
-------------------------------------------------------------------------------- TODO - Remove this -------------------------------------------------------------------------------- GetIngestState sets up the basic pieces of data we'll need to process a request. Param initIfEmpty says we should initialize an IntellectualObject if we can't find one in the IngestManifest. That should only happen in apt_fetcher, where we're often fetching new bags that Pharos has never seen before. All other workers should pass in false for initIfEmpty.
func GetWorkItem ¶
GetWorkItem returns the WorkItem with the specified Id from Pharos, or nil.
func GetWorkItemState ¶
func GetWorkItemState(workItem *models.WorkItem, _context *context.Context, initIfEmpty bool) (*models.WorkItemState, error)
GetWorkItemState returns the WorkItemState associated with the specified WorkItem from Pharos, or nil if none exists. Param initIfEmpty should be true ONLY when calling from apt_fetcher, which is working with objects that are not yet in the system.
func InitWorkItemState ¶
func InitWorkItemState(workItem *models.WorkItem) (*models.WorkItemState, error)
InitWorkItemState returns a new WorkItemState object. This is used only by apt_fetcher, when we're working on a brand new ingest bag that doesn't yet have a WorkItemState record. Param workItem is the workItem to be associated with the WorkItemState.
func LoadAPTrustBagValidationConfig ¶
func LoadAPTrustBagValidationConfig(_context *context.Context) *validation.BagValidationConfig
Loads the bag validation config file specified in the general config options. This will die if the bag validation config cannot be loaded or is invalid.
func LogJson ¶
func LogJson(ingestState *models.IngestState, jsonLog *log.Logger)
LogJson dumps the WorkItemState.State into the JSON log, surrounded by markers that make it easy to find. This log gets big.
func MarkWorkItemFailed ¶
func MarkWorkItemFailed(ingestState *models.IngestState, _context *context.Context) error
MarkWorkItemFailed tells Pharos that this item failed processing due to a fatal error or too many unsuccessful attempts.
func MarkWorkItemRequeued ¶
func MarkWorkItemRequeued(ingestState *models.IngestState, _context *context.Context) error
MarkWorkItemRequeued tells Pharos that this item has been requeued due to transient errors.
func MarkWorkItemStarted ¶
func MarkWorkItemStarted(ingestState *models.IngestState, _context *context.Context, stage, message string) error
MarkWorkItemStarted tells Pharos that we've started work on this item.
func MarkWorkItemSucceeded ¶
func MarkWorkItemSucceeded(ingestState *models.IngestState, _context *context.Context, nextStage string) error
MarkWorkItemSucceeded tells Pharos that this item was processed successfully.
func PushToQueue ¶
func PushToQueue(ingestState *models.IngestState, _context *context.Context, queueTopic string)
PushToQueue pushes the WorkItem in ingestState into the specified NSQ topic.
func RecordWorkItemState ¶
func RecordWorkItemState(ingestState *models.IngestState, _context *context.Context, activeResult *models.WorkSummary)
RecordWorkItemState saves the WorkItemState for this task. We drop a copy into our JSON log as a backup, and update the WorkItemState in Pharos, so the next worker knows what to do with this item.
Param activeResult will change, depending on what stage of processing we're in. It could be the IngestState.FetchResult, IngestState.RecordResult, etc.
func SetupIngestState ¶
func SetupIngestState(message *nsq.Message, _context *context.Context) (*models.IngestState, error)
SetupIngestState sets up the IngestState object that the workers use during the ingest process.
Types ¶
type APTAuditList ¶
type APTAuditList struct {
// contains filtered or unexported fields
}
APTAuditList lists the contents of S3 and Glacier buckets, for auditing and other purposes. When the bucket is one of the APTrust or DPN long-term preservation buckets, this prints extended metadata information for each item it finds. For other buckets, it prints standard metadata, such as the key name, etag, and size. This prints results to STDOUT, which you can then redirect to a file.
func NewAPTAuditList ¶
func NewAPTAuditList(context *context.Context, region, bucket, keyPrefix, format string, limit, concurrency int) (*APTAuditList, error)
NewAuditList returns a new APTAuditList object. Param context is a context.Context object. Param region is the AWS S3/Glacier region to connect to. Param bucket is the name of the bucket to list. Param keyPrefix is the key, or the prefix of the keys, you want to look for. A keyPrefix of "abc" will return all keys beginning with "abc." An empty keyPrefix will return all keys. Param format can be "json", "csv" (comma-separated values) or "tsv" (tab-separated values). Param limit is the maximum number of keys to return. Set limit to zero to return an unlimited number of keys. Param concurrency is the number of items to fetch simultaneously. It defaults to 4. Max is 32.
func (*APTAuditList) Run ¶
func (list *APTAuditList) Run() (int, error)
Run prints a list of files to STDOUT, and errors to STDERR. It returns the number of items listed, and an error if it encountered any errors during its run. Check the STDERR log for errors if List returns an error.
type APTBucketReader ¶
type APTBucketReader struct { Context *context.Context Institutions map[string]*models.Institution RecentIngestItems map[string]*models.WorkItem // contains filtered or unexported fields }
APTBucketReader scans APTrust receiving buckets for items that need to be ingested. It creates a WorkItem record and an NSQ entry for each qualifying bag, and is responsible for knowing whether items in the receiving buckets actually need to be queued.
func NewAPTBucketReader ¶
func NewAPTBucketReader(context *context.Context, enableStats bool) *APTBucketReader
Creates a new bucket reader with the given context. Param enableStats is generally used for integration tests. Enabling stats in production can cause high memory usage, so keep that off unless you're trying to diagnose specific problems.
func (*APTBucketReader) GetStats ¶
func (reader *APTBucketReader) GetStats() *stats.APTBucketReaderStats
func (*APTBucketReader) Run ¶
func (reader *APTBucketReader) Run() error
type APTFetcher ¶
type APTFetcher struct { Context *context.Context BagValidationConfig *validation.BagValidationConfig FetchChannel chan *models.IngestState ValidationChannel chan *models.IngestState CleanupChannel chan *models.IngestState RecordChannel chan *models.IngestState }
Fetches bags (tar files) from S3 receiving buckets and validates them.
func NewAPTFetcher ¶
func NewAPTFetcher(_context *context.Context) *APTFetcher
func (*APTFetcher) HandleMessage ¶
func (fetcher *APTFetcher) HandleMessage(message *nsq.Message) error
This is the callback that NSQ workers use to handle messages from NSQ.
func (*APTFetcher) StillIngestingOlderVersion ¶
func (fetcher *APTFetcher) StillIngestingOlderVersion(state *models.IngestState) bool
StillIngestingOlderVersion returns true if another version of this bag is being ingested by this or another worker. See https://trello.com/c/GLURkoKW.
type APTFileDeleter ¶
type APTFileDeleter struct { // Context contains basic information required to run, // connect to Pharos, S3, etc. Context *context.Context // DeleteChannel is for the go routines that delete GenericFiles // from primary and secondary storage. DeleteChannel chan *models.DeleteState // PostProcess channel is for the goroutines that record // the outcome of the deletion in Pharos and finish or // requeue the NSQ message. PostProcessChannel chan *models.DeleteState // RingList keeps track of IntellectualObject identifiers so we don't // spam Pharos with requests to api/v2/objects/<id>/finish_delete. // Because Rails runs multiple processes, issuing multiple concurrent // finish_delete requests (which is proven to happen regularly) results // in multiple IntellectualObject 'deletion' PREMIS events. The Rails // code includes guards to prevent that, but they don't work across // multiple processes, so we have to implement a guard here. // This list is for object identifiers only, not generic file identifiers. RecentlyDeleted *models.RingList // contains filtered or unexported fields }
APTFileDeleter deletes files from S3 and Glacier long-term storage.
func NewAPTFileDeleter ¶
func NewAPTFileDeleter(_context *context.Context) *APTFileDeleter
func (*APTFileDeleter) HandleMessage ¶
func (deleter *APTFileDeleter) HandleMessage(message *nsq.Message) error
This is the callback that NSQ workers use to handle messages from NSQ.
type APTFileRestorer ¶
type APTFileRestorer struct { // Context contains basic information required to run, // connect to Pharos, S3, etc. Context *context.Context // RestoreChannel is for the go routines that restore GenericFiles // from primary and secondary storage. RestoreChannel chan *models.FileRestoreState // PostProcess channel is for the goroutines that record // the outcome of the restoration in Pharos and finish or // requeue the NSQ message. PostProcessChannel chan *models.FileRestoreState }
APTFileRestorer restores files from S3 and Glacier long-term storage.
func NewAPTFileRestorer ¶
func NewAPTFileRestorer(_context *context.Context) *APTFileRestorer
func (*APTFileRestorer) HandleMessage ¶
func (restorer *APTFileRestorer) HandleMessage(message *nsq.Message) error
This is the callback that NSQ workers use to handle messages from NSQ.
type APTFixityChecker ¶
type APTFixityChecker struct { // Context contains contextual info, such as a reference to the logger, // a Pharos client, basic configuration data and other things the fixity // checker needs to do its work. Context *context.Context // FixityChannel is where we calculate Sha256 digests of files stored in S3. FixityChannel chan *models.FixityResult // RecordChannel is where record PremisEvents in Pharos. RecordChannel chan *models.FixityResult // PostProcessChannel is where we dispose of NSQ messages and log outcomes. PostProcessChannel chan *models.FixityResult // ItemsInProcess contains a map of items we're currently processing. // When there's a backlog, it's common for items to be queued more than // once for fixity checking. We don't want to perform the fixity check // if it's already underway. ItemsInProcess *models.SynchronizedMap }
APTFixityChecker performs ongoing fixity checks on files stored in S3.
func NewAPTFixityChecker ¶
func NewAPTFixityChecker(_context *context.Context) *APTFixityChecker
func (*APTFixityChecker) HandleMessage ¶
func (checker *APTFixityChecker) HandleMessage(message *nsq.Message) error
HandleMessage handles a new message from NSQ. Unlike most other NSQ messages, where the message.Body is a WorkItem.Id (int as string), messages in the apt_fixity queue contain a GenericFile.Identifier. So the entire message body will be something like "georgetown.edu/georgetown.edu.10822_707412".
type APTGlacierRestoreInit ¶
type APTGlacierRestoreInit struct { // Context includes logging, config, network connections, and // other general resources for the worker. Context *context.Context // RequestChannel is for requesting an item be moved from Glacier // into S3. RequestChannel chan *models.GlacierRestoreState // CleanupChannel is for housekeeping, like updating NSQ. CleanupChannel chan *models.GlacierRestoreState // PostTestChannel is for testing only. In production, nothing listens // on this channel. PostTestChannel chan *models.GlacierRestoreState // S3Url is a custom URL that the S3 client should connect to. // We use this only in testing, when we want the client to talk // to a local test server. This should not be set in demo or // production. S3Url string }
Requests that an object be restored from Glacier to S3. This is the first step toward restoring a Glacier-only bag.
func NewGlacierRestore ¶
func NewGlacierRestore(_context *context.Context) *APTGlacierRestoreInit
func (*APTGlacierRestoreInit) Cleanup ¶
func (restorer *APTGlacierRestoreInit) Cleanup()
func (*APTGlacierRestoreInit) CreateRestoreWorkItem ¶
func (restorer *APTGlacierRestoreInit) CreateRestoreWorkItem(state *models.GlacierRestoreState)
createRestoreWorkItem: We call this to create a normal WorkItem with action='Restore 'when we know all files have been restored from Glacier to S3. Once all files are in S3, the apt_restore process can follow the normal S3 restoration process. So we'll close out this WorkItem and open a new one, which will go into the apt_restore queue.
func (*APTGlacierRestoreInit) FinishWithError ¶
func (restorer *APTGlacierRestoreInit) FinishWithError(state *models.GlacierRestoreState)
func (*APTGlacierRestoreInit) FinishWithMaxAttemptsExceeded ¶
func (restorer *APTGlacierRestoreInit) FinishWithMaxAttemptsExceeded(state *models.GlacierRestoreState, report *models.GlacierRequestReport)
func (*APTGlacierRestoreInit) GetGenericFile ¶
func (restorer *APTGlacierRestoreInit) GetGenericFile(state *models.GlacierRestoreState) (*models.GenericFile, error)
func (*APTGlacierRestoreInit) GetGlacierRestoreState ¶
func (restorer *APTGlacierRestoreInit) GetGlacierRestoreState(message *nsq.Message, workItem *models.WorkItem) (*models.GlacierRestoreState, error)
func (*APTGlacierRestoreInit) GetIntellectualObject ¶
func (restorer *APTGlacierRestoreInit) GetIntellectualObject(state *models.GlacierRestoreState) (*models.IntellectualObject, error)
func (*APTGlacierRestoreInit) GetRequestDetails ¶
func (restorer *APTGlacierRestoreInit) GetRequestDetails(gf *models.GenericFile) (map[string]string, error)
This returns the info we'll need to ask AWS to move the file from Glacier to S3.
func (*APTGlacierRestoreInit) GetRequestRecord ¶
func (restorer *APTGlacierRestoreInit) GetRequestRecord(state *models.GlacierRestoreState, gf *models.GenericFile, details map[string]string) *models.GlacierRestoreRequest
func (*APTGlacierRestoreInit) GetS3HeadClient ¶
func (restorer *APTGlacierRestoreInit) GetS3HeadClient(storageOption string) (*network.S3Head, error)
func (*APTGlacierRestoreInit) HandleMessage ¶
func (restorer *APTGlacierRestoreInit) HandleMessage(message *nsq.Message) error
This is the callback that NSQ workers use to handle messages from NSQ.
func (*APTGlacierRestoreInit) HasPendingRestoreRequest ¶
func (restorer *APTGlacierRestoreInit) HasPendingRestoreRequest(state *models.GlacierRestoreState) bool
PT #158734805: Check to make sure no existing restore request exists before we create a new one. If a pending restore request exists for this same item, we don't want to create a duplicate. This check allows us to safely re-run a Glacier restore without creating unnecessary extra work (the Restore request is a more heavyweight operation). The bigger dange of creating duplicate Restore requests is that if two workers are simultaneously restoring they same item, they will overwrite each other's work and cause errors. This returns true if there's a pending request.
func (*APTGlacierRestoreInit) InitializeRetrieval ¶
func (restorer *APTGlacierRestoreInit) InitializeRetrieval(state *models.GlacierRestoreState, gf *models.GenericFile, details map[string]string, glacierRestoreRequest *models.GlacierRestoreRequest)
func (*APTGlacierRestoreInit) RequestAllFiles ¶
func (restorer *APTGlacierRestoreInit) RequestAllFiles(state *models.GlacierRestoreState)
func (*APTGlacierRestoreInit) RequestFile ¶
func (restorer *APTGlacierRestoreInit) RequestFile(state *models.GlacierRestoreState, gf *models.GenericFile)
func (*APTGlacierRestoreInit) RequestObject ¶
func (restorer *APTGlacierRestoreInit) RequestObject(state *models.GlacierRestoreState)
func (*APTGlacierRestoreInit) RequestRestore ¶
func (restorer *APTGlacierRestoreInit) RequestRestore()
func (*APTGlacierRestoreInit) RequeueForAdditionalRequests ¶
func (restorer *APTGlacierRestoreInit) RequeueForAdditionalRequests(state *models.GlacierRestoreState)
requeueForAdditionalRequests: We call this when we know we didn't issue Glacier restore requests for some of the files we'll need to restore (or maybe we issued the requests, but AWS/Glacier didn't accept them). In this case, we put the item back in the current queue and reprocess it, requesting Glacier-to-S3 restoration for any files still needing to be restored. We can requeue with a one-minute timeout.
func (*APTGlacierRestoreInit) RequeueToCheckState ¶
func (restorer *APTGlacierRestoreInit) RequeueToCheckState(state *models.GlacierRestoreState)
requeueToCheckState: We call this when we know we've requested Glacier-to-S3 restoration of all required files, and those requests have all been accepted. It typically takes 3-5 hours to get all the files into S3.
func (*APTGlacierRestoreInit) RestoreRequestNeeded ¶
func (restorer *APTGlacierRestoreInit) RestoreRequestNeeded(state *models.GlacierRestoreState, gf *models.GenericFile) (bool, error)
func (*APTGlacierRestoreInit) SaveWorkItemState ¶
func (restorer *APTGlacierRestoreInit) SaveWorkItemState(state *models.GlacierRestoreState)
saveWorkItemState saves a JSON representation of the GlacierRestoreState in Pharos' WorkItemState table. We do this primarily so an admin can review this info and trace evidence on problem cases. The WorkItemState JSON is visible on the WorkItem detail page of the Pharos UI.
func (*APTGlacierRestoreInit) UpdateWorkItem ¶
func (restorer *APTGlacierRestoreInit) UpdateWorkItem(state *models.GlacierRestoreState)
updateWorkItem saves the updated WorkItem in Pharos.
type APTQueue ¶
type APTQueue struct { Context *context.Context NSQClient *network.NSQClient // contains filtered or unexported fields }
func NewAPTQueue ¶
NewAPTQueue creates a new queue worker to push WorkItems from Pharos into NSQ, and marked them as queued. If param topic is specified, this will queue items destined for the specified topic; otherwise, it will queue items for all topics. If param enableStats is true, it will dump stats about what was queued to a JSON file. If param dryRun is true, it will log all the items it would have queued, without actually pushing anything to NSQ.
func (*APTQueue) GetStats ¶
func (aptQueue *APTQueue) GetStats() *stats.APTQueueStats
type APTQueueFixity ¶
type APTQueueFixity struct { Context *context.Context NSQClient *network.NSQClient // contains filtered or unexported fields }
func NewAPTQueueFixity ¶
func NewAPTQueueFixity(_context *context.Context, identifierLike string, maxFiles int) *APTQueueFixity
NewAPTQueueFixity creates a new worker to push files needing a fixity check into the NSQ apt_fixity_topic. Param _context is a Context object and maxFiles is the maximum number of files to queue. Param identifierLike is used in integration testing to select files we know exist.
func (*APTQueueFixity) Run ¶
func (aptQueue *APTQueueFixity) Run()
Run retrieves a list of GenericFiles needing fixity checks and adds the Identifier of each file to the NSQ apt_fixity_check topic. It stops after queuing maxFiles.
type APTRecorder ¶
type APTRecorder struct { Context *context.Context RecordChannel chan *models.IngestState CleanupChannel chan *models.IngestState }
Records ingest data (objects, files and events) in Pharos
func NewAPTRecorder ¶
func NewAPTRecorder(_context *context.Context) *APTRecorder
func (*APTRecorder) HandleMessage ¶
func (recorder *APTRecorder) HandleMessage(message *nsq.Message) error
This is the callback that NSQ workers use to handle messages from NSQ.
type APTRestorer ¶
type APTRestorer struct { // Context contains basic information required to run, // connect to Pharos, S3, etc. Context *context.Context // PackageChannel is for the go routines that reassemble // the S3 files into a new bag. PackageChannel chan *models.RestoreState // ValidateChannel is for go routines that validate the // newly assembled bag before sending it off to the restoration bucket. ValidateChannel chan *models.RestoreState // CopyChannel is for the goroutines that copy the newly // packaged bag to the depositor's restoration bucket in S3. CopyChannel chan *models.RestoreState // PostProcess channel is for the goroutines that record // the outcome of the restoration in Pharos and NSQ, and // do any other required cleanup. PostProcessChannel chan *models.RestoreState // BagValidationConfig is loaded from a JSON file in the // config directory. It describes what constitutes a valid // APTrust bag. BagValidationConfig *validation.BagValidationConfig }
APTRestorer restores bags by reassmbling their contents and pushing them into the depositor's restoration bucket.
func NewAPTRestorer ¶
func NewAPTRestorer(_context *context.Context) *APTRestorer
func (*APTRestorer) HandleMessage ¶
func (restorer *APTRestorer) HandleMessage(message *nsq.Message) error
This is the callback that NSQ workers use to handle messages from NSQ.
func (*APTRestorer) WritePremisEventFile ¶
func (restorer *APTRestorer) WritePremisEventFile(restoreState *models.RestoreState)
WritePremisEventFile: dump all PREMIS events to a file inside the restored bag, so users can see which files have been deleted or overwritten during the bag's time in APTrust.
type APTSpotTestRestore ¶
type APTSpotTestRestore struct { Context *context.Context CreatedBefore time.Time NotRestoredSince time.Time MaxSize int64 DryRun bool }
func NewAPTSpotTestRestore ¶
func NewAPTSpotTestRestore(_context *context.Context, maxSize int64, createdBefore, notRestoredSince time.Time) *APTSpotTestRestore
NewAPTSpotTestRestore creates a new restore spot test worker. This is meant to run as a cron job.
Param maxSize tells the worker to choose bags no larger than maxSize for restoration. Param createdBefore means choose bags created before this date. Param notRestoredSince means choose bags that have not been restored since this date (which helps prevent us restoring the same bag again and again).
func (*APTSpotTestRestore) CreateWorkItem ¶
func (restoreTest *APTSpotTestRestore) CreateWorkItem(obj *models.IntellectualObject) (*models.WorkItem, error)
CreateWorkItem creates the Restore WorkItem for the specified object identifier.
func (*APTSpotTestRestore) GetInstitutions ¶
func (restoreTest *APTSpotTestRestore) GetInstitutions() ([]*models.Institution, error)
GetInstitutions returns a list of all depositing institutions from Pharos.
func (*APTSpotTestRestore) GetLastIngestWorkItem ¶
func (restoreTest *APTSpotTestRestore) GetLastIngestWorkItem(objIdentifier string) (*models.WorkItem, error)
func (*APTSpotTestRestore) GetObjectFor ¶
func (restoreTest *APTSpotTestRestore) GetObjectFor(institution string) (*models.IntellectualObject, error)
GetObjectFor returns the IntellectualObject we should restore for the specified institution. This object will be the first one we find in Pharos that matches all of the criteria, which include:
institution - it belongs the specified institution
createdBefore - it was created before the specified date
notRestoredSince - it has not been restored since the specified date
maxSize - it's total size is less than or equal to this many bytes
state - is "A" for active
access - is not "restricted"
func (*APTSpotTestRestore) HasCompletedRestore ¶
func (restoreTest *APTSpotTestRestore) HasCompletedRestore(objIdentifier string) (bool, error)
HasCompletedRestore returns true if the object with the specified identifier has been successfully restored since NotRestoredSince.
func (*APTSpotTestRestore) Run ¶
func (restoreTest *APTSpotTestRestore) Run() ([]*models.WorkItem, error)
Run runs the spot test by choosing ONE bag from each institution that matches the specified criteria (smaller than maxSize, createdBefore the specified date and notRestoredSince the specified date). It creates a Restore WorkItem for each bag, and returns the WorkItems it created. The caller can get the WorkItem.Id and object identifier from there.
type APTStorer ¶
type APTStorer struct { Context *context.Context StorageChannel chan *models.IngestState CleanupChannel chan *models.IngestState RecordChannel chan *models.IngestState }
Stores GenericFiles in long-term storage (S3 and Glacier).
func NewAPTStorer ¶
func (*APTStorer) HandleMessage ¶
This is the callback that NSQ workers use to handle messages from NSQ.