Documentation ¶
Index ¶
- Constants
- Variables
- func EnsureItemIsMarkedCancelled(_context *context.Context, manifest *models.ReplicationManifest)
- func EnsureItemIsMarkedComplete(_context *context.Context, manifest *models.ReplicationManifest)
- func GetDPNBag(dpnClient *network.DPNRestClient, manifest *models.ReplicationManifest, ...)
- func GetDPNWorkItem(_context *context.Context, manifest *models.ReplicationManifest, ...)
- func GetRsyncCommand(copyFrom, copyTo string, useSSH bool) *exec.Cmd
- func GetWorkItem(_context *context.Context, manifest *models.DPNIngestManifest, ...)
- func GetWorkItemState(_context *context.Context, manifest *models.DPNIngestManifest, ...)
- func GetXferRequest(dpnClient *network.DPNRestClient, manifest *models.ReplicationManifest, ...)
- func LoadDPNBagValidationConfig(_context *context.Context) *validation.BagValidationConfig
- func LogIngestJson(manifest *models.DPNIngestManifest, jsonLog *log.Logger)
- func LogReplicationJson(manifest *models.ReplicationManifest, jsonLog *log.Logger)
- func PushToQueue(_context *context.Context, manifest *models.DPNIngestManifest, ...)
- func ReserveSpaceOnVolume(_context *context.Context, manifest *models.ReplicationManifest) bool
- func SaveDPNWorkItemState(_context *context.Context, manifest *models.ReplicationManifest, ...)
- func SaveWorkItem(_context *context.Context, manifest *models.DPNIngestManifest, ...)
- func SaveWorkItemState(_context *context.Context, manifest *models.DPNIngestManifest, ...)
- func SetupIngestManifest(message *nsq.Message, stage string, _context *context.Context, ...) *models.DPNIngestManifest
- func SetupReplicationManifest(message *nsq.Message, stage string, _context *context.Context, ...) *models.ReplicationManifest
- func UpdateReplicationTransfer(_context *context.Context, remoteClient *network.DPNRestClient, ...)
- type DPNCopier
- type DPNFixityChecker
- func (checker *DPNFixityChecker) FinishWithError(helper *DPNRestoreHelper)
- func (checker *DPNFixityChecker) FinishWithSuccess(helper *DPNRestoreHelper)
- func (checker *DPNFixityChecker) HandleMessage(message *nsq.Message) error
- func (checker *DPNFixityChecker) SaveFixityRecord(helper *DPNRestoreHelper)
- func (checker *DPNFixityChecker) ValidateBag(helper *DPNRestoreHelper)
- type DPNGlacierRestoreInit
- func (restorer *DPNGlacierRestoreInit) Cleanup()
- func (restorer *DPNGlacierRestoreInit) FinishWithError(helper *DPNRestoreHelper)
- func (restorer *DPNGlacierRestoreInit) FinishWithSuccess(helper *DPNRestoreHelper)
- func (restorer *DPNGlacierRestoreInit) HandleMessage(message *nsq.Message) error
- func (restorer *DPNGlacierRestoreInit) InitializeRetrieval(helper *DPNRestoreHelper)
- func (restorer *DPNGlacierRestoreInit) RequestRestore()
- func (restorer *DPNGlacierRestoreInit) RestoreRequestNeeded(helper *DPNRestoreHelper) (bool, error)
- func (restorer *DPNGlacierRestoreInit) SendToDownloadQueue(helper *DPNRestoreHelper)
- type DPNIngestRecorder
- type DPNIngestStorer
- type DPNPackager
- type DPNQueue
- type DPNReplicationStorer
- type DPNRestoreHelper
- type DPNS3Retriever
- func (fetcher *DPNS3Retriever) DownloadFile(helper *DPNRestoreHelper)
- func (fetcher *DPNS3Retriever) FinishWithError(helper *DPNRestoreHelper)
- func (fetcher *DPNS3Retriever) FinishWithSuccess(helper *DPNRestoreHelper)
- func (retriever *DPNS3Retriever) HandleMessage(message *nsq.Message) error
- func (fetcher *DPNS3Retriever) SendToFixityQueue(helper *DPNRestoreHelper)
- type DPNSync
- func (dpnSync *DPNSync) GetAllNodes() ([]*models.Node, error)
- func (dpnSync *DPNSync) LocalNodeName() string
- func (dpnSync *DPNSync) RemoteNodeNames() []string
- func (dpnSync *DPNSync) Run() bool
- func (dpnSync *DPNSync) SyncBags(node *models.Node)
- func (dpnSync *DPNSync) SyncDigests(remoteNode *models.Node)
- func (dpnSync *DPNSync) SyncEverythingFromNode(remoteNode *models.Node)
- func (dpnSync *DPNSync) SyncFixities(remoteNode *models.Node)
- func (dpnSync *DPNSync) SyncIngests(bag *models.DPNBag)
- func (dpnSync *DPNSync) SyncMembers(remoteNode *models.Node)
- func (dpnSync *DPNSync) SyncNode(remoteNode *models.Node)
- func (dpnSync *DPNSync) SyncReplicationRequests(remoteNode *models.Node)
- func (dpnSync *DPNSync) SyncRestoreRequests(remoteNode *models.Node)
- type DPNValidator
Constants ¶
const DAYS_TO_KEEP_IN_S3 = 3
Keep the files in S3 up to X days, in case we're having system problems and we need to attempt the restore multiple times. We'll have other processes clean out the S3 bucket when necessary.
const HOURS_BETWEEN_CHECKS = 2
After a Glacier restore request has been accepted, we will check S3 periodically to see if the item has been restored. This is the interval between checks.
const MAX_FIXITY_CHECKS_PER_RUN = 20
As we're still in the probationary period, limit the number fixity checks queued on each run.
const MINUTES_BETWEEN_RETRIES = 3
If S3 download fails with a non-fatal error, how many minutes should we wait before trying again?
const RETRIEVAL_OPTION = "Bulk"
Standard retrieval is 3-5 hours. Bulk is 5-12 hours, and is cheaper. There's no rush on DPN fixity checking, so use the cheaper option. https://docs.aws.amazon.com/amazonglacier/latest/dev/downloading-an-archive-two-steps.html#api-downloading-an-archive-two-steps-retrieval-options For retrieval pricing, see https://aws.amazon.com/glacier/pricing/
const SYNC_BATCH_SIZE = 50
SYNC_BATCH_SIZE describes how many records should request per page from remote nodes when we're synching bags, replication requests, etc.
Variables ¶
var DO_NOT_REPLICATE_TO = map[string]string{
"7277cbab-d539-4a81-ac1e-70cefc28fb2e": "hathi",
}
DO_NOT_REPLICATE_TO describes which nodes we should NOT replicate member content to. The key is the member uuid, and the value is the node to avoid.
Functions ¶
func EnsureItemIsMarkedCancelled ¶
func EnsureItemIsMarkedCancelled(_context *context.Context, manifest *models.ReplicationManifest)
EnsureItemIsMarkedCancelled makes sure a stored replication is marked as cancelled in the DPNWorkItems table.
func EnsureItemIsMarkedComplete ¶
func EnsureItemIsMarkedComplete(_context *context.Context, manifest *models.ReplicationManifest)
EnsureItemIsMarkedComplete makes sure a stored replication is marked as complete in the DPNWorkItems table. Normally, the workers handle this, but in some cases, a previous store attempt may have succeeded and then failed to update the DPNWorkItems table. That's usually the result of a message timeout or restarting a service.
func GetDPNBag ¶
func GetDPNBag(dpnClient *network.DPNRestClient, manifest *models.ReplicationManifest, workSummary *apt_models.WorkSummary)
GetDPNBag gets the bag record fom the DPN REST server that describes the bag we are being asked to copy. Param _context is a context object, manifest is a ReplicationManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on copy, workSummary should be manifest.CopySummary; on validation, it should be manifest.ValidationSummary; and on store it should be manifest.StoreSummary.
func GetDPNWorkItem ¶
func GetDPNWorkItem(_context *context.Context, manifest *models.ReplicationManifest, workSummary *apt_models.WorkSummary)
GetDPNWorkItem fetches the DPNWorkItem associated with this message and attaches it to the manifest.
Param _context is a context object, manifest is a ReplicationManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on copy, workSummary should be manifest.CopySummary; on validation, it should be manifest.ValidationSummary; and on store it should be manifest.StoreSummary.
func GetRsyncCommand ¶
GetRsyncCommand returns a command object for copying from the remote location to the local filesystem. The copy is done via rsync over ssh, and the command will capture stdout and stderr. The copyFrom param should be a valid scp target in this format:
remoteuser@remotehost:/remote/dir/bag.tar
The copyTo param should be an absolute path on a locally-accessible file system, such as:
/mnt/dpn/data/bag.tar
Using this assumes a few things:
- You have rsync installed.
- You have an ssh client installed.
- You have an entry in your ~/.ssh/config file specifying connection and key information for the remote host.
Usage:
command := GetRsyncCommand("aptrust@tdr:bag.tar", "/mnt/dpn/bag.tar") err := command.Run()
if err != nil { ... do something ... }
-- OR --
output, err := command.CombinedOutput()
if err != nil { fmt.Println(err.Error()) fmt.Println(string(output)) }
func GetWorkItem ¶
func GetWorkItem(_context *context.Context, manifest *models.DPNIngestManifest, workSummary *apt_models.WorkSummary)
GetWorkItem fetches the WorkItem associated with this message and attaches it to the manifest.
Param _context is a context object, manifest is an IngestManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on package, workSummary should be manifest.PackageSummary; on store it should be manifest.StoreSummary, and on record, it should be manifest.RecordSummary.
func GetWorkItemState ¶
func GetWorkItemState(_context *context.Context, manifest *models.DPNIngestManifest, workSummary *apt_models.WorkSummary)
GetWorkItemState fetches the WorkItemState associated with this message and attaches it to the manifest.
Param _context is a context object, manifest is an IngestManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on package, workSummary should be manifest.PackageSummary; on store it should be manifest.StoreSummary, and on record, it should be manifest.RecordSummary.
func GetXferRequest ¶
func GetXferRequest(dpnClient *network.DPNRestClient, manifest *models.ReplicationManifest, workSummary *apt_models.WorkSummary)
GetXferRequest gets the ReplicationTransfer request from the DPN REST server that describes the replication we're about to perform. Param _context is a context object, manifest is a ReplicationManifest, and workSummary should be the WorkSummary pertinent to the current operation. So, on copy, workSummary should be manifest.CopySummary; on validation, it should be manifest.ValidationSummary; and on store it should be manifest.StoreSummary.
func LoadDPNBagValidationConfig ¶
func LoadDPNBagValidationConfig(_context *context.Context) *validation.BagValidationConfig
LoadBagValidationConfig loads the bag validation config file specified in the general config options. This will die if the bag validation config cannot be loaded or is invalid.
func LogIngestJson ¶
func LogIngestJson(manifest *models.DPNIngestManifest, jsonLog *log.Logger)
LogIntestJson dumps the WorkItemState.State into the JSON log, surrounded by markers that make it easy to find. This log gets big.
func LogReplicationJson ¶
func LogReplicationJson(manifest *models.ReplicationManifest, jsonLog *log.Logger)
LogReplicationJson dumps the WorkItemState.State into the JSON log, surrounded by markers that make it easy to find. This log gets big.
func PushToQueue ¶
func PushToQueue(_context *context.Context, manifest *models.DPNIngestManifest, activeSummary *apt_models.WorkSummary, queueTopic string)
PushToQueue pushes a WorkItem into the specified NSQ topic.
func ReserveSpaceOnVolume ¶
func ReserveSpaceOnVolume(_context *context.Context, manifest *models.ReplicationManifest) bool
reserveSpaceOnVolume does just what it says. Make sure we have space to copy this item from the remote node. We will be validating this bag in a later step without untarring it, so we just have to reserve enough room for the tar file.
func SaveDPNWorkItemState ¶
func SaveDPNWorkItemState(_context *context.Context, manifest *models.ReplicationManifest, workSummary *apt_models.WorkSummary)
SaveDPNWorkItemState saves the manifest.DPNWorkItem to Pharos, after it's State property to a JSON serialization of the manifest.
func SaveWorkItem ¶
func SaveWorkItem(_context *context.Context, manifest *models.DPNIngestManifest, workSummary *apt_models.WorkSummary)
SaveWorkItem saves the WorkItem in the manifest to Pharos. Param workSummary should be the WorkSummary from the manifest for the current stage of processing.
func SaveWorkItemState ¶
func SaveWorkItemState(_context *context.Context, manifest *models.DPNIngestManifest, activeSummary *apt_models.WorkSummary)
SaveWorkItemState sends a copy of this processes' WorkItemState back to Pharos. It also dumps the ingest manifest to the JSON log.
Param activeSummary will change, depending on what stage of processing we're in. It could be the DPNIngestState.PackageSummary, DPNIngestState.StoreSummary, etc.
func SetupIngestManifest ¶
func SetupIngestManifest(message *nsq.Message, stage string, _context *context.Context, includeIntelObj bool) *models.DPNIngestManifest
SetupIngestManifest loads the existing DPNIngestManifest associated with the NSQ message, or creates a new one if necessary. Param message should be the NSQ message we're working on. Param stage should be one of "package", "store" or "record". Param _context is the context of the worker calling this fuction. The caller should check for errors in the manifest's Package, Store or Record summary (whichever is the current stage) before proceeding. If param includeIntelObj is true, this will load the IntellectualObject record from Pharos, which can be an expensive operation. The IntellectualObject is required for dpn_packager, and is not used at all in dpn_ingest_store or dpn_record.
func SetupReplicationManifest ¶
func SetupReplicationManifest(message *nsq.Message, stage string, _context *context.Context, localClient *network.DPNRestClient, remoteClients map[string]*network.DPNRestClient) *models.ReplicationManifest
SetupReplicationManifest loads the existing ReplicationManifest associated with the NSQ message, or creates a new one if necessary. Param message should be the NSQ message we're working on. Param stage should be one of "copy", "validate" or "store". Param _context is the context of the worker calling this fuction.
func UpdateReplicationTransfer ¶
func UpdateReplicationTransfer(_context *context.Context, remoteClient *network.DPNRestClient, manifest *models.ReplicationManifest)
UpdateReplicationTransfer updates manifest.ReplicationTransfer at the remote DPN node that remoteClient is connected to. That must be the FromNode of the ReplicationTransfer.
Types ¶
type DPNCopier ¶
type DPNCopier struct { CopyChannel chan *models.ReplicationManifest ChecksumChannel chan *models.ReplicationManifest PostProcessChannel chan *models.ReplicationManifest Context *context.Context LocalClient *network.DPNRestClient RemoteClients map[string]*network.DPNRestClient }
DPNCopier copies tarred bags from other nodes via rsync. This is used when replicating content from other nodes. For putting together DPN bags from APTrust files, see fetcher.go.
func NewDPNCopier ¶
NewDPNCopier returns a new DPNCopier object.
func (*DPNCopier) HandleMessage ¶
HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.
type DPNFixityChecker ¶
type DPNFixityChecker struct { Context *context.Context LocalDPNRestClient *network.DPNRestClient ValidationChannel chan *DPNRestoreHelper RecordChannel chan *DPNRestoreHelper CleanupChannel chan *DPNRestoreHelper // PreTestChannel is used in testing only to set some properties // on the helper/manifest. PreTestChannel should push directly into // the ValidationChannel. PreTestChannel chan *DPNRestoreHelper // PostTestChannel is for testing only. It allows us to inspect the // state of our helper and manifest when processing completes. PostTestChannel chan *DPNRestoreHelper BagValidationConfig *validation.BagValidationConfig }
func NewDPNFixityChecker ¶
func NewDPNFixityChecker(_context *context.Context) (*DPNFixityChecker, error)
NewDPNFixityChecker creates a new DPNFixityChecker.
func (*DPNFixityChecker) FinishWithError ¶
func (checker *DPNFixityChecker) FinishWithError(helper *DPNRestoreHelper)
func (*DPNFixityChecker) FinishWithSuccess ¶
func (checker *DPNFixityChecker) FinishWithSuccess(helper *DPNRestoreHelper)
func (*DPNFixityChecker) HandleMessage ¶
func (checker *DPNFixityChecker) HandleMessage(message *nsq.Message) error
HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.
func (*DPNFixityChecker) SaveFixityRecord ¶
func (checker *DPNFixityChecker) SaveFixityRecord(helper *DPNRestoreHelper)
Save the fixity record to the local DPN REST server.
func (*DPNFixityChecker) ValidateBag ¶
func (checker *DPNFixityChecker) ValidateBag(helper *DPNRestoreHelper)
DPN fixity check requires us to validate the entire bag and extract the sha256 checksum of tagmanifest-sha256.txt file. If the bag is valid, and the fixity value of the tag manifest matches what's in the DPN registry, the fixity check passes. DPN currently has no way of recording that a fixity check has failed, other than a human looking at the records. We can record the DPNWorkItem as failed in Pharos.
type DPNGlacierRestoreInit ¶
type DPNGlacierRestoreInit struct { // Context includes logging, config, network connections, and // other general resources for the worker. Context *context.Context // LocalDPNRestClient lets us talk to our local DPN server. LocalDPNRestClient *dpn_network.DPNRestClient // RequestChannel is for requesting an item be moved from Glacier // into S3. RequestChannel chan *DPNRestoreHelper // CleanupChannel is for housekeeping, like updating NSQ. CleanupChannel chan *DPNRestoreHelper // PostTestChannel is for testing only. In production, nothing listens // on this channel. PostTestChannel chan *DPNRestoreHelper // S3Url is a custom URL that the S3 client should connect to. // We use this only in testing, when we want the client to talk // to a local test server. This should not be set in demo or // production. S3Url string }
Requests that an object be restored from Glacier to S3. This is the first step toward performing fixity checks on DPN bags, and restoring DPN bags, all of which are stored in Glacier.
func DPNNewGlacierRestoreInit ¶
func DPNNewGlacierRestoreInit(_context *context.Context) (*DPNGlacierRestoreInit, error)
func (*DPNGlacierRestoreInit) Cleanup ¶
func (restorer *DPNGlacierRestoreInit) Cleanup()
func (*DPNGlacierRestoreInit) FinishWithError ¶
func (restorer *DPNGlacierRestoreInit) FinishWithError(helper *DPNRestoreHelper)
func (*DPNGlacierRestoreInit) FinishWithSuccess ¶
func (restorer *DPNGlacierRestoreInit) FinishWithSuccess(helper *DPNRestoreHelper)
func (*DPNGlacierRestoreInit) HandleMessage ¶
func (restorer *DPNGlacierRestoreInit) HandleMessage(message *nsq.Message) error
This is the callback that NSQ workers use to handle messages from NSQ.
func (*DPNGlacierRestoreInit) InitializeRetrieval ¶
func (restorer *DPNGlacierRestoreInit) InitializeRetrieval(helper *DPNRestoreHelper)
func (*DPNGlacierRestoreInit) RequestRestore ¶
func (restorer *DPNGlacierRestoreInit) RequestRestore()
func (*DPNGlacierRestoreInit) RestoreRequestNeeded ¶
func (restorer *DPNGlacierRestoreInit) RestoreRequestNeeded(helper *DPNRestoreHelper) (bool, error)
func (*DPNGlacierRestoreInit) SendToDownloadQueue ¶
func (restorer *DPNGlacierRestoreInit) SendToDownloadQueue(helper *DPNRestoreHelper)
type DPNIngestRecorder ¶
type DPNIngestRecorder struct { RecordChannel chan *models.DPNIngestManifest PostProcessChannel chan *models.DPNIngestManifest Context *context.Context LocalClient *network.DPNRestClient RemoteClients map[string]*network.DPNRestClient }
DPNIngestRecorder records information about locally-ingested DPN bags in both APTrust and DPN.
func NewDPNIngestRecorder ¶
func NewDPNIngestRecorder(_context *context.Context) (*DPNIngestRecorder, error)
NewDPNIngestRecord returns a new DPNIngestRecorder.
func (*DPNIngestRecorder) HandleMessage ¶
func (recorder *DPNIngestRecorder) HandleMessage(message *nsq.Message) error
HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.
type DPNIngestStorer ¶
type DPNIngestStorer struct { StoreChannel chan *models.DPNIngestManifest PostProcessChannel chan *models.DPNIngestManifest Context *context.Context LocalClient *network.DPNRestClient RemoteClients map[string]*network.DPNRestClient }
DPNIngestStorer copies bags ingested from APTrust into Glacier long-term storage.
func NewDPNIngestStorer ¶
func NewDPNIngestStorer(_context *context.Context) (*DPNIngestStorer, error)
NewDPNIngestStorer returns a new DPNIngestStorer object.
func (*DPNIngestStorer) HandleMessage ¶
func (storer *DPNIngestStorer) HandleMessage(message *nsq.Message) error
HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.
type DPNPackager ¶
type DPNPackager struct { PackageChannel chan *models.DPNIngestManifest TarChannel chan *models.DPNIngestManifest ValidationChannel chan *models.DPNIngestManifest PostProcessChannel chan *models.DPNIngestManifest BagValidationConfig *validation.BagValidationConfig Context *context.Context LocalClient *network.DPNRestClient RemoteClients map[string]*network.DPNRestClient }
DPNPackager repackages APTrust bags as DPN bags so they can be copied into DPN.
func NewDPNPackager ¶
func NewDPNPackager(_context *context.Context) (*DPNPackager, error)
NewDPNPackager creates a new DPNPackager.
func (*DPNPackager) HandleMessage ¶
func (packager *DPNPackager) HandleMessage(message *nsq.Message) error
HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.
type DPNQueue ¶
type DPNQueue struct { // LocalClient is the DPN REST client that talks to our own // local DPN REST server. LocalClient *network.DPNRestClient // RemoteNodes is a map of remote nodes. Key is the namespace // and value is the node. RemoteNodes map[string]*models.Node // RemoteClients is a collection of clients that talk to the // DPN REST servers on other nodes. The key is the namespace // of the remote node, and the value is the client that talks // to that node. RemoteClients map[string]*network.DPNRestClient // Context provides access to information about our environment // and config settings, and access to basic services like // logging and a Pharos client. Context *context.Context // ExamineItemsSince is a timestamp. We will examine any items // updated since this timestamp to see if they need to be queued. ExamineItemsSince time.Time // QueueResult contains information about which items were // queued during this run of the program. QueueResult *models.QueueResult }
DPNQueue queues DPN ingest requests (found in the Pharos WorkItems table) and DPN replication requests (found in the Pharos DPNWorkItems table). These items will go into the proper NSQ topics for DPN ingest or replication.
func NewDPNQueue ¶
NewDPNQueue creates a new DPNQueue object. Param _context is a Context object, and param hours tells the code to examine all Replication, Restore and DPN Ingest requests from the past N hours.
type DPNReplicationStorer ¶
type DPNReplicationStorer struct { StoreChannel chan *models.ReplicationManifest PostProcessChannel chan *models.ReplicationManifest Context *context.Context LocalClient *network.DPNRestClient RemoteClients map[string]*network.DPNRestClient }
DPNReplicationStorer copies replicated bags from our staging area to Glacier long-term storage. We only copy bags that have been validated.
func NewDPNReplicationStorer ¶
func NewDPNReplicationStorer(_context *context.Context) (*DPNReplicationStorer, error)
NewDPNReplicationStorer creates a new DPNReplicationStorer object.
func (*DPNReplicationStorer) HandleMessage ¶
func (storer *DPNReplicationStorer) HandleMessage(message *nsq.Message) error
HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.
type DPNRestoreHelper ¶
type DPNRestoreHelper struct { Manifest *models.DPNRetrievalManifest WorkSummary *apt_models.WorkSummary // contains filtered or unexported fields }
func NewDPNRestoreHelper ¶
func NewDPNRestoreHelper(message *nsq.Message, _context *context.Context, dpnRestClient *network.DPNRestClient, action, summaryName string) (*DPNRestoreHelper, error)
func (*DPNRestoreHelper) FileExistsAndIsComplete ¶
func (helper *DPNRestoreHelper) FileExistsAndIsComplete() bool
func (*DPNRestoreHelper) SaveDPNWorkItem ¶
func (helper *DPNRestoreHelper) SaveDPNWorkItem()
type DPNS3Retriever ¶
type DPNS3Retriever struct { // Context includes logging, config, network connections, and // other general resources for the worker. Context *context.Context // LocalDPNRestClient lets us talk to our local DPN server. LocalDPNRestClient *dpn_network.DPNRestClient // FetchChannel is for fetching files from S3. FetchChannel chan *DPNRestoreHelper // CleanupChannel is for post-fetch processing. CleanupChannel chan *DPNRestoreHelper // PostTestChannel is for testing only. In production, nothing listens // on this channel. PostTestChannel chan *DPNRestoreHelper }
Fetches from S3 to local storage.
func NewDPNS3Retriever ¶
func NewDPNS3Retriever(_context *context.Context) (*DPNS3Retriever, error)
func (*DPNS3Retriever) DownloadFile ¶
func (fetcher *DPNS3Retriever) DownloadFile(helper *DPNRestoreHelper)
func (*DPNS3Retriever) FinishWithError ¶
func (fetcher *DPNS3Retriever) FinishWithError(helper *DPNRestoreHelper)
func (*DPNS3Retriever) FinishWithSuccess ¶
func (fetcher *DPNS3Retriever) FinishWithSuccess(helper *DPNRestoreHelper)
func (*DPNS3Retriever) HandleMessage ¶
func (retriever *DPNS3Retriever) HandleMessage(message *nsq.Message) error
This is the callback that NSQ workers use to handle messages from NSQ.
func (*DPNS3Retriever) SendToFixityQueue ¶
func (fetcher *DPNS3Retriever) SendToFixityQueue(helper *DPNRestoreHelper)
type DPNSync ¶
type DPNSync struct { // LocalClient is the DPN REST client that talks to our own // local DPN REST server. LocalClient *network.DPNRestClient // RemoteNodes is a map of remote nodes. Key is the namespace // and value is the node. RemoteNodes map[string]*models.Node // RemoteClients is a collection of clients that talk to the // DPN REST servers on other nodes. The key is the namespace // of the remote node, and the value is the client that talks // to that node. RemoteClients map[string]*network.DPNRestClient // Context provides access to information about our environment // and config settings, and access to basic services like // logging and a Pharos client. Context *context.Context // Results contains information about the results of the sync // operations with each node. Key is the node namespace, // value is the SyncResult object for that node. Results map[string]*models.SyncResult }
DPNSync copies data from remote DPN nodes to our local DPN node. Data includes information about bags, replication transfers, etc. Each node is the authority on bags where they are listed as the admin node, so when synching from DPN node X, we ask for all bags where X is the admin node, as well as all ReplicationTransfers, RestoreTransfers, FixityChecks, etc. We do NOT ask node X for info about or related to bags whose admin node is Y or Z.
func NewDPNSync ¶
NewDPNSync creates a new DPNSync object.
func (*DPNSync) GetAllNodes ¶
GetAllNodes returns a list of all the nodes that our node knows about.
func (*DPNSync) LocalNodeName ¶
LocalNodeName returns the namespace of our local DPN node.
func (*DPNSync) RemoteNodeNames ¶
RemoteNodeNames returns the namespaces of all known remote DPN nodes.
func (*DPNSync) Run ¶
Run runs all sync operations against all nodes. This is the only function your cron job needs to call. The boolean return value will be true if all sync operations completed without error, false otherwise. For errors, check the log.
func (*DPNSync) SyncBags ¶
SyncBags syncs bags from the specified node to our own local DPN registry if the bags match these critieria:
1. The node we are querying is the admin node for the bag. 2. The bag was updated since the last time we queried the node.
Returns a list of the bags that were successfully updated. Even on error, this may still return a list with whatever bags were updated before the error occurred.
func (*DPNSync) SyncDigests ¶
func (*DPNSync) SyncEverythingFromNode ¶
SyncEverythingFromNode syncs all bags, replication requests and restore requests from the specified remote node. Note that this is a pull-only sync.We are not writing any data to other nodes, just reading what they have and updating our own registry with their info.
func (*DPNSync) SyncFixities ¶
func (*DPNSync) SyncIngests ¶
func (*DPNSync) SyncMembers ¶
SyncMembers copies remote member records to our own node. This does not update existing records, it only creates new ones.
func (*DPNSync) SyncNode ¶
SyncNode copies the latest node record from the node itself to our DPN registry. E.g. It copies the SDR record from SDR to us, but only if the remote record is newer.
func (*DPNSync) SyncReplicationRequests ¶
SyncReplicationRequests copies ReplicationTransfer records from remote nodes to our own local node.
func (*DPNSync) SyncRestoreRequests ¶
SyncRestoreRequests copies RestoreTransfer records from remote nodes to our local node.
type DPNValidator ¶
type DPNValidator struct { ValidationChannel chan *dpn_models.ReplicationManifest PostProcessChannel chan *dpn_models.ReplicationManifest BagValidationConfig *validation.BagValidationConfig Context *context.Context LocalClient *network.DPNRestClient RemoteClients map[string]*network.DPNRestClient }
DPNValidator validates DPN bags (tar files) before we send them off to long-term storage.
func NewDPNValidator ¶
func NewDPNValidator(_context *context.Context) (*DPNValidator, error)
NewDPNValidator returns a new DPNValidator object.
func (*DPNValidator) HandleMessage ¶
func (validator *DPNValidator) HandleMessage(message *nsq.Message) error
HandleMessage is the NSQ message handler. The NSQ consumer will pass each message in the subscribed channel to this function.