workflow

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2019 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package workflow contains an experimental workflow for Archivemica transfers.

It's not generalized since it contains client-specific activities. However, the long-term goal is to build a system where workflows and activities are dynamically set up based on user input.

Index

Constants

View Source
const (
	DownloadActivityName               = "download-activity"
	TransferActivityName               = "transfer-activity"
	PollTransferActivityName           = "poll-transfer-activity"
	PollIngestActivityName             = "poll-ingest-activity"
	UpdateHARIActivityName             = "update-hari-activity"
	UpdateProductionSystemActivityName = "update-production-system-activity"
	CleanUpActivityName                = "clean-up-activity"
)
View Source
const NRE = "non retryable error"

Variables

This section is empty.

Functions

func HeartbeatTimeoutError

func HeartbeatTimeoutError(err error, details interface{}) bool

Types

type CleanUpActivity

type CleanUpActivity struct {
	// contains filtered or unexported fields
}

func NewCleanUpActivity

func NewCleanUpActivity(m *Manager) *CleanUpActivity

func (*CleanUpActivity) Execute

func (a *CleanUpActivity) Execute(ctx context.Context, tinfo *TransferInfo) error

type DownloadActivity

type DownloadActivity struct {
	// contains filtered or unexported fields
}

func NewDownloadActivity

func NewDownloadActivity(m *Manager) *DownloadActivity

func (*DownloadActivity) Execute

func (a *DownloadActivity) Execute(ctx context.Context, tinfo *TransferInfo) (*TransferInfo, error)

Execute downloads the submitted package.

This implementation is client-specific for now. It needs to be cleaned up and generalized.

We're making the following assumptions:

* The blob is a file using the tar or zip archival formats. * Expected blob key: `DPJ-SIP-<uuid>.tar`. * AVLXML file found at: `DPJ-SIP-<uuid>.tar:/<uuid>/DPJ/journal/<uuid>.xml`. * The contents are submitted to Archivematica as-is.

A broader implementation could try to identify the format of the package and submit to Archivematica without extracting th econtents.

type Manager

type Manager struct {
	Logger     logr.Logger
	Collection collection.Service
	Watcher    watcher.Service
	Pipelines  *pipeline.PipelineRegistry
	Hooks      map[string]map[string]interface{}
}

Manager carries workflow and activity dependencies.

func NewManager

func NewManager(logger logr.Logger, colsvc collection.Service, wsvc watcher.Service, pipelines *pipeline.PipelineRegistry, hooks map[string]map[string]interface{}) *Manager

NewManager returns a pointer to a new Manager.

type PollIngestActivity

type PollIngestActivity struct {
	// contains filtered or unexported fields
}

func NewPollIngestActivity

func NewPollIngestActivity(m *Manager) *PollIngestActivity

func (*PollIngestActivity) Execute

func (a *PollIngestActivity) Execute(ctx context.Context, tinfo *TransferInfo) (*TransferInfo, error)

type PollTransferActivity

type PollTransferActivity struct {
	// contains filtered or unexported fields
}

func NewPollTransferActivity

func NewPollTransferActivity(m *Manager) *PollTransferActivity

func (*PollTransferActivity) Execute

func (a *PollTransferActivity) Execute(ctx context.Context, tinfo *TransferInfo) (*TransferInfo, error)

type ProcessingWorkflow

type ProcessingWorkflow struct {
	// contains filtered or unexported fields
}

func NewProcessingWorkflow

func NewProcessingWorkflow(m *Manager) *ProcessingWorkflow

func (*ProcessingWorkflow) Execute

func (w *ProcessingWorkflow) Execute(ctx workflow.Context, event *watcher.BlobEvent, collectionID uint) error

ProcessingWorkflow orchestrates all the activities related to the processing of a SIP in Archivematica, including is retrieval, creation of transfer, etc...

Retrying this workflow would result in a new Archivematica transfer. We do not have a retry policy in place. The user could trigger a new instance via the API.

func (*ProcessingWorkflow) SessionHandler

func (w *ProcessingWorkflow) SessionHandler(ctx workflow.Context, sessCtx workflow.Context, tinfo *TransferInfo) error

type TransferActivity

type TransferActivity struct {
	// contains filtered or unexported fields
}

func NewTransferActivity

func NewTransferActivity(m *Manager) *TransferActivity

func (*TransferActivity) Execute

func (a *TransferActivity) Execute(ctx context.Context, tinfo *TransferInfo) (*TransferInfo, error)

type TransferInfo

type TransferInfo struct {
	CollectionID     uint               // Enduro internal collection ID.
	Event            *watcher.BlobEvent // Original watcher event.
	Name             string             // Name of the transfer.
	FullPath         string             // Path to the transfer directory in the pipeline.
	RelPath          string             // Path relative to transfer directory in the pipeline.
	ProcessingConfig string             // Archivematica processing configuration.
	AutoApprove      bool               // Archivematica auto-approval setting.
	TransferID       string             // Transfer ID given by Archivematica.
	SIPID            string             // SIP ID given by Archivematica.
	StoredAt         time.Time
	Status           collection.Status

	OriginalID string // Client specific, obtained from name.
	Kind       string // Client specific, obtained from name, e.g. "DPJ-SIP".
}

TransferInfo is shared state that is passed down to activities. It can be useful for hooks that may require quick access to processing state. TODO: clean this up, e.g.: it can embed a collection.Collection.

type UpdateHARIActivity

type UpdateHARIActivity struct {
	// contains filtered or unexported fields
}

func NewUpdateHARIActivity

func NewUpdateHARIActivity(m *Manager) *UpdateHARIActivity

func (UpdateHARIActivity) Execute

func (a UpdateHARIActivity) Execute(ctx context.Context, tinfo *TransferInfo) error

type UpdateProductionSystemActivity

type UpdateProductionSystemActivity struct {
	// contains filtered or unexported fields
}

func NewUpdateProductionSystemActivity

func NewUpdateProductionSystemActivity(m *Manager) *UpdateProductionSystemActivity

func (*UpdateProductionSystemActivity) Execute

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL