Documentation ¶
Overview ¶
Package workflow contains an experimental workflow for Archivemica transfers.
It's not generalized since it contains client-specific activities. However, the long-term goal is to build a system where workflows and activities are dynamically set up based on user input.
Index ¶
- Constants
- func HeartbeatTimeoutError(err error, details interface{}) bool
- type BundleActivity
- func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, ...) (string, string, error)
- func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityParams) (*BundleActivityResult, error)
- func (a *BundleActivity) NameKind(key string) (name, kind string)
- func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, key, tempFile string) (string, error)
- func (a *BundleActivity) Unarchiver(key, filename string) archiver.Unarchiver
- type BundleActivityParams
- type BundleActivityResult
- type BundleInfo
- type CleanUpActivity
- type CleanUpActivityParams
- type DeleteOriginalActivity
- type DownloadActivity
- type HidePackageActivity
- type Manager
- type PollIngestActivity
- type PollIngestActivityParams
- type PollTransferActivity
- type PollTransferActivityParams
- type ProcessingWorkflow
- type TransferActivity
- type TransferActivityParams
- type TransferActivityResponse
- type TransferInfo
- type UpdateHARIActivity
- type UpdateHARIActivityParams
- type UpdateProductionSystemActivity
- type UpdateProductionSystemActivityParams
Constants ¶
const ( DownloadActivityName = "download-activity" BundleActivityName = "bundle-activity" TransferActivityName = "transfer-activity" PollTransferActivityName = "poll-transfer-activity" PollIngestActivityName = "poll-ingest-activity" UpdateHARIActivityName = "update-hari-activity" UpdateProductionSystemActivityName = "update-production-system-activity" CleanUpActivityName = "clean-up-activity" HidePackageActivityName = "hide-package-activity" DeleteOriginalActivityName = "delete-original-activity" )
const NRE = "non retryable error"
Variables ¶
This section is empty.
Functions ¶
func HeartbeatTimeoutError ¶
Types ¶
type BundleActivity ¶ added in v0.5.1
type BundleActivity struct{}
func NewBundleActivity ¶ added in v0.5.1
func NewBundleActivity() *BundleActivity
func (*BundleActivity) Bundle ¶ added in v0.5.1
func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, transferDir, key, tempFile string, stripTopLevelDir bool) (string, string, error)
Bundle a transfer with the contents found in the archive.
func (*BundleActivity) Execute ¶ added in v0.5.1
func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityParams) (*BundleActivityResult, error)
func (*BundleActivity) NameKind ¶ added in v0.5.1
func (a *BundleActivity) NameKind(key string) (name, kind string)
Name the transfer.
func (*BundleActivity) SingleFile ¶ added in v0.5.1
func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, key, tempFile string) (string, error)
SingleFile bundles a transfer with the downloaded blob in it.
func (*BundleActivity) Unarchiver ¶ added in v0.5.1
func (a *BundleActivity) Unarchiver(key, filename string) archiver.Unarchiver
Unarchiver returns the unarchiver suited for the archival format.
type BundleActivityParams ¶ added in v0.5.1
type BundleActivityResult ¶ added in v0.5.1
type BundleActivityResult struct { Name string // Name of the transfer. Kind string // Client specific, obtained from name, e.g. "DPJ-SIP". RelPath string // Path of the transfer relative to the transfer directory. FullPath string // Full path to the transfer in the worker running the session. FullPathBeforeStrip string // Same as FullPath but includes the top-level dir even when stripped. }
type BundleInfo ¶ added in v0.5.1
type BundleInfo struct { }
type CleanUpActivity ¶
type CleanUpActivity struct {
// contains filtered or unexported fields
}
CleanUpActivity removes the contents that we've created in the TS location.
func NewCleanUpActivity ¶
func NewCleanUpActivity(m *Manager) *CleanUpActivity
func (*CleanUpActivity) Execute ¶
func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error
type CleanUpActivityParams ¶ added in v0.5.1
type CleanUpActivityParams struct {
FullPath string
}
type DeleteOriginalActivity ¶ added in v0.4.0
type DeleteOriginalActivity struct {
// contains filtered or unexported fields
}
func NewDeleteOriginalActivity ¶ added in v0.4.0
func NewDeleteOriginalActivity(m *Manager) *DeleteOriginalActivity
type DownloadActivity ¶
type DownloadActivity struct {
// contains filtered or unexported fields
}
DownloadActivity downloads the blob into the pipeline processing directory.
func NewDownloadActivity ¶
func NewDownloadActivity(m *Manager) *DownloadActivity
type HidePackageActivity ¶ added in v0.2.0
type HidePackageActivity struct {
// contains filtered or unexported fields
}
func NewHidePackageActivity ¶ added in v0.2.0
func NewHidePackageActivity(m *Manager) *HidePackageActivity
type Manager ¶
type Manager struct { Logger logr.Logger Collection collection.Service Watcher watcher.Service Pipelines *pipeline.Registry Hooks map[string]map[string]interface{} }
Manager carries workflow and activity dependencies.
type PollIngestActivity ¶
type PollIngestActivity struct {
// contains filtered or unexported fields
}
func NewPollIngestActivity ¶
func NewPollIngestActivity(m *Manager) *PollIngestActivity
func (*PollIngestActivity) Execute ¶
func (a *PollIngestActivity) Execute(ctx context.Context, params *PollIngestActivityParams) (time.Time, error)
type PollIngestActivityParams ¶ added in v0.5.2
type PollTransferActivity ¶
type PollTransferActivity struct {
// contains filtered or unexported fields
}
func NewPollTransferActivity ¶
func NewPollTransferActivity(m *Manager) *PollTransferActivity
func (*PollTransferActivity) Execute ¶
func (a *PollTransferActivity) Execute(ctx context.Context, params *PollTransferActivityParams) (string, error)
type PollTransferActivityParams ¶ added in v0.5.2
type ProcessingWorkflow ¶
type ProcessingWorkflow struct {
// contains filtered or unexported fields
}
func NewProcessingWorkflow ¶
func NewProcessingWorkflow(m *Manager) *ProcessingWorkflow
func (*ProcessingWorkflow) Execute ¶
func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.ProcessingWorkflowRequest) error
ProcessingWorkflow orchestrates all the activities related to the processing of a SIP in Archivematica, including is retrieval, creation of transfer, etc...
Retrying this workflow would result in a new Archivematica transfer. We do not have a retry policy in place. The user could trigger a new instance via the API.
func (*ProcessingWorkflow) SessionHandler ¶
func (w *ProcessingWorkflow) SessionHandler(ctx workflow.Context, sessCtx workflow.Context, tinfo *TransferInfo) error
type TransferActivity ¶
type TransferActivity struct {
// contains filtered or unexported fields
}
TransferActivity submits the transfer to Archivematica and returns its ID.
This is our first interaction with Archivematica. The workflow ends here after authentication errors.
func NewTransferActivity ¶
func NewTransferActivity(m *Manager) *TransferActivity
func (*TransferActivity) Execute ¶
func (a *TransferActivity) Execute(ctx context.Context, params *TransferActivityParams) (*TransferActivityResponse, error)
type TransferActivityParams ¶ added in v0.5.2
type TransferActivityResponse ¶ added in v0.7.0
type TransferInfo ¶
type TransferInfo struct { // TempFile is the temporary location where the blob is downloaded. // // It is populated by the workflow with the result of DownloadActivity. TempFile string // TransferID given by Archivematica. // // It is populated by TransferActivity. TransferID string // SIPID given by Archivematica. // // It is populated by PollTransferActivity. SIPID string // Enduro internal collection ID. // // It is populated via the workflow request or createPackageLocalActivity. CollectionID uint // Original watcher event. // // It is populated via the workflow request. Event *watcher.BlobEvent // OriginalID is the UUID found in the key of the blob. It can be empty. // // It is populated from the workflow (deterministically). OriginalID string // Status of the collection. // // It is populated from the workflow (deterministically) Status collection.Status // StoredAt is the time when the AIP is stored. // // It is populated by PollIngestActivity as long as Ingest completes. StoredAt time.Time // PipelineConfig is the configuration of the pipeline that this workflow // uses to provide access to its activities. // // It is populated by loadConfigLocalActivity. PipelineConfig *pipeline.Config // PipelineID is the UUID of the Archivematica pipeline. Extracted from // the API response header when the transfer is submitted. // // It is populated by transferActivity. PipelineID string // Hooks is the hook config store. // // It is populated by loadConfigLocalActivity. Hooks map[string]map[string]interface{} // Information about the bundle (transfer) that we submit to Archivematica. // Full path, relative path, name, kind... // // It is populated by BundleActivity. Bundle BundleActivityResult }
TransferInfo is shared state that is passed down to activities. It can be useful for hooks that may require quick access to processing state. TODO: clean this up, e.g.: it can embed a collection.Collection.
type UpdateHARIActivity ¶
type UpdateHARIActivity struct {
// contains filtered or unexported fields
}
func NewUpdateHARIActivity ¶
func NewUpdateHARIActivity(m *Manager) *UpdateHARIActivity
func (UpdateHARIActivity) Execute ¶
func (a UpdateHARIActivity) Execute(ctx context.Context, params *UpdateHARIActivityParams) error
type UpdateHARIActivityParams ¶ added in v0.14.2
type UpdateProductionSystemActivity ¶
type UpdateProductionSystemActivity struct {
// contains filtered or unexported fields
}
func NewUpdateProductionSystemActivity ¶
func NewUpdateProductionSystemActivity(m *Manager) *UpdateProductionSystemActivity
func (*UpdateProductionSystemActivity) Execute ¶
func (a *UpdateProductionSystemActivity) Execute(ctx context.Context, params *UpdateProductionSystemActivityParams) error