am

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DeleteTransferActivityName = "DeleteTransferActivity"
View Source
const PollIngestActivityName = "poll-ingest-activity"
View Source
const PollTransferActivityName = "poll-transfer-activity"
View Source
const StartTransferActivityName = "start-transfer-activity"
View Source
const UploadTransferActivityName = "UploadTransferActivity"

Variables

View Source
var (
	// ErrWorkOngoing indicates work is ongoing and polling should continue.
	ErrWorkOngoing = errors.New("work ongoing")

	// ErrBadRequest respresents an AM "400 Bad request" response, which can
	// occur while a transfer or ingest is still processing and may require
	// special handling.
	ErrBadRequest = errors.New("Archivematica response: 400 Bad request")
)

Functions

func ConvertJobToPreservationTask

func ConvertJobToPreservationTask(job amclient.Job) datatypes.PreservationTask

ConvertJobToPreservationTask converts an amclient.Job to a datatypes.PreservationTask.

Types

type Config

type Config struct {
	// Archivematica server address.
	Address string

	// Archivematica API user.
	User string

	// Archivematica API key.
	APIKey string

	// Archivematica processing configuration to use (default: "automated").
	ProcessingConfig string

	// SFTP configuration for uploading transfers to Archivematica.
	SFTP sftp.Config

	// TransferSourcePath is the path to an Archivematica transfer source
	// directory. It is used in the POST /api/v2beta/package "path" parameter
	// to start a transfer via the API. TransferSourcePath must be prefixed with
	// the UUID of an AMSS transfer source directory, optionally followed by a
	// relative path from the source dir (e.g.
	// "749ef452-fbed-4d50-9072-5f98bc01e52e:sftp_upload").
	TransferSourcePath string

	// ZipPIP specifies whether or not a PIP should be zipped before being sent
	// from Enduro to Archivematica.
	ZipPIP bool

	// Capacity sets the maximum number of worker sessions the worker can
	// handle at one time (default: 1).
	Capacity int

	// PollInterval is the time to wait between poll requests to the AM API.
	PollInterval time.Duration

	// TransferDeadline is the maximum time to wait for a transfer to complete.
	// Set to zero for no deadline.
	TransferDeadline time.Duration
}

type DeleteTransferActivity

type DeleteTransferActivity struct {
	// contains filtered or unexported fields
}

func NewDeleteTransferActivity

func NewDeleteTransferActivity(client sftp.Client) *DeleteTransferActivity

func (*DeleteTransferActivity) Execute

type DeleteTransferActivityParams

type DeleteTransferActivityParams struct {
	Destination string
}

type DeleteTransferActivityResult

type DeleteTransferActivityResult struct{}

type JobTracker

type JobTracker struct {
	// contains filtered or unexported fields
}

func NewJobTracker

func NewJobTracker(
	clock clockwork.Clock,
	jobSvc amclient.JobsService,
	ingestsvc ingest.Service,
	presActionID int,
) *JobTracker

func (*JobTracker) SavePreservationTasks

func (jt *JobTracker) SavePreservationTasks(ctx context.Context, unitID string) (int, error)

SavePreservationTasks queries the Archivematica jobs list endpoint to get a list of completed jobs related to the transfer or ingest identified by unitID, then saves any new jobs as preservation tasks.

type PollIngestActivity

type PollIngestActivity struct {
	// contains filtered or unexported fields
}

func NewPollIngestActivity

func NewPollIngestActivity(
	cfg *Config,
	clock clockwork.Clock,
	ingSvc amclient.IngestService,
	jobSvc amclient.JobsService,
	ingestsvc ingest.Service,
) *PollIngestActivity

func (*PollIngestActivity) Execute

Execute polls Archivematica for the status of an ingest and returns when ingest is complete or returns an error status. Execute sends an activity heartbeat after each poll.

A response status of "REJECTED", "FAILED", "USER_INPUT", or "BACKLOG" returns a temporal.NonRetryableApplicationError to indicate that processing can not continue.

type PollIngestActivityParams

type PollIngestActivityParams struct {
	PresActionID int
	SIPID        string
}

type PollIngestActivityResult

type PollIngestActivityResult struct {
	Status        string
	PresTaskCount int
}

type PollTransferActivity

type PollTransferActivity struct {
	// contains filtered or unexported fields
}

func NewPollTransferActivity

func NewPollTransferActivity(
	cfg *Config,
	clock clockwork.Clock,
	tfrSvc amclient.TransferService,
	jobSvc amclient.JobsService,
	ingestsvc ingest.Service,
) *PollTransferActivity

func (*PollTransferActivity) Execute

Execute polls Archivematica for the status of a transfer and returns when the transfer is complete or returns an error status. Execute sends an activity heartbeat after each poll.

On each poll, Execute requests an updated list of AM jobs performed and saves the job data to the ingest service as preservation tasks.

A transfer status of "REJECTED", "FAILED", "USER_INPUT", or "BACKLOG" returns a temporal.NonRetryableApplicationError to indicate that processing can not continue.

type PollTransferActivityParams

type PollTransferActivityParams struct {
	PresActionID int
	TransferID   string
}

type PollTransferActivityResult

type PollTransferActivityResult struct {
	SIPID         string
	Path          string
	PresTaskCount int
}

type StartTransferActivity

type StartTransferActivity struct {
	// contains filtered or unexported fields
}

func NewStartTransferActivity

func NewStartTransferActivity(cfg *Config, amps amclient.PackageService) *StartTransferActivity

func (*StartTransferActivity) Execute

Execute sends a request to the Archivematica API to start a new "auto-approved" transfer. If the request is successful a transfer UUID is returned. An error response will return a retryable or non-retryable temporal.ApplicationError, depending on the nature of the error.

type StartTransferActivityParams

type StartTransferActivityParams struct {
	// Name of the transfer.
	Name string

	ZipPIP bool

	// RelativePath is the PIP path relative to the Archivematica transfer
	// source directory.
	RelativePath string
}

type StartTransferActivityResult

type StartTransferActivityResult struct {
	TransferID string
}

type UploadTransferActivity

type UploadTransferActivity struct {
	// contains filtered or unexported fields
}

UploadTransferActivity uploads a transfer via the SFTP client, and sends a periodic Temporal Heartbeat at the given heartRate.

func NewUploadTransferActivity

func NewUploadTransferActivity(client sftp.Client, heartRate time.Duration) *UploadTransferActivity

NewUploadTransferActivity initializes and returns a new UploadTransferActivity.

func (*UploadTransferActivity) Execute

Execute copies the source transfer to the destination via SFTP.

func (*UploadTransferActivity) Heartbeat

func (a *UploadTransferActivity) Heartbeat(ctx context.Context, upload sftp.AsyncUpload, fileSize int64) error

Heartbeat sends a periodic Temporal heartbeat, which includes the number of bytes uploaded, until the upload is complete, cancelled or returns an error.

type UploadTransferActivityParams

type UploadTransferActivityParams struct {
	// Local path of the source file.
	SourcePath string
}

type UploadTransferActivityResult

type UploadTransferActivityResult struct {
	// Bytes copied to the remote file over the SFTP connection.
	BytesCopied int64
	// Full path of the destination file including `remoteDir` config path.
	RemoteFullPath string
	// Path of the destination file relative to the `remoteDir` config path.
	RemoteRelativePath string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL