client

package
v0.0.0-...-b29c24b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2025 License: Apache-2.0 Imports: 55 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SyncNone  = iota // When synchronizing, always re-transfer, regardless of existence at destination.
	SyncExist        // Skip synchronization transfer if the destination exists
	SyncSize         // Skip synchronization transfer if the destination exists and matches the current source size

)

Variables

View Source
var ObjectServersToTry int = 3

Number of caches to attempt to use in any invocation

View Source
var (
	PelicanError error_codes.PelicanError
)

Functions

func AcquireToken

func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse, opts config.TokenGenerationOpts) (string, error)

Given a URL and a director Response, attempt to acquire a valid token for that URL.

func ByteCountSI

func ByteCountSI(b int64) string

Convert b bytes to a human-friendly string with SI units

For example, ByteCountSI(2000) returns "2 KB"

func CanDisableProxy

func CanDisableProxy() bool

Determine whether we are allowed to skip the proxy as a fallback

func CreateSharingUrl

func CreateSharingUrl(ctx context.Context, objectUrl *url.URL, isWrite bool) (string, error)

func DoCacheInfo

func DoCacheInfo(ctx context.Context, destination string, options ...TransferOption) (age int, size int64, err error)

Check the cache information of a remote cache

func DoDelete

func DoDelete(ctx context.Context, remoteDestination string, recursive bool, options ...TransferOption) (err error)

DoDelete queries the director using the DELETE HTTP method, retrieves the token, and initializes the delete operation.

func DoShadowIngest

func DoShadowIngest(ctx context.Context, sourceFile string, originPrefix string, shadowOriginPrefix string, options ...TransferOption) (int64, string, error)

func GetBehavior

func GetBehavior(behaviorName string) (packerBehavior, error)

func GetDirectorInfoForPath

func GetDirectorInfoForPath(ctx context.Context, pUrl *pelican_url.PelicanURL, httpMethod string, token string) (parsedResponse server_structs.DirectorResponse, err error)

Retrieve federation namespace information for a given URL.

func GetObjectServerHostnames

func GetObjectServerHostnames(ctx context.Context, testFile string) (urls []string, err error)

func IsRetryable

func IsRetryable(err error) bool

IsRetryable will return true if the error is retryable

func ParseDirectorInfo

func ParseDirectorInfo(dirResp *http.Response) (server_structs.DirectorResponse, error)

Given the Director response, parse the headers and construct the ordered list of object servers.

func ParseRemoteAsPUrl

func ParseRemoteAsPUrl(ctx context.Context, rp string) (*pelican_url.PelicanURL, error)

Given a remote path, use the client's wisdom to parse it as a Pelican URL, including metadata discovery.

This will handle setting up the URL cache, passing along contexts to discovery, and passing the client context/user agent. Calling this should return a fully populated PelicanURL object, including any metadata that was discovered.

func ShouldRetry

func ShouldRetry(err error) bool

Types

type ConnectionSetupError

type ConnectionSetupError struct {
	URL string
	Err error
}

ConnectionSetupError is an error that is returned when a connection to the remote server fails

func (*ConnectionSetupError) Error

func (e *ConnectionSetupError) Error() string

func (*ConnectionSetupError) Is

func (e *ConnectionSetupError) Is(target error) bool

func (*ConnectionSetupError) Unwrap

func (e *ConnectionSetupError) Unwrap() error

type ConstantSizer

type ConstantSizer struct {
	// contains filtered or unexported fields
}

func (*ConstantSizer) BytesComplete

func (cs *ConstantSizer) BytesComplete() int64

func (*ConstantSizer) Size

func (cs *ConstantSizer) Size() int64

type FileInfo

type FileInfo struct {
	Name         string
	Size         int64
	ModTime      time.Time
	IsCollection bool
}

Our own FileInfo structure to hold information about a file NOTE: this was created to provide more flexibility to information on a file. The fs.FileInfo interface was causing some issues like not always returning a Name attribute ALSO NOTE: the fields are exported so they can be marshalled into JSON, it does not work otherwise

func DoList

func DoList(ctx context.Context, remoteObject string, options ...TransferOption) (fileInfos []FileInfo, err error)

Function for the object ls command, we get target information for our remote object and eventually print out the contents of the specified object

func DoStat

func DoStat(ctx context.Context, destination string, options ...TransferOption) (fileInfo *FileInfo, err error)

Check the size of a remote file in an origin

type HeaderTimeoutError

type HeaderTimeoutError struct{}

func (*HeaderTimeoutError) Error

func (e *HeaderTimeoutError) Error() string

func (*HeaderTimeoutError) Is

func (e *HeaderTimeoutError) Is(target error) bool

type HttpErrResp

type HttpErrResp struct {
	Code int
	Err  string
}

func (*HttpErrResp) Error

func (e *HttpErrResp) Error() string

type NetworkResetError

type NetworkResetError struct{}

func (*NetworkResetError) Error

func (e *NetworkResetError) Error() string

type ProgressReader

type ProgressReader struct {
	// contains filtered or unexported fields
}

ProgressReader wraps the io.Reader to get progress Adapted from https://stackoverflow.com/questions/26050380/go-tracking-post-request-progress

func (*ProgressReader) BytesComplete

func (pr *ProgressReader) BytesComplete() int64

func (*ProgressReader) Close

func (pr *ProgressReader) Close() error

Close implements the close function of io.Closer

func (*ProgressReader) Read

func (pr *ProgressReader) Read(p []byte) (n int, err error)

Read implements the common read function for io.Reader

func (*ProgressReader) Size

func (pr *ProgressReader) Size() int64

type ServerPriority

type ServerPriority struct {
	URL      *url.URL
	Priority int
}

type Sizer

type Sizer interface {
	Size() int64
	BytesComplete() int64
}

type SlowTransferError

type SlowTransferError struct {
	BytesTransferred int64
	BytesPerSecond   int64
	BytesTotal       int64
	Duration         time.Duration
	CacheAge         time.Duration
}

SlowTransferError is an error that is returned when a transfer takes longer than the configured timeout

func (*SlowTransferError) Error

func (e *SlowTransferError) Error() (errMsg string)

func (*SlowTransferError) Is

func (e *SlowTransferError) Is(target error) bool

type StatusCodeError

type StatusCodeError grab.StatusCodeError

StatusCodeError is a wrapper around grab.StatusCodeError that indicates the server returned a non-200 code.

The wrapper is done to provide a Pelican-based error hierarchy in case we ever decide to have a different underlying download package.

func (*StatusCodeError) Error

func (e *StatusCodeError) Error() string

func (*StatusCodeError) Is

func (e *StatusCodeError) Is(target error) bool

type StoppedTransferError

type StoppedTransferError struct {
	BytesTransferred int64
	StoppedTime      time.Duration
	CacheHit         bool
	Upload           bool
}

Error type for when the transfer started to return data then completely stopped

func (*StoppedTransferError) Error

func (e *StoppedTransferError) Error() (errMsg string)

func (*StoppedTransferError) Is

func (e *StoppedTransferError) Is(target error) bool

type SyncLevel

type SyncLevel int

Different types of synchronization for recursize transfers

type TimestampedError

type TimestampedError struct {
	// contains filtered or unexported fields
}

func (*TimestampedError) Error

func (te *TimestampedError) Error() string

func (*TimestampedError) Unwrap

func (te *TimestampedError) Unwrap() error

type TransferAttemptError

type TransferAttemptError struct {
	// contains filtered or unexported fields
}

Transfer attempt error wraps an error with information about the service/proxy used

func (*TransferAttemptError) Error

func (tae *TransferAttemptError) Error() (errMsg string)

func (*TransferAttemptError) Is

func (tae *TransferAttemptError) Is(target error) bool

func (*TransferAttemptError) Unwrap

func (tae *TransferAttemptError) Unwrap() error

type TransferCallbackFunc

type TransferCallbackFunc = func(path string, downloaded int64, totalSize int64, completed bool)

type TransferClient

type TransferClient struct {
	// contains filtered or unexported fields
}

A client to the transfer engine.

func (*TransferClient) CacheInfo

func (tc *TransferClient) CacheInfo(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (age int, size int64, err error)

cacheInfo retrieves and returns the age and size of the specified object.

func (*TransferClient) Cancel

func (tc *TransferClient) Cancel()

Cancel a client

When cancelled, all channels and goroutines associated with the client will close/exit immediately.

func (*TransferClient) Close

func (tc *TransferClient) Close()

Close the transfer client object

Any subsequent job submissions will cause a panic

func (*TransferClient) NewPrestageJob

func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (tj *TransferJob, err error)

Create a new prestage job for the client

The returned object can be further customized as desired. This function does not "submit" the job for execution.

func (*TransferClient) NewTransferJob

func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL, localPath string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error)

Create a new transfer job for the client

The returned object can be further customized as desired. This function does not "submit" the job for execution.

func (*TransferClient) Results

func (tc *TransferClient) Results() chan TransferResults

Return a channel containing the results from the client

func (*TransferClient) Shutdown

func (tc *TransferClient) Shutdown() (results []TransferResults, err error)

Shutdown the transfer client

Closes the client and waits for all jobs to exit cleanly. Returns any results that were pending when Shutdown was called

func (*TransferClient) Submit

func (tc *TransferClient) Submit(tj *TransferJob) error

Submit the transfer job to the client for processing

type TransferEngine

type TransferEngine struct {
	// contains filtered or unexported fields
}

An object able to process transfer jobs.

func NewTransferEngine

func NewTransferEngine(ctx context.Context) (te *TransferEngine, err error)

Returns a new transfer engine object whose lifetime is tied to the provided context. Will launcher worker goroutines to handle the underlying transfers

func (*TransferEngine) Close

func (te *TransferEngine) Close()

Closes the TransferEngine. No new work may be submitted. Any ongoing work will continue

func (*TransferEngine) NewClient

func (te *TransferEngine) NewClient(options ...TransferOption) (client *TransferClient, err error)

Create a new client to work with an engine

func (*TransferEngine) Shutdown

func (te *TransferEngine) Shutdown() error

Initiates a shutdown of the transfer engine. Waits until all workers have finished

type TransferErrors

type TransferErrors struct {
	// contains filtered or unexported fields
}

A container object for multiple sub-errors representing transfer failures.

func NewTransferErrors

func NewTransferErrors() *TransferErrors

Create a new transfer error object

func (*TransferErrors) AddError

func (te *TransferErrors) AddError(err error)

func (*TransferErrors) AddPastError

func (te *TransferErrors) AddPastError(err error, timestamp time.Time)

func (*TransferErrors) AllErrorsRetryable

func (te *TransferErrors) AllErrorsRetryable() bool

Returns true if all errors are retryable. If no errors are present, then returns true

func (*TransferErrors) Error

func (te *TransferErrors) Error() string

func (*TransferErrors) Unwrap

func (te *TransferErrors) Unwrap() []error

func (*TransferErrors) UserError

func (te *TransferErrors) UserError() string

Return a more refined, user-friendly error string

type TransferJob

type TransferJob struct {
	// contains filtered or unexported fields
}

A representation of a "transfer job". The job can be submitted to the client library, resulting in one or more transfers (if recursive is true). We assume the transfer job is potentially queued for a long time and all the transfers generated by this job will use the same namespace and token.

func (*TransferJob) Cancel

func (tj *TransferJob) Cancel()

Cancel the transfer job

func (*TransferJob) GetLookupStatus

func (tj *TransferJob) GetLookupStatus() (ok bool, err error)

Returns the status of the transfer job-to-file(s) lookup

ok is true if the lookup has completed.

func (*TransferJob) ID

func (tj *TransferJob) ID() string

Get the transfer's ID

type TransferOption

type TransferOption = option.Interface

func WithAcquireToken

func WithAcquireToken(enable bool) TransferOption

Create an option to specify the token acquisition logic

Token acquisition (e.g., using OAuth2 to get a token when one isn't found in the environment) defaults to `true` but can be disabled with this options

func WithCaches

func WithCaches(caches ...*url.URL) TransferOption

Create an option to override the cache list

func WithCallback

func WithCallback(callback TransferCallbackFunc) TransferOption

Create an option that provides a callback for a TransferClient

The callback is invoked periodically by one of the transfer workers, with inputs of the local path (e.g., source on upload), the current bytes transferred, and the total object size

func WithCollectionsUrl

func WithCollectionsUrl(url string) TransferOption

Override collections URL to be used by the TransferClient

func WithSynchronize

func WithSynchronize(level SyncLevel) TransferOption

Create an option to specify the object synchronization level

The synchronization level specifies what to do if the destination object already exists.

func WithToken

func WithToken(token string) TransferOption

Create an option to provide a specific token to the transfer

The contents of the token will be used as part of the HTTP request

func WithTokenLocation

func WithTokenLocation(location string) TransferOption

Create an option to override the token locating logic

This will force the transfer to use a specific file for the token contents instead of doing any sort of auto-detection

type TransferResult

type TransferResult struct {
	Number            int           // indicates which attempt this is
	TransferFileBytes int64         // how much each attempt downloaded
	TimeToFirstByte   time.Duration // how long it took to download the first byte
	TransferEndTime   time.Time     // when the transfer ends
	TransferTime      time.Duration // amount of time we were transferring per attempt (in seconds)
	CacheAge          time.Duration // age of the data reported by the cache
	Endpoint          string        // which origin did it use
	ServerVersion     string        // version of the server
	Error             error         // what error the attempt returned (if any)
}

type TransferResults

type TransferResults struct {
	Error             error
	TransferredBytes  int64
	TransferStartTime time.Time
	Scheme            string
	Attempts          []TransferResult
	// contains filtered or unexported fields
}

Represents the results of a single object transfer, potentially across multiple attempts / retries.

func DoCopy

func DoCopy(ctx context.Context, sourceFile string, destination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)

Start the transfer, whether read or write back. Primarily used for backwards compatibility

func DoGet

func DoGet(ctx context.Context, remoteObject string, localDestination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start of transfer for pelican object get, gets information from the target source before doing our HTTP GET request

remoteObject: the source file/directory you would like to upload localDestination: the end location of the upload recursive: a boolean indicating if the source is a directory or not

func DoPrestage

func DoPrestage(ctx context.Context, prefixUrl string, options ...TransferOption) (transferResults []TransferResults, err error)

Single-shot call to prestage a single prefix

func DoPut

func DoPut(ctx context.Context, localObject string, remoteDestination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start of transfer for pelican object put, gets information from the target destination before doing our HTTP PUT request

localObject: the source file/directory you would like to upload remoteDestination: the end location of the upload recursive: a boolean indicating if the source is a directory or not

func (TransferResults) ID

func (tr TransferResults) ID() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL