v0.0.0-...-9369c8b Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2016 License: Apache-2.0 Imports: 36 Imported by: 0



Common vars and constants, shared by many parts of the bagman library.

Package client provides a client for the fluctus REST API.

This requires an external C library that our partners won't have, so this file is not compiled when the flag -tags=partners

PartnerS3Client is used by apt_upload, which APTrust partners use to upload bags to the S3 receiving buckets.

Don't include this in the partners build: it's not needed in the partner apps, and the syscall.Stat* functions cause the build to fail on Windows.

validator.go contains functions to allow partner institutions to validate bags before they send them. This code is run by users at our partner institutions, on their desktops and laptops. It's not intended to run on APTrust servers.

Don't include this in the partners build: it's not needed in the partner apps, and the syscall.Stat* functions cause the build to fail on Windows.



View Source
const (
	APTrustNamespace        = ""
	ReceiveBucketPrefix     = "aptrust.receiving."
	ReceiveTestBucketPrefix = "aptrust.receiving.test."
	RestoreBucketPrefix     = "aptrust.restore."
	S3DateFormat            = "2006-01-02T15:04:05.000Z"
	// All S3 urls begin with this.
	S3UriPrefix = ""
View Source
const (
	StatusStarted   StatusType = "Started"
	StatusPending              = "Pending"
	StatusSuccess              = "Success"
	StatusFailed               = "Failed"
	StatusCancelled            = "Cancelled"
View Source
const (
	StageRequested StageType = "Requested"
	StageReceive             = "Receive"
	StageFetch               = "Fetch"
	StageUnpack              = "Unpack"
	StageValidate            = "Validate"
	StageStore               = "Store"
	StageRecord              = "Record"
	StageCleanup             = "Cleanup"
	StageResolve             = "Resolve"
View Source
const (
	ActionIngest      ActionType = "Ingest"
	ActionFixityCheck            = "Fixity Check"
	ActionRestore                = "Restore"
	ActionDelete                 = "Delete"
	ActionDPN                    = "DPN"
View Source
const (
	// DefaultBagSizeLimit is 250GB.
	DefaultBagSizeLimit = int64(268435456000)

	// Allox approx 1MB padding for tag files,
	// manifest, and tar file headers.
	DefaultBagPadding = int64(1000000)

	// The default restoration bucket prefix.
	RestorationBucketPrefix = "aptrust.restore"
View Source
const (
	// A Gigabyte!
	GIGABYTE int64 = int64(1024 * 1024 * 1024)

	// Files over 5GB in size must be uploaded via multi-part put.
	S3_LARGE_FILE int64 = int64(5 * GIGABYTE)

	// Chunk size for multipart puts to S3: ~500 MB
	S3_CHUNK_SIZE = int64(500000000)


View Source
const (
	// Item to validate is a tar file
	// Item to validate is a directory
	// Item is something we can't validate
View Source

Maximum number of generic files we can create in a single call to IntellectualObjectCreate. New objects with more than this number of files need special handling.

View Source

Log fluctus error responses up to this number of bytes. We DO want to log concise error messages. We DO NOT want to log huge HTML error responses.


View Source
var AccessRights []string = []string{

List of valid APTrust IntellectualObject AccessRights.

View Source
var ErrStopIteration = errors.New("iteration completed")
View Source
var EventTypes []string = []string{

List of valid Premis Event types.

View Source
var MultipartSuffix = regexp.MustCompile("\\.b\\d+\\.of\\d+$")

The tar files that make up multipart bags include a suffix that follows this pattern. For example, after stripping off the .tar suffix, you'll have a name like "my_bag.b04.of12"


func AddToArchive

func AddToArchive(tarWriter *tar.Writer, filePath, pathWithinArchive string) error

Adds a file to a tar archive.

func BagNeedsProcessing

func BagNeedsProcessing(s3File *S3File, procUtil *ProcessUtil) bool

Returns true if the file needs processing. We check this because the bucket reader may add duplicate items to the queue when the queue is long and the reader refills it hourly. If we get rid of NSQ and read directly from the database, we can get rid of this.

func BagmanHome

func BagmanHome() (bagmanHome string, err error)

BagmanHome returns the absolute path to the bagman root directory, which contains source, config and test files. This will usually be something like /home/xxx/go/src/ You can set this explicitly by defining an environment variable called BAGMAN_HOME. Otherwise, this function will try to infer the value by appending to the environment variable GOPATH. If neither of those variables is set, this returns an error.

func Base64EncodeMd5

func Base64EncodeMd5(md5Digest string) (string, error)

Returns a base64-encoded md5 digest. The is the format S3 wants.

func BucketNameAndKey

func BucketNameAndKey(uri string) (string, string)

Given an S3 URI, returns the bucket name and key.

func CleanBagName

func CleanBagName(bagName string) (string, error)

Given the name of a tar file, returns the clean bag name. That's the tar file name minus the tar extension and any ".bagN.ofN" suffix.

func CleanString

func CleanString(str string) string

Cleans a string we might find a config file, trimming leading and trailing spaces, single quotes and double quoted. Note that leading and trailing spaces inside the quotes are not trimmed.

func DiscardLogger

func DiscardLogger(module string) *logging.Logger

Discard logger returns a logger that writes to dev/null. Suitable for use in testing.

func Enqueue

func Enqueue(nsqdHttpAddress, topic string, object interface{}) error

Sends the JSON of a result object to the specified queue.

func ExpandTilde

func ExpandTilde(filePath string) (string, error)

Expands the tilde in a directory path to the current user's home directory. For example, on Linux, ~/data would expand to something like /home/josie/data

func FileExists

func FileExists(path string) bool

Returns true if the file at path exists, false if not.

func GenericFilesToBulkSaveMaps

func GenericFilesToBulkSaveMaps(files []*GenericFile) []map[string]interface{}

Converts generic files to maps, so we can serialize to JSON. These map structures work with the save_batch endpoint of the generic_files controller, which takes in a list of generic files along with all of their related checksums and premis events.

func GetInstitutionFromBagIdentifier

func GetInstitutionFromBagIdentifier(bagIdentifier string) (string, error)

Given the identifier of an already ingested bag, such as, this returns the institution identifier (

func GetInstitutionFromBagName

func GetInstitutionFromBagName(bagName string) (string, error)

Returns the instution name from the bag name, or an error if the bag name does not contain the institution name. For example, "" returns "" and no errors. "" returns the same thing. But "bag_of_videos.tar" or "virginia.bag_of_videos.tar" returns an error because the institution identifier is missing from the bag name.

func GetOwnerAndGroup

func GetOwnerAndGroup(finfo os.FileInfo, header *tar.Header)

We have a dummy version of this call in nix.go. Windows does not implement the syscall.Stat_t type we need, but the *nixes do. We use this in util.AddToArchive to set owner/group on files being added to a tar archive.

func GuessMimeType

func GuessMimeType(absPath string) (mimeType string, err error)

func HasPendingDeleteRequest

func HasPendingDeleteRequest(statusRecords []*ProcessStatus) bool

Returns true if the ProcessStatus records include a delete request that has not been completed.

func HasPendingIngestRequest

func HasPendingIngestRequest(statusRecords []*ProcessStatus) bool

Returns true if the ProcessStatus records include an ingest request that has not been completed.

func HasPendingRestoreRequest

func HasPendingRestoreRequest(statusRecords []*ProcessStatus) bool

Returns true if the ProcessStatus records include a restore request that has not been completed.

func HasSavableName

func HasSavableName(filename string) bool

Returns true if the file name indicates this is something we should save to long-term storage. As of late March, 2016, we save everything in the bag except bagit.txt, manifest-<algo>.txt and tagmanifest-<algo>.txt. Those files we don't save will be reconstructed when the bag is restored.

Param filename should be the relative path of the file within the bag. For example, "tagmanifest-sha256.txt" or "data/images/photo_01.jpg". This is important, because a file called "manifest-md5.txt" will return false (indicating it should not be saved), while a file called "data/manifest-md5.txt" will return true, because its file path indicates it's part of the payload.

We reconstruct bagit.txt because we may have moved to a newer version by the time the file is restored. We reconstruct manifests and tag manifests because payload files and tag files may be deleted or overwritten by the depositor between initial ingest and restoration.

And did you know both savable and saveable are correct? I chose the former to reduce the size of our compiled binary by one byte. That could save us pennies over the next 10,000 years.

func InitJsonLogger

func InitJsonLogger(config Config) *stdlog.Logger

InitLogger creates and returns a logger suitable for logging JSON data. Bagman JSON logs consist of a single JSON object per line, with no extraneous data. Because all of the data in the file is pure JSON, with one record per line, these files are easy to parse.

func InitLogger

func InitLogger(config Config) *logging.Logger

InitLogger creates and returns a logger suitable for logging human-readable message.

func IsValidFileName

func IsValidFileName(filename string) bool

Returns true if the filename follows APTrust's file naming requiremens. May contain upper or lower case letters, numbers, dots, underscores and dashes. (A–Z a–z 0–9 . _ -) MUST not begin with a dash. (-) MUST be at betwee 1 and 255 characters in length (inclusive).

Also note that BagIt spec says path seperator must be a forward slash.

func LoadCustomEnvOrDie

func LoadCustomEnvOrDie(customEnvFile *string, logger *logging.Logger)

Loads enviroment vars from a custom file or dies. If param customEnvFile is nil or points to an empty string, this loads nothing and proceeds without error. If customEnvFile specifies a file that does not exist or cannot be read, this causes the program to exit. Param logger is optional. Pass nil if you don't have a logger.

func LoadEnv

func LoadEnv(path string) (vars map[string]string, err error)

Loads environment variables from the file at the specified absolute path. The variables are expected to be in the format typically seen in .bashrc and .bash_profile files:


with optional quotes. This function is here because supervisord doesn't provide an easy way of loading environment vars from an external file, and we have some sensitive environment vars that we want to keep in only one file on the system.

Returns a map of the vars that were loaded from the file, and sets them in the program's environment.

func LoadRelativeFile

func LoadRelativeFile(relativePath string) ([]byte, error)

LoadRelativeFile reads the file at the specified path relative to BAGMAN_HOME and returns the contents as a byte array.

func LooksLikeURL

func LooksLikeURL(url string) bool

Returns true if url looks like a URL.

func LooksLikeUUID

func LooksLikeUUID(uuid string) bool

func Min

func Min(x, y int) int

Min returns the minimum of x or y. The Math package has this function but you have to cast to floats.

func NamePartIsValid

func NamePartIsValid(namePart string) bool

func OwnerOf

func OwnerOf(bucketName string) (institution string)

Returns the domain name of the institution that owns the specified bucket. For example, if bucketName is '' the return value will be ''.

func QueueToNSQ

func QueueToNSQ(url string, data []interface{}) error

QueueToNSQ sends data to NSQ. The URL param must be a valid NSQ URL. The data will be converted to JSON, with each object/record in a single line, then posted to url. This requires an NSQ server, so it's covered in the integration tests in the scripts directory.

func RecursiveFileList

func RecursiveFileList(dir string) ([]string, error)

RecursiveFileList returns a list of all files in path dir and its subfolders. It does not return directories.

func RelativeToAbsPath

func RelativeToAbsPath(relativePath string) (string, error)

Converts a relative path within the bagman directory tree to an absolute path.

func RestorationBucketFor

func RestorationBucketFor(institution string) (bucketName string)

Returns the name of the specified institution's restoration bucket. E.g. institution '' returns bucketName ''


type ActionType

type ActionType string

Action enumerations match values defined in

type BagReadResult

type BagReadResult struct {
	Path           string
	Files          []string
	ErrorMessage   string
	Tags           []Tag
	ChecksumErrors []error

BagReadResult contains data describing the result of processing a single bag. If there were any processing errors, this structure should tell us exactly what happened and where.

func ReadBag

func ReadBag(tarFilePath string) (result *BagReadResult)

Reads an untarred bag. The tarFilePath parameter should be a path to a directory that contains the bag, info and manifest files. The bag content should be in the data directory under tarFilePath. Check result.Error to ensure there were no errors.

func (*BagReadResult) TagValue

func (result *BagReadResult) TagValue(tagLabel string) (tagValue string)

TagValue returns the value of the tag with the specified label.

type BagRestorer

type BagRestorer struct {
	// The intellectual object we'll be restoring.
	IntellectualObject *IntellectualObject
	// contains filtered or unexported fields

BagRestorer exposes methods for restoring bags and publishing them to S3 restoration buckets. There are separate methods below for restoring a bag locally, copying the restored files to S3, and cleaning up.

Generally, you'll want to do all that in one step, which you can do like this:

restorer, err := bagman.NewBagRestorer(intellectualObject, outputDir)
if err != nil {
    return err
urls, err := RestoreAndPublish()

Here's a fuller example:

restorer, err := bagman.NewBagRestorer(intellectualObject, outputDir)
if err != nil {
    return err

// Optional, if you want to log debug statements.
// Default is no logging.

// Optional, if you wan to constrain bag size to 50000000 bytes
// The following aims for total bag sizes of 50000000 bytes
// that include 100k or so of non-payload data (manifests, tag
// files, tar file headers). Default is <= 250GB bag size.

// Optional, if you want to restore to a non-standard bucket.
// Default is

// This creates the bags, copies them to S3, and cleans up.
// Return value urls is a slice of strings, each of which
// is a URL pointing to a restored bag on S3.
urls, err := RestoreAndPublish()

func NewBagRestorer

func NewBagRestorer(intelObj *IntellectualObject, workingDir string, restoreToTestBuckets bool) (*BagRestorer, error)

Creates a new bag restorer from the intellectual object. Param working dir is the path to the directory into which files should be downloaded and the bag should be built.

func (*BagRestorer) Cleanup

func (restorer *BagRestorer) Cleanup()

Deletes all of the bags created by Restore()

func (*BagRestorer) CopyToS3

func (restorer *BagRestorer) CopyToS3(setNumber int) (string, error)

Copies a tarred bag file to S3. In most cases, you'll want RestoreAndPublish() to do this for you. But if you want to do it manually, do something like this (but don't ignore the errors):

paths, _ := restorer.Restore()
for i := range paths {
    pathToTarFile, _ := restorer.TarBag(i)
    s3Url, _ := restorer.CopyToS3(i)

func (*BagRestorer) GetBagPadding

func (restorer *BagRestorer) GetBagPadding() int64

func (*BagRestorer) GetBagSizeLimit

func (restorer *BagRestorer) GetBagSizeLimit() int64

func (*BagRestorer) GetFileSetSizeLimit

func (restorer *BagRestorer) GetFileSetSizeLimit() int64

Returns the total number of bytes the files in the data directory may occupy for this bag, which is calculated as bagSizeLimit - bagPadding.

func (*BagRestorer) PathWithinBag

func (restorer *BagRestorer) PathWithinBag(fileName, bagName string) string

Notes for fix to PivotalTracker #93237220: data files were being put into the wrong directory.

We've already pulled the files down into the proper directory structure. The fileName here is an absolute path. We need to extract from that the file's path within the bag. The call to bag.AddFile says "add the file at absolute path x into the bag at relative path y." If the two paths wind up being the same (and they will be the same here), AddFile does not peform a copy, but it does calculate the md5 checksum for the manifest.

The vars below look something like this:

fileName: /Users/apd4n/tmp/restore/ bagName: workingDir: /Users/apd4n/tmp/restore pathWithinBag: data/metadata.xml

func (*BagRestorer) RestorationBucketName

func (restorer *BagRestorer) RestorationBucketName() string

func (*BagRestorer) Restore

func (restorer *BagRestorer) Restore() ([]string, error)

Restores an IntellectualObject by downloading all of its files and assembling them into one or more bags. Returns a slice of strings, each of which is the path to a bag.

This function restores the entire bag at once, and will use about 2 * bag_size bytes of disk space. To avoid using so much disk space, you can use RestoreAndPublish below.

func (*BagRestorer) RestoreAndPublish

func (restorer *BagRestorer) RestoreAndPublish(message *nsq.Message) (urls []string, err error)

Restores a bag (including multi-part bags), publishes them to the restoration bucket, and returns the URLs to access them. Param message is an NSQ message and may be nil. In production, we want this param, because we need to remind NSQ frequently that we're still working on the message. Otherwise, NSQ thinks the message timed out.

func (*BagRestorer) SetBagPadding

func (restorer *BagRestorer) SetBagPadding(limit int64)

Set the padding for the bag. This is the amount of space you think tag files, manifests and tar file headers may occupy.

func (*BagRestorer) SetBagSizeLimit

func (restorer *BagRestorer) SetBagSizeLimit(limit int64)

Sets the size limit for a bag. Default is 250GB. This is used primarily for testing.

func (*BagRestorer) SetCustomRestoreBucket

func (restorer *BagRestorer) SetCustomRestoreBucket(bucketName string)

func (*BagRestorer) SetLogger

func (restorer *BagRestorer) SetLogger(logger *logging.Logger)

Sets a logger to which the BagRestorer will print debug messages. This is optional.

func (*BagRestorer) TarBag

func (restorer *BagRestorer) TarBag(setNumber int) (string, error)

Tars the bag specified by setNumber, which is zero-based. Returns the path to the tar file it just created.

Restore() returns a slice of strings, each of which is the path to a bag. To tar all the bags, you'd do this:

paths, _ := restorer.Restore()
for i := range paths {
    pathToTarFile, _ := restorer.TarBag(i)

type BucketSummary

type BucketSummary struct {
	BucketName string
	Keys       []s3.Key // TODO: Change to slice of pointers!

BucketSummary contains information about an S3 bucket and its contents.

type ChecksumAttribute

type ChecksumAttribute struct {
	Algorithm string    `json:"algorithm"`
	DateTime  time.Time `json:"datetime"`
	Digest    string    `json:"digest"`

ChecksumAttribute contains information about a checksum that can be used to validate the integrity of a GenericFile.

DateTime should be in ISO8601 format for local time or UTC. For example:

1994-11-05T08:15:30-05:00 (Local Time) 1994-11-05T08:15:30Z (UTC)

type Config

type Config struct {
	// ActiveConfig is the configuration currently
	// in use.
	ActiveConfig string

	// Configuration options for apt_bag_delete
	BagDeleteWorker WorkerConfig

	// Set this in non-production environments to restore
	// intellectual objects to a custom bucket. If this is set,
	// all intellectual objects from all institutions will be
	// restored to this bucket.
	CustomRestoreBucket string

	// Should we delete the uploaded tar file from the receiving
	// bucket after successfully processing this bag?
	DeleteOnSuccess bool

	// DPNCopyWorker copies tarred bags from other nodes into our
	// DPN staging area, so we can replication them. Currently,
	// copying is done by rsync over ssh.
	DPNCopyWorker WorkerConfig

	// DPNHomeDirectory is the prefix to the home directory
	// for all DPN users. On demo and production, this should
	// be "/home". The full home directory for a user like tdr
	// would be "/home/dpn.tdr". On a local dev or test machine,
	// DPNHomeDirectory can be any path the user has full read/write
	// access to.
	DPNHomeDirectory string

	// DPNPackageWorker records details about fixity checks
	// that could not be completed.
	DPNPackageWorker WorkerConfig

	// The name of the long-term storage bucket for DPN
	DPNPreservationBucket string

	// DPNRecordWorker records DPN storage events in Fluctus
	// and through the DPN REST API.
	DPNRecordWorker WorkerConfig

	// The local directory for DPN staging. We store DPN bags
	// here while they await transfer to the DPN preservation
	// bucket and while they await replication to other nodes.
	DPNStagingDirectory string

	// DPNStoreWorker copies DPN bags to AWS Glacier.
	DPNStoreWorker WorkerConfig

	// DPNTroubleWorker records failed DPN tasks in the DPN
	// trouble queue.
	DPNTroubleWorker WorkerConfig

	// DPNValidationWorker validates DPN bags.
	DPNValidationWorker WorkerConfig

	// FailedFixityWorker records details about fixity checks
	// that could not be completed.
	FailedFixityWorker WorkerConfig

	// FailedReplicationWorker records details about failed
	// attempts to copy generic files to the S3 replication
	// bucket in Oregon.
	FailedReplicationWorker WorkerConfig

	// Configuration options for apt_file_delete
	FileDeleteWorker WorkerConfig

	// Configuration options for apt_fixity, which
	// handles ongoing fixity checks.
	FixityWorker WorkerConfig

	// The version of the Fluctus API we're using. This should
	// start with a v, like v1, v2.2, etc.
	FluctusAPIVersion string

	// FluctusURL is the URL of the Fluctus server where
	// we will be recording results and metadata. This should
	// start with http:// or https://
	FluctusURL string

	// LogDirectory is where we'll write our log files.
	LogDirectory string

	// LogLevel is defined in
	// and should be one of the following:
	// 1 - CRITICAL
	// 2 - ERROR
	// 3 - WARNING
	// 4 - NOTICE
	// 5 - INFO
	// 6 - DEBUG
	LogLevel logging.Level

	// If true, processes will log to STDERR in addition
	// to their standard log files. You really only want
	// to do this in development.
	LogToStderr bool

	// Maximum number of days allowed between scheduled
	// fixity checks. The fixity_reader periodically
	// queries Fluctus for GenericFiles whose last
	// fixity check was greater than or equal to this
	// number of days ago. Those items are put into the
	// fixity_check queue.
	MaxDaysSinceFixityCheck int

	// MaxFileSize is the size in bytes of the largest
	// tar file we're willing to process. Set to zero
	// to process all files, regardless of size.
	// Set to some reasonably small size (100000 - 500000)
	// when you're running locally, or else you'll wind
	// up pulling down a huge amount of data from the
	// receiving buckets.
	MaxFileSize int64

	// NsqdHttpAddress is the address of the NSQ server.
	// We can put items into queues by issuing PUT requests
	// to this URL. This should start with http:// or https://
	NsqdHttpAddress string

	// NsqLookupd is the hostname and port number of the NSQ
	// lookup deamon. It should not include a protocol.
	// E.g. localhost:4161. Queue consumers use this to
	// discover available queues.
	NsqLookupd string

	// Configuration options for apt_prepare
	PrepareWorker WorkerConfig

	// The name of the preservation bucket to which we should
	// copy files for long-term storage.
	PreservationBucket string

	// ReceivingBuckets is a list of S3 receiving buckets to check
	// for incoming tar files.
	ReceivingBuckets []string

	// Configuration options for apt_record
	RecordWorker WorkerConfig

	// The bucket that stores a second copy of our perservation
	// files. This should be in a different region than the
	// preseration bucket. As of November 2014, the preservation
	// bucket is in Virginia, and the replication bucket is in
	// Oregon.
	ReplicationBucket string

	// The path to the local directory that will temporarily
	// hold files being copied from the preservartion bucket
	// in US East to the replication bucket in USWest2.
	ReplicationDirectory string

	// Configuration options for apt_replicate
	ReplicationWorker WorkerConfig

	// RestoreDirectory is the directory in which we will
	// rebuild IntellectualObject before sending them
	// off to the S3 restoration bucket.
	RestoreDirectory string

	// If true, we should restore bags to our partners' test
	// restoration buckets instead of the usual restoration
	// buckets. This should be true only in the demo config,
	// which is what we run on Also note
	// that CustomRestoreBucket overrides this.
	RestoreToTestBuckets bool

	// Configuration options for apt_restore
	RestoreWorker WorkerConfig

	// SkipAlreadyProcessed indicates whether or not the
	// bucket_reader should  put successfully-processed items into
	// NSQ for re-processing. This is amost always set to false.
	// The exception is when we deliberately want to reprocess
	// items to test code changes.
	SkipAlreadyProcessed bool

	// Configuration options for apt_store
	StoreWorker WorkerConfig

	// TarDirectory is the directory in which we will
	// untar files from S3. This should be on a volume
	// with lots of free disk space.
	TarDirectory string

	// Configuration options for apt_trouble
	TroubleWorker WorkerConfig

func LoadRequestedConfig

func LoadRequestedConfig(requestedConfig *string) (config Config)

This returns the configuration that the user requested. If the user did not specify any configuration (using the -config flag), or if the specified configuration cannot be found, this prints a help message and terminates the program.

TODO: Use string param, not pointer!

func (*Config) AbsLogDirectory

func (config *Config) AbsLogDirectory() string

func (*Config) EnsureFluctusConfig

func (config *Config) EnsureFluctusConfig() error

func (*Config) ExpandFilePaths

func (config *Config) ExpandFilePaths()

Expands ~ file paths

type FedoraResult

type FedoraResult struct {
	ObjectIdentifier string
	GenericFilePaths []string
	MetadataRecords  []*MetadataRecord
	IsNewObject      bool
	ErrorMessage     string

FedoraResult is a collection of MetadataRecords, each indicating whether or not some bit of metadata has been recorded in Fluctus/Fedora. The bag processor needs to keep track of this information to ensure it successfully records all metadata in Fedora.

func NewFedoraResult

func NewFedoraResult(objectIdentifier string, genericFilePaths []string) *FedoraResult

Creates a new FedoraResult object with the specified IntellectualObject identifier and list of GenericFile paths.

func (*FedoraResult) AddRecord

func (result *FedoraResult) AddRecord(recordType, action, eventObject, errorMessage string) error

AddRecord adds a new MetadataRecord to the Fedora result.

func (*FedoraResult) AllRecordsSucceeded

func (result *FedoraResult) AllRecordsSucceeded() bool

Returns true if all metadata was recorded successfully in Fluctus/Fedora. A true result means that all of the following were successfully recorded:

1) Registration of the IntellectualObject. This may mean creating a new IntellectualObject or updating an existing one.

2) Recording the ingest PremisEvent for the IntellectualObject.

3) Registration of EACH of the object's GenericFiles. This may mean creating a new GenericFile or updating an existing one.

4) Recording the intentifier_assignment for EACH GenericFile. The identifier is typically a UUID.

5) Recording the fixity_generation for EACH GenericFile. Although most files already come with md5 checksums from S3, we always generate a sha256 as well.

A successful FedoraResult will have (2 + (3 * len(GenericFilePaths))) successful MetadataRecords.

func (*FedoraResult) FindRecord

func (result *FedoraResult) FindRecord(recordType, action, eventObject string) *MetadataRecord

FindRecord returns the MetadataRecord with the specified type, action and event object.

func (*FedoraResult) RecordSucceeded

func (result *FedoraResult) RecordSucceeded(recordType, action, eventObject string) bool

Returns true/false to indicate whether the specified bit of metadata was recorded successfully in Fluctus/Fedora.

type FetchResult

type FetchResult struct {
	BucketName    string
	Key           string
	LocalFile     string
	RemoteMd5     string
	LocalMd5      string
	Md5Verified   bool
	Md5Verifiable bool
	ErrorMessage  string
	Warning       string
	Retry         bool

FetchResult descibes the results of fetching a bag from S3 and verification of that bag.

type File

type File struct {
	// Path is the path to the file within the bag. Payload files
	// always begin with "data/".
	Path string
	// The size of the file, in bytes.
	Size int64
	// The time the file was created. This is here because
	// it's part of the Fedora object model, but we do not
	// actually have access to this data. Created will usually
	// be set to empty time or mod time.
	Created time.Time
	// The time the file was last modified.
	Modified time.Time
	// The md5 checksum for the file & when we verified it.
	Md5         string
	Md5Verified time.Time
	// The sha256 checksum for the file.
	Sha256 string
	// The time the sha256 checksum was generated. The bag processor
	// generates this checksum when it unpacks the file from the
	// tar archive.
	Sha256Generated time.Time
	// The unique identifier for this file. This is generated by the
	// bag processor when it unpackes the file from the tar archive.
	Uuid string
	// The time when the bag processor generated the UUID for this file.
	UuidGenerated time.Time
	// The mime type of the file. This should be suitable for use in an
	// HTTP Content-Type header.
	MimeType string
	// A message describing any errors that occurred during the processing
	// of this file. E.g. I/O error, bad checksum, etc. If this is empty,
	// there were no processing errors.
	ErrorMessage string
	// The file's URL in the S3 preservation bucket. This is assigned by
	// the bag processor after it stores the file in the preservation
	// bucket. If this is blank, the file has not yet been sent to
	// preservation.
	StorageURL string
	StoredAt   time.Time
	StorageMd5 string
	// The unique id of this GenericFile. Institution domain name +
	// "/" + bag name.
	Identifier         string
	IdentifierAssigned time.Time

	// If true, some version of this file already exists in the S3
	// preservation bucket and its metadata is in Fedora.
	ExistingFile bool

	// If true, this file needs to be saved to the S3 preservation
	// bucket, and its metadata and events must be saved to Fedora.
	// This will be true if the file is new, or if its an existing
	// file whose contents have changed since it was last ingested.
	NeedsSave bool

	// ReplicationError describes the last error that occurred while
	// trying to send this file to the replication bucket in Oregon.
	// Replication is the last step in the ingest process, and before
	// that step, this property will contain an empty string.
	ReplicationError string

File contains information about a generic data file within the data directory of bag or tar archive.

func NewFile

func NewFile() *File

func (*File) PremisEvents

func (file *File) PremisEvents() (events []*PremisEvent)

PremisEvents returns a list of Premis events generated during bag processing. Ingest, Fixity Generation (sha256), identifier assignment.

func (*File) ReplicationEvent

func (file *File) ReplicationEvent(replicationUrl string) (*PremisEvent, error)

Returns a replication event, saying the file was saved to the S3 replication bucket in Oregon. Param replicationUrl is the URL of the file in the replication bucket.

func (*File) S3UUID

func (file *File) S3UUID() string

This function shouldn't exist. It's related to Pivotal Tracker #105112644, which pertains to reingested files that we should not try to re-save to Glacier.

func (*File) ToGenericFile

func (file *File) ToGenericFile() (*GenericFile, error)

Converts bagman.File to GenericFile, which is what Fluctus understands.

type FileBatchIterator

type FileBatchIterator struct {
	// contains filtered or unexported fields

FileBatchIterator returns batches of files whose metadata needs to be saved in Fluctus.

func NewFileBatchIterator

func NewFileBatchIterator(files []*File, batchSize int) *FileBatchIterator

Returns a new BatchFileIterator that will iterate over files. Each call to Next() will return up to batchSize files that need to be saved.

func (*FileBatchIterator) NextBatch

func (iter *FileBatchIterator) NextBatch() ([]*GenericFile, error)

Next returns the next N files that need to be saved, where N is the batchSize that was passed into NewFileBatchIterator. This may return fewer than batchSize files if only a few need saving. When no remaining files need to be saved, this returns an error.

type FileDigest

type FileDigest struct {
	PathToFile   string
	Md5Digest    string
	Sha256Digest string
	Size         int64

func CalculateDigests

func CalculateDigests(pathToFile string) (*FileDigest, error)

Returns a FileDigest structure with the md5 and sha256 digests of the specified file as hex-enconded strings, along with the file's size.

type FileSet

type FileSet struct {
	Files []*GenericFile

FileSet is a set of files that will be put into a single bag upon restoration. Some large intellectual objects will have to be split into multiple bags during restoration to accomodate the 250GB bag size limit.

type FixityResult

type FixityResult struct {

	// The generic file we're going to look at.
	// This file is sitting somewhere on S3.
	GenericFile *GenericFile

	// The NSQ message being worked on. This is only relevant
	// if we using this struct in the context of an NSQ worker.
	// The struct is still valid if this member is nil.
	NsqMessage *nsq.Message `json:"-"` // Don't serialize

	// Does the file exist in S3?
	S3FileExists bool

	// The sha256 sum we calculated after downloading
	// the file.
	Sha256 string

	// The date and time at which we finished calculating
	// the md5 and sha256 checksums.
	CalculatedAt time.Time

	// A string describing any error that might have
	// occurred during the fetch and/or checksum calculation.
	ErrorMessage string

	// Should we retry the fixity check if the last attempt
	// failed? Typically, this will be true, because most
	// failures are transient network errors. It will be
	// false on fatal errors, such as if the remote file
	// does not exist.
	Retry bool

FixityResult descibes the results of fetching a file from S3 and verification of the file's sha256 checksum.

func NewFixityResult

func NewFixityResult(gf *GenericFile) *FixityResult

func (*FixityResult) BucketAndKey

func (result *FixityResult) BucketAndKey() (string, string, error)

Returns the name of the S3 bucket and key for the GenericFile.

func (*FixityResult) BuildPremisEvent

func (result *FixityResult) BuildPremisEvent() (*PremisEvent, error)

Returns a PremisEvent describing the result of this fixity check.

func (*FixityResult) FedoraSha256

func (result *FixityResult) FedoraSha256() string

Returns the SHA256 checksum that Fedora has on record.

func (*FixityResult) FixityCheckPossible

func (result *FixityResult) FixityCheckPossible() bool

Returns true if we have all the data we need to compare the existing checksum with the checksum of the S3 file.

func (*FixityResult) GenericFileHasDigest

func (result *FixityResult) GenericFileHasDigest() bool

Returns true if the underlying GenericFile includes a SHA256 checksum.

func (*FixityResult) GotDigestFromPreservationFile

func (result *FixityResult) GotDigestFromPreservationFile() bool

Returns true if result.Sha256 was set.

func (*FixityResult) Sha256Matches

func (result *FixityResult) Sha256Matches() (bool, error)

Returns true if the sha256 sum we calculated for this file matches the sha256 sum recorded in Fedora.

type FluctusClient

type FluctusClient struct {
	// contains filtered or unexported fields

func NewFluctusClient

func NewFluctusClient(hostUrl, apiVersion, apiUser, apiKey string, logger *logging.Logger) (*FluctusClient, error)

Creates a new fluctus client. Param hostUrl should come from the config.json file.

func (*FluctusClient) BuildUrl

func (client *FluctusClient) BuildUrl(relativeUrl string) string

BuildUrl combines the host and protocol in client.hostUrl with relativeUrl to create an absolute URL. For example, if client.hostUrl is "http://localhost:3456", then client.BuildUrl("/path/to/action.json") would return "http://localhost:3456/path/to/action.json".

func (*FluctusClient) BulkStatusGet

func (client *FluctusClient) BulkStatusGet(since time.Time) (statusRecords []*ProcessStatus, err error)

func (*FluctusClient) CacheInstitutions

func (client *FluctusClient) CacheInstitutions() error

Caches a map of institutions in which institution domain name is the key and institution id is the value.

func (*FluctusClient) DeleteFluctusTestData

func (client *FluctusClient) DeleteFluctusTestData() error

Delete the data we created with our integration tests

func (*FluctusClient) DeletionItemsGet

func (client *FluctusClient) DeletionItemsGet(genericFileIdentifier string) (statusRecords []*ProcessStatus, err error)

Returns a list of items that need to be deleted. If param genericFileIdentifier is not an empty string, this will return all ProcessedItem records for the generic file that have Action == "Delete".

If no genericFileIdentifier is supplied, this returns all ProcessedItem records in action "Delete" with stage "Requested" and status "Pending".

This will return zero items in either of the following cases:

1. No genericFileIdentifier is supplied and there are no pending restoration requests in Fluctus' ProcessedItems table.

2. A genericFileIdentifier is supplied, and there are no ProcessedItem records for that object in stage Restore.

func (*FluctusClient) GenericFileGet

func (client *FluctusClient) GenericFileGet(genericFileIdentifier string, includeRelations bool) (*GenericFile, error)

Returns the generic file with the specified identifier.

func (*FluctusClient) GenericFileSave

func (client *FluctusClient) GenericFileSave(objId string, gf *GenericFile) (newGf *GenericFile, err error)

Saves a GenericFile to fluctus. This function figures out whether the save is a create or an update. Param objId is the Id of the IntellectualObject to which the file belongs. This returns the GenericFile.

func (*FluctusClient) GenericFileSaveBatch

func (client *FluctusClient) GenericFileSaveBatch(objId string, files []*GenericFile) (err error)

Saves a batch of GenericFiles to fluctus. This is for create only.

func (*FluctusClient) GetBagStatus

func (client *FluctusClient) GetBagStatus(etag, name string, bag_date time.Time) (status *ProcessStatus, err error)

GetBagStatus returns the status of a bag from a prior round of processing. This function will return nil if Fluctus has no record of this bag.

func (*FluctusClient) GetBagStatusById

func (client *FluctusClient) GetBagStatusById(id int) (status *ProcessStatus, err error)

GetBagStatusById returns the processed item with the specified ID, or nil.

func (*FluctusClient) GetFilesNotCheckedSince

func (client *FluctusClient) GetFilesNotCheckedSince(daysAgo time.Time, offset, limit int) (files []*GenericFile, err error)

Returns a list of GenericFiles that have not had a fixity check since the specified datetime.

func (*FluctusClient) GetGenericFileSummaries

func (client *FluctusClient) GetGenericFileSummaries(intelObjIdentifier string) (files []*GenericFile, err error)

Returns a lightweight version of the generic files belonging to an intellectual object. See the comments above on IntellectualObjectGetForRestore.

func (*FluctusClient) InstitutionGet

func (client *FluctusClient) InstitutionGet(identifier string) (*Institution, error)

func (*FluctusClient) IntellectualObjectCreate

func (client *FluctusClient) IntellectualObjectCreate(obj *IntellectualObject, maxGenericFiles int) (newObj *IntellectualObject, err error)

func (*FluctusClient) IntellectualObjectGet

func (client *FluctusClient) IntellectualObjectGet(identifier string, includeRelations bool) (*IntellectualObject, error)

Returns the IntellectualObject with the specified id, or nil of no such object exists. If includeRelations is false, this returns only the IntellectualObject. If includeRelations is true, this returns the IntellectualObject with all of its GenericFiles and Events. Param identifier must have slashes replaced with %2F or you'll get a 404!

func (*FluctusClient) IntellectualObjectGetForRestore

func (client *FluctusClient) IntellectualObjectGetForRestore(identifier string) (*IntellectualObject, error)

Returns an IntellectualObject with GenericFiles that have just enough info filled in to restore the object. Each GenericFile will have Size, Identifier and URI, and no other data. This special call works around some problems in Fedora, where getting a list of 10k GenericFiles can take over an hour at 100% CPU. This specially optimized call can get 10k files in about 2 seconds. Just keep in mind that you will not get fully-formed GenericFile objects.

func (*FluctusClient) IntellectualObjectUpdate

func (client *FluctusClient) IntellectualObjectUpdate(obj *IntellectualObject) (newObj *IntellectualObject, err error)

Updates an existing IntellectualObject in fluctus. Returns the IntellectualObject.

func (*FluctusClient) NewJsonRequest

func (client *FluctusClient) NewJsonRequest(method, targetUrl string, body io.Reader) (*http.Request, error)

newJsonGet returns a new request with headers indicating JSON request and response formats.

func (*FluctusClient) PremisEventSave

func (client *FluctusClient) PremisEventSave(objId, objType string, event *PremisEvent) (newEvent *PremisEvent, err error)

Saves a PremisEvent to Fedora. Param objId should be the IntellectualObject id if you're recording an object-related event, such as ingest; or a GenericFile id if you're recording a file-related event, such as fixity generation. Param objType must be either "IntellectualObject" or "GenericFile". Param event is the event you wish to save. This returns the event that comes back from Fluctus. Note that you can create events, but you cannot update them. All saves will create new events!

func (*FluctusClient) ProcessStatusSearch

func (client *FluctusClient) ProcessStatusSearch(ps *ProcessStatus, retrySpecified, reviewedSpecified bool) (statusRecords []*ProcessStatus, err error)

ProcessStatusSearch returns any ProcessedItem/ProcessStatus records from fluctus matching the specified criteria. Fill a ProcessStatus with as many attributes as you like and pass it in. This will return all ProcessStatus records from Fluctus whose attributes match the attributes of the object you passed in.

Because booleans in Go default to false, the params retrySpecified and reviewSpecified indicate whether you want ps.Retry and ps.Reviewed to be added in to the search criteria.

func (*FluctusClient) RestorationItemsGet

func (client *FluctusClient) RestorationItemsGet(objectIdentifier string) (statusRecords []*ProcessStatus, err error)

Returns a list of items that need to be restored. If param objectIdentifier is not an empty string, this will return all ProcessedItem records for the intellectual object that are in action "Restore".

If no objectIdentifier is supplied, this returns all ProcessedItem records in action "Restore" with stage "Requested" and status "Pending".

This will return zero items in either of the following cases:

1. No objectIdentifier is supplied and there are no pending restoration requests in Fluctus' ProcessedItems table.

2. An objectIdentifier is supplied, and there are no ProcessedItem records for that object in stage Restore.

func (*FluctusClient) RestorationStatusSet

func (client *FluctusClient) RestorationStatusSet(processStatus *ProcessStatus) error

This sets the status of the bag restore operation on all ProcessedItem records for all bag parts that make up the current object. If an object was uploaded as a series of 100 bags, this sets the status on the processed item records for the latest ingested version of each of those 100 bags.

func (*FluctusClient) SendProcessedItem

func (client *FluctusClient) SendProcessedItem(localStatus *ProcessStatus) (err error)

SendProcessedItem sends information about the status of processing this item to Fluctus. Param localStatus should come from ProcessResult.ProcessStatus(), which gives information about the current state of processing.

func (*FluctusClient) UpdateProcessedItem

func (client *FluctusClient) UpdateProcessedItem(status *ProcessStatus) (err error)

UpdateProcessedItem sends a message to Fluctus describing whether bag processing succeeded or failed. If it failed, the ProcessStatus object includes some details of what went wrong.

type GenericFile

type GenericFile struct {
	Id                 string               `json:"id"`
	Identifier         string               `json:"identifier"`
	Format             string               `json:"file_format"`
	URI                string               `json:"uri"`
	Size               int64                `json:"size"`
	Created            time.Time            `json:"created"`
	Modified           time.Time            `json:"modified"`
	ChecksumAttributes []*ChecksumAttribute `json:"checksum"`
	Events             []*PremisEvent       `json:"premisEvents"`

GenericFile contains information about a file that makes up part (or all) of an IntellectualObject.

IntellectualObject is the object to which the file belongs.

Format is typically a mime-type, such as "application/xml", that describes the file format.

URI describes the location of the object (in APTrust?).

Size is the size of the object, in bytes.

Created is the date and time at which the object was created (in APTrust, or at the institution that owns it?).

Modified is the data and time at which the object was last modified (in APTrust, or at the institution that owns it?).

Created and Modified should be ISO8601 DateTime strings, such as:

1994-11-05T08:15:30-05:00 (Local Time) 1994-11-05T08:15:30Z (UTC)

func (*GenericFile) BagName

func (gf *GenericFile) BagName() (string, error)

Returns the name of the original bag.

func (*GenericFile) FindEventsByType

func (gf *GenericFile) FindEventsByType(eventType string) []PremisEvent

Returns events of the specified type

func (*GenericFile) GetChecksum

func (gf *GenericFile) GetChecksum(algorithm string) *ChecksumAttribute

Returns the most recent checksum digest for the given algorithm for this file. We use the most recent checksum to check fixity when doing fixity checks, when restoring bags, and when building bags for DPN. See PT #126734205 at

func (*GenericFile) InstitutionId

func (gf *GenericFile) InstitutionId() (string, error)

Returns the name of the institution that owns this file.

func (*GenericFile) OriginalPath

func (gf *GenericFile) OriginalPath() (string, error)

Returns the original path of the file within the original bag. This is just the identifier minus the institution id and bag name. For example, if the identifier is "", this returns "data/"

func (*GenericFile) PreservationStorageFileName

func (gf *GenericFile) PreservationStorageFileName() (string, error)

Returns the name of this file in the preservation storage bucket (that should be a UUID), or an error if the GenericFile does not have a valid preservation storage URL.

func (*GenericFile) SerializeForFluctus

func (gf *GenericFile) SerializeForFluctus() ([]byte, error)

Serializes a version of GenericFile that Fluctus will accept as post/put input.

func (*GenericFile) ToMapForBulkSave

func (gf *GenericFile) ToMapForBulkSave() map[string]interface{}

Converts a generic file to a map structure which can then be serialized to JSON. The resulting structure includes both checksums and premis events, and is intended for the save_batch action of Fluctus' generic_files controller.

type IngestHelper

type IngestHelper struct {
	ProcUtil *ProcessUtil
	Result   *ProcessResult
	// contains filtered or unexported fields

func NewIngestHelper

func NewIngestHelper(procUtil *ProcessUtil, message *nsq.Message, s3File *S3File) *IngestHelper

Returns a new IngestHelper

func (*IngestHelper) CopyToPreservationBucket

func (helper *IngestHelper) CopyToPreservationBucket(file *File, reader *os.File, options *s3.Options) (string, error)

Returns the S# URL of the file that was copied to the preservation bucket, or an error.

func (*IngestHelper) DeleteLocalFiles

func (helper *IngestHelper) DeleteLocalFiles() (errors []error)

This deletes the tar file and all of the files that were unpacked from it. Param file is the path the tar file.

func (*IngestHelper) FailedAndNoMoreRetries

func (helper *IngestHelper) FailedAndNoMoreRetries() bool

func (*IngestHelper) FetchTarFile

func (helper *IngestHelper) FetchTarFile()

This fetches a file from S3 and stores it locally.

func (*IngestHelper) GetFileReader

func (helper *IngestHelper) GetFileReader(file *File) (*os.File, string, error)

Returns an OPEN reader for the specified File (reading it from the local disk). Caller is responsible for closing the reader.

func (*IngestHelper) GetS3Options

func (helper *IngestHelper) GetS3Options(file *File) (*s3.Options, error)

func (*IngestHelper) IncompleteCopyToS3

func (helper *IngestHelper) IncompleteCopyToS3() bool

func (*IngestHelper) LogResult

func (helper *IngestHelper) LogResult()

func (*IngestHelper) MergeFedoraRecord

func (helper *IngestHelper) MergeFedoraRecord() error

Our result object contains information about the bag we just unpacked. Fedora may have information about a previous version of this bag, or about the same version of the same bag from an earlier round of processing. This function merges data from Fedora into our result, so we can know whether any of the generic files have been updated.

func (*IngestHelper) ProcessBagFile

func (helper *IngestHelper) ProcessBagFile()

Unpacks the bag file at path, extracts tag info and returns information about whether it was successfully unpacked, valid and complete.

func (*IngestHelper) SaveFile

func (helper *IngestHelper) SaveFile(file *File) (string, error)

Saves a file to the preservation bucket. Returns the url of the file that was saved. Returns an error if there was a problem.

func (*IngestHelper) SaveGenericFiles

func (helper *IngestHelper) SaveGenericFiles() error

func (*IngestHelper) UpdateFluctusStatus

func (helper *IngestHelper) UpdateFluctusStatus(stage StageType, status StatusType)

type Institution

type Institution struct {
	Pid        string `json:"pid"`
	Name       string `json:"name"`
	BriefName  string `json:"brief_name"`
	Identifier string `json:"identifier"`
	DpnUuid    string `json:"dpn_uuid"`

Institution represents an institution in fluctus. Name is the institution's full name. BriefName is a shortened name. Identifier is the institution's domain name.

type IntellectualObject

type IntellectualObject struct {
	Id            string         `json:"id"`
	Identifier    string         `json:"identifier"`
	InstitutionId string         `json:"institution_id"`
	Title         string         `json:"title"`
	Description   string         `json:"description"`
	Access        string         `json:"access"`
	AltIdentifier []string       `json:"alt_identifier"`
	GenericFiles  []*GenericFile `json:"generic_files"`
	Events        []*PremisEvent `json:"events"`

IntellectualObject is Fluctus' version of an IntellectualObject. It belongs to an Institution and consists of one or more GenericFiles and a number of events.

Institution is the owner of the intellectual object.

Title is the title.

Description is a free-text description of the object.

Identifier is the object's unique identifier. (Whose assigned this id? APTrust or the owner?)

Access indicate who can access the object. Valid values are consortial, institution and restricted.

func LoadIntelObjFixture

func LoadIntelObjFixture(filename string) (*IntellectualObject, error)

Loads an IntellectualObject fixture (a JSON file) from the testdata directory for testing.

func (*IntellectualObject) AccessValid

func (obj *IntellectualObject) AccessValid() bool

AccessValid returns true or false to indicate whether the structure's Access property contains a valid value.

func (*IntellectualObject) CreateIdEvent

func (obj *IntellectualObject) CreateIdEvent() *PremisEvent

func (*IntellectualObject) CreateIngestEvent

func (obj *IntellectualObject) CreateIngestEvent() *PremisEvent

func (*IntellectualObject) CreateRightsEvent

func (obj *IntellectualObject) CreateRightsEvent() *PremisEvent

func (*IntellectualObject) OriginalBagName

func (obj *IntellectualObject) OriginalBagName() string

Returns the original bag name of this object. That's the intellectual object id, minus the institution name and the slash. So "" returns ""

func (*IntellectualObject) SerializeForCreate

func (obj *IntellectualObject) SerializeForCreate(maxGenericFiles int) ([]byte, error)

SerializeForCreate serializes a fluctus intellectual object along with all of its generic files and events in a single shot. The output is a byte array of JSON data.

If maxGenericFiles is greater than zero, the JSON data will include only that number of generic files. Otherwise, it will include all of the generic files.

Fluctus is somewhat efficient at creating new intellectual objects when all of the files and events are included in the JSON for the initial create request. But some Intellectual Objects contain more than 10,000 files, and if we send all of this data at once to Fluctus, it crashes.

func (*IntellectualObject) SerializeForFluctus

func (obj *IntellectualObject) SerializeForFluctus() ([]byte, error)

Serialize the subset of IntellectualObject data that fluctus will accept. This is for post/put, where essential info, such as institution id and/or object id will be in the URL.

func (*IntellectualObject) TotalFileSize

func (obj *IntellectualObject) TotalFileSize() int64

Returns the total number of bytes of all of the generic files in this object. The object's bag size will be slightly larger than this, because it will include a manifest, tag files and tar header.

type MetadataRecord

type MetadataRecord struct {
	// Type describes what we're trying to record in Fedora. It can
	// be "IntellectualObject", "GenericFile", or "PremisEvent"
	Type string
	// Action contains information about what was in Fedora.
	// For Type IntellectualObject, this will be "object_registered".
	// For Type GenericFile, this will be "file_registered".
	// For Type PremisEvent, this will be the name of the event:
	// "ingest", "identifier_assignment", or "fixity_generation".
	Action string
	// For actions or events pertaining to a GenericFile this will be the path
	// of the file the action pertains to. For example, for fixity_generation
	// on the file "data/images/aerial.jpg", the EventObject would be
	// "data/images/aerial.jpg". For actions or events pertaining to the
	// IntellectualObject, this will be the IntellectualObject identifier.
	EventObject string
	// ErrorMessage contains a description of the error that occurred
	// when we tried to save this bit of metadata in Fluctus/Fedora.
	// It will be empty if there was no error, or if we have not yet
	// attempted to save the item.
	ErrorMessage string

MetadataRecord describes the result of an attempt to record metadata in Fluctus/Fedora.

func (*MetadataRecord) Succeeded

func (record *MetadataRecord) Succeeded() bool

Returns true if this bit of metadata was successfully saved to Fluctus/Fedora.

type PartnerConfig

type PartnerConfig struct {
	AwsAccessKeyId     string
	AwsSecretAccessKey string
	ReceivingBucket    string
	RestorationBucket  string
	DownloadDir        string
	// contains filtered or unexported fields

func LoadPartnerConfig

func LoadPartnerConfig(configFile string) (*PartnerConfig, error)

func (*PartnerConfig) ExpandFilePaths

func (partnerConfig *PartnerConfig) ExpandFilePaths()

func (*PartnerConfig) LoadAwsFromEnv

func (partnerConfig *PartnerConfig) LoadAwsFromEnv()

Fill in AWS values if their missing from config file but present in the environment.

func (*PartnerConfig) Validate

func (partnerConfig *PartnerConfig) Validate() error

func (*PartnerConfig) Warnings

func (partnerConfig *PartnerConfig) Warnings() []string

type PartnerS3Client

type PartnerS3Client struct {
	PartnerConfig *PartnerConfig
	S3Client      *S3Client
	LogVerbose    bool
	Test          bool // used only in testing to suppress output

func NewPartnerS3ClientFromConfigFile

func NewPartnerS3ClientFromConfigFile(configFile string, logVerbose bool) (*PartnerS3Client, error)

Returns a new PartnerS3Client object. Will return an error if the config file is missing, unreadable or incomplete.

func NewPartnerS3ClientWithConfig

func NewPartnerS3ClientWithConfig(partnerConfig *PartnerConfig, logVerbose bool) (*PartnerS3Client, error)

Returns a new PartnerS3Client object with the specified configuration.

func (*PartnerS3Client) Delete

func (client *PartnerS3Client) Delete(bucketName, fileName string) error

Deletes the specified file from the specified bucket.

func (*PartnerS3Client) DownloadFile

func (client *PartnerS3Client) DownloadFile(bucketName, key, checksum string) (string, error)

Downloads a file from the S3 restoration bucket and saves it in the directory specified by PartnerConfig.DownloadDir. Param checksum may be "md5", "sha256" or "none". This returns the md5 or sha256 checksum of the downloaded file, or an empty string if the checksum param was "none". Returns an error if any occurred.

func (*PartnerS3Client) List

func (client *PartnerS3Client) List(bucketName string, limit int) (keys []s3.Key, err error)

Lists up the contents of a bucket, return up to limit number of entries.

func (*PartnerS3Client) LoadConfig

func (client *PartnerS3Client) LoadConfig(configFile string) error

Loads configuration from the specified file path.

func (*PartnerS3Client) UploadFile

func (client *PartnerS3Client) UploadFile(file *os.File) error

Uploads a single file to the S3 receiving bucket. Returns S3's checksum for the file, or an error. Note that S3 checksums for large multi-part uploads will not be normal md5 checksums.

func (*PartnerS3Client) UploadFiles

func (client *PartnerS3Client) UploadFiles(filePaths []string) (succeeded, failed int)

Uploads all files in filePaths to the S3 receiving bucket. Returns the number of uploads that succeeded and the number that failed.

type PremisEvent

type PremisEvent struct {
	Identifier         string    `json:"identifier"`
	EventType          string    `json:"type"`
	DateTime           time.Time `json:"date_time"`
	Detail             string    `json:"detail"`
	Outcome            string    `json:"outcome"`
	OutcomeDetail      string    `json:"outcome_detail"`
	Object             string    `json:"object"`
	Agent              string    `json:"agent"`
	OutcomeInformation string    `json:"outcome_information"`

PremisEvent contains information about events that occur during the processing of a file or intellectual object, such as the verfication of checksums, generation of unique identifiers, etc. We use this struct to exchange data in JSON format with the fluctus API. Fluctus, in turn, is responsible for managing all of this data in Fedora.

This structure has the following fields:

EventType is the type of Premis event we want to register: ingest, validation, fixity_generation, fixity_check or identifier_assignment.

DateTime is when this event occurred in our system.

Detail is a brief description of the event.

Outcome is either success or failure

Outcome detail is the checksum for checksum generation, the id for id generation.

Object is a description of the object that generated the checksum or id.

Agent is a URL describing where to find more info about Object.

OutcomeInformation contains the text of an error message, if Outcome was failure.

func (*PremisEvent) EventTypeValid

func (premisEvent *PremisEvent) EventTypeValid() bool

EventTypeValid returns true/false, indicating whether the structure's EventType property contains the name of a valid premis event.

type ProcessResult

type ProcessResult struct {
	NsqMessage    *nsq.Message `json:"-"` // Don't serialize
	S3File        *S3File
	ErrorMessage  string
	FetchResult   *FetchResult
	TarResult     *TarResult
	BagReadResult *BagReadResult
	FedoraResult  *FedoraResult
	BagDeletedAt  time.Time
	Stage         StageType
	Retry         bool

Retry will be set to true if the attempt to process the file failed and should be tried again. This would be case, for example, if the failure was due to a network error. Retry is set to false if processing failed for some reason that will not change: for example, if the file cannot be untarred, checksums were bad, or data files were missing. If processing succeeded, Retry is irrelevant.

func LoadResult

func LoadResult(filename string) (result *ProcessResult, err error)

Loads a result from the test data directory. This is used primarily for tests.

func (*ProcessResult) GenericFiles

func (result *ProcessResult) GenericFiles() (files []*GenericFile, err error)

GenericFiles returns a list of GenericFile objects that were found in the bag.

func (*ProcessResult) IngestStatus

func (result *ProcessResult) IngestStatus(logger *logging.Logger) (status *ProcessStatus)

IngestStatus returns a lightweight Status object suitable for reporting to the Fluctus results table, so that APTrust partners can view the status of their submitted bags.

TODO: Refactor. We should have to pass in a logger. <Sigh>

func (*ProcessResult) IntellectualObject

func (result *ProcessResult) IntellectualObject() (obj *IntellectualObject, err error)

IntellectualObject returns an instance of IntellectualObject which describes what was unpacked from the bag. The IntellectualObject structure matches Fluctus' IntellectualObject model, and can be sent directly to Fluctus for recording.

type ProcessStatus

type ProcessStatus struct {
	Id                    int        `json:"id"`
	ObjectIdentifier      string     `json:"object_identifier"`
	GenericFileIdentifier string     `json:"generic_file_identifier"`
	Name                  string     `json:"name"`
	Bucket                string     `json:"bucket"`
	ETag                  string     `json:"etag"`
	BagDate               time.Time  `json:"bag_date"`
	Institution           string     `json:"institution"`
	User                  string     `json:"user"`
	Date                  time.Time  `json:"date"`
	Note                  string     `json:"note"`
	Action                ActionType `json:"action"`
	Stage                 StageType  `json:"stage"`
	Status                StatusType `json:"status"`
	Outcome               string     `json:"outcome"`
	Retry                 bool       `json:"retry"`
	Reviewed              bool       `json:"reviewed"`
	State                 string     `json:"state"`
	Node                  string     `json:"node"`
	Pid                   int        `json:"pid"`
	NeedsAdminReview      bool       `json:"needs_admin_review"`

ProcessStatus contains summary information describing the status of a bag in process. This data goes to Fluctus, so that APTrust partners can see which of their bags have been processed successfully, and why failed bags failed. See for details.

Type may have one of the following values: Ingest, Delete, Restore

Stage may have one of the following values: Receive (bag was uploaded by partner into receiving bucket), Fetch (fetch tarred bag file from S3 receiving bucket), Unpack (unpack the tarred bag), Validate (make sure all data files are present, checksums are correct, required tags are present), Store (copy generic files to permanent S3 bucket for archiving), Record (save record of intellectual object, generic files and events to Fedora).

Status may have one of the following values: Pending, Success, Failed.

func (*ProcessStatus) HasBeenStored

func (status *ProcessStatus) HasBeenStored() bool

Returns true if an object's files have been stored in S3 preservation bucket.

func (*ProcessStatus) IsStoring

func (status *ProcessStatus) IsStoring() bool

func (*ProcessStatus) SerializeForFluctus

func (status *ProcessStatus) SerializeForFluctus() ([]byte, error)

Convert ProcessStatus to JSON, omitting id, which Rails won't permit. For internal use, json.Marshal() works fine.

func (*ProcessStatus) SetNodePidState

func (status *ProcessStatus) SetNodePidState(object interface{}, logger *logging.Logger)

Set state, node and pid on ProcessStatus.

func (*ProcessStatus) ShouldTryIngest

func (status *ProcessStatus) ShouldTryIngest() bool

Returns true if we should try to ingest this item.

type ProcessUtil

type ProcessUtil struct {
	ConfigName    string
	Config        Config
	JsonLog       *log.Logger
	MessageLog    *logging.Logger
	Volume        *Volume
	S3Client      *S3Client
	FluctusClient *FluctusClient
	// contains filtered or unexported fields

ProcessUtil sets up the items common to many of the bag processing services (bag_processor, bag_restorer, cleanup, etc.). It also encapsulates some functions common to all of those services.

func NewProcessUtil

func NewProcessUtil(requestedConfig *string, serviceGroup string) (procUtil *ProcessUtil)

Creates and returns a new ProcessUtil object. Because some items are absolutely required by this object and the processes that use it, this method will panic if it gets an invalid config param from the command line, or if it cannot set up some essential services, such as logging.

This object is meant to used as a singleton with any of the stand-along processing services (bag_processor, bag_restorer, cleanup, etc.).

Param requestedConfig should be the name of a valid configuration in the config.json file ("dev", "test", etc.).

Param serviceGroup must be either "aptrust" or "dpn".

func (*ProcessUtil) BagAlreadyInProgress

func (procUtil *ProcessUtil) BagAlreadyInProgress(s3File *S3File, currentMessageId string) bool

Returns true if the bag is currently being processed. This handles a special case where a very large bag is in process for a long time, the NSQ message times out, then NSQ re-sends the same message with the same ID to this worker. Without these checks, the worker will accept the message and will be processing it twice. This causes problems because the first working will be deleting files while the second working is trying to run checksums on them.

func (*ProcessUtil) Failed

func (procUtil *ProcessUtil) Failed() int64

Returns the number of processed items that failed.

func (*ProcessUtil) IncrementFailed

func (procUtil *ProcessUtil) IncrementFailed() int64

Increases the count of unsuccessfully processed items by one.

func (*ProcessUtil) IncrementSucceeded

func (procUtil *ProcessUtil) IncrementSucceeded() int64

Increases the count of successfully processed items by one.

func (*ProcessUtil) LogStats

func (procUtil *ProcessUtil) LogStats()

Logs info about the number of items that have succeeded and failed.

func (*ProcessUtil) MessageIdFor

func (procUtil *ProcessUtil) MessageIdFor(key string) string

Returns the NSQ MessageId under which the current item is being processed, or an empty string if no item with that key is currently being processed.

func (*ProcessUtil) MessageIdString

func (procUtil *ProcessUtil) MessageIdString(messageId nsq.MessageID) string

Converts an NSQ MessageID to a string.

func (*ProcessUtil) RegisterItem

func (procUtil *ProcessUtil) RegisterItem(key string, messageId nsq.MessageID) error

Registers an item currently being processed so we can keep track of duplicates. Many requests for ingest, restoration, etc. may be queued more than once. Register an item here to note that it is being processed under a specific message id. If they item comes in again before we're done processing, and you try to register it here, you'll get an error saying the item is already in process.

The key should be a unique identifier. For intellectual objects, this can be the IntellectualObject.Identifier. For S3 files, it can be bucket_name/file_name.

func (*ProcessUtil) Succeeded

func (procUtil *ProcessUtil) Succeeded() int64

Returns the number of processed items that succeeded.

func (*ProcessUtil) UnregisterItem

func (procUtil *ProcessUtil) UnregisterItem(key string)

UnregisterItem removes the item with specified key from the list of items we are currently processing. Be sure to call this when you're done processing any item you've registered so we know we're finished with it and we can reprocess it later, under a different message id.

type S3Client

type S3Client struct {
	S3 *s3.S3

func NewS3Client

func NewS3Client(region aws.Region) (*S3Client, error)

Returns an S3Client for the specified region, using AWS credentials from the environment. Please keep your AWS keys out of the source code repos! Store them somewhere else and load them into environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

func NewS3ClientExplicitAuth

func NewS3ClientExplicitAuth(region aws.Region, accessKey, secretKey string) (*S3Client, error)

Returns an S3 client from specific auth credentials, instead of reading credentials from the environment.

func (*S3Client) CheckAllBuckets

func (client *S3Client) CheckAllBuckets(buckets []string) (bucketSummaries []*BucketSummary, errors []error)

Collects info about all of the buckets listed in buckets. TODO: Write unit test

func (*S3Client) CheckBucket

func (client *S3Client) CheckBucket(bucketName string) (bucketSummary *BucketSummary, err error)

Returns info about the contents of the bucket named bucketName. BucketSummary contains the bucket name, a list of keys, and the size of the largest file in the bucket. TODO: Write unit test

func (*S3Client) Delete

func (client *S3Client) Delete(bucketName, fileName string) error

Deletes an item from S3

func (*S3Client) Exists

func (client *S3Client) Exists(bucketName, key string) (bool, error)

Returns true/false indicating whether a bucket exists.

func (*S3Client) FetchAndCalculateSha256

func (client *S3Client) FetchAndCalculateSha256(fixityResult *FixityResult, localPath string) error

This fetches the file from S3, but does **not** save it unless you specify a localPath, in which case the file will be saved into the file specified by localPath.

Without localPath, this simply calculates the sha256 digest of the stream thatS3 returns. Keep in mind that the remote file may be up to 250GB, so this call can run for several hours and use a lot of CPU.

Returns a FixityResult object that includes not only the checksum, but also some information about what went wrong and whether the operation should be retried.

func (*S3Client) FetchToFile

func (client *S3Client) FetchToFile(bucketName string, key s3.Key, path string) (fetchResult *FetchResult)

Fetches key from bucket and saves it to path. This validates the md5 sum of the byte stream before saving to disk. If the md5 sum of the downloaded bytes does not match the md5 sum in the key, this will not save the file. It will just return an error.

This method is primarily intended for fetching tar files from the receiving buckets. It calculates the file's Md5 checksum as it writes it to disk.

func (*S3Client) FetchToFileWithoutChecksum

func (client *S3Client) FetchToFileWithoutChecksum(bucketName, key, localPath string) error

Fetches file key from bucketName and saves it to localPath. The file will be at localPath when this is done, unless it returns an error. This is a simple fetch method that does none of the accounting required for ingest.

func (*S3Client) FetchURLToFile

func (client *S3Client) FetchURLToFile(url, localPath string) *FetchResult

Fetches the specified S3 URL and saves it in the specified localPath. Ensures that the directory containing localPath exists, and calculates an md5 checksum on download. The FetchResult will tell you whether the md5 matched what AWS said it should be. You'll get an error if url is not an S3 url, or if it doesn't exist. Check FetchResult.ErrorMessage.

func (*S3Client) GetKey

func (client *S3Client) GetKey(bucketName, fileName string) (*s3.Key, error)

Returns an S3 key object for the specified file in the specified bucket. The key object has the ETag, last mod date, size and other useful info.

func (*S3Client) GetReader

func (client *S3Client) GetReader(bucketName, key string) (io.ReadCloser, error)

Returns a reader that lets you read data from bucket/key.

func (*S3Client) Head

func (client *S3Client) Head(bucketName, key string) (*http.Response, error)

Performs a HEAD request on an S3 object and returns the response. Check the response status code. You may get a 401 or 403 for files that don't exist, and the body will be an XML error message.

func (*S3Client) ListBucket

func (client *S3Client) ListBucket(bucketName string, limit int) (keys []s3.Key, err error)

Returns a list of keys in the specified bucket. If limit is zero, this will return all the keys in the bucket; otherwise, it will return only the number of keys specifed. Note that listing all keys may result in the underlying client issuing multiple requests.

func (*S3Client) MakeOptions

func (client *S3Client) MakeOptions(md5sum string, metadata map[string][]string) s3.Options

Creates an options struct that adds metadata headers to the S3 put.

func (*S3Client) SaveLargeFileToS3

func (client *S3Client) SaveLargeFileToS3(bucketName, fileName, contentType string,
	reader s3.ReaderAtSeeker, byteCount int64, options s3.Options, chunkSize int64) (url string, err error)

Sends a large file (>= 5GB) to S3 in 200MB chunks. This operation may take several minutes to complete. Note that os.File satisfies the s3.ReaderAtSeeker interface.

func (*S3Client) SaveToS3

func (client *S3Client) SaveToS3(bucketName, fileName, contentType string, reader io.Reader, byteCount int64, options s3.Options) (url string, err error)

Saves a file to S3 with default access of Private. The underlying S3 client does not return the md5 checksum from s3, but we already have this info elsewhere. If the PUT produces no error, we assume the copy worked and the files md5 sum is the same on S3 as here.

type S3File

type S3File struct {
	BucketName string
	Key        s3.Key

S3File contains information about the S3 file we're trying to process from an intake bucket. BucketName and Key are the S3 bucket name and key. AttemptNumber describes whether this is the 1st, 2nd, 3rd, etc. attempt to process this file.

func (*S3File) BagName

func (s3File *S3File) BagName() string

The name of the owning institution, followed by a slash, followed by the name of the tar file. This differs from the ObjectName, because it will have the .tar or bag.001.of030.tar suffix.

func (*S3File) ObjectName

func (s3File *S3File) ObjectName() (string, error)

Returns the object identifier that will identify this bag in fedora. That's the institution identifier, followed by a slash and the tar file name, minus the .tar extension and the ".bag1of12" multipart extension. So for BucketName "" and Key.Key "nc_bag.b001.of030.tar", this would return ""

type StageType

type StageType string

Stage enumerations match values defined in

type StatusType

type StatusType string

Status enumerations match values defined in

type SynchronizedMap

type SynchronizedMap struct {
	// contains filtered or unexported fields

SynchronizedMap is a map structure that can be shared across go routines and threads. Both keys and values are strings.

func NewSynchronizedMap

func NewSynchronizedMap() *SynchronizedMap

Creates a new empty SynchronizedMap

func (*SynchronizedMap) Add

func (syncMap *SynchronizedMap) Add(key, value string)

Adds a key/value pair to the map.

func (*SynchronizedMap) Delete

func (syncMap *SynchronizedMap) Delete(key string)

Deletes the specified key from the map.

func (*SynchronizedMap) Get

func (syncMap *SynchronizedMap) Get(key string) string

Returns the value of key from the map.

func (*SynchronizedMap) HasKey

func (syncMap *SynchronizedMap) HasKey(key string) bool

Returns true if the key exists in the map.

func (*SynchronizedMap) Keys

func (syncMap *SynchronizedMap) Keys() []string

Returns a slice of all keys in the map.

func (*SynchronizedMap) Values

func (syncMap *SynchronizedMap) Values() []string

Returns a slice of all values in the map.

type Tag

type Tag struct {
	Label string
	Value string

This Tag struct is essentially the same as the bagins TagField struct, but its properties are public and can be easily serialized to / deserialized from JSON.

type TarResult

type TarResult struct {
	InputFile     string
	OutputDir     string
	ErrorMessage  string
	Warnings      []string
	FilesUnpacked []string
	Files         []*File

TarResult contains information about the attempt to untar a bag.

func Untar

func Untar(tarFilePath, instDomain, bagName string, buildIngestData bool) (result *TarResult)

Untars the file at the specified tarFilePath and returns a list of files that were untarred from the archive. Check result.Error to ensure there were no errors. tarFilePath is the tarFilePath to the tar file that you want to unpack. instDomain is the domain name of the institution that owns the bag. bagName is the name of the tar file, minus the ".tar" extension.

If param buildIngestData is true, the untar operation will run md5 and sha256 checksums on all of the data files in the archive, and it will use MimeMagic to figure out each file's mime type. We must do this during ingest, so buildIngestData must be true when we're running this on our servers.

If we're running in a client environment--that is, an APTrust partner is running the bag validation application on their laptop or desktop-- buildIngestData should be false. There's no need to calculate sha256 checksums in that case, and we cannot assume that the client would have the external MimeMagic C libraries installed. The lack of those libraries would cause the application to crash. So buildIngestData == false on the client!!

func (*TarResult) AllFilesCopiedToPreservation

func (result *TarResult) AllFilesCopiedToPreservation() bool

Returns true if all generic files were successfully copied to S3 long term storage.

func (*TarResult) AnyFilesCopiedToPreservation

func (result *TarResult) AnyFilesCopiedToPreservation() bool

Returns true if any generic files were successfully copied to S3 long term storage.

func (*TarResult) AnyFilesNeedSaving

func (result *TarResult) AnyFilesNeedSaving() bool

Returns true if any of the untarred files are new or updated.

func (*TarResult) FilePaths

func (result *TarResult) FilePaths() []string

FilePaths returns a list of all the File paths that were untarred from the bag. The list will look something like "data/file1.gif", "data/file2.pdf", etc.

func (*TarResult) GetFileByPath

func (result *TarResult) GetFileByPath(filePath string) *File

Returns the File with the specified path, if it exists.

func (*TarResult) MergeExistingFiles

func (result *TarResult) MergeExistingFiles(genericFiles []*GenericFile)

MergeExistingFiles merges data from generic files that already exist in Fedora. This is necessary when an existing bag is reprocessed or re-uploaded.

type User

type User struct {
	Email     string `json:"email"`
	Password  string `json:"password,omitempty"`
	ApiKey    string `json:"api_secret_key,omitempty"`
	AuthToken string `json:"authenticity_token,omitempty"`

User struct is used for logging in to fluctus.

type Validator

type Validator struct {
	PathToFile    string
	TarResult     *TarResult
	BagReadResult *BagReadResult
	ErrorMessage  string

func NewValidator

func NewValidator(pathToFile string) (*Validator, error)

Returns a new Validator suitable for partners to validate bags before sending. For server-side use, use IngestHelper.

func (*Validator) FileType

func (validator *Validator) FileType() (int, error)

Returns either VAL_TYPE_TAR, VAL_TYPE_DIR or VAL_TYPE_ERR to describe what type of item the user wants to validate.

func (*Validator) InstitutionDomain

func (validator *Validator) InstitutionDomain() (string, error)

Get the instution name from the file/bag name, or returns a descriptive error if the file doesn't include the institution name. There's a little problem here that came along late in development. According to the docs at, bag names are supposed to start with the institution identifier, minus the ".edu" or ".org" extension. So "ncsu" instead of "", or "miami" insteady of "". So this returns the institution identifier without the TLD extension.

func (*Validator) IsValid

func (validator *Validator) IsValid() bool

func (*Validator) IsValidMultipartName

func (validator *Validator) IsValidMultipartName() bool

Returns true if the bag has a valid multipart bag name.

func (*Validator) LooksLikeMultipart

func (validator *Validator) LooksLikeMultipart() bool

Returns true if the bag name looks like a multipart bag. This catches both correct multipart bag names and some common incorrect variants, such as "bag1of2"

func (*Validator) TarFileName

func (validator *Validator) TarFileName() string

Returns the name of the tar file that the user wants to validate. If this is a directory, returns the name of the directory with a .tar suffix.

func (*Validator) UntarredDir

func (validator *Validator) UntarredDir() string

Returns the path to the directory that holds the untarred contents of the bag.

type Volume

type Volume struct {
	// contains filtered or unexported fields

Volume tracks the amount of available space on a volume (disk), as well as the amount of space claimed for pending operations. The purpose is to allow the bag processor to try to determine ahead of time whether the underlying disk has enough space to accomodate the file it just pulled off the queue. We want to avoid downloading 100GB files when we know ahead of time that we don't have enough space to process them.

(BUG) If the config file specifies a TarDirectory and a RestorDirectory that are on the same physical or logical volume, this volume manager may not give accurate information about the amount of available space.

func NewVolume

func NewVolume(path string, messageLog *logging.Logger) (*Volume, error)

NewVolume creates a new Volume structure to track the amount of available space and claimed space on a volume (disk).

func (*Volume) AvailableSpace

func (volume *Volume) AvailableSpace() (numBytes uint64)

AvailableSpace returns an approximate number of free bytes currently available to unprivileged users on the underlying volume, minus the number of bytes reserved for pending processes. The value returned will never be 100% accurate, because other processes may be writing to the volume.

func (*Volume) ClaimedSpace

func (volume *Volume) ClaimedSpace() (numBytes uint64)

Claimed space returns the number of bytes reserved for pending operations, including downloading and untarring bag archives.

func (*Volume) InitialFreeSpace

func (volume *Volume) InitialFreeSpace() (numBytes uint64)

InitialFreeSpace returns the number of bytes available to an unprivileged user on the volume at the time the Volume struct was initialized.

func (*Volume) Release

func (volume *Volume) Release(numBytes uint64)

Release tells the Volume struct that numBytes have been deleted from the underlying volume and are free to be reused later.

func (*Volume) Reserve

func (volume *Volume) Reserve(numBytes uint64) (err error)

Reserve requests that a number of bytes on disk be reserved for an upcoming operation, such as downloading and untarring a file. Reserving space does not have any effect on the file system. It simply allows the Volume struct to maintain some internal bookkeeping. Reserve will return an error if there is not enough free disk space to accomodate the requested number of bytes.

type WorkReader

type WorkReader struct {
	Config        Config
	MessageLog    *logging.Logger
	FluctusClient *FluctusClient

WorkReader includes some basic components used by processes that read information from external sources to queue up work in NSQ.

type WorkerConfig

type WorkerConfig struct {
	// This describes how often the NSQ client should ping
	// the NSQ server to let it know it's still there. The
	// setting must be formatted like so:
	// "800ms" for 800 milliseconds
	// "10s" for ten seconds
	// "1m" for one minute
	HeartbeatInterval string

	// The maximum number of times the worker should try to
	// process a job. If non-fatal errors cause a job to
	// fail, it will be requeued this number of times.
	// Fatal errors, such as invalid bags or attempts to
	// restore or delete non-existent files, will not be
	// retried.
	MaxAttempts uint16

	// Maximum number of jobs a worker will accept from the
	// queue at one time. Workers that may have to process
	// very long-running tasks, such as apt_prepare,
	// apt_store and apt_restore, should set this number
	// fairly low (20 or so) to prevent messages from
	// timing out.
	MaxInFlight int

	// If the NSQ server does not hear from a client that a
	// job is complete in this amount of time, the server
	// considers the job to have timed out and re-queues it.
	// Long-running jobs such as apt_prepare, apt_store,
	// apt_record and apt_restore will "touch" the NSQ message
	// as it moves through each channel in the processing pipeline.
	// The touch message tells NSQ that it's still working on
	// the job, and effectively resets NSQ's timer on that
	// message to zero. Still, very large bags in any of the
	// long-running processes will need a timeout of "180m" or
	// so to ensure completion.
	MessageTimeout string

	// Number of go routines used to perform network I/O,
	// such as fetching files from S3, storing files to S3,
	// and fetching/storing Fluctus data. If a worker does
	// no network I/O (such as the TroubleWorker), this
	// setting is ignored.
	NetworkConnections int

	// The name of the NSQ Channel the worker should read from.
	NsqChannel string

	// The name of the NSQ Topic the worker should listen to.
	NsqTopic string

	// This describes how long the NSQ client will wait for
	// a read from the NSQ server before timing out. The format
	// is the same as for HeartbeatInterval.
	ReadTimeout string

	// Number of go routines to start in the worker to
	// handle all work other than network I/O. Typically,
	// this should be close to the number of CPUs.
	Workers int

	// This describes how long the NSQ client will wait for
	// a write to the NSQ server to complete before timing out.
	// The format is the same as for HeartbeatInterval.
	WriteTimeout string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL