Documentation ¶
Overview ¶
Don't include this in the partners build: it's not needed in the partner apps, and the syscall.Stat* functions cause the build to fail on Windows.
Index ¶
- func HasPendingDeleteRequest(workItems []*WorkItem) bool
- func HasPendingIngestRequest(workItems []*WorkItem) bool
- func HasPendingRestoreRequest(workItems []*WorkItem) bool
- type Checksum
- type ChecksumForPharos
- type CleanupResult
- type Config
- func (config *Config) AbsLogDirectory() string
- func (config *Config) EnsureLogDirectory() (string, error)
- func (config *Config) EnsurePharosConfig() error
- func (config *Config) ExpandFilePaths()
- func (config *Config) GetAWSAccessKeyId() string
- func (config *Config) GetAWSSecretAccessKey() string
- func (config *Config) StorageRegionAndBucketFor(storageOption string) (region string, bucket string, err error)
- func (config *Config) TestsAreRunning() bool
- type DPNConfig
- type DPNWorkItem
- func (item *DPNWorkItem) ClearNodeAndPid()
- func (item *DPNWorkItem) IsBeingProcessed() bool
- func (item *DPNWorkItem) IsBeingProcessedByMe(hostname string, pid int) bool
- func (item *DPNWorkItem) IsCompletedOrCancelled() bool
- func (item *DPNWorkItem) SerializeForPharos() ([]byte, error)
- func (item *DPNWorkItem) SetNodeAndPid()
- type DPNWorkItemForPharos
- type DefaultMetadata
- type DeleteState
- type FileRestoreState
- type FixityResult
- type GenericFile
- func (gf *GenericFile) BuildIngestChecksums() error
- func (gf *GenericFile) BuildIngestEvents() error
- func (gf *GenericFile) FindEventByIdentifier(identifier string) *PremisEvent
- func (gf *GenericFile) FindEventsByType(eventType string) []*PremisEvent
- func (gf *GenericFile) GetChecksumByAlgorithm(algorithm string) *Checksum
- func (gf *GenericFile) GetChecksumByDigest(digest string) *Checksum
- func (gf *GenericFile) InstitutionIdentifier() (string, error)
- func (gf *GenericFile) MergeAttributes(savedFile *GenericFile) []error
- func (gf *GenericFile) OriginalPath() string
- func (gf *GenericFile) OriginalPathWithBagName() (string, error)
- func (gf *GenericFile) PreservationStorageFileName() (string, error)
- func (gf *GenericFile) PropagateIdsToChildren()
- func (gf *GenericFile) SerializeForPharos() ([]byte, error)
- type GenericFileForPharos
- type GlacierRequestReport
- type GlacierRestoreRequest
- type GlacierRestoreState
- func (state *GlacierRestoreState) FindRequest(gfIdentifier string) *GlacierRestoreRequest
- func (state *GlacierRestoreState) GetFileIdentifiers() []string
- func (state *GlacierRestoreState) GetReport(gfIdentifiers []string) *GlacierRequestReport
- func (state *GlacierRestoreState) GetStorageOption() (string, error)
- type IngestManifest
- func (manifest *IngestManifest) AllErrorsAsString() string
- func (manifest *IngestManifest) BagHasBeenValidated() bool
- func (manifest *IngestManifest) BagIsOnDisk() bool
- func (manifest *IngestManifest) ClearAllErrors()
- func (manifest *IngestManifest) DBExists() bool
- func (manifest *IngestManifest) HasErrors() bool
- func (manifest *IngestManifest) HasFatalErrors() bool
- func (manifest *IngestManifest) ObjectIdentifier() (string, error)
- func (manifest *IngestManifest) SizeOfBagOnDisk() (int64, error)
- type IngestState
- type Institution
- type IntellectualObject
- func (obj *IntellectualObject) AccessValid() bool
- func (obj *IntellectualObject) AllFilesSaved() bool
- func (obj *IntellectualObject) BuildIngestChecksums() error
- func (obj *IntellectualObject) BuildIngestEvents(numberOfFiles int) error
- func (obj *IntellectualObject) FindEventsByType(eventType string) []PremisEvent
- func (obj *IntellectualObject) FindGenericFile(filePath string) *GenericFile
- func (obj *IntellectualObject) FindTag(tagName string) []*Tag
- func (obj *IntellectualObject) PayloadBytesAndFiles() (byteCount int64, fileCount int)
- func (obj *IntellectualObject) PropagateIdsToChildren()
- func (obj *IntellectualObject) SerializeForPharos() ([]byte, error)
- func (obj *IntellectualObject) TotalFileSize() int64
- type IntellectualObjectForPharos
- type MissingFile
- type PharosDPNBag
- type PremisEvent
- func NewEventFileDeletion(fileUUID, requestedBy, instApprover, aptrustApprover string, ...) *PremisEvent
- func NewEventGenericFileDigestCalculation(checksumGeneratedAt time.Time, fixityAlg, digest string) (*PremisEvent, error)
- func NewEventGenericFileFixityCheck(checksumVerifiedAt time.Time, fixityAlg, digest string, fixityMatched bool) (*PremisEvent, error)
- func NewEventGenericFileIdentifierAssignment(identifierGeneratedAt time.Time, identifierType, identifier string) (*PremisEvent, error)
- func NewEventGenericFileIngest(storedAt time.Time, md5Digest, _uuid string) (*PremisEvent, error)
- func NewEventGenericFileReplication(replicatedAt time.Time, replicationUrl string) (*PremisEvent, error)
- func NewEventObjectCreation() *PremisEvent
- func NewEventObjectIdentifierAssignment(objectIdentifier string) (*PremisEvent, error)
- func NewEventObjectIngest(numberOfFilesIngested int) (*PremisEvent, error)
- func NewEventObjectRights(accessSetting string) (*PremisEvent, error)
- type PremisEventForPharos
- type RestClientConfig
- type RestoreState
- func (restoreState *RestoreState) AllErrorsAsString() string
- func (restoreState *RestoreState) FinishNSQ()
- func (restoreState *RestoreState) HasErrors() bool
- func (restoreState *RestoreState) HasFatalErrors() bool
- func (restoreState *RestoreState) MostRecentSummary() *WorkSummary
- func (restoreState *RestoreState) RequeueNSQ(milliseconds int)
- func (restoreState *RestoreState) TouchNSQ()
- type RingList
- type S3File
- type StorageSummary
- type StoredFile
- type SynchronizedMap
- func (syncMap *SynchronizedMap) Add(key, value string)
- func (syncMap *SynchronizedMap) Delete(key string)
- func (syncMap *SynchronizedMap) Get(key string) string
- func (syncMap *SynchronizedMap) HasKey(key string) bool
- func (syncMap *SynchronizedMap) Keys() []string
- func (syncMap *SynchronizedMap) Values() []string
- type Tag
- type Volume
- func (volume *Volume) AvailableSpace() (uint64, error)
- func (volume *Volume) ClaimedSpace() uint64
- func (volume *Volume) MountPoint() string
- func (volume *Volume) Release(path string)
- func (volume *Volume) Reservations() map[string]uint64
- func (volume *Volume) Reserve(path string, numBytes uint64) error
- type VolumeResponse
- type WorkItem
- func (item *WorkItem) BelongsToAnotherWorker() bool
- func (item *WorkItem) HasBeenStored() bool
- func (item *WorkItem) IsInProgress() bool
- func (item *WorkItem) IsPastIngest() bool
- func (item *WorkItem) IsStoring() bool
- func (item *WorkItem) MsgAlreadyOnDisk() string
- func (item *WorkItem) MsgAlreadyValidated() string
- func (item *WorkItem) MsgGoingToFetch() string
- func (item *WorkItem) MsgGoingToValidation() string
- func (item *WorkItem) MsgPastIngest() string
- func (item *WorkItem) MsgSkippingInProgress() string
- func (item *WorkItem) SerializeForPharos() ([]byte, error)
- func (item *WorkItem) SetNodeAndPid()
- func (item *WorkItem) ShouldTryIngest() bool
- type WorkItemState
- type WorkItemStateForPharos
- type WorkSummary
- func (summary *WorkSummary) AddError(format string, a ...interface{})
- func (summary *WorkSummary) AllErrorsAsString() string
- func (summary *WorkSummary) ClearErrors()
- func (summary *WorkSummary) Finish()
- func (summary *WorkSummary) Finished() bool
- func (summary *WorkSummary) FirstError() string
- func (summary *WorkSummary) HasErrors() bool
- func (summary *WorkSummary) RunTime() time.Duration
- func (summary *WorkSummary) Start()
- func (summary *WorkSummary) Started() bool
- func (summary *WorkSummary) Succeeded() bool
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HasPendingDeleteRequest ¶
Returns true if the WorkItem records include a delete request that has not been completed.
func HasPendingIngestRequest ¶
Returns true if the WorkItem records include an ingest request that has not been completed.
func HasPendingRestoreRequest ¶
Returns true if the WorkItem records include a restore request that has not been completed.
Types ¶
type Checksum ¶
type Checksum struct { Id int `json:"id,omitempty"` // Do not serialize zero to JSON! GenericFileId int `json:"generic_file_id"` Algorithm string `json:"algorithm"` DateTime time.Time `json:"datetime"` Digest string `json:"digest"` CreatedAt time.Time `json:"created_at,omitempty"` UpdatedAt time.Time `json:"updated_at,omitempty"` }
Checksum contains information about a checksum that can be used to validate the integrity of a GenericFile. DateTime should be in ISO8601 format for local time or UTC when we serialize an object to JSON for Pharos.
For example: 1994-11-05T08:15:30-05:00 (Local Time) 1994-11-05T08:15:30Z (UTC)
func (*Checksum) MergeAttributes ¶
MergeAttributes sets the Id, CreatedAt and UpdatedAt properties of this checksum to match those os savedChecksum. We call this after saving a record to Pharos, which sets all of those properties. Generally, savedChecksum is a temporary record returned from Pharos, while this checksum is one we want to keep.
func (*Checksum) SerializeForPharos ¶
SerializeForPharos serializes a Checksum into a JSON format that the Pharos server will accept for PUT and POST calls.
type ChecksumForPharos ¶
type ChecksumForPharos struct { Id int `json:"id,omitempty"` // Do not serialize zero to JSON! GenericFileId int `json:"generic_file_id"` Algorithm string `json:"algorithm"` DateTime time.Time `json:"datetime"` Digest string `json:"digest"` }
Same as Checksum, but without CreatedAt and UpdatedAt
func NewChecksumForPharos ¶
func NewChecksumForPharos(cs *Checksum) *ChecksumForPharos
type CleanupResult ¶
type CleanupResult struct { // The S3 key of the original bag file. This will // be in one of the receiving buckets. This is not // necessarily the file we'll be deleting, but all // files to be deleted are related to this bag. BagName string // The ETag of the original uploaded bag (minus the // quotes). This is the bag's md5 sum for bags under // about 2GB. ETag string // The modified date of the original bag. BagDate time.Time // The identifier of the intellectual object to which // the Files belong. This may be an empty string in // cases where we're cleaning up files from a bag that // failed ingest. If it's not null, the bag was successfully // ingested, and the identifier will look something like // virginia.edu/bag_name ObjectIdentifier string // Files contains a list of files/keys to be deleted // from S3. Files []*S3File WorkSummary *WorkSummary }
CleanupResult describes one or more files to be deleted from S3, and the result of the attempts to delete those files. The combination of BagName + ETag + BagDate maps to a unique entry in Fluctus' ProcessedItems table.
func (*CleanupResult) Succeeded ¶
func (result *CleanupResult) Succeeded() bool
Returns true if all files were successfully deleted.
type Config ¶
type Config struct { // ActiveConfig is the configuration currently // in use. ActiveConfig string // The name of the AWS region that hosts APTrust's Glacier files. APTrustGlacierRegion string // The name of the AWS region that hosts APTrust's S3 files. APTrustS3Region string // Config options specific to DPN services. DPN DPNConfig // Configuration options for apt_bag_delete BagDeleteWorker WorkerConfig // BagItVersion is the version number we write into the // bagit.txt file when we restore a bag. BagItVersion string // BagItEncoding is the encoding value we write into the // bagit.txt file when we restore a bag. BagItEncoding string // Location of the config file for bag validation. // Config will differ for APTrust and DPN. This is // for the APTrust config file. BagValidationConfigFile string // The bucket reader checks for new items in the receiving // buckets and queues them for ingest if they're not already // queued. During periods of heavy ingest, we may have // 10,000+ items in the receiving buckets. To avoid doing // 10,000+ REST calls to Pharos to check for existing WorkItems, // the bucket reader can do a handful of calls to cache // all new ingest records from the past X hours. We usually set // BucketReaderCacheHours to 24, to cache items that have appeared // in the past day. The bucket reader WILL look items that aren't // in the cache, but during peak hours when Pharos is under heavy // load, this will save the server a lot of work. BucketReaderCacheHours int // Should we delete the uploaded tar file from the receiving // bucket after successfully processing this bag? DeleteOnSuccess bool // Configuration options for apt_fetch FetchWorker WorkerConfig // Configuration options for apt_file_delete FileDeleteWorker WorkerConfig // Configuration options for apt_file_restore FileRestoreWorker WorkerConfig // Configuration options for apt_fixity, which // handles ongoing fixity checks. FixityWorker WorkerConfig // GlacierBucketVA is the name of the Glacier-only storage bucket in Virginia. GlacierBucketVA string // GlacierBucketOH is the name of the Glacier-only storage bucket in Ohio. GlacierBucketOH string // GlacierBucketOR is the name of the Glacier-only storage bucket in Oregon. // This bucket is distinct from the regular APTrust Glacier preservation // bucket in Oregon, since this one is for the Glacier-only storage class. GlacierBucketOR string // GlacierBucketDeepVA is the name of the Glacier Deep Archive storage // bucket in Virginia. GlacierDeepBucketVA string // GlacierDeepBucketOH is the name of the Glacier Deep Archive storage // bucket in Ohio. GlacierDeepBucketOH string // GlacierDeepBucketOR is the name of the Glacier Deep Archive storage // bucket in Oregon. GlacierDeepBucketOR string // GlacierRegionVA is the name of the AWS region in which the Virginia // Glacier-only storage bucket is located. GlacierRegionVA string // GlacierRegionOH is the name of the AWS region in which the Ohio // Glacier-only storage bucket is located. GlacierRegionOH string // GlacierRegionOR is the name of the AWS region in which the Oregon // Glacier-only storage bucket is located. GlacierRegionOR string // Configuration options for apt_glacier_restore GlacierRestoreWorker WorkerConfig // LogDirectory is where we'll write our log files. LogDirectory string // LogLevel is defined in github.com/op/go-logging // and should be one of the following: // 1 - CRITICAL // 2 - ERROR // 3 - WARNING // 4 - NOTICE // 5 - INFO // 6 - DEBUG LogLevel logging.Level // If true, processes will log to STDERR in addition // to their standard log files. You really only want // to do this in development. LogToStderr bool // Maximum number of days allowed between scheduled // fixity checks. The fixity_reader periodically // queries Pharos for GenericFiles whose last // fixity check was greater than or equal to this // number of days ago. Those items are put into the // fixity_check queue. MaxDaysSinceFixityCheck int // MaxFileSize is the size in bytes of the largest // tar file we're willing to process. Set to zero // to process all files, regardless of size. // Set to some reasonably small size (100000 - 500000) // when you're running locally, or else you'll wind // up pulling down a huge amount of data from the // receiving buckets. MaxFileSize int64 // NsqdHttpAddress tells us where to find the NSQ server // where we can read from and write to topics and channels. // It's typically something like "http://localhost:4151" NsqdHttpAddress string // NsqLookupd is the full HTTP(S) address of the NSQ Lookup // daemon, which is where our worker processes look first to // discover where they can find topics and channels. This is // typically something like "localhost:4161" NsqLookupd string // The version of the Pharos API we're using. This should // start with a v, like v1, v2.2, etc. PharosAPIVersion string // PharosURL is the URL of the Pharos server where // we will be recording results and metadata. This should // start with http:// or https:// PharosURL string // The name of the preservation bucket to which we should // copy files for long-term storage. PreservationBucket string // ReceivingBuckets is a list of S3 receiving buckets to check // for incoming tar files. ReceivingBuckets []string // Configuration options for apt_record RecordWorker WorkerConfig // The bucket that stores a second copy of our perservation // files. This should be in a different region than the // preseration bucket. As of November 2014, the preservation // bucket is in Virginia, and the replication bucket is in // Oregon. ReplicationBucket string // The path to the local directory that will temporarily // hold files being copied from the preservartion bucket // in US East to the replication bucket in USWest2. ReplicationDirectory string // RestoreDirectory is the directory in which we will // rebuild IntellectualObject before sending them // off to the S3 restoration bucket. RestoreDirectory string // If true, we should restore bags to our partners' test // restoration buckets instead of the usual restoration // buckets. This should be true only in the demo config, // which is what we run on test.aptrust.org. RestoreToTestBuckets bool // Configuration options for apt_restore RestoreWorker WorkerConfig // SkipAlreadyProcessed indicates whether or not the // bucket_reader should put successfully-processed items into // NSQ for re-processing. This is amost always set to false. // The exception is when we deliberately want to reprocess // items to test code changes. SkipAlreadyProcessed bool // Configuration options for apt_store StoreWorker WorkerConfig // TarDirectory is the directory in which we will // untar files from S3. This should be on a volume // with lots of free disk space. TarDirectory string // UseVolumeService describes whether to use volume_service or // to try to reserve disk space before downloading and processing // bags. You'll want to use this service on systems with a fixed // amount of disk space, so that APTrust and DPN services don't // try to download bags that won't fit in the remaining disk space. // When this is on, and the volume_service is running, APTrust and // DPN services will simply reque items that require more disk space // than is currently available. UseVolumeService should be false // (off) when using Amazon's EFS volumes because querying EFS volumes // for available space often returns an error, and that causes items // to be requeued when they should be processed, and EFS volumes // are virtually guaranteed to have more space than we need to process // bags. UseVolumeService bool // The port number, on localhost, where the HTTP // VolumeService should run. This is always on // 127.0.0.1, because it has to access the same // volumes and mounts as the locally running // services. VolumeServicePort int }
func LoadConfigFile ¶
This returns the configuration that the user requested, which is specified in the -config flag when we run a program from the command line
func (*Config) AbsLogDirectory ¶
func (*Config) EnsureLogDirectory ¶
Ensures that the logging directory exists, creating it if necessary. Returns the absolute path the logging directory.
TODO: Rename this, since it's ensuring more than just the existence of the log directory.
func (*Config) EnsurePharosConfig ¶
func (*Config) ExpandFilePaths ¶
func (config *Config) ExpandFilePaths()
Expands ~ file paths and bag validation config file relative paths to absolute paths.
func (*Config) GetAWSAccessKeyId ¶
GetAWSAccessKeyId returns the AWS Access Key ID from the environment, or an empty string if the ENV var isn't set. In test context, this returns a dummy key id so we don't get an error in the Travis CI environment.
func (*Config) GetAWSSecretAccessKey ¶
GetAWSAccessSecretAccessKey returns the AWS Secret Access Key from the environment, or an empty string if the ENV var isn't set. In test context, this returns a dummy key id so we don't get an error in the Travis CI environment.
func (*Config) StorageRegionAndBucketFor ¶
func (*Config) TestsAreRunning ¶
TestsAreRunning returns true if we're running unit or integration tests; false otherwise.
type DPNConfig ¶
type DPNConfig struct { // Should we accept self-signed and otherwise invalid SSL // certificates? We need to do this in testing, but it // should not be allowed in production. Bools in Go default // to false, so if this is not set in config, we should be // safe. AcceptInvalidSSLCerts bool // Location of the config file for bag validation. // Config will differ for APTrust and DPN. This is // for the DPN config file. BagValidationConfigFile string // Default metadata that goes into bags produced at our node. DefaultMetadata DefaultMetadata // DPNAPIVersion is the current version of the DPN REST API. // This should be a string in the format api-v1, api-v2, etc. DPNAPIVersion string // The name of the AWS region that hosts DPN's Glacier files. DPNGlacierRegion string // DPNCopyWorker copies tarred bags from other nodes into our // DPN staging area, so we can replication them. Currently, // copying is done by rsync over ssh. DPNCopyWorker WorkerConfig // DPNFixityWorker processes requests to run fixity checks on // bags that have been copied from Glacier through S3 into // local storage. DPNFixityWorker WorkerConfig // DPNGlacierRestoreWorker processes requests to move files // from Glacier storage to S3 storage. DPNGlacierRestoreWorker WorkerConfig // DPNIngestStoreWorker copies DPN bags ingested from APTrust // to AWS Glacier. DPNIngestStoreWorker WorkerConfig // DPNPackageWorker packages APTrust IntellectualObjects into // DPN bags, so they can be ingested into DPN. DPNPackageWorker WorkerConfig // The name of the long-term storage bucket for DPN. // This is, in effect, a Glacier bucket. (S3 with a // move-to-Glacier policy.) DPNPreservationBucket string // The name of the bucket into which we restore DPN items // for retrieval and fixity checking. This is an S3 bucket. DPNRestorationBucket string // DPNIngestRecordWorker records DPN ingest events in Pharos // and in the DPN REST server. DPNIngestRecordWorker WorkerConfig // DPNReplicationStoreWorker copies DPN bags replicated from // other nodes to AWS Glacier. DPNReplicationStoreWorker WorkerConfig // DPNRestoreWorker processed RestoreTransfer requests. DPNRestoreWorker WorkerConfig // DPNS3DownloadWorker processes requests to move files // from S3 to local storage. These files have previously // been moved from Glacier to S3 by the DPNGlacierRestoreWorker. // We do the downlad from S3 to local before checking fixity, // which in DPN requires us to parse and validate the entire // tarred bag, then calculate the sha256 checksum of the bag's // tag manifest. DPNS3DownloadWorker WorkerConfig // DPNValidationWorker validates DPN bags that we are replicating // from other nodes. DPNValidationWorker WorkerConfig // LocalNode is the namespace of the node this code is running on. // E.g. "aptrust", "chron", "hathi", "tdr", "sdr" LocalNode string // Where should DPN service logs go? LogDirectory string // Log level (4 = debug) LogLevel logging.Level // Should we log to Stderr in addition to writing to // the log file? LogToStderr bool // RemoteNodeHomeDirectory is the prefix to the home directory // for remote DPN nodes that connect to our node via rsync/ssh. // On demo and production, this should be "/home". The full home // directory for a user like tdr would be "/home/dpn.tdr". // On a local dev or test machine,this can be any path the user // has full read/write access to. RemoteNodeHomeDirectory string // Number of nodes we should replicate bags to. ReplicateToNumNodes int // Settings for connecting to our own REST service RestClient RestClientConfig // RemoteNodeAdminTokensForTesting are used in integration // tests only, when we want to perform admin-only operations, // such as creating bags and replication requests on a remote // node in the test cluster. RemoteNodeAdminTokensForTesting map[string]string // API Tokens for connecting to remote nodes RemoteNodeTokens map[string]string // URLs for remote nodes. Set these only if you want to // override the node URLs we get back from our local // DPN REST server. RemoteNodeURLs map[string]string // The local directory for DPN staging. We store DPN bags // here while they await transfer to the DPN preservation // bucket and while they await replication to other nodes. StagingDirectory string // The local directory for bag restoration. We download bags // into this directory for DPN fixity checking, which requires // full parsing and validation of the entire bag. DPNRestorationDirectory string // When copying bags from remote nodes, should we use rsync // over SSH (true) or just plain rsync (false)? For local // integration testing, this should be false. UseSSHWithRsync bool }
type DPNWorkItem ¶
type DPNWorkItem struct { Id int `json:"id"` RemoteNode string `json:"remote_node"` Task string `json:"task"` Identifier string `json:"identifier"` QueuedAt *time.Time `json:"queued_at"` CompletedAt *time.Time `json:"completed_at"` ProcessingNode *string `json:"processing_node"` Pid int `json:"pid"` Stage string `json:"stage"` Status string `json:"status"` Retry bool `json:"retry"` Note *string `json:"note"` State *string `json:"state"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` }
DPNWorkItem contains some basic information about a DPN-related task. Valid task values are enumerated in constants/constants.go.
func (*DPNWorkItem) ClearNodeAndPid ¶
func (item *DPNWorkItem) ClearNodeAndPid()
Clear ProcessingNode and Pid on this DPNWorkItem.
func (*DPNWorkItem) IsBeingProcessed ¶
func (item *DPNWorkItem) IsBeingProcessed() bool
IsBeingProcessed returns true if this item is currently being processed by any node.
func (*DPNWorkItem) IsBeingProcessedByMe ¶
func (item *DPNWorkItem) IsBeingProcessedByMe(hostname string, pid int) bool
IsBeingProcessedByMe returns true if this item is currently being processed by the specified hostname under the specified pid.
func (*DPNWorkItem) IsCompletedOrCancelled ¶
func (item *DPNWorkItem) IsCompletedOrCancelled() bool
Returns true if this item has been completed or cancelled. The worker processes check this to see if they should actually perform their work.
func (*DPNWorkItem) SerializeForPharos ¶
func (item *DPNWorkItem) SerializeForPharos() ([]byte, error)
SerializeForPharos serializes a version of DPNWorkItem that Pharos will accept as post/put input. The Pharos post/put serialization omits some fields that are not allowed by Rails strong params.
func (*DPNWorkItem) SetNodeAndPid ¶
func (item *DPNWorkItem) SetNodeAndPid()
Set ProcessingNode and Pid on this DPNWorkItem.
type DPNWorkItemForPharos ¶
type DPNWorkItemForPharos struct { RemoteNode string `json:"remote_node"` Task string `json:"task"` Identifier string `json:"identifier"` QueuedAt *time.Time `json:"queued_at"` CompletedAt *time.Time `json:"completed_at"` ProcessingNode *string `json:"processing_node"` Pid int `json:"pid"` Note *string `json:"note"` State *string `json:"state"` Retry bool `json:"retry"` Stage string `json:"stage"` Status string `json:"status"` }
func NewDPNWorkItemForPharos ¶
func NewDPNWorkItemForPharos(item *DPNWorkItem) *DPNWorkItemForPharos
type DefaultMetadata ¶
type DefaultMetadata struct { Comment string BagItVersion string BagItEncoding string IngestNodeName string IngestNodeAddress string IngestNodeContactName string IngestNodeContactEmail string }
DefaultMetadata includes mostly static information about bags that APTrust packages for DPN.
type DeleteState ¶
type DeleteState struct { // NSQMessage is the NSQ message being processed in this restore // request. Not serialized because it will change each time we // try to process a request. NSQMessage *nsq.Message `json:"-"` // WorkItem is the Pharos WorkItem we're processing. // Not serialized because the Pharos WorkItem record will be // more up-to-date and authoritative. WorkItem *WorkItem `json:"-"` // GenericFile is the file to be deleted. GenericFile *GenericFile `json:"-"` // DeleteSummary contains information about the outcome of the // attempt to delete the file. DeleteSummary *WorkSummary // DeletedFromPrimaryAt is a timestamp describing when the file // was deleted from primary storage (S3). DeletedFromPrimaryAt time.Time // DeletedFromSecondaryAt is a timestamp describing when the file // was deleted from secondary storage (Glacier). DeletedFromSecondaryAt time.Time }
DeleteState stores information about the state of a file deletion operation.
func NewDeleteState ¶
func NewDeleteState(message *nsq.Message) *DeleteState
NewDeleteState creates a new DeleteState object with an empty DeleteSummary.
type FileRestoreState ¶
type FileRestoreState struct { // NSQMessage is the NSQ message being processed in this restore // request. Not serialized because it will change each time we // try to process a request. NSQMessage *nsq.Message `json:"-"` // WorkItem is the Pharos WorkItem we're processing. // Not serialized because the Pharos WorkItem record will be // more up-to-date and authoritative. WorkItem *WorkItem `json:"-"` // GenericFile is the file we're going to restore. We don't // serialize this. We fetch it fresh from Pharos each time. GenericFile *GenericFile `json:"-"` // IntellectualObject is the object to which the file belongs. IntellectualObject *IntellectualObject `json:"-"` // RestoreSummary contains information about the restore operation, // such as when it started and completed, whether there were errors, // etc. RestoreSummary *WorkSummary // RestoredToUrl is a URL that points to the copy of this bag // in the depositor's S3 restoration bucket. RestoredToURL string // CopiedToRestorationAt is a timestamp describing when the // reassembled bag was copied to the depositor's S3 restoration // bucket. CopiedToRestorationAt time.Time }
FileRestoreState stores information about the state of a file restoration operation. This entire structure will be converted to JSON and saved as a WorkItemState object in Pharos.
func NewFileRestoreState ¶
func NewFileRestoreState(message *nsq.Message) *FileRestoreState
NewFileRestoreState creates a new FileRestoreState object with empty RestoreSummary.
type FixityResult ¶
type FixityResult struct { // NSQMessage is the NSQ message being processed in this restore // request. Not serialized because it will change each time we // try to process a request. NSQMessage *nsq.Message `json:"-"` // GenericFile is the generic file whose fixity we're going to check. // This file is sitting somewhere on S3. GenericFile *GenericFile // S3FileExists describes whether the GenericFile file exist in S3. S3FileExists bool // Sha256 contains sha256 digest we calculated after downloading // the file. This will be empty initially. Sha256 string // Error records the error (if any) that occured while trying to // check fixity. Error error // ErrorIsFatal indicates whether the error will prevent us from // ever checking fixity on this item. ErrorIsFatal bool }
FixityResult descibes the results of fetching a file from S3 and verification of the file's sha256 checksum.
func NewFixityResult ¶
func NewFixityResult(message *nsq.Message) *FixityResult
NewFixityResult returns a new empty FixityResult object for the specified GenericFile.
func (*FixityResult) BucketAndKey ¶
func (result *FixityResult) BucketAndKey() (string, string, error)
BucketAndKey returns the name of the S3 bucket and key for the GenericFile.
func (*FixityResult) PharosSha256 ¶
func (result *FixityResult) PharosSha256() string
PharosSha256 returns the SHA256 checksum that Pharos has on record.
type GenericFile ¶
type GenericFile struct { // The Rails/Database id for this generic file. // If the Id is non-zero, it's been recorded in Pharos. Id int `json:"id,omitempty"` // The human-readable identifier for this file. It consists of // the object name, followed by a slash, followed by the path // of the file within the bag. E.g. "virginia.edu/bag001/data/file1.pdf" Identifier string `json:"identifier,omitempty"` // The id of the IntellectualObject to which this file belongs. IntellectualObjectId int `json:"intellectual_object_id,omitempty"` // The identifier of the intellectual object to which this file belongs. IntellectualObjectIdentifier string `json:"intellectual_object_identifier,omitempty"` // The file's mime type. E.g. "application/xml" FileFormat string `json:"file_format,omitempty"` // The location of this file in our primary s3 long-term storage bucket. URI string `json:"uri,omitempty"` // The size of the file, in bytes. Size int64 `json:"size,omitempty"` // The date this file was created by the depositor. This date comes from // the file record in the tarred bag. FileCreated time.Time `json:"file_created,omitempty"` // The date this file was last modified by the depository. This date comes // from the file record in the tarred bag. FileModified time.Time `json:"file_modified,omitempty"` // A timestamp indicating when this GenericFile record was created in // our repository. CreatedAt time.Time `json:"created_at,omitempty"` // UpdatedAt indicates when this GenericFile record was last updated in // our repository. UpdatedAt time.Time `json:"updated_at,omitempty"` // Checksums is a list of checksums for this file. Checksums []*Checksum `json:"checksums,omitempty"` // PremisEvents is a list of PREMIS events for this file. PremisEvents []*PremisEvent `json:"premis_events,omitempty"` // LastFixityCheck is the date and time we last verified // the fixity digest for this file. LastFixityCheck time.Time `json:"last_fixity_check,omitempty"` // State will be "A" for active files, "D" for deleted files. State string `json:"state,omitempty"` // Storage option: Standard, Glacier-OH, Glacier-OR, Glacier-VA, // "Glacier-Deep-OH", "Glacier-Deep-OR", "Glacier-Deep-VA". StorageOption string `json:"storage_option"` // IngestFileType can be one of the types defined in constants. // PAYLOAD_FILE, PAYLOAD_MANIFEST, TAG_MANIFEST, TAG_FILE IngestFileType string `json:"ingest_file_type,omitempty"` // IngestLocalPath is the absolute path to this file on local disk. // It may be empty if we're working with a tar file. IngestLocalPath string `json:"ingest_local_path,omitempty"` // IngestManifestMd5 is the md5 checksum of this file, as reported // in the bag's manifest-md5.txt file. This may be empty if there // was no md5 checksum file, or if this generic file wasn't listed // in the md5 manifest. IngestManifestMd5 string `json:"ingest_manifest_md5,omitempty"` // The md5 checksum we calculated at ingest from the actual file. IngestMd5 string `json:"ingest_md5,omitempty"` // DateTime we calculated the md5 digest from local file. IngestMd5GeneratedAt time.Time `json:"ingest_md5_generated_at,omitempty"` // DateTime we verified that our md5 checksum matches what's in the manifest. IngestMd5VerifiedAt time.Time `json:"ingest_md5_verified_at,omitempty"` // The sha256 checksum for this file, as reported in the payload manifest. // This may be empty if the bag had no sha256 manifest, or if this file // was not listed in the manifest. IngestManifestSha256 string `json:"ingest_manifest_sha256,omitempty"` // The sha256 checksum we calculated when we read the actual file. IngestSha256 string `json:"ingest_sha_256,omitempty"` // Timestamp of when we calculated the sha256 checksum. IngestSha256GeneratedAt time.Time `json:"ingest_sha_256_generated_at,omitempty"` // Timestamp of when we verified that the sha256 checksum we calculated // matches what's in the manifest. IngestSha256VerifiedAt time.Time `json:"ingest_sha_256_verified_at,omitempty"` // The UUID assigned to this file. This will be its S3 key when we store it. IngestUUID string `json:"ingest_uuid,omitempty"` // Timestamp of when we generated the UUID for this file. Needed to create // the identifier assignment PREMIS event. IngestUUIDGeneratedAt time.Time `json:"ingest_uuid_generated_at,omitempty"` // Where this file is stored in S3. IngestStorageURL string `json:"ingest_storage_url,omitempty"` // Timestamp indicating when this file was stored in S3. IngestStoredAt time.Time `json:"ingest_stored_at,omitempty"` // Where this file is stored in Glacier. IngestReplicationURL string `json:"ingest_replication_url,omitempty"` // Timestamp indicating when this file was stored in Glacier. IngestReplicatedAt time.Time `json:"ingest_replicated_at,omitempty"` // If true, a previous version of this same file exists in S3/Glacier. IngestPreviousVersionExists bool `json:"ingest_previous_version_exists,omitempty"` // If true, this file needs to be saved to S3. // We'll set this to false if a copy of the file already // exists in long-term storage with the same sha-256 digest. IngestNeedsSave bool `json:"ingest_needs_save,omitempty"` // Error that occurred during ingest. If empty, there was no error. IngestErrorMessage string `json:"ingesterror_message,omitempty"` // File User Id (unreliable) IngestFileUid int `json:"ingest_file_uid,omitempty"` // File Group Id (unreliable) IngestFileGid int `json:"ingest_file_gid,omitempty"` // File User Name (unreliable) IngestFileUname string `json:"ingest_file_uname,omitempty"` // File Group Name (unreliable) IngestFileGname string `json:"ingest_file_gname,omitempty"` // File Mode/Permissions (unreliable) IngestFileMode int64 `json:"ingest_file_mode,omitempty"` // FetchLocalPath is the path on the local file system where we // saved this file after retrieving it from S3 long-term storage. // We only set this when we fetch files to be restored or to be // packaged into a DPN bag. When we fetch files for fixity checking, // we stream them to /dev/null, because we're only interested in // computing a checksum. FetchLocalPath string `json:"fetch_local_path,omitempty"` // FetchMd5Value is the md5 digest we computed on the file we pulled // down from S3. This is supposed to match the file's known md5 fixity // value. FetchMd5Value string `json:"fetch_md5_value,omitempty"` // FetchSha256Value is the sha256 digest we computed on the file we // pulled down from S3. This should match the known sha256 digest. FetchSha256Value string `json:"fetch_sha256_value,omitempty"` // FetchErrorMessage describes any error that occurred during the // fetch process, including network errors, object not found, no disk // space, fixity mismatches, etc. FetchErrorMessage string `json:"fetch_error_message,omitempty"` }
GenericFile contains information about a file that makes up part (or all) of an IntellectualObject.
IntellectualObject is the object to which the file belongs.
Format is typically a mime-type, such as "application/xml", that describes the file format.
URI describes the location of the object (in APTrust?).
Size is the size of the object, in bytes.
FileCreated is the date and time at which the file was created by the depositor.
FileModified is the data and time at which the object was last modified (in APTrust, or at the institution that owns it?).
CreatedAt and UpdatedAt are Rails timestamps describing when this GenericFile records was created and last updated.
FileCreated and FileModified should be ISO8601 DateTime strings, such as: 1994-11-05T08:15:30-05:00 (Local Time) 1994-11-05T08:15:30Z (UTC)
func NewGenericFile ¶
func NewGenericFile() *GenericFile
func (*GenericFile) BuildIngestChecksums ¶
func (gf *GenericFile) BuildIngestChecksums() error
BuildIngestChecksums creates all of the ingest checksums for this GenericFile. See the notes for IntellectualObject.BuildIngestEvents, as they all apply here. This call is idempotent, so calling it multiple times will not mess up our data.
func (*GenericFile) BuildIngestEvents ¶
func (gf *GenericFile) BuildIngestEvents() error
BuildIngestEvents creates all of the ingest events for this GenericFile. See the notes for IntellectualObject.BuildIngestEvents, as they all apply here. This call is idempotent, so calling it multiple times will not mess up our data.
func (*GenericFile) FindEventByIdentifier ¶
func (gf *GenericFile) FindEventByIdentifier(identifier string) *PremisEvent
Returns the event with the matching identifier (UUID)
func (*GenericFile) FindEventsByType ¶
func (gf *GenericFile) FindEventsByType(eventType string) []*PremisEvent
Returns events of the specified type
func (*GenericFile) GetChecksumByAlgorithm ¶
func (gf *GenericFile) GetChecksumByAlgorithm(algorithm string) *Checksum
Returns the LAST checksum digest for the given algorithm for this file.
func (*GenericFile) GetChecksumByDigest ¶
func (gf *GenericFile) GetChecksumByDigest(digest string) *Checksum
Returns the LAST checksum with the given digest for this file.
func (*GenericFile) InstitutionIdentifier ¶
func (gf *GenericFile) InstitutionIdentifier() (string, error)
Returns the name of the institution that owns this file.
func (*GenericFile) MergeAttributes ¶
func (gf *GenericFile) MergeAttributes(savedFile *GenericFile) []error
Merge attributes from a recently-saved GenericFile into this one. When we save a GenericFile in Pharos, it assigns attributes Id, CreatedAt, and UpdatedAt. This function will save those attributes into the current object, and will also call MergeAttributes on this file's child objects (PremisEvents and Checksums), if the savedGenericFile has PremisEvents and Checksums. This also propagates the new Id attribute to the GenericFile's children. Generally, savedGenericFile is a disposable data structure that we throw away after merging its attributes into this object.
func (*GenericFile) OriginalPath ¶
func (gf *GenericFile) OriginalPath() string
Returns the original path of the file within the original bag. This is just the identifier minus the institution id and bag name. For example, if the identifier is "uc.edu/cin.675812/data/object.properties", this returns "data/object.properties"
func (*GenericFile) OriginalPathWithBagName ¶
func (gf *GenericFile) OriginalPathWithBagName() (string, error)
Returns the original path of the file within the original bag, including the bag name. This is just the identifier minus the institution id. For example, if the identifier is "uc.edu/cin.675812/data/object.properties", this returns "cin.675812/data/object.properties"
func (*GenericFile) PreservationStorageFileName ¶
func (gf *GenericFile) PreservationStorageFileName() (string, error)
Returns the name of this file in the preservation storage bucket (that should be a UUID), or an error if the GenericFile does not have a valid preservation storage URL.
func (*GenericFile) PropagateIdsToChildren ¶
func (gf *GenericFile) PropagateIdsToChildren()
Copy this GenericFile's Id and Identifier to the GenericFileId and GenericFileIdentifier properties of all child objects, including Checksums and Premis Events. This call exists because GenericFiles don't have Ids until after they're saved in Pharos.
func (*GenericFile) SerializeForPharos ¶
func (gf *GenericFile) SerializeForPharos() ([]byte, error)
Serializes a version of GenericFile that Fluctus will accept as post/put input. Note that we don't serialize the id or any of our internal housekeeping info.
type GenericFileForPharos ¶
type GenericFileForPharos struct { Identifier string `json:"identifier"` IntellectualObjectId int `json:"intellectual_object_id"` FileFormat string `json:"file_format"` URI string `json:"uri"` Size int64 `json:"size"` StorageOption string `json:"storage_option"` // TODO: Next two items are not part of Pharos model, but they should be. // We need to add these to the Rails schema. // FileCreated time.Time `json:"file_created"` // FileModified time.Time `json:"file_modified"` Checksums []*ChecksumForPharos `json:"checksums_attributes"` PremisEvents []*PremisEventForPharos `json:"premis_events_attributes"` }
This struct is a special subset of GenericFile, with special JSON serialization rules that conform to Rails 4 nested strong paramaters naming conventions. When we create GenericFiles in batches, we need to send them in this format.
func NewGenericFileForPharos ¶
func NewGenericFileForPharos(gf *GenericFile) *GenericFileForPharos
type GlacierRequestReport ¶
type GlacierRequestReport struct { // FilesRequired is the number of files we need to request // from Glacier. When restoring a single file, this will be // set to one. When restoring a full IntellectualObject, this // we be set to the number of saved, active (non-deleted) files // that make up the object. FilesRequired int // FilesRequested is the number of file retrieval requests // we've made to Glacier. Glacier may have rejected some of // these requests. See RequestsNotAccepted. FilesRequested int // FilesNotRequested is a list of GenericFile identifiers that // we were supposed to request from Glacier but have not yet // requested. FilesNotRequested []string // RequestsNotAccepted is a list of GenericFile identifiers that // we requested from Glacier that were denied (or errored). // We should retry these. RequestsNotAccepted []string // FilesNotYetInS3 contains a list of files which are not yet // available in S3, either because we haven't requested their // restoration, the request wasn't accepted, or the request // hasn't completed. FilesNotYetInS3 []string // EarliestRequest is the timestamp on the earliest Glacier retrieval // request for this job. EarliestRequest time.Time // LatestRequest is the timestamp on the latest Glacier retrieval // request for this job. LatestRequest time.Time // EarliestExpiry is the approximate earliest date-time at which // a restored file will be deleted from S3. Once restored from // Glacier, files only stay in S3 for a few days. // See APTGlacierRestoreInit.DAYS_TO_KEEP_IN_S3 EarliestExpiry time.Time // LatestExpiry is the approximate latest date-time at which // a restored file will be deleted from S3. Once restored from // Glacier, files only stay in S3 for a few days. // See APTGlacierRestoreInit.DAYS_TO_KEEP_IN_S3 LatestExpiry time.Time }
GlacierRequestReport provides information on whether all Glacier files have been requested, which ones still need to be requested, and how long the files should remain available in S3.
func NewGlacierRequestReport ¶
func NewGlacierRequestReport() *GlacierRequestReport
NewGlacierRequestReport creates a new GlacierRequestReport
func (*GlacierRequestReport) AllItemsInS3 ¶
func (report *GlacierRequestReport) AllItemsInS3() bool
AllItemsInS3 returns true if all items have been moved from Glacier into S3.
func (*GlacierRequestReport) AllRetrievalsInitiated ¶
func (report *GlacierRequestReport) AllRetrievalsInitiated() bool
AllRetrievalsInitialed returns true if we have initiated the retrieval process for all of the files we were supposed to retrieve.
type GlacierRestoreRequest ¶
type GlacierRestoreRequest struct { // GenericFileIdentifier is the identifier of the generic // file we want to restore. GenericFileIdentifier string // GlacierBucket is the bucket that contains the item // we want to restore. GlacierBucket string // GlacierKey is the key we want to restore // (usually a UUID, for APTrust). GlacierKey string // RequestAccepted indicates whether Glacier accepted // our request to restore this object. This does not mean // the request is complete. It can take several hours for // AWS to push the file from Glacier to S3. Check the // property IsAvailableInS3 to see if AWS has actually // completed the request. RequestAccepted bool // RequestedAt is the timestamp of the last request to // restore this object. RequestedAt time.Time // EstimatedDeletionFromS3 describes approximately when // this item should be available at the RestorationURL. // This time can vary, depending on what level of Glacier // retrieval service we're using. Using the standard service // level, this should be about four hours after RequestedAt, // if the requests succeeded. EstimatedDeletionFromS3 time.Time // SomeoneElseRequested will be true if apt_glacier_restore // thinks someone else requested retrieval of the object. // If this is true, EstimatedDeletionFromS3 may not be // reliable, because we don't know when the retrieval // request occurred, or with what parameters. // // We should be able to get rid of this, because headers // on either the S3 head request or the Glacier restore // request tell us when the S3 file expires. SomeoneElseRequested bool // IsAvailableInS3 describes whether the file has been // made available in S3 for download, a process which typically // takes 3-5 hours. If RequestAccepted is true and IsAvailableInS3 // is false, then the request is still in process. IsAvailableInS3 bool // LastChecked is the date/time we last checked to see whether // this file had been retrieved from Glacier in to S3. LastChecked time.Time }
GlacierRestoreRequest describes a request to restore a file from Glacier to S3.
type GlacierRestoreState ¶
type GlacierRestoreState struct { // NSQMessage is the NSQ message being processed in this restore // request. Not serialized because it will change each time we // try to process a request. NSQMessage *nsq.Message `json:"-"` // WorkItem is the Pharos WorkItem we're processing. // Not serialized because the Pharos WorkItem record will be // more up-to-date and authoritative. WorkItem *WorkItem `json:"-"` // WorkSummary contains information about whether/when // we requested this object(s) be restored from Glacier. WorkSummary *WorkSummary // GenericFile is the file to be restored. This will // be nil if we're restoring an entire object. This item // is not serialized to JSON. GenericFile *GenericFile `json:"-"` // IntellectualObject is the object to be restored. This will // be nil if we're only restoring a single file. This item // is not serialized to JSON. IntellectualObject *IntellectualObject `json:"-"` // DPNWorkItem is the DPNWorkItem being processed. This will // be nil, except for DPN fixity checks and DPN restore jobs. DPNWorkItem *DPNWorkItem `json:"-"` // Requests are the requests we've made (or need to make) // to Glacier to retrieve the objects we need to retrieve. Requests []*GlacierRestoreRequest }
GlacierRestoreState holds information about the state of the Glacier restore process. This is serialized to JSON and stored in the Pharos WorkItemState table, so any worker picking up this task can know what's been done and what work remains. The worker apt_glacier_restore_init uses this object to keep track of its work.
Restoring a full APTrust bag from Glacier requires one Glacier retrieval initialization request and (later) one S3 GET request for each file in the bag. Large bags may contain tens of thousands of files, so workers may have to attempt retrieval initialization several times before all requests succeed.
func NewGlacierRestoreState ¶
func NewGlacierRestoreState(message *nsq.Message, workItem *WorkItem) *GlacierRestoreState
NewGlacierRestoreState creates a new GlacierRestoreState object.
func (*GlacierRestoreState) FindRequest ¶
func (state *GlacierRestoreState) FindRequest(gfIdentifier string) *GlacierRestoreRequest
FindRequest returns the GlacierRestoreRequest for the specified GenericFile identifier. If it returns nil, we have not yet submitted a retrieval request to Glacier for that file. Be sure to check the returned GlacierRestoreRequest to see whether RequestAccepted is true.
func (*GlacierRestoreState) GetFileIdentifiers ¶
func (state *GlacierRestoreState) GetFileIdentifiers() []string
func (*GlacierRestoreState) GetReport ¶
func (state *GlacierRestoreState) GetReport(gfIdentifiers []string) *GlacierRequestReport
GetReport returns a GlacierRequestReport describing what work remains to be done, and how long we can expect the items to remain in the S3 buckets. Param gfIdentifiers is a slice of GenericFile Identifiers.
func (*GlacierRestoreState) GetStorageOption ¶
func (state *GlacierRestoreState) GetStorageOption() (string, error)
type IngestManifest ¶
type IngestManifest struct { WorkItemId int // TODO: Get rid of bucket, key, and etag, since they're in WorkItem S3Bucket string S3Key string ETag string BagPath string DBPath string FetchResult *WorkSummary UntarResult *WorkSummary ValidateResult *WorkSummary StoreResult *WorkSummary RecordResult *WorkSummary CleanupResult *WorkSummary Object *IntellectualObject }
func NewIngestManifest ¶
func NewIngestManifest() *IngestManifest
func (*IngestManifest) AllErrorsAsString ¶
func (manifest *IngestManifest) AllErrorsAsString() string
func (*IngestManifest) BagHasBeenValidated ¶
func (manifest *IngestManifest) BagHasBeenValidated() bool
BagHasBeenValidated returns true if the bag has already been validated.
func (*IngestManifest) BagIsOnDisk ¶
func (manifest *IngestManifest) BagIsOnDisk() bool
BagIsOnDisk returns true if the bag (tar file) exists on disk.
func (*IngestManifest) ClearAllErrors ¶
func (manifest *IngestManifest) ClearAllErrors()
ClearAllErrors clears all of the errors on all of the WorkSummaries.
func (*IngestManifest) DBExists ¶
func (manifest *IngestManifest) DBExists() bool
DBExists returns true if the Bolt DB (.valdb file) exists on disk.
func (*IngestManifest) HasErrors ¶
func (manifest *IngestManifest) HasErrors() bool
func (*IngestManifest) HasFatalErrors ¶
func (manifest *IngestManifest) HasFatalErrors() bool
func (*IngestManifest) ObjectIdentifier ¶
func (manifest *IngestManifest) ObjectIdentifier() (string, error)
ObjectIdentifier returns the IntellectualObject.Identifier for the object being ingested. If this is a new ingest, the identifier will not yet exist in Pharos. If it's a re-ingest, the object will exist.
func (*IngestManifest) SizeOfBagOnDisk ¶
func (manifest *IngestManifest) SizeOfBagOnDisk() (int64, error)
SizeOfBagOnDisk returns the size, in bytes, of the bag on disk. This will return an error if the bag does not exist, or if it is a directory or is inaccessible.
type IngestState ¶
type IngestState struct { NSQMessage *nsq.Message `json:"-"` WorkItem *WorkItem WorkItemState *WorkItemState IngestManifest *IngestManifest }
IngestState stores information about the state of ingest operations for a single bag being ingested into APTrust. The ingest process involves a number of steps and worker processes. This state object is passed from one worker to the next, and accompanies the bag through every step of the process. If ingest fails, this object contains enough information to tell us why the ingest failed, where it failed, at which step it should be resumed, and whether there's anything (like partial files) that need to be cleaned up.
func (*IngestState) FinishNSQ ¶
func (ingestState *IngestState) FinishNSQ()
FinishNSQ tells NSQ we're done with this message.
func (*IngestState) RequeueNSQ ¶
func (ingestState *IngestState) RequeueNSQ(milliseconds int)
RequeueNSQ tells NSQ to give this item to give this item to another worker (or perhaps the same worker) after a delay of at least the specified number of milliseconds.
func (*IngestState) TouchNSQ ¶
func (ingestState *IngestState) TouchNSQ()
TouchNSQ tells NSQ we're still working on this item.
type Institution ¶
type Institution struct { // Id is the Pharos id for this object. Id int `json:"id"` // Name is the institution's full name. Name string `json:"name"` // BriefName is a shortened name. // E.g. "uva" for University of Virginia. BriefName string `json:"brief_name"` // Identifier is the institution's domain name. Identifier string `json:"identifier"` // The institution's DPN member id. This will be // empty if the depositing institution is not a // DPN member. DPNUUID string `json:"dpn_uuid"` }
type IntellectualObject ¶
type IntellectualObject struct { // Id is the primary key id of this bag in Pharos. // If Id is non-zero, this has been recorded in Pharos. Id int `json:"id,omitempty"` // Identifier is the unique bag identifier, which is a // string in the format "institution_identifier/bag_name". // Example: "virginia.edu/bag1234" Identifier string `json:"identifier,omitempty"` // BagName is the name of the bag, without the institution // identifier prefix. Example: "bag1234" BagName string `json:"bag_name,omitempty"` // BagGroupIdentifier (from the BagIt spec) describes a logical // collection or group to which a bag belongs. This will be empty // in many cases. BagGroupIdentifier string `json:"bag_group_identifier,omitempty"` // Institution is the institution identifier (the domain name) // of the institution that owns this bag. Institution string `json:"institution,omitempty"` // InstitutionId is the Id (in Pharos) of the institution // that owns this bag. InstitutionId int `json:"institution_id,omitempty"` // Title is the title of the IntellectualObject. For example, // "Architectural Plans for Alderman Library, 1933" Title string `json:"title,omitempty"` // Description is a description of the IntellectualObject. // This comes from the Internal-Sender-Description field of the // bag-info.txt file. Description string `json:"description,omitempty"` // Access describes who can see this intellectual object. // This is specified in the aptrust-info.txt file. See // https://sites.google.com/a/aptrust.org/member-wiki/basic-operations/bagging // for a description of access policies. Access string `json:"access,omitempty"` // AltIdentifier is an alternate identifier for this bag. It comes from // the Internal-Sender-Identifier field in the bag-info.txt file. AltIdentifier string `json:"alt_identifier,omitempty"` // DPNUUID is the DPN identifier for this bag, which is a UUID. // This field will be empty if the bag has not been pushed to DPN. DPNUUID string `json:"dpn_uuid,omitempty"` // ETag is the AWS S3 etag from the depositor's receiving bucket // for the bag that became this IntellectualObject. ETag string `json:"etag,omitempty"` // GenericFiles is a list of the files that make up this bag. GenericFiles []*GenericFile `json:"generic_files,omitempty"` // PremisEvents is a list of PREMIS events associated with this // IntellectualObject. That includes events such as ingest and // identifier assignment. Note that most PREMIS events are associated // with GenericFiles, so see GenericFile.PremisEvents as well. PremisEvents []*PremisEvent `json:"premis_events,omitempty"` // State of the object. A = active, D = deleted. State string `json:"state"` // Storage option: Standard, Glacier-OH, Glacier-OR, Glacier-VA, // "Glacier-Deep-OH", "Glacier-Deep-OR", "Glacier-Deep-VA" StorageOption string `json:"storage_option"` // CreatedAt is the Pharos timestamp describing when this // IntellectualObject was first recorded in our database. // This is usually within minutes of the ingest event, after // all files have been copied to long-term storage. CreatedAt time.Time `json:"created_at,omitempty"` // UpdatedAt describes when this object was last updated in Pharos. // If this timestamp differs from CreatedAt, it usually means the // bag (or some part of it) was ingested a second time. UpdatedAt time.Time `json:"updated_at,omitempty"` // FileCount is the number of files this object includes. We do not // set this ever, and it will always be zero during the ingest process. // This is a calculated field returned by Pharos. Consider it read-only. // It will only ever be populated by Pharos, when retrieving records // through the Admin REST API. Added for Restoration spot tests. FileCount int `json:"file_count,omitempty"` // FileSize is the total size (in bytes) of the files that make up this // object. We do not set this ever, and it will always be zero during // the ingest process. This is a calculated field returned by Pharos. // Consider it read-only. It will only ever be populated by Pharos, // when retrieving records through the Admin REST API. Added for // Restoration spot tests. FileSize int64 `json:"file_size,omitempty"` // IngestS3Bucket is the bucket to which the depositor uploaded // this bag. We fetch it from there to a local staging area for // processing. IngestS3Bucket string `json:"ingest_s3_bucket,omitempty"` // IngestS3Key is the file name in the S3 receiving bucket. It // should be the bag name plus a ".tar" extension. IngestS3Key string `json:"ingest_s3_key,omitempty"` // IngestTarFilePath is the absolute path to the tarred bag in // our local staging area. We download the bag (as a tar file) from // the receiving bucket to this local file. IngestTarFilePath string `json:"ingest_tar_file_path,omitempty"` // IngestUntarredPath is the path to the untarred bag in our local // staging area. This may be an empty string if we did not untar the bag. // As of APTrust 2.0, we generally validate bags and send their files // to long-term storage without ever untarring them. IngestUntarredPath string `json:"ingest_untarred_path,omitempty"` // IngestSize is the size of the tarred bag we're trying to ingest. IngestSize int64 `json:"ingest_size,omitempty"` // IngestRemoteMd5 is the etag of this bag as reported by the // depositor's S3 receiving bucket. We use this to verify the download, // if possible. For smaller bags (< 5GB), the etag is an md5 checksum. // Large bags that the depositor sent to S3 via multipart // upload have an etag that is calculated differently from a normal // md5 checksum and includes a dash, followed by the number of parts // in the original multipart upload. We cannot use those multipart // etags for md5 validation. IngestRemoteMd5 string `json:"ingest_remote_md5,omitempty"` // IngestLocalMd5 is the md5 digest of the tarred bag that we calculated // locally upon downloading the file. IngestLocalMd5 string `json:"ingest_local_md5,omitempty"` // IngestMd5Verified indicates whether or not we were able to verify // the md5 digest of the entire bag upon download to our staging area. IngestMd5Verified bool `json:"ingest_md5_verified,omitempty"` // IngestMd5Verifiable indicates whether we can verify our local md5 // digest against the S3 etag for this tarred bag. We cannot verify // the checksum of large bags. See the comments on IngestRemoteMd5 // above. IngestMd5Verifiable bool `json:"ingest_md5_verifiable,omitempty"` // IngestManifests is list of manifest files found inside this bag // when we downloaded it. IngestManifests []string `json:"ingest_manifests,omitempty"` // IngestTagManifests is a list of tag manifests found inside this // bag when we downloaded it. IngestTagManifests []string `json:"ingest_tag_manifests,omitempty"` // IngestFilesIgnored is a list of files found in the bag that are // neither manifests, tag files, or data files. This includes files // beginning with a dot (.) or dash (-). We do not save these files // to long-term storage. IngestFilesIgnored []string `json:"ingest_files_ignored,omitempty"` // IngestTags is a list of tags found in all of the tag files that // we parsed when ingesting this bag. We parse only those tag files // listed with the ParseAsTagFile option in // config/aptrust_bag_validation_config.json. While Pharos itself // only keeps and exposes the Title, Description, and Access tags, // the JSON ingest data preserved in the WorkItemState record for // each ingest includes a record of all tags parsed on ingest for // items ingested in APTrust 2.0. IngestTags []*Tag `json:"ingest_tags,omitempty"` // IngestMissingFiles is a list of files that appear in the bag's // manifest(s) but were not found inside the tarred bag. This list // should be empty for valid bags. This field is for reporting // bag validation errors. IngestMissingFiles []*MissingFile `json:"ingest_missing_files,omitempty"` // IngestTopLevelDirNames is a list of directory names found at // the top of the directory hierarchy inside the tarred bag. The // APTrust spec says there should be only one directory at the top // level of the tar file contents, and that directory should have // the same name as the bag, minus the ".tar" extension. This field // is used for reporting bag validation errors. IngestTopLevelDirNames []string `json:"ingest_top_level_dir_names,omitempty"` // IngestErrorMessage contains information about why ingest failed // for this bag. On successful ingest, this field will be empty. IngestErrorMessage string `json:"ingest_error_message,omitempty"` // IngestDeletedFromReceivingAt is a timestamp describing when the // original tar file was deleted from the receiving bucket. After // successful ingest, the workers/apt_recorder process should // delete the tar file. If this timestamp is empty, it means the // cleanup didn't happen, and we may be accumulating unneeded bags // and incurring unnecessary costs in the receiving buckets. IngestDeletedFromReceivingAt time.Time `json:"ingest_deleted_from_receiving_at,omitempty"` // contains filtered or unexported fields }
IntellectualObject represents a single object (ingested bag) in APTrust. An object can include any number of files and events.
Properties are described below, but note that all of the "Ingest-" fields populated and used by the Exchange ingest code on ingest only. These fields are not stored in Pharos, and will not be populated on any IntellectualObject retrieved from Pharos.
Ingest services do save a JSON representation of IntellectualObjects, including all of the "Ingest-" fields in the WorkItemState record associated with the ingest WorkItem. That JSON record can be useful for forensics, debugging, troubleshooting, and data reconstruction.
func NewIntellectualObject ¶
func NewIntellectualObject() *IntellectualObject
func (*IntellectualObject) AccessValid ¶
func (obj *IntellectualObject) AccessValid() bool
AccessValid returns true or false to indicate whether the structure's Access property contains a valid value.
func (*IntellectualObject) AllFilesSaved ¶
func (obj *IntellectualObject) AllFilesSaved() bool
Returns true if all the GenericFiles that needed to be saved were successfully saved to both primary and secondary storage. Note that GenericFiles marked as IngestNeedsSave = false do not need to be saved.
func (*IntellectualObject) BuildIngestChecksums ¶
func (obj *IntellectualObject) BuildIngestChecksums() error
BuildIngestChecksums creates all of the ingest checksums for this object's GenericFiles. See the notes for BuildIngestEvents above, as they all apply here. This call is idempotent, so calling it multiple times will not mess up our data.
func (*IntellectualObject) BuildIngestEvents ¶
func (obj *IntellectualObject) BuildIngestEvents(numberOfFiles int) error
BuildIngestEvents creates all of the PremisEvents required for ingest for this IntellectualObject and all of its GenericFiles. This call works only when the Ingest data fields on the IntellectualObject are populated, which means it will not work on the barebones IntellectualObject we get back from Pharos. It will work on the IntellectualObject we build during the ingest process, the one that apt_fetch builds and passes along to apt_store and apt_record. That fully-fleshed object is preserved in JSON format in WorkItemState.State.
Param numberOfFiles is the number of GenericFiles received in the ingested tar file. Not all of these will be saved to long-term storage (e.g. bagit.txt and any files that have not changed since the last ingest).
We want to build all of the ingest PremisEvents before saving them to avoid a problem that showed up in the old system. In that system, we created PremisEvents when we were ready to save them. Ingest often took 2 or 3 attempts in the old system due to problems with Fluctus/Fedora. That resulted in 2 or 3 ingest events for each object and file. Generating the events beforehand, with uuids, allows us to check with Pharos first to see if the event with the specific uuid is already in the system. We can add it if it's not, and we won't duplicate it if it is. This takes care of PT #113562325.
This call is idempotent, so calling it multiple times will not mess up our data.
func (*IntellectualObject) FindEventsByType ¶
func (obj *IntellectualObject) FindEventsByType(eventType string) []PremisEvent
Returns events of the specified type
func (*IntellectualObject) FindGenericFile ¶
func (obj *IntellectualObject) FindGenericFile(filePath string) *GenericFile
Returns the GenericFile record for the specified path, or nil. Param filePath should be the relative path of the file within the bag. E.g. "data/images/myphoto.jpg"
func (*IntellectualObject) FindTag ¶
func (obj *IntellectualObject) FindTag(tagName string) []*Tag
Returns the tag with the specified name, or nil. The bag spec at https://tools.ietf.org/html/draft-kunze-bagit-13#section-2.2.2 says tags may be repeated, and their order must be preserved, so this returns a slice of tags if the tag is found. In most cases, the slice will contain one element.
func (*IntellectualObject) PayloadBytesAndFiles ¶
func (obj *IntellectualObject) PayloadBytesAndFiles() (byteCount int64, fileCount int)
PayloadBytesAndFiles returns the total number of bytes and files in the object's payload directory. Use this method to calculate the Payload-Oxum value of a BagIt bag.
Return values byteCount and fileCount correspond to the BagIt spec's _OctetCount_ and _StreamCount_. See the Payload-Oxum tag documentation under https://tools.ietf.org/html/draft-kunze-bagit-17#section-2.2.2
func (*IntellectualObject) PropagateIdsToChildren ¶
func (obj *IntellectualObject) PropagateIdsToChildren()
Copy this IntellectualObject's Id and Identifier to the IntellectualObjectId and IntellectualObjectIdentifier properties of all child objects. Also propagates GenericFile Ids and Identifiers to sub-objects, if they are avialable. This call exists because objects don't have Ids until after they're saved in Pharos
func (*IntellectualObject) SerializeForPharos ¶
func (obj *IntellectualObject) SerializeForPharos() ([]byte, error)
Serialize the subset of IntellectualObject data that Pharos will accept. This is for post/put, where essential info, such as institution id and/or object id will be in the URL. This sets obj.Access to all lower case when it serializes, because Pharos requires access values to be normalized that way.
func (*IntellectualObject) TotalFileSize ¶
func (obj *IntellectualObject) TotalFileSize() int64
Returns the total number of bytes of all of the generic files in this object, including tag files. The object's bag size will be slightly larger than this, because it will include manifests, and a tar header.
If you're looking for the number of bytes the payload only, See PayloadBytesAndFiles
type IntellectualObjectForPharos ¶
type IntellectualObjectForPharos struct { Identifier string `json:"identifier"` BagName string `json:"bag_name"` BagGroupIdentifier string `json:"bag_group_identifier"` InstitutionId int `json:"institution_id"` Title string `json:"title"` Description string `json:"description"` AltIdentifier string `json:"alt_identifier"` Access string `json:"access"` DPNUUID string `json:"dpn_uuid"` ETag string `json:"etag"` State string `json:"state"` StorageOption string `json:"storage_option"` }
IntellectualObject in the format that Pharos accepts for POST/create.
func NewIntellectualObjectForPharos ¶
func NewIntellectualObjectForPharos(obj *IntellectualObject) *IntellectualObjectForPharos
type MissingFile ¶
MissingFile defines a file that is not in the bag, despite the fact that its checksum was found in a manifest. We keep track of these during bag validation, so we can report which files were not found.
func NewMissingFile ¶
func NewMissingFile(manifest string, lineNumber int, filePath, digest string) *MissingFile
NewMissingFile creates a new missing file record.
type PharosDPNBag ¶
type PharosDPNBag struct { Id int `json:"id"` InstitutionId int `json:"institution_id"` ObjectIdentifier string `json:"object_identifier"` DPNIdentifier string `json:"dpn_identifier"` DPNSize uint64 `json:"dpn_size"` Node1 string `json:"node_1"` Node2 string `json:"node_2"` Node3 string `json:"node_3"` DPNCreatedAt time.Time `json:"dpn_created_at"` DPNUpdatedAt time.Time `json:"dpn_updated_at"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` }
PharosDPNBag represents a lightweight DPN bag record stored in Pharos.
func (*PharosDPNBag) SerializeForPharos ¶
func (bag *PharosDPNBag) SerializeForPharos() ([]byte, error)
SerializeForPharos serializes a version of this record that Pharos will accept as post/put input. The Pharos post/put serialization requires the data to be wrapped in a hash with key 'dpn_bag'.
type PremisEvent ¶
type PremisEvent struct { // The Pharos id for this event. Will be zero if the event // is not yet in Pharos. If non-zero, it's been recorded // in Pharos. Do not serialize zero values to JSON, or // Pharos complains. Id int `json:"id,omitempty"` // Identifier is a UUID string assigned by Pharos. Identifier string `json:"identifier"` // EventType is the type of Premis event we want to register. // See constants.EventTypes. EventType string `json:"event_type"` // DateTime is when this event occurred in our system. DateTime time.Time `json:"date_time"` // Detail is a brief description of the event. Detail string `json:"detail"` // Outcome is either success or failure Outcome string `json:"outcome"` // Outcome detail is the checksum for checksum generation, // the id for id generation. OutcomeDetail string `json:"outcome_detail"` // Object is a description of the object that generated // the checksum or id. Object string `json:"object"` // Agent is a URL describing where to find more info about Object. Agent string `json:"agent"` // OutcomeInformation contains the text of an error message, if // Outcome was failure. OutcomeInformation string `json:"outcome_information"` // Id of the IntellectualObject with which this event is // associated. IntellectualObjectId int `json:"intellectual_object_id"` // Identifier of the IntellectualObject with which this // event is associated. IntellectualObjectIdentifier string `json:"intellectual_object_identifier"` // Id of the GenericFile with which this event is // associated. This will be zero for object-level events. GenericFileId int `json:"generic_file_id"` // Identifier of the GenericFile with which this // event is associated. This will be an empty string // for object-level events. GenericFileIdentifier string `json:"generic_file_identifier"` // Rails timestamp for when this object was created. CreatedAt time.Time `json:"created_at,omitempty"` // Rails timestamp for when this object was last updated. UpdatedAt time.Time `json:"updated_at,omitempty"` }
PremisEvent contains information about events that occur during the processing of a file or intellectual object, such as the verification of checksums, generation of unique identifiers, etc. We use this struct to exchange data in JSON format with the Pharos API.
func NewEventFileDeletion ¶
func NewEventFileDeletion(fileUUID, requestedBy, instApprover, aptrustApprover string, timestamp time.Time) *PremisEvent
NewEventFileDeletion creates a new file deletion event.
func NewEventGenericFileDigestCalculation ¶
func NewEventGenericFileDigestCalculation(checksumGeneratedAt time.Time, fixityAlg, digest string) (*PremisEvent, error)
We generated a sha256 checksum.
func NewEventGenericFileFixityCheck ¶
func NewEventGenericFileFixityCheck(checksumVerifiedAt time.Time, fixityAlg, digest string, fixityMatched bool) (*PremisEvent, error)
We checked fixity against the manifest. If fixity didn't match, we wouldn't be ingesting this.
func NewEventGenericFileIdentifierAssignment ¶
func NewEventGenericFileIdentifierAssignment(identifierGeneratedAt time.Time, identifierType, identifier string) (*PremisEvent, error)
We assigned an identifier: either a generic file identifier or a new storage URL. Note that when identifierType is constants.IdTypeStorageURL, identifierGeneratedAt is the timestamp at which the file was stored in S3.
func NewEventGenericFileIngest ¶
func NewEventGenericFileIngest(storedAt time.Time, md5Digest, _uuid string) (*PremisEvent, error)
We ingested a generic file into primary long-term storage.
func NewEventGenericFileReplication ¶
func NewEventGenericFileReplication(replicatedAt time.Time, replicationUrl string) (*PremisEvent, error)
We saved the file to replication storage.
func NewEventObjectCreation ¶
func NewEventObjectCreation() *PremisEvent
func NewEventObjectIdentifierAssignment ¶
func NewEventObjectIdentifierAssignment(objectIdentifier string) (*PremisEvent, error)
func NewEventObjectIngest ¶
func NewEventObjectIngest(numberOfFilesIngested int) (*PremisEvent, error)
func NewEventObjectRights ¶
func NewEventObjectRights(accessSetting string) (*PremisEvent, error)
func (*PremisEvent) EventTypeValid ¶
func (premisEvent *PremisEvent) EventTypeValid() bool
EventTypeValid returns true/false, indicating whether the structure's EventType property contains the name of a valid premis event.
func (*PremisEvent) IsUrlAssignment ¶
func (event *PremisEvent) IsUrlAssignment() bool
Returns true if this event is a URL assignment event.
func (*PremisEvent) MergeAttributes ¶
func (event *PremisEvent) MergeAttributes(savedEvent *PremisEvent) error
Sets the Id, CreatedAt and UpdatedAt properties of this event to match those os savedEvent. We call this after saving a record to Pharos, which sets all of those properties. Generally, savedEvent is a temporary event record returned from Pharos, while this event is one we want to keep.
type PremisEventForPharos ¶
type PremisEventForPharos struct { Id int `json:"id,omitempty"` Identifier string `json:"identifier"` EventType string `json:"event_type"` DateTime time.Time `json:"date_time"` Detail string `json:"detail"` Outcome string `json:"outcome"` OutcomeDetail string `json:"outcome_detail"` Object string `json:"object"` Agent string `json:"agent"` OutcomeInformation string `json:"outcome_information"` IntellectualObjectId int `json:"intellectual_object_id"` IntellectualObjectIdentifier string `json:"intellectual_object_identifier"` GenericFileId int `json:"generic_file_id"` GenericFileIdentifier string `json:"generic_file_identifier"` }
Same as PremisEvent, but omits CreatedAt and UpdatedAt
func NewPremisEventForPharos ¶
func NewPremisEventForPharos(event *PremisEvent) *PremisEventForPharos
type RestClientConfig ¶
type RestClientConfig struct { Comment string LocalServiceURL string LocalAPIRoot string LocalAuthToken string }
Config options for our DPN REST client.
type RestoreState ¶
type RestoreState struct { // NSQMessage is the NSQ message being processed in this restore // request. Not serialized because it will change each time we // try to process a request. NSQMessage *nsq.Message `json:"-"` // WorkItem is the Pharos WorkItem we're processing. // Not serialized because the Pharos WorkItem record will be // more up-to-date and authoritative. WorkItem *WorkItem `json:"-"` // IntellectualObject is the object we're restoring. Not serialized // because if the object has thousands of files, the serialization is // huge. IntellectualObject *IntellectualObject `json:"-"` // PackageSummary contains information about the outcome of the // attempt to reassemble this bag for restoration. PackageSummary *WorkSummary // ValidateSummary contains information about the outcome // of validating this newly reassembled bag. We must validate // it before sending it to the restoration bucket. ValidateSummary *WorkSummary // CopySummary contains information about the outcome of the // attempt to copy the tarred bag to the depositor's restoration // bucket. CopySummary *WorkSummary // RecordSummary contains information about the outcome of // attempts to record the restoration event and the completion // of the WorkItem in Pharos. RecordSummary *WorkSummary // LocalBagDir is the absolute path to the untarred bag. We'll be // assembling the bag contents in this directory. LocalBagDir string // LocalTarFile is the absolute path the tarred version of this // bag. The local tar file will not exist until the bag has been // fully assembled and tarred. LocalTarFile string // RestoredToUrl is a URL that points to the copy of this bag // in the depositor's S3 restoration bucket. RestoredToUrl string // CopiedToRestorationAt is a timestamp describing when the // reassembled bag was copied to the depositor's S3 restoration // bucket. CopiedToRestorationAt time.Time // BagDirDeletedAt is a timestamp saying when the bag directory // was deleted. Check this to ensure apt_restorer cleaned up // after itself. If LocalBagDir is an empty string, there was // nothing to delete. BagDirDeletedAt time.Time // TarFileDeletedAt is a timestamp saying when the local tar file // was deleted. Check this to ensure apt_restorer cleaned up // after itself. If LocalTarFile is an empty string, there was // nothing to delete. TarFileDeletedAt time.Time // If this restoration was cancelled, the reason goes here. CancelReason string }
RestoreState stores information about the state of a bag restoration operation. This entire structure will be converted to JSON and saved as a WorkItemState object in Pharos.
func NewRestoreState ¶
func NewRestoreState(message *nsq.Message) *RestoreState
NewRestoreState creates a new RestoreState object with empty PackageSummary, RestoreSummary, and ValidationSummary.
func (*RestoreState) AllErrorsAsString ¶
func (restoreState *RestoreState) AllErrorsAsString() string
AllErrorsAsString returns all error messages from all work summaries as a single string.
func (*RestoreState) FinishNSQ ¶
func (restoreState *RestoreState) FinishNSQ()
FinishNSQ tells NSQ we're done with this message.
func (*RestoreState) HasErrors ¶
func (restoreState *RestoreState) HasErrors() bool
HasErrors returns true if any of the work summaries have errors.
func (*RestoreState) HasFatalErrors ¶
func (restoreState *RestoreState) HasFatalErrors() bool
HasFatalErrors returns true if any of the work summaries have a fatal error.
func (*RestoreState) MostRecentSummary ¶
func (restoreState *RestoreState) MostRecentSummary() *WorkSummary
MostRecentSummary returns the WorkSummary for the most recent operation.
func (*RestoreState) RequeueNSQ ¶
func (restoreState *RestoreState) RequeueNSQ(milliseconds int)
RequeueNSQ tells NSQ to give this item to give this item to another worker (or perhaps the same worker) after a delay of at least the specified number of milliseconds.
func (*RestoreState) TouchNSQ ¶
func (restoreState *RestoreState) TouchNSQ()
TouchNSQ tells NSQ we're still working on this item.
type RingList ¶
type RingList struct {
// contains filtered or unexported fields
}
RingList is a circular list of strings with a set capacity. This structure uses mutexes for adding and searching, so it should be safe to share across goroutines.
func NewRingList ¶
NewRingList creates a new RingList with the specified capacity.
type S3File ¶
type S3File struct { // The name of the S3 bucket that holds this key. BucketName string // The S3 Key, with object name, size, etag, last modified, etc. // // TODO: On delete jobs, you'll need to fetch the key object from S3. Key s3.Key // If we attempted to delete this file from S3 and got an error // message, that message will be stored here. This field is only // relevant in the context of the file delete worker. ErrorMessage string // The date and time at which the key/file was successfully deleted // from S3. If this is zero time, file was not deleted. If it's // any other time, delete succeeded. This field is only relevant // in the context of the delete worker. DeletedAt time.Time // Flag to indicate whether we skipped the delete // operation because config.DeleteOnSuccess == false. // This field is only relevant in the context of the delete worker. DeleteSkippedPerConfig bool }
S3File contains information about the S3 file we're trying to process from an intake bucket. BucketName and Key are the S3 bucket name and key. AttemptNumber describes whether this is the 1st, 2nd, 3rd, etc. attempt to process this file.
func NewS3FileWithName ¶
func (*S3File) BagName ¶
The name of the owning institution, followed by a slash, followed by the name of the tar file. This differs from the ObjectName, because it will have the .tar or bag.001.of030.tar suffix.
func (*S3File) DeleteAttempted ¶
Returns true if we attempted to delete this file.
func (*S3File) KeyIsComplete ¶
Returns true of the S3 key is complete. In some cases, we only have the Key.Key and we have to fetch the rest from S3.
func (*S3File) ObjectName ¶
Returns the object identifier that will identify this bag in fedora. That's the institution identifier, followed by a slash and the tar file name, minus the .tar extension and the ".bag1of12" multipart extension. So for BucketName "aptrust.receiving.unc.edu" and Key.Key "nc_bag.b001.of030.tar", this would return "unc.edu/nc_bag"
type StorageSummary ¶
type StorageSummary struct { // StoreResult is a WorkSummary object than will hold information // about the attempt to store a file in S3 or Glacier. The goroutines // that save files primarily record errors here, using the AddError() // method and the ErrorIsFatal property. StoreResult *WorkSummary // TarFilePath is the path the tar file containing the bag being // processed. This should never be empty. TarFilePath string // UntarredPath is the absolute path to the untarred version of the // bag being processed. This will usually be empty, since we // process bags while they're still tarred. UntarredPath string // GenericFile is the file to be saved in S3/Glacier. The storage // goroutine will update this object directly. GenericFile *GenericFile }
StorageSummary is a lightweight object built from IngestState to be passed into a goroutine that saves a file to S3 or Glacier. The goroutine fills in information about where and when a file was stored before returning.
This allows multiple goroutines to save files concurrently
to S3/Glacier without having to share data.
func NewStorageSummary ¶
func NewStorageSummary(gf *GenericFile, tarPath, untarredPath string) (*StorageSummary, error)
NewStorageSummary creates a new StorageSummary object. Param gf is the GenericFile to be saved. It cannot be nil. Param tarPath is the absolute path the tar file containing the bag, and cannot be empty. Param untarredPath is the absolute path to the untarred bag, and may be empty, since most bags are processed without untarring.
type StoredFile ¶
type StoredFile struct { // Id is a unique identifier for this StoredFile, // if it happens to be stored in a SQL database. // This can be zero for items not stored in a DB. Id int64 `json:"id"` // Key is the s3/glacier name of the file. A file // may be stored under more than one UUID if multiple // versions of it exist. Typically, we should retain // only the most recent version. Key string `json:"key"` // Bucket is the name of the bucket where the item is stored. Bucket string `json:"bucket"` // Size is the size, in bytes, of the object in // long-term storage. This should match the size // of the file in the GenericFiles table in Pharos. Size int64 `json:"size"` // ContentType is the object's mime type. E.g. image/jpeg. ContentType string `json:"content_type"` // Institution is the domain name of the institution // that owns the file. E.g. virginia.edu Institution string `json:"institution"` // BagName is the name of the bag this file belongs to. // It's the same as the intellectual object identifier. BagName string `json:"bag_name"` // PathInBag is the file's path in the original bag. // For examople, data/subdir/image.jpeg. Combine this // with BagName to get the GenericDile.Identifier. PathInBag string `json:"path_in_bag"` // Md5 is the md5 sum we calculated for this file at ingest. Md5 string `json:"md5"` // Sha256 is the sha256 checksum we calculated for this // file on ingest. Sha256 string `json:"sha256"` // ETag is Amazon's etag for this item. For multipart // uploads, the etag will contain a dash. ETag string `json:"etag"` // LastModified is when this file was last modified in // the long-term storage bucket. LastModified time.Time `json:"last_modified"` // LastSeenAt is when our system last saw this item in // the long-term storage bucket. LastSeenAt time.Time `json:"last_seen_at"` // CreatedAt is when this record was created. CreatedAt time.Time `json:"created_at"` // UpdatedAt is when this record was updated. UpdatedAt time.Time `json:"updated_at"` // DeletedAt is when this file was deleted from long-term // storage. This will almost always be an empty timestamp. DeletedAt time.Time `json:"deleted_at,omitempty"` }
StoredFile represents a file stored in a long-term storage bucket on S3 or Glacier. This object is used primarily for audit purposes, when we occasionally scan through our S3 and Glacier buckets to get a list of what is actually stored there.
func (*StoredFile) ToCSV ¶
func (f *StoredFile) ToCSV(delimiter rune) (string, error)
ToCSV converts this object to a CSV record. When listing thousands of files, we dump records to a CSV file that we can import later to a SQL DB. Param delimiter is the field delimiter (comma, tab, pipe, etc).
func (*StoredFile) ToJson ¶
func (f *StoredFile) ToJson() (string, error)
ToJson converts this object to JSON.
func (*StoredFile) ToStringArray ¶
func (f *StoredFile) ToStringArray() []string
ToStringArray converts this record to a string array, usually so it can be serialized to CSV format.
type SynchronizedMap ¶
type SynchronizedMap struct {
// contains filtered or unexported fields
}
SynchronizedMap is a map structure that can be shared across go routines and threads. Both keys and values are strings.
func NewSynchronizedMap ¶
func NewSynchronizedMap() *SynchronizedMap
NewSynchronizedMap creates a new empty SynchronizedMap
func (*SynchronizedMap) Add ¶
func (syncMap *SynchronizedMap) Add(key, value string)
Add adds a key/value pair to the map.
func (*SynchronizedMap) Delete ¶
func (syncMap *SynchronizedMap) Delete(key string)
Delete deletes the specified key from the map.
func (*SynchronizedMap) Get ¶
func (syncMap *SynchronizedMap) Get(key string) string
Get returns the value of key from the map.
func (*SynchronizedMap) HasKey ¶
func (syncMap *SynchronizedMap) HasKey(key string) bool
HasKey returns true if the key exists in the map.
func (*SynchronizedMap) Keys ¶
func (syncMap *SynchronizedMap) Keys() []string
Keys returns a slice of all keys in the map.
func (*SynchronizedMap) Values ¶
func (syncMap *SynchronizedMap) Values() []string
Values returns a slice of all values in the map.
type Tag ¶
This Tag struct is essentially the same as the bagins TagField struct, but its properties are public and can be easily serialized to / deserialized from JSON.
type Volume ¶
type Volume struct {
// contains filtered or unexported fields
}
Volume tracks the amount of available space on a volume (disk), as well as the amount of space claimed for pending operations. The purpose is to allow the bag processor to try to determine ahead of time whether the underlying disk has enough space to accommodate the file it just pulled off the queue. We want to avoid downloading 100GB files when we know ahead of time that we don't have enough space to process them.
func NewVolume ¶
Creates a new Volume object to track free and used space on a volume (disk). Param mountPoint is the point at which the volume is mounted. The volume itself can be a physical disk or a logical partition.
On Mac and *nix systems, use posix.GetMountPointFromPath to get the mountpoint. If you're on Windows, Mr. T pities you, fool! This volume manager won't work for you. Upgrade to a more sensible OS.
func (*Volume) AvailableSpace ¶
AvailableSpace returns an approximate number of free bytes currently available to unprivileged users on the underlying volume, minus the number of bytes reserved for pending processes. The value returned will never be 100% accurate, because other processes may be writing to the volume.
func (*Volume) ClaimedSpace ¶
Returns the number of bytes claimed but not yet written to disk.
func (*Volume) MountPoint ¶
Returns the mountPoint to the volume.
func (*Volume) Release ¶
Release tells the Volume that the bytes no longer need to be reserved. This could be because they have already been written (and hence will show up in volume.currentFreeSpace()) or because the bytes will not be written at all.
func (*Volume) Reservations ¶
This is for reporting and debugging.
func (*Volume) Reserve ¶
Reserve requests that a number of bytes on disk be reserved for an upcoming operation, such as downloading and untarring a file. Reserving space does not have any effect on the file system. It simply allows the Volume struct to maintain some internal bookkeeping. Reserve will return an error if there is not enough free disk space to accommodate the requested number of bytes.
type VolumeResponse ¶
VolumeResponse contains response data returned by the VolumeService.
type WorkItem ¶
type WorkItem struct { // Id is the unique identifier for this work item. Id int `json:"id"` // ObjectIdentifier is the identifier of the IntellectualObject // that is the subject of this WorkItem. E.g. "virginia.edu/bag1234". // ObjectIdentifier will be empty until an item is ingested. // ObjectIdentifier should be calculated as institution identifier, // plus a slash, plus Name (see below) without the ".tar" extension. // E.g. "virginia.edu/bag1234" ObjectIdentifier string `json:"object_identifier"` // GenericFileIdentifier is the identifier of the GenericFile to // which this WorkItem pertains. This will be empty for WorkItems // that only make sense at the object level, such as Ingest // and Send-to-DPN. If GenericFileIdentifier is non-empty, it // means the work for this WorkItem is to be performed on the // GenericFile and not the object. For example, file deletion // and fixity checking are performed on GenericFiles, not // IntellectualObjects. GenericFileIdentifier string `json:"generic_file_identifier"` // Name is the name of the S3 key in the receiving bucket where // this object first appeared. It should match the bag name, minus // the institution prefix, with a ".tar" extension at the end. // For example, if the IntellectualObject is "virginia.edu/bag1234", // Name should be "bag1234.tar". Name string `json:"name"` // Bucket is the S3 receiving bucket to which this item was uploaded. Bucket string `json:"bucket"` // ETag is the S3 etag associated with this item. When a depositor // uploads a new version of an existing bag, you'll find a new // WorkItem record with the same Name and Bucket, but a different // ETag. ETag string `json:"etag"` // Size is the size, in bytes, of the tar file in the S3 receiving // bucket. This may not match the size of the restored bag because: // 1) on restoration, we add a sha256 manifest if it didn't exist // in the original bag, and 2) the depositor may have deleted some // files from the IntellectualObject before restoring the bag. Size int64 `json:"size"` // BagDate is the creation timestamp on the bag, as reported by S3. // This should be the date and time that the depositor created the // bag (not the date/time when S3 received it). BagDate time.Time `json:"bag_date"` // InstitutionId is the unique identifier of the institution to // whom this bag belongs. InstitutionId int `json:"institution_id"` // WorkItemStateId is the unique id of the WorkItemState record that // contains JSON data describing the last known state of processing // on this WorkItem. If processing has not started yet for this item, // it will have no WorkItemState. WorkItemStateId *int `json:"work_item_state_id"` // User is the email address of the user who requested this WorkItem. // For Ingest, this will always be the system user. For restoration, // deletion and send-to-DPN requests, it will be the email address of // the user who clicked the button or submitted the API request to // start the process. User string `json:"user"` // InstitutionalApprover is for deletions only and will be null for all // actions other than deletion. This is the email // address of the institutional admin who approved the deletion. // Exchange services should not process deletions where this field // is nil. InstitutionalApprover *string `json:"inst_approver"` // APTrustApprover is for bulk deletions only and will be null for // all actions other than deletion. This is the email // address of the APTrust admin who approved the deletion. APTrustApprover *string `json:"aptrust_approver"` // Date is the timestamp describing when some worker process last // touched this item. Date time.Time `json:"date"` // Note is a human-readable note about the status of this WorkItem. // This note is intended for users checking on the state of their // work items, so it should be descriptive. Note string `json:"note"` // Action is the action to be performed in this WorkItem. See // constants.ActionTypes. Action string `json:"action"` // Stage is the current stage of processing for this item. See. // constants.StageTypes. Stage string `json:"stage"` // StageStartedAt describes when processing started for the current // stage. If it's empty, processing for the current stage has not // begun. StageStartedAt *time.Time `json:"stage_started_at"` // Status is the status of this WorkItem. See the values in // constants.StatusTypes. Status string `json:"status"` // Outcome describes the outcome of a completed WorkItem. For example, // Success, Failure, Cancelled. Outcome string `json:"outcome"` // Retry indicates whether or not a failed or uncompleted WorkItem // should be retried. This will be set to false if processing resulted // in a fatal error (such as when trying to ingest an invalid bag) or // when processing has encountered too many transient errors (such as // network connection problems, lack of disk space, etc.). The threshold // for transient errors is defined by the MaxAttempts config setting for // each WorkerConfig. WorkItems that fail due to repeated transient errors // should be requeued by an administrator when the cause of the transient // error resolves. Part of the requeuing process involves setting Retry // back to true. Retry bool `json:"retry"` // Node is the hostname or IP address of the machine that is currently // processing this request. The worker process sets this field when it // begins processing and clears it when it's done. If a worker crashes, // you can identify orphaned WorkItems by the combination of non-empty // Node and StageStartedAt. You can check Pid as well, to see if it's // still running on Node. Node string `json:"node"` // Pid is the process id of the worker currently handling this WorkItem. // Workers set and clear Pid in the same way they set and clear Node. Pid int `json:"pid"` // NeedsAdminReview indicates whether an administrator needs to look into // this WorkItem. The worker process that attepts to fulfill this WorkItem // will set this to true when it encounters unexpected errors. NeedsAdminReview bool `json:"needs_admin_review"` // QueuedAt describes when this item was copied into NSQ. This is a nullable // DateTime in the Rails app, so it has to be a pointer here. When requeuing // an item, set this to nil and set Retry to true. Otherwise, apt_queue will // ignore it. QueuedAt exists to prevent apt_queue from adding items more than // once to an NSQ topic. QueuedAt *time.Time `json:"queued_at"` // CreatedAt is the Rails timestamp describing when this item was created. CreatedAt time.Time `json:"created_at"` // UpdatedAt is the Rails timestamp describing when this item was updated. UpdatedAt time.Time `json:"updated_at"` }
WorkItem contains summary information describing the status of a bag in process. This data goes to Fluctus, so that APTrust partners can see which of their bags have been processed successfully, and why failed bags failed. See http://bit.ly/1pf7qxD for details.
Type may have one of the following values: Ingest, Delete, Restore
Stage may have one of the following values: Receive (bag was uploaded by partner into receiving bucket), Fetch (fetch tarred bag file from S3 receiving bucket), Unpack (unpack the tarred bag), Validate (make sure all data files are present, checksums are correct, required tags are present), Store (copy generic files to permanent S3 bucket for archiving), Record (save record of intellectual object, generic files and events to Fedora).
Status may have one of the following values: Pending, Success, Failed.
func (*WorkItem) BelongsToAnotherWorker ¶
Returns true if this item is currently being processed by another worker.
func (*WorkItem) HasBeenStored ¶
Returns true if an object's files have been stored in S3 preservation bucket.
func (*WorkItem) IsInProgress ¶
IsInProgress returns true if any worker is currently working on this item.
func (*WorkItem) IsPastIngest ¶
IsPastIngest returns true if this item has already passed the ingest stage.
func (*WorkItem) MsgAlreadyOnDisk ¶
MsgAlreadyOnDisk returns a message saying the bag has already been downloaded to the local disk.
func (*WorkItem) MsgAlreadyValidated ¶
MsgAlreadyValidated returns a message saying the bag has already been validated.
func (*WorkItem) MsgGoingToFetch ¶
MsgGoingToFetch returns a message saying this item is being put into the fetch channel.
func (*WorkItem) MsgGoingToValidation ¶
MsgGoingToValidation returns a message saying this item is being put into the validation channel.
func (*WorkItem) MsgPastIngest ¶
MsqPastIngest returns a message saying that a worker is skipping this item because it's past the ingest stage.
func (*WorkItem) MsgSkippingInProgress ¶
MsgSkippingInProgress returns a message saying that a worker is skipping this item because it's already being processed.
func (*WorkItem) SerializeForPharos ¶
Convert WorkItem to JSON, omitting id and other attributes that Rails won't permit. For internal use, json.Marshal() works fine.
func (*WorkItem) SetNodeAndPid ¶
func (item *WorkItem) SetNodeAndPid()
Set state, node and pid on WorkItem.
func (*WorkItem) ShouldTryIngest ¶
Returns true if we should try to ingest this item.
type WorkItemState ¶
type WorkItemState struct { // Id is the unique identifier for this WorkItemState object. Id int `json:"id"` // WorkItemId is the unique identifier of the WorkItem whose state // this object describes. WorkItemId int `json:"work_item_id"` // Action is the WorkItem action to be performed. See constants.ActionTypes. Action string `json:"action"` // State is a JSON string describing the state of processing, what work has // been completed, and what work remains to be done. This JSON string // deserializes to different types, based on the Action. For example, if // Action is "Ingest", the JSON deserializes to in IngestState object. // The workers in the /workers directory retrieve this state when they begin // work an a WorkItem and update it when they stop work on that item. // // It's common for long-running tasks to fail due to network errors. For // example, copying 10,000 files from a large bag to S3 and/or Glacier // may fail half-way through due to connectivity problems. When that // happens, the worker will stop work on the item and preserve its state // in JSON format in this field. The next worker to pick up the task will // deserialize the JSON state info, see that the first 5000 files were // already successfully stored, and then resume the storage work at file // #5001. // // This state information is essential for intelligently resuming work // after failures, and for forensics on failed items. Admin users can // see the state JSON in the WorkItem detail view in Pharos. State string `json:"state"` // CreatedAt is the Rails timestamp describing when this item was created. CreatedAt time.Time `json:"created_at"` // UpdatedAt is the Rails timestamp describing when this item was updated. UpdatedAt time.Time `json:"updated_at"` }
WorkItemState contains information about what work has been completed, and what work remains to be done, for the associated WorkItem. WorkItems that have not yet started processing will have no associated WorkItemState.
func NewWorkItemState ¶
func NewWorkItemState(workItemId int, action, state string) *WorkItemState
func (*WorkItemState) GlacierRestoreState ¶
func (state *WorkItemState) GlacierRestoreState() (*GlacierRestoreState, error)
func (*WorkItemState) HasData ¶
func (state *WorkItemState) HasData() bool
func (*WorkItemState) IngestManifest ¶
func (state *WorkItemState) IngestManifest() (*IngestManifest, error)
IngestManifest converts the State string (JSON) to an IngestManifest object. This works only if there's data in the State string, and the Action is constants.ActionIngest. Other actions will have different types of data in the State string.
func (*WorkItemState) SetStateFromIngestManifest ¶
func (state *WorkItemState) SetStateFromIngestManifest(manifest *IngestManifest) error
Converts an IngestManifest into a JSON string and stores it in the State attribute.
type WorkItemStateForPharos ¶
type WorkItemStateForPharos struct { Id int `json:"id"` WorkItemId int `json:"work_item_id"` Action string `json:"action"` State string `json:"state"` }
func NewWorkItemStateForPharos ¶
func NewWorkItemStateForPharos(workItemState *WorkItemState) *WorkItemStateForPharos
type WorkSummary ¶
type WorkSummary struct { // This is set to true when the process that produces // this result starts. Attempted bool // AttemptNumber is the number of the read attempt. // This starts at one. This is uint16 to match the datatype // of NsqMessage.Attempt. AttemptNumber uint16 // This will be set to true if an error is fatal. In that // case, we should not try to reprocess the item. ErrorIsFatal bool // Errors is a list of strings describing errors that occurred // during bag validation. Don't write to this. It's public so // we can serialize it to/from JSON, but access is locked internally // with a mutex. Hmm... Errors []string // StartedAt describes when the attempt to read the bag started. // If StartedAt.IsZero(), we have not yet attempted to read the // bag. StartedAt time.Time // FinishedAt describes when the attempt to read the bag completed. // If FinishedAt.IsZero(), we have not yet attempted to read the // bag. Note that the attempt may have completed without succeeding. // Check the Succeeded() method to see if the process actually // completed successfully. FinishedAt time.Time // Retry indicates whether we should retry a failed process. // After non-fatal errors, such as network timeout, this will // generally be set to true. For fatal errors, such as invalid // data, this will generally be set to false. This defaults to // true, because fatal errors are rare, and we don't want to // give up on transient errors. Just requeue and try again. Retry bool // contains filtered or unexported fields }
func NewWorkSummary ¶
func NewWorkSummary() *WorkSummary
func (*WorkSummary) AddError ¶
func (summary *WorkSummary) AddError(format string, a ...interface{})
func (*WorkSummary) AllErrorsAsString ¶
func (summary *WorkSummary) AllErrorsAsString() string
func (*WorkSummary) ClearErrors ¶
func (summary *WorkSummary) ClearErrors()
func (*WorkSummary) Finish ¶
func (summary *WorkSummary) Finish()
func (*WorkSummary) Finished ¶
func (summary *WorkSummary) Finished() bool
func (*WorkSummary) FirstError ¶
func (summary *WorkSummary) FirstError() string
func (*WorkSummary) HasErrors ¶
func (summary *WorkSummary) HasErrors() bool
func (*WorkSummary) RunTime ¶
func (summary *WorkSummary) RunTime() time.Duration
func (*WorkSummary) Start ¶
func (summary *WorkSummary) Start()
func (*WorkSummary) Started ¶
func (summary *WorkSummary) Started() bool
func (*WorkSummary) Succeeded ¶
func (summary *WorkSummary) Succeeded() bool
type WorkerConfig ¶
type WorkerConfig struct { // This describes how often the NSQ client should ping // the NSQ server to let it know it's still there. The // setting must be formatted like so: // // "800ms" for 800 milliseconds // "10s" for ten seconds // "1m" for one minute HeartbeatInterval string // The maximum number of times the worker should try to // process a job. If non-fatal errors cause a job to // fail, it will be requeued this number of times. // Fatal errors, such as invalid bags or attempts to // restore or delete non-existent files, will not be // retried. MaxAttempts uint16 // Maximum number of jobs a worker will accept from the // queue at one time. Workers that may have to process // very long-running tasks, such as apt_prepare, // apt_store and apt_restore, should set this number // fairly low (20 or so) to prevent messages from // timing out. MaxInFlight int // If the NSQ server does not hear from a client that a // job is complete in this amount of time, the server // considers the job to have timed out and re-queues it. // Long-running jobs such as apt_prepare, apt_store, // apt_record and apt_restore will "touch" the NSQ message // as it moves through each channel in the processing pipeline. // The touch message tells NSQ that it's still working on // the job, and effectively resets NSQ's timer on that // message to zero. Still, very large bags in any of the // long-running processes will need a timeout of "180m" or // so to ensure completion. MessageTimeout string // Number of go routines used to perform network I/O, // such as fetching files from S3, storing files to S3, // and fetching/storing Fluctus data. If a worker does // no network I/O, this setting is ignored. NetworkConnections int // The name of the NSQ Channel the worker should read from. NsqChannel string // The name of the NSQ Topic the worker should listen to. NsqTopic string // This describes how long the NSQ client will wait for // a read from the NSQ server before timing out. The format // is the same as for HeartbeatInterval. ReadTimeout string // Number of go routines to start in the worker to // handle all work other than network I/O. Typically, // this should be close to the number of CPUs. Workers int // This describes how long the NSQ client will wait for // a write to the NSQ server to complete before timing out. // The format is the same as for HeartbeatInterval. WriteTimeout string }
Source Files ¶
- checksum.go
- cleanup_result.go
- config.go
- delete_state.go
- dpn_work_item.go
- file_restore_state.go
- fixity_result.go
- generic_file.go
- glacier_restore_state.go
- ingest_manifest.go
- ingest_state.go
- institution.go
- intellectual_object.go
- pharos_dpn_bag.go
- pharos_special_models.go
- premis_event.go
- restore_state.go
- ring_list.go
- s3_file.go
- storage_summary.go
- stored_file.go
- synchronized_map.go
- volume.go
- volume_response.go
- work_item.go
- work_item_state.go
- work_summary.go