workflow

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2019 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Overview

Package workflow contains an experimental workflow for Archivemica transfers.

It's not generalized since it contains client-specific activities. However, the long-term goal is to build a system where workflows and activities are dynamically set up based on user input.

Index

Constants

View Source
const (
	DownloadActivityName               = "download-activity"
	BundleActivityName                 = "bundle-activity"
	TransferActivityName               = "transfer-activity"
	PollTransferActivityName           = "poll-transfer-activity"
	PollIngestActivityName             = "poll-ingest-activity"
	UpdateHARIActivityName             = "update-hari-activity"
	UpdateProductionSystemActivityName = "update-production-system-activity"
	CleanUpActivityName                = "clean-up-activity"
	HidePackageActivityName            = "hide-package-activity"
	DeleteOriginalActivityName         = "delete-original-activity"
)
View Source
const NRE = "non retryable error"

Variables

This section is empty.

Functions

func HeartbeatTimeoutError

func HeartbeatTimeoutError(err error, details interface{}) bool

Types

type BundleActivity added in v0.5.1

type BundleActivity struct{}

func NewBundleActivity added in v0.5.1

func NewBundleActivity() *BundleActivity

func (*BundleActivity) Bundle added in v0.5.1

func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, transferDir, key, tempFile string, stripTopLevelDir bool) (string, string, error)

Bundle a transfer with the contents found in the archive.

func (*BundleActivity) Execute added in v0.5.1

func (*BundleActivity) NameKind added in v0.5.1

func (a *BundleActivity) NameKind(key string) (name, kind string)

Name the transfer.

func (*BundleActivity) SingleFile added in v0.5.1

func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, key, tempFile string) (string, error)

SingleFile bundles a transfer with the downloaded blob in it.

func (*BundleActivity) Unarchiver added in v0.5.1

func (a *BundleActivity) Unarchiver(key, filename string) archiver.Unarchiver

Unarchiver returns the unarchiver suited for the archival format.

type BundleActivityParams added in v0.5.1

type BundleActivityParams struct {
	TransferDir      string
	Key              string
	TempFile         string
	StripTopLevelDir bool
}

type BundleActivityResult added in v0.5.1

type BundleActivityResult struct {
	Name                string // Name of the transfer.
	Kind                string // Client specific, obtained from name, e.g. "DPJ-SIP".
	RelPath             string // Path of the transfer relative to the transfer directory.
	FullPath            string // Full path to the transfer in the worker running the session.
	FullPathBeforeStrip string // Same as FullPath but includes the top-level dir even when stripped.
}

type BundleInfo added in v0.5.1

type BundleInfo struct {
}

type CleanUpActivity

type CleanUpActivity struct {
	// contains filtered or unexported fields
}

CleanUpActivity removes the contents that we've created in the TS location.

func NewCleanUpActivity

func NewCleanUpActivity(m *Manager) *CleanUpActivity

func (*CleanUpActivity) Execute

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error

type CleanUpActivityParams added in v0.5.1

type CleanUpActivityParams struct {
	FullPath string
}

type DeleteOriginalActivity added in v0.4.0

type DeleteOriginalActivity struct {
	// contains filtered or unexported fields
}

func NewDeleteOriginalActivity added in v0.4.0

func NewDeleteOriginalActivity(m *Manager) *DeleteOriginalActivity

func (*DeleteOriginalActivity) Execute added in v0.4.0

func (a *DeleteOriginalActivity) Execute(ctx context.Context, event *watcher.BlobEvent) error

type DownloadActivity

type DownloadActivity struct {
	// contains filtered or unexported fields
}

DownloadActivity downloads the blob into the pipeline processing directory.

func NewDownloadActivity

func NewDownloadActivity(m *Manager) *DownloadActivity

func (*DownloadActivity) Execute

func (a *DownloadActivity) Execute(ctx context.Context, event *watcher.BlobEvent) (string, error)

type HidePackageActivity added in v0.2.0

type HidePackageActivity struct {
	// contains filtered or unexported fields
}

func NewHidePackageActivity added in v0.2.0

func NewHidePackageActivity(m *Manager) *HidePackageActivity

func (*HidePackageActivity) Execute added in v0.2.0

func (a *HidePackageActivity) Execute(ctx context.Context, unitID, unitType, pipelineName string) error

type Manager

type Manager struct {
	Logger     logr.Logger
	Collection collection.Service
	Watcher    watcher.Service
	Pipelines  *pipeline.Registry
	Hooks      map[string]map[string]interface{}
}

Manager carries workflow and activity dependencies.

func NewManager

func NewManager(logger logr.Logger, colsvc collection.Service, wsvc watcher.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager

NewManager returns a pointer to a new Manager.

type PollIngestActivity

type PollIngestActivity struct {
	// contains filtered or unexported fields
}

func NewPollIngestActivity

func NewPollIngestActivity(m *Manager) *PollIngestActivity

func (*PollIngestActivity) Execute

type PollIngestActivityParams added in v0.5.2

type PollIngestActivityParams struct {
	PipelineName string
	SIPID        string
}

type PollTransferActivity

type PollTransferActivity struct {
	// contains filtered or unexported fields
}

func NewPollTransferActivity

func NewPollTransferActivity(m *Manager) *PollTransferActivity

func (*PollTransferActivity) Execute

type PollTransferActivityParams added in v0.5.2

type PollTransferActivityParams struct {
	PipelineName string
	TransferID   string
}

type ProcessingWorkflow

type ProcessingWorkflow struct {
	// contains filtered or unexported fields
}

func NewProcessingWorkflow

func NewProcessingWorkflow(m *Manager) *ProcessingWorkflow

func (*ProcessingWorkflow) Execute

ProcessingWorkflow orchestrates all the activities related to the processing of a SIP in Archivematica, including is retrieval, creation of transfer, etc...

Retrying this workflow would result in a new Archivematica transfer. We do not have a retry policy in place. The user could trigger a new instance via the API.

func (*ProcessingWorkflow) SessionHandler

func (w *ProcessingWorkflow) SessionHandler(ctx workflow.Context, sessCtx workflow.Context, tinfo *TransferInfo) error

type TransferActivity

type TransferActivity struct {
	// contains filtered or unexported fields
}

TransferActivity submits the transfer to Archivematica and returns its ID.

This is our first interaction with Archivematica. The workflow ends here after authentication errors.

func NewTransferActivity

func NewTransferActivity(m *Manager) *TransferActivity

func (*TransferActivity) Execute

type TransferActivityParams added in v0.5.2

type TransferActivityParams struct {
	PipelineName       string
	TransferLocationID string
	RelPath            string
	Name               string
	ProcessingConfig   string
	AutoApprove        bool
}

type TransferActivityResponse added in v0.7.0

type TransferActivityResponse struct {
	TransferID      string
	PipelineVersion string
	PipelineID      string
}

type TransferInfo

type TransferInfo struct {

	// TempFile is the temporary location where the blob is downloaded.
	//
	// It is populated by the workflow with the result of DownloadActivity.
	TempFile string

	// TransferID given by Archivematica.
	//
	// It is populated by TransferActivity.
	TransferID string

	// SIPID given by Archivematica.
	//
	// It is populated by PollTransferActivity.
	SIPID string

	// Enduro internal collection ID.
	//
	// It is populated via the workflow request or createPackageLocalActivity.
	CollectionID uint

	// Original watcher event.
	//
	// It is populated via the workflow request.
	Event *watcher.BlobEvent

	// OriginalID is the UUID found in the key of the blob. It can be empty.
	//
	// It is populated from the workflow (deterministically).
	OriginalID string

	// Status of the collection.
	//
	// It is populated from the workflow (deterministically)
	Status collection.Status

	// StoredAt is the time when the AIP is stored.
	//
	// It is populated by PollIngestActivity as long as Ingest completes.
	StoredAt time.Time

	// PipelineConfig is the configuration of the pipeline that this workflow
	// uses to provide access to its activities.
	//
	// It is populated by loadConfigLocalActivity.
	PipelineConfig *pipeline.Config

	// PipelineID is the UUID of the Archivematica pipeline. Extracted from
	// the API response header when the transfer is submitted.
	//
	// It is populated by transferActivity.
	PipelineID string

	// Hooks is the hook config store.
	//
	// It is populated by loadConfigLocalActivity.
	Hooks map[string]map[string]interface{}

	// Information about the bundle (transfer) that we submit to Archivematica.
	// Full path, relative path, name, kind...
	//
	// It is populated by BundleActivity.
	Bundle BundleActivityResult
}

TransferInfo is shared state that is passed down to activities. It can be useful for hooks that may require quick access to processing state. TODO: clean this up, e.g.: it can embed a collection.Collection.

type UpdateHARIActivity

type UpdateHARIActivity struct {
	// contains filtered or unexported fields
}

func NewUpdateHARIActivity

func NewUpdateHARIActivity(m *Manager) *UpdateHARIActivity

func (UpdateHARIActivity) Execute

func (a UpdateHARIActivity) Execute(ctx context.Context, tinfo *TransferInfo) error

type UpdateProductionSystemActivity

type UpdateProductionSystemActivity struct {
	// contains filtered or unexported fields
}

func NewUpdateProductionSystemActivity

func NewUpdateProductionSystemActivity(m *Manager) *UpdateProductionSystemActivity

func (*UpdateProductionSystemActivity) Execute

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL