workers

package
v0.0.0-...-9369c8b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2016 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

bagrecorder records bag metadata in Fluctus. That includes metadata for Intellectual Objects, Generic Files and Premis Events.

bagrestorer restores bags to partners' restoration buckets

bagstorer stores bags that have been unpacked and validated by apt_prepare. Each item/message follows this flow:

  1. Storage channel: copies files to S3 permanent storage.
  2. Results channel: tells the queue whether processing succeeded, and if not, whether the item should be requeued. Also logs results to json and message logs.
  3. Cleanup channel: cleans up the files after processing completes.

If a failure occurs anywhere in the first step, processing goes directly to the Results Channel, which records the error and the disposition (retry/give up).

As long as the message from nsq contains valid JSON, steps 2 and 3 ALWAYS run.

fixityworker.go receives GenericFile records from the fixity queue. It downloads the generic files from S3 preservation storage, calculates the files' SHA256 checksums and writes the results back to Fluctus. None of the data downloaded from S3 is saved to disk; it's simply streamed through the SHA256 hash writer and then discarded.

Index

Constants

View Source
const LARGE_FILE_SIZE = int64(50000000000)

Large file is ~50GB

Variables

This section is empty.

Functions

func CreateNsqConsumer

func CreateNsqConsumer(config *bagman.Config, workerConfig *bagman.WorkerConfig) (*nsq.Consumer, error)

Creates and returns an NSQ consumer for a worker process.

func CreateProcUtil

func CreateProcUtil(serviceGroup string) (procUtil *bagman.ProcessUtil)

Creates and returns a ProcessUtil object for a worker process. Param serviceGroup should be either "aptrust" or "dpn" and defaults to "aptrust". This is a late hack to fix a problem where DPN services are checking space on the wrong volume. This code will be replaced soon by Exchange.

func InitializeReader

func InitializeReader() (*bagman.WorkReader, error)

Initializes basic services for a reader fills the queues. Readers such as the bucket_reader and request_reader run as cron jobs. They read from external sources (Fluctus, S3 buckets, etc.) then add messages to the appropriate NSQ topic when they find work to be done.

Returns a MessageLog for the reader to log messages and a FluctusClient for the reader to read from Fluctus.

Will die if it cannot find the requested config file, or if essential config options (such as where to find Fluctus) are missing.

Types

type BagPreparer

type BagPreparer struct {
	FetchChannel   chan *bagman.IngestHelper
	UnpackChannel  chan *bagman.IngestHelper
	CleanUpChannel chan *bagman.IngestHelper
	ResultsChannel chan *bagman.IngestHelper
	ProcUtil       *bagman.ProcessUtil
	// contains filtered or unexported fields
}

apt_prepare receives messages from nsqd describing items in the S3 receiving buckets. It fetches, untars, and validates tar files, then queues them for storage, if they untar and validate successfully. Each item/message follows this flow:

  1. Fetch channel: fetches the file from S3.
  2. Unpack channel: untars the bag files, parses and validates the bag, reads tags, generates checksums and generic file UUIDs.
  3. Results channel: tells the queue whether processing succeeded, and if not, whether the item should be requeued. Also logs results to json and message logs.
  4. Cleanup channel: cleans up the files after processing completes.

If a failure occurs anywhere in the first three steps, processing goes directly to the Results Channel, which records the error and the disposition (retry/give up).

As long as the message from nsq contains valid JSON, steps 4 and 5 ALWAYS run.

Some notes... It's essential that that the fetchChannel be limited to a relatively low number. If we are downloading 1GB tar files, we need space to store the tar file AND the untarred version. That's about 2 x 1GB. We do not want to pull down 1000 files at once, or we'll run out of disk space! If config sets fetchers to 10, we can pull down 10 files at a time. The fetch queue could hold 10 * 4 = 40 items, so we'd have max 40 tar files + untarred directories on disk at once. The number of workers should be close to the number of CPU cores.

We do NOT want one go routine per S3 file. If we do that, the system will run out of file handles, as we'll have tens of thousands of open connections to S3 trying to write data into tens of thousands of local files.

func NewBagPreparer

func NewBagPreparer(procUtil *bagman.ProcessUtil) *BagPreparer

func (*BagPreparer) HandleMessage

func (bagPreparer *BagPreparer) HandleMessage(message *nsq.Message) error

MessageHandler handles messages from the queue, putting each item into the pipleline.

func (*BagPreparer) SendToStorageQueue

func (bagPreparer *BagPreparer) SendToStorageQueue(helper *bagman.IngestHelper)

Puts an item into the queue for Fluctus/Fedora metadata processing.

type BagRecorder

type BagRecorder struct {
	FedoraChannel  chan *bagman.ProcessResult
	CleanupChannel chan *bagman.ProcessResult
	ResultsChannel chan *bagman.ProcessResult
	ProcUtil       *bagman.ProcessUtil
	UsingNsq       bool
	WaitGroup      sync.WaitGroup
}

func NewBagRecorder

func NewBagRecorder(procUtil *bagman.ProcessUtil) *BagRecorder

func (*BagRecorder) DeleteS3File

func (bagRecorder *BagRecorder) DeleteS3File(result *bagman.ProcessResult)

Delete the original tar file from the depositor's S3 receiving bucket.

func (*BagRecorder) HandleMessage

func (bagRecorder *BagRecorder) HandleMessage(message *nsq.Message) error

MessageHandler handles messages from the queue, putting each item into the pipleline.

func (*BagRecorder) QueueItemsForReplication

func (bagRecorder *BagRecorder) QueueItemsForReplication(result *bagman.ProcessResult)

func (*BagRecorder) RunWithoutNsq

func (bagRecorder *BagRecorder) RunWithoutNsq(result *bagman.ProcessResult)

type BagRestorer

type BagRestorer struct {
	RestoreChannel chan *RestoreObject
	ResultsChannel chan *RestoreObject
	ProcUtil       *bagman.ProcessUtil
}

func NewBagRestorer

func NewBagRestorer(procUtil *bagman.ProcessUtil) *BagRestorer

func (*BagRestorer) HandleMessage

func (bagRestorer *BagRestorer) HandleMessage(message *nsq.Message) error

MessageHandler handles messages from the queue, putting each item into the pipleline.

type BagStorer

type BagStorer struct {
	StorageChannel chan *bagman.IngestHelper
	CleanUpChannel chan *bagman.IngestHelper
	ResultsChannel chan *bagman.IngestHelper
	ProcUtil       *bagman.ProcessUtil
}

func NewBagStorer

func NewBagStorer(procUtil *bagman.ProcessUtil) *BagStorer

func (*BagStorer) HandleMessage

func (bagStorer *BagStorer) HandleMessage(message *nsq.Message) error

MessageHandler handles messages from the queue, putting each item into the pipleline.

func (*BagStorer) SendToMetadataQueue

func (bagStorer *BagStorer) SendToMetadataQueue(helper *bagman.IngestHelper)

Puts an item into the queue for Fluctus/Fedora metadata processing.

func (*BagStorer) SendToTroubleQueue

func (bagStorer *BagStorer) SendToTroubleQueue(helper *bagman.IngestHelper)

Puts an item into the trouble queue.

type DeleteObject

type DeleteObject struct {
	GenericFile   *bagman.GenericFile
	ProcessStatus *bagman.ProcessStatus `json:"-"`
	NsqMessage    *nsq.Message          `json:"-"`
	ErrorMessage  string
	Retry         bool
}

DeleteObject holds information about the state of a single delete operation.

type FailedFixityProcessor

type FailedFixityProcessor struct {
	ProcUtil *bagman.ProcessUtil
}

FailedFixityProcessor dumps the FixityResult structure of items where fixity check could not be completed into readable JSON files for review.

func NewFailedFixityProcessor

func NewFailedFixityProcessor(procUtil *bagman.ProcessUtil) *FailedFixityProcessor

func (*FailedFixityProcessor) HandleMessage

func (processor *FailedFixityProcessor) HandleMessage(message *nsq.Message) error

type FailedReplicationProcessor

type FailedReplicationProcessor struct {
	ProcUtil *bagman.ProcessUtil
}

FailedReplicationProcessor dumps the ReplicationResult structure of items where replication check could not be completed into readable JSON files for review.

func NewFailedReplicationProcessor

func NewFailedReplicationProcessor(procUtil *bagman.ProcessUtil) *FailedReplicationProcessor

func (*FailedReplicationProcessor) HandleMessage

func (processor *FailedReplicationProcessor) HandleMessage(message *nsq.Message) error

type FileDeleter

type FileDeleter struct {
	DeleteChannel  chan *DeleteObject
	ResultsChannel chan *DeleteObject
	ProcUtil       *bagman.ProcessUtil
	// Replication client connects to the
	// replication bucket in Oregon.
	S3ReplicationClient *bagman.S3Client
}

func NewFileDeleter

func NewFileDeleter(procUtil *bagman.ProcessUtil) *FileDeleter

func (*FileDeleter) HandleMessage

func (fileDeleter *FileDeleter) HandleMessage(message *nsq.Message) error

MessageHandler handles messages from the queue, putting each item into the pipleline.

type FixityChecker

type FixityChecker struct {
	FixityChannel  chan *bagman.FixityResult
	ResultsChannel chan *bagman.FixityResult
	ProcUtil       *bagman.ProcessUtil
}

func NewFixityChecker

func NewFixityChecker(procUtil *bagman.ProcessUtil) *FixityChecker

func (*FixityChecker) HandleMessage

func (fixityChecker *FixityChecker) HandleMessage(message *nsq.Message) error

MessageHandler handles messages from the queue, putting each item into the pipleline.

type ReplicationObject

type ReplicationObject struct {
	File       *bagman.File
	NsqMessage *nsq.Message
}

type Replicator

type Replicator struct {
	ReplicationChannel  chan *ReplicationObject
	S3ReplicationClient *bagman.S3Client
	ProcUtil            *bagman.ProcessUtil
}

func NewReplicator

func NewReplicator(procUtil *bagman.ProcessUtil) *Replicator

func (*Replicator) CopyAndSaveEvent

func (replicator *Replicator) CopyAndSaveEvent(replicationObject *ReplicationObject) (string, error)

Copy the file to S3 in Oregon and save the replication PremisEvent to Fluctus. Returns the S3 URL of the newly-saved file or an error.

func (*Replicator) CopyFile

func (replicator *Replicator) CopyFile(replicationObject *ReplicationObject) (string, error)

Copies a file from one bucket to another, across regions, including all of APTrust's custom metadata. Returns the URL of the destination file (that should be in the replication bucket in Oregon), or an error.

This does NOT use PUT COPY internally because PUT COPY is limited to files of 5GB or less, and we'll have many files over 5GB. The copy operation downloads data from the S3 preservation bucket and uploads it to the replication bucket.

As long as we're running in the same region as our S3 preservation bucket (USEast), the download should be fast and free. Running this code outside of USEast will be slow and expensive, since we'll have to pay for the bandwidth of BOTH download and upload.

func (*Replicator) DownloadFromPreservation

func (replicator *Replicator) DownloadFromPreservation(file *bagman.File) (string, error)

Copies a file from the preservation bucket to a local file and returns the path to the local file.

func (*Replicator) GetCopyOptions

func (replicator *Replicator) GetCopyOptions(file *bagman.File) (s3.Options, error)

Returns S3 options, including the md5 checksum and APTrust's custom metadata. These options must accompany the file copy.

func (*Replicator) HandleMessage

func (replicator *Replicator) HandleMessage(message *nsq.Message) error

MessageHandler handles messages from the queue, putting each item into the replication channel.

func (*Replicator) ReplicatedFileExists

func (replicator *Replicator) ReplicatedFileExists(fileUUID string) bool

func (*Replicator) SaveReplicationEvent

func (replicator *Replicator) SaveReplicationEvent(file *bagman.File, url string) (*bagman.PremisEvent, error)

Saves the replication PremisEvent to Fluctus. Param url is the S3 URL we just saved that file to. That should be in Oregon.

func (*Replicator) SendToTroubleQueue

func (replicator *Replicator) SendToTroubleQueue(file *bagman.File)

Puts an item into the trouble queue.

type RestoreObject

type RestoreObject struct {
	BagRestorer     *bagman.BagRestorer
	ProcessStatus   *bagman.ProcessStatus
	NsqMessage      *nsq.Message
	ErrorMessage    string
	Retry           bool
	RestorationUrls []string
	// contains filtered or unexported fields
}

func (*RestoreObject) Key

func (object *RestoreObject) Key() string

func (*RestoreObject) RestoredBagUrls

func (object *RestoreObject) RestoredBagUrls() string

type TroubleProcessor

type TroubleProcessor struct {
	ProcUtil *bagman.ProcessUtil
}

TroubleProcessor dumps the ProcessResult structure of items that failed the ingest process into JSON files. The JSON is formatted and human-readable, and may be deserialized and loaded into other processes in the future. The ProcessResult structure contains fairly detailed information about every stage of the ingest process.

func NewTroubleProcessor

func NewTroubleProcessor(procUtil *bagman.ProcessUtil) *TroubleProcessor

func (*TroubleProcessor) HandleMessage

func (troubleProcessor *TroubleProcessor) HandleMessage(message *nsq.Message) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL