Documentation ¶
Overview ¶
bagrecorder records bag metadata in Fluctus. That includes metadata for Intellectual Objects, Generic Files and Premis Events.
bagrestorer restores bags to partners' restoration buckets
bagstorer stores bags that have been unpacked and validated by apt_prepare. Each item/message follows this flow:
- Storage channel: copies files to S3 permanent storage.
- Results channel: tells the queue whether processing succeeded, and if not, whether the item should be requeued. Also logs results to json and message logs.
- Cleanup channel: cleans up the files after processing completes.
If a failure occurs anywhere in the first step, processing goes directly to the Results Channel, which records the error and the disposition (retry/give up).
As long as the message from nsq contains valid JSON, steps 2 and 3 ALWAYS run.
fixityworker.go receives GenericFile records from the fixity queue. It downloads the generic files from S3 preservation storage, calculates the files' SHA256 checksums and writes the results back to Fluctus. None of the data downloaded from S3 is saved to disk; it's simply streamed through the SHA256 hash writer and then discarded.
Index ¶
- Constants
- func CreateNsqConsumer(config *bagman.Config, workerConfig *bagman.WorkerConfig) (*nsq.Consumer, error)
- func CreateProcUtil(serviceGroup string) (procUtil *bagman.ProcessUtil)
- func InitializeReader() (*bagman.WorkReader, error)
- type BagPreparer
- type BagRecorder
- func (bagRecorder *BagRecorder) DeleteS3File(result *bagman.ProcessResult)
- func (bagRecorder *BagRecorder) HandleMessage(message *nsq.Message) error
- func (bagRecorder *BagRecorder) QueueItemsForReplication(result *bagman.ProcessResult)
- func (bagRecorder *BagRecorder) RunWithoutNsq(result *bagman.ProcessResult)
- type BagRestorer
- type BagStorer
- type DeleteObject
- type FailedFixityProcessor
- type FailedReplicationProcessor
- type FileDeleter
- type FixityChecker
- type ReplicationObject
- type Replicator
- func (replicator *Replicator) CopyAndSaveEvent(replicationObject *ReplicationObject) (string, error)
- func (replicator *Replicator) CopyFile(replicationObject *ReplicationObject) (string, error)
- func (replicator *Replicator) DownloadFromPreservation(file *bagman.File) (string, error)
- func (replicator *Replicator) GetCopyOptions(file *bagman.File) (s3.Options, error)
- func (replicator *Replicator) HandleMessage(message *nsq.Message) error
- func (replicator *Replicator) ReplicatedFileExists(fileUUID string) bool
- func (replicator *Replicator) SaveReplicationEvent(file *bagman.File, url string) (*bagman.PremisEvent, error)
- func (replicator *Replicator) SendToTroubleQueue(file *bagman.File)
- type RestoreObject
- type TroubleProcessor
Constants ¶
const LARGE_FILE_SIZE = int64(50000000000)
Large file is ~50GB
Variables ¶
This section is empty.
Functions ¶
func CreateNsqConsumer ¶
func CreateNsqConsumer(config *bagman.Config, workerConfig *bagman.WorkerConfig) (*nsq.Consumer, error)
Creates and returns an NSQ consumer for a worker process.
func CreateProcUtil ¶
func CreateProcUtil(serviceGroup string) (procUtil *bagman.ProcessUtil)
Creates and returns a ProcessUtil object for a worker process. Param serviceGroup should be either "aptrust" or "dpn" and defaults to "aptrust". This is a late hack to fix a problem where DPN services are checking space on the wrong volume. This code will be replaced soon by Exchange.
func InitializeReader ¶
func InitializeReader() (*bagman.WorkReader, error)
Initializes basic services for a reader fills the queues. Readers such as the bucket_reader and request_reader run as cron jobs. They read from external sources (Fluctus, S3 buckets, etc.) then add messages to the appropriate NSQ topic when they find work to be done.
Returns a MessageLog for the reader to log messages and a FluctusClient for the reader to read from Fluctus.
Will die if it cannot find the requested config file, or if essential config options (such as where to find Fluctus) are missing.
Types ¶
type BagPreparer ¶
type BagPreparer struct { FetchChannel chan *bagman.IngestHelper UnpackChannel chan *bagman.IngestHelper CleanUpChannel chan *bagman.IngestHelper ResultsChannel chan *bagman.IngestHelper ProcUtil *bagman.ProcessUtil // contains filtered or unexported fields }
apt_prepare receives messages from nsqd describing items in the S3 receiving buckets. It fetches, untars, and validates tar files, then queues them for storage, if they untar and validate successfully. Each item/message follows this flow:
- Fetch channel: fetches the file from S3.
- Unpack channel: untars the bag files, parses and validates the bag, reads tags, generates checksums and generic file UUIDs.
- Results channel: tells the queue whether processing succeeded, and if not, whether the item should be requeued. Also logs results to json and message logs.
- Cleanup channel: cleans up the files after processing completes.
If a failure occurs anywhere in the first three steps, processing goes directly to the Results Channel, which records the error and the disposition (retry/give up).
As long as the message from nsq contains valid JSON, steps 4 and 5 ALWAYS run.
Some notes... It's essential that that the fetchChannel be limited to a relatively low number. If we are downloading 1GB tar files, we need space to store the tar file AND the untarred version. That's about 2 x 1GB. We do not want to pull down 1000 files at once, or we'll run out of disk space! If config sets fetchers to 10, we can pull down 10 files at a time. The fetch queue could hold 10 * 4 = 40 items, so we'd have max 40 tar files + untarred directories on disk at once. The number of workers should be close to the number of CPU cores.
We do NOT want one go routine per S3 file. If we do that, the system will run out of file handles, as we'll have tens of thousands of open connections to S3 trying to write data into tens of thousands of local files.
func NewBagPreparer ¶
func NewBagPreparer(procUtil *bagman.ProcessUtil) *BagPreparer
func (*BagPreparer) HandleMessage ¶
func (bagPreparer *BagPreparer) HandleMessage(message *nsq.Message) error
MessageHandler handles messages from the queue, putting each item into the pipleline.
func (*BagPreparer) SendToStorageQueue ¶
func (bagPreparer *BagPreparer) SendToStorageQueue(helper *bagman.IngestHelper)
Puts an item into the queue for Fluctus/Fedora metadata processing.
type BagRecorder ¶
type BagRecorder struct { FedoraChannel chan *bagman.ProcessResult CleanupChannel chan *bagman.ProcessResult ResultsChannel chan *bagman.ProcessResult ProcUtil *bagman.ProcessUtil UsingNsq bool WaitGroup sync.WaitGroup }
func NewBagRecorder ¶
func NewBagRecorder(procUtil *bagman.ProcessUtil) *BagRecorder
func (*BagRecorder) DeleteS3File ¶
func (bagRecorder *BagRecorder) DeleteS3File(result *bagman.ProcessResult)
Delete the original tar file from the depositor's S3 receiving bucket.
func (*BagRecorder) HandleMessage ¶
func (bagRecorder *BagRecorder) HandleMessage(message *nsq.Message) error
MessageHandler handles messages from the queue, putting each item into the pipleline.
func (*BagRecorder) QueueItemsForReplication ¶
func (bagRecorder *BagRecorder) QueueItemsForReplication(result *bagman.ProcessResult)
func (*BagRecorder) RunWithoutNsq ¶
func (bagRecorder *BagRecorder) RunWithoutNsq(result *bagman.ProcessResult)
type BagRestorer ¶
type BagRestorer struct { RestoreChannel chan *RestoreObject ResultsChannel chan *RestoreObject ProcUtil *bagman.ProcessUtil }
func NewBagRestorer ¶
func NewBagRestorer(procUtil *bagman.ProcessUtil) *BagRestorer
func (*BagRestorer) HandleMessage ¶
func (bagRestorer *BagRestorer) HandleMessage(message *nsq.Message) error
MessageHandler handles messages from the queue, putting each item into the pipleline.
type BagStorer ¶
type BagStorer struct { StorageChannel chan *bagman.IngestHelper CleanUpChannel chan *bagman.IngestHelper ResultsChannel chan *bagman.IngestHelper ProcUtil *bagman.ProcessUtil }
func NewBagStorer ¶
func NewBagStorer(procUtil *bagman.ProcessUtil) *BagStorer
func (*BagStorer) HandleMessage ¶
MessageHandler handles messages from the queue, putting each item into the pipleline.
func (*BagStorer) SendToMetadataQueue ¶
func (bagStorer *BagStorer) SendToMetadataQueue(helper *bagman.IngestHelper)
Puts an item into the queue for Fluctus/Fedora metadata processing.
func (*BagStorer) SendToTroubleQueue ¶
func (bagStorer *BagStorer) SendToTroubleQueue(helper *bagman.IngestHelper)
Puts an item into the trouble queue.
type DeleteObject ¶
type DeleteObject struct { GenericFile *bagman.GenericFile ProcessStatus *bagman.ProcessStatus `json:"-"` NsqMessage *nsq.Message `json:"-"` ErrorMessage string Retry bool }
DeleteObject holds information about the state of a single delete operation.
type FailedFixityProcessor ¶
type FailedFixityProcessor struct {
ProcUtil *bagman.ProcessUtil
}
FailedFixityProcessor dumps the FixityResult structure of items where fixity check could not be completed into readable JSON files for review.
func NewFailedFixityProcessor ¶
func NewFailedFixityProcessor(procUtil *bagman.ProcessUtil) *FailedFixityProcessor
func (*FailedFixityProcessor) HandleMessage ¶
func (processor *FailedFixityProcessor) HandleMessage(message *nsq.Message) error
type FailedReplicationProcessor ¶
type FailedReplicationProcessor struct {
ProcUtil *bagman.ProcessUtil
}
FailedReplicationProcessor dumps the ReplicationResult structure of items where replication check could not be completed into readable JSON files for review.
func NewFailedReplicationProcessor ¶
func NewFailedReplicationProcessor(procUtil *bagman.ProcessUtil) *FailedReplicationProcessor
func (*FailedReplicationProcessor) HandleMessage ¶
func (processor *FailedReplicationProcessor) HandleMessage(message *nsq.Message) error
type FileDeleter ¶
type FileDeleter struct { DeleteChannel chan *DeleteObject ResultsChannel chan *DeleteObject ProcUtil *bagman.ProcessUtil // Replication client connects to the // replication bucket in Oregon. S3ReplicationClient *bagman.S3Client }
func NewFileDeleter ¶
func NewFileDeleter(procUtil *bagman.ProcessUtil) *FileDeleter
func (*FileDeleter) HandleMessage ¶
func (fileDeleter *FileDeleter) HandleMessage(message *nsq.Message) error
MessageHandler handles messages from the queue, putting each item into the pipleline.
type FixityChecker ¶
type FixityChecker struct { FixityChannel chan *bagman.FixityResult ResultsChannel chan *bagman.FixityResult ProcUtil *bagman.ProcessUtil }
func NewFixityChecker ¶
func NewFixityChecker(procUtil *bagman.ProcessUtil) *FixityChecker
func (*FixityChecker) HandleMessage ¶
func (fixityChecker *FixityChecker) HandleMessage(message *nsq.Message) error
MessageHandler handles messages from the queue, putting each item into the pipleline.
type ReplicationObject ¶
type Replicator ¶
type Replicator struct { ReplicationChannel chan *ReplicationObject S3ReplicationClient *bagman.S3Client ProcUtil *bagman.ProcessUtil }
func NewReplicator ¶
func NewReplicator(procUtil *bagman.ProcessUtil) *Replicator
func (*Replicator) CopyAndSaveEvent ¶
func (replicator *Replicator) CopyAndSaveEvent(replicationObject *ReplicationObject) (string, error)
Copy the file to S3 in Oregon and save the replication PremisEvent to Fluctus. Returns the S3 URL of the newly-saved file or an error.
func (*Replicator) CopyFile ¶
func (replicator *Replicator) CopyFile(replicationObject *ReplicationObject) (string, error)
Copies a file from one bucket to another, across regions, including all of APTrust's custom metadata. Returns the URL of the destination file (that should be in the replication bucket in Oregon), or an error.
This does NOT use PUT COPY internally because PUT COPY is limited to files of 5GB or less, and we'll have many files over 5GB. The copy operation downloads data from the S3 preservation bucket and uploads it to the replication bucket.
As long as we're running in the same region as our S3 preservation bucket (USEast), the download should be fast and free. Running this code outside of USEast will be slow and expensive, since we'll have to pay for the bandwidth of BOTH download and upload.
func (*Replicator) DownloadFromPreservation ¶
func (replicator *Replicator) DownloadFromPreservation(file *bagman.File) (string, error)
Copies a file from the preservation bucket to a local file and returns the path to the local file.
func (*Replicator) GetCopyOptions ¶
Returns S3 options, including the md5 checksum and APTrust's custom metadata. These options must accompany the file copy.
func (*Replicator) HandleMessage ¶
func (replicator *Replicator) HandleMessage(message *nsq.Message) error
MessageHandler handles messages from the queue, putting each item into the replication channel.
func (*Replicator) ReplicatedFileExists ¶
func (replicator *Replicator) ReplicatedFileExists(fileUUID string) bool
func (*Replicator) SaveReplicationEvent ¶
func (replicator *Replicator) SaveReplicationEvent(file *bagman.File, url string) (*bagman.PremisEvent, error)
Saves the replication PremisEvent to Fluctus. Param url is the S3 URL we just saved that file to. That should be in Oregon.
func (*Replicator) SendToTroubleQueue ¶
func (replicator *Replicator) SendToTroubleQueue(file *bagman.File)
Puts an item into the trouble queue.
type RestoreObject ¶
type RestoreObject struct { BagRestorer *bagman.BagRestorer ProcessStatus *bagman.ProcessStatus NsqMessage *nsq.Message ErrorMessage string Retry bool RestorationUrls []string // contains filtered or unexported fields }
func (*RestoreObject) Key ¶
func (object *RestoreObject) Key() string
func (*RestoreObject) RestoredBagUrls ¶
func (object *RestoreObject) RestoredBagUrls() string
type TroubleProcessor ¶
type TroubleProcessor struct {
ProcUtil *bagman.ProcessUtil
}
TroubleProcessor dumps the ProcessResult structure of items that failed the ingest process into JSON files. The JSON is formatted and human-readable, and may be deserialized and loaded into other processes in the future. The ProcessResult structure contains fairly detailed information about every stage of the ingest process.
func NewTroubleProcessor ¶
func NewTroubleProcessor(procUtil *bagman.ProcessUtil) *TroubleProcessor
func (*TroubleProcessor) HandleMessage ¶
func (troubleProcessor *TroubleProcessor) HandleMessage(message *nsq.Message) error