Documentation ¶
Overview ¶
Package workflow contains an experimental workflow for Archivemica transfers.
It's not generalized since it contains client-specific activities. However, the long-term goal is to build a system where workflows and activities are dynamically set up based on user input.
Index ¶
- Constants
- func HeartbeatTimeoutError(err error, details interface{}) bool
- type CleanUpActivity
- type DownloadActivity
- type Manager
- type PollIngestActivity
- type PollTransferActivity
- type ProcessingWorkflow
- type TransferActivity
- type TransferInfo
- type UpdateHARIActivity
- type UpdateProductionSystemActivity
Constants ¶
const ( DownloadActivityName = "download-activity" TransferActivityName = "transfer-activity" PollTransferActivityName = "poll-transfer-activity" PollIngestActivityName = "poll-ingest-activity" UpdateHARIActivityName = "update-hari-activity" UpdateProductionSystemActivityName = "update-production-system-activity" CleanUpActivityName = "clean-up-activity" )
const NRE = "non retryable error"
Variables ¶
This section is empty.
Functions ¶
func HeartbeatTimeoutError ¶
Types ¶
type CleanUpActivity ¶
type CleanUpActivity struct {
// contains filtered or unexported fields
}
func NewCleanUpActivity ¶
func NewCleanUpActivity(m *Manager) *CleanUpActivity
func (*CleanUpActivity) Execute ¶
func (a *CleanUpActivity) Execute(ctx context.Context, tinfo *TransferInfo) error
type DownloadActivity ¶
type DownloadActivity struct {
// contains filtered or unexported fields
}
func NewDownloadActivity ¶
func NewDownloadActivity(m *Manager) *DownloadActivity
func (*DownloadActivity) Execute ¶
func (a *DownloadActivity) Execute(ctx context.Context, tinfo *TransferInfo) (*TransferInfo, error)
Execute downloads the submitted package.
This implementation is client-specific for now. It needs to be cleaned up and generalized.
We're making the following assumptions:
* The blob is a file using the tar or zip archival formats. * Expected blob key: `DPJ-SIP-<uuid>.tar`. * AVLXML file found at: `DPJ-SIP-<uuid>.tar:/<uuid>/DPJ/journal/<uuid>.xml`. * The contents are submitted to Archivematica as-is.
A broader implementation could try to identify the format of the package and submit to Archivematica without extracting th econtents.
type Manager ¶
type Manager struct { Logger logr.Logger Collection collection.Service Watcher watcher.Service Pipelines *pipeline.PipelineRegistry Hooks map[string]map[string]interface{} }
Manager carries workflow and activity dependencies.
func NewManager ¶
func NewManager(logger logr.Logger, colsvc collection.Service, wsvc watcher.Service, pipelines *pipeline.PipelineRegistry, hooks map[string]map[string]interface{}) *Manager
NewManager returns a pointer to a new Manager.
type PollIngestActivity ¶
type PollIngestActivity struct {
// contains filtered or unexported fields
}
func NewPollIngestActivity ¶
func NewPollIngestActivity(m *Manager) *PollIngestActivity
func (*PollIngestActivity) Execute ¶
func (a *PollIngestActivity) Execute(ctx context.Context, tinfo *TransferInfo) (*TransferInfo, error)
type PollTransferActivity ¶
type PollTransferActivity struct {
// contains filtered or unexported fields
}
func NewPollTransferActivity ¶
func NewPollTransferActivity(m *Manager) *PollTransferActivity
func (*PollTransferActivity) Execute ¶
func (a *PollTransferActivity) Execute(ctx context.Context, tinfo *TransferInfo) (*TransferInfo, error)
type ProcessingWorkflow ¶
type ProcessingWorkflow struct {
// contains filtered or unexported fields
}
func NewProcessingWorkflow ¶
func NewProcessingWorkflow(m *Manager) *ProcessingWorkflow
func (*ProcessingWorkflow) Execute ¶
func (w *ProcessingWorkflow) Execute(ctx workflow.Context, event *watcher.BlobEvent, collectionID uint) error
ProcessingWorkflow orchestrates all the activities related to the processing of a SIP in Archivematica, including is retrieval, creation of transfer, etc...
Retrying this workflow would result in a new Archivematica transfer. We do not have a retry policy in place. The user could trigger a new instance via the API.
func (*ProcessingWorkflow) SessionHandler ¶
func (w *ProcessingWorkflow) SessionHandler(ctx workflow.Context, sessCtx workflow.Context, tinfo *TransferInfo) error
type TransferActivity ¶
type TransferActivity struct {
// contains filtered or unexported fields
}
func NewTransferActivity ¶
func NewTransferActivity(m *Manager) *TransferActivity
func (*TransferActivity) Execute ¶
func (a *TransferActivity) Execute(ctx context.Context, tinfo *TransferInfo) (*TransferInfo, error)
type TransferInfo ¶
type TransferInfo struct { CollectionID uint // Enduro internal collection ID. Event *watcher.BlobEvent // Original watcher event. Name string // Name of the transfer. FullPath string // Path to the transfer directory in the pipeline. RelPath string // Path relative to transfer directory in the pipeline. ProcessingConfig string // Archivematica processing configuration. AutoApprove bool // Archivematica auto-approval setting. TransferID string // Transfer ID given by Archivematica. SIPID string // SIP ID given by Archivematica. StoredAt time.Time Status collection.Status OriginalID string // Client specific, obtained from name. Kind string // Client specific, obtained from name, e.g. "DPJ-SIP". }
TransferInfo is shared state that is passed down to activities. It can be useful for hooks that may require quick access to processing state. TODO: clean this up, e.g.: it can embed a collection.Collection.
type UpdateHARIActivity ¶
type UpdateHARIActivity struct {
// contains filtered or unexported fields
}
func NewUpdateHARIActivity ¶
func NewUpdateHARIActivity(m *Manager) *UpdateHARIActivity
func (UpdateHARIActivity) Execute ¶
func (a UpdateHARIActivity) Execute(ctx context.Context, tinfo *TransferInfo) error
type UpdateProductionSystemActivity ¶
type UpdateProductionSystemActivity struct {
// contains filtered or unexported fields
}
func NewUpdateProductionSystemActivity ¶
func NewUpdateProductionSystemActivity(m *Manager) *UpdateProductionSystemActivity
func (*UpdateProductionSystemActivity) Execute ¶
func (a *UpdateProductionSystemActivity) Execute(ctx context.Context, tinfo *TransferInfo) error