Documentation
¶
Index ¶
- Constants
- Variables
- func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse, ...) (string, error)
- func ByteCountSI(b int64) string
- func CanDisableProxy() bool
- func CreateSharingUrl(ctx context.Context, objectUrl *url.URL, isWrite bool) (string, error)
- func DoCacheInfo(ctx context.Context, destination string, options ...TransferOption) (age int, size int64, err error)
- func DoDelete(ctx context.Context, remoteDestination string, recursive bool, ...) (err error)
- func DoShadowIngest(ctx context.Context, sourceFile string, originPrefix string, ...) (int64, string, error)
- func GetBehavior(behaviorName string) (packerBehavior, error)
- func GetDirectorInfoForPath(ctx context.Context, pUrl *pelican_url.PelicanURL, httpMethod string, ...) (parsedResponse server_structs.DirectorResponse, err error)
- func GetObjectServerHostnames(ctx context.Context, testFile string) (urls []string, err error)
- func IsRetryable(err error) bool
- func ParseDirectorInfo(dirResp *http.Response) (server_structs.DirectorResponse, error)
- func ParseRemoteAsPUrl(ctx context.Context, rp string) (*pelican_url.PelicanURL, error)
- func ShouldRetry(err error) bool
- type ConnectionSetupError
- type ConstantSizer
- type FileInfo
- type HeaderTimeoutError
- type HttpErrResp
- type NetworkResetError
- type ProgressReader
- type ServerPriority
- type Sizer
- type SlowTransferError
- type StatusCodeError
- type StoppedTransferError
- type SyncLevel
- type TimestampedError
- type TransferAttemptError
- type TransferCallbackFunc
- type TransferClient
- func (tc *TransferClient) CacheInfo(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (age int, size int64, err error)
- func (tc *TransferClient) Cancel()
- func (tc *TransferClient) Close()
- func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (tj *TransferJob, err error)
- func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL, localPath string, upload bool, ...) (tj *TransferJob, err error)
- func (tc *TransferClient) Results() chan TransferResults
- func (tc *TransferClient) Shutdown() (results []TransferResults, err error)
- func (tc *TransferClient) Submit(tj *TransferJob) error
- type TransferEngine
- type TransferErrors
- type TransferJob
- type TransferOption
- func WithAcquireToken(enable bool) TransferOption
- func WithCaches(caches ...*url.URL) TransferOption
- func WithCallback(callback TransferCallbackFunc) TransferOption
- func WithCollectionsUrl(url string) TransferOption
- func WithSynchronize(level SyncLevel) TransferOption
- func WithToken(token string) TransferOption
- func WithTokenLocation(location string) TransferOption
- type TransferResult
- type TransferResults
- func DoCopy(ctx context.Context, sourceFile string, destination string, recursive bool, ...) (transferResults []TransferResults, err error)
- func DoGet(ctx context.Context, remoteObject string, localDestination string, ...) (transferResults []TransferResults, err error)
- func DoPrestage(ctx context.Context, prefixUrl string, options ...TransferOption) (transferResults []TransferResults, err error)
- func DoPut(ctx context.Context, localObject string, remoteDestination string, ...) (transferResults []TransferResults, err error)
Constants ¶
const ( SyncNone = iota // When synchronizing, always re-transfer, regardless of existence at destination. SyncExist // Skip synchronization transfer if the destination exists SyncSize // Skip synchronization transfer if the destination exists and matches the current source size )
Variables ¶
var ObjectServersToTry int = 3
Number of caches to attempt to use in any invocation
var (
PelicanError error_codes.PelicanError
)
Functions ¶
func AcquireToken ¶
func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse, opts config.TokenGenerationOpts) (string, error)
Given a URL and a director Response, attempt to acquire a valid token for that URL.
func ByteCountSI ¶
Convert b bytes to a human-friendly string with SI units
For example, ByteCountSI(2000) returns "2 KB"
func CanDisableProxy ¶
func CanDisableProxy() bool
Determine whether we are allowed to skip the proxy as a fallback
func CreateSharingUrl ¶
func DoCacheInfo ¶
func DoCacheInfo(ctx context.Context, destination string, options ...TransferOption) (age int, size int64, err error)
Check the cache information of a remote cache
func DoDelete ¶
func DoDelete(ctx context.Context, remoteDestination string, recursive bool, options ...TransferOption) (err error)
DoDelete queries the director using the DELETE HTTP method, retrieves the token, and initializes the delete operation.
func DoShadowIngest ¶
func GetBehavior ¶
func GetDirectorInfoForPath ¶
func GetDirectorInfoForPath(ctx context.Context, pUrl *pelican_url.PelicanURL, httpMethod string, token string) (parsedResponse server_structs.DirectorResponse, err error)
Retrieve federation namespace information for a given URL.
func IsRetryable ¶
IsRetryable will return true if the error is retryable
func ParseDirectorInfo ¶
func ParseDirectorInfo(dirResp *http.Response) (server_structs.DirectorResponse, error)
Given the Director response, parse the headers and construct the ordered list of object servers.
func ParseRemoteAsPUrl ¶
func ParseRemoteAsPUrl(ctx context.Context, rp string) (*pelican_url.PelicanURL, error)
Given a remote path, use the client's wisdom to parse it as a Pelican URL, including metadata discovery.
This will handle setting up the URL cache, passing along contexts to discovery, and passing the client context/user agent. Calling this should return a fully populated PelicanURL object, including any metadata that was discovered.
func ShouldRetry ¶
Types ¶
type ConnectionSetupError ¶
ConnectionSetupError is an error that is returned when a connection to the remote server fails
func (*ConnectionSetupError) Error ¶
func (e *ConnectionSetupError) Error() string
func (*ConnectionSetupError) Is ¶
func (e *ConnectionSetupError) Is(target error) bool
func (*ConnectionSetupError) Unwrap ¶
func (e *ConnectionSetupError) Unwrap() error
type ConstantSizer ¶
type ConstantSizer struct {
// contains filtered or unexported fields
}
func (*ConstantSizer) BytesComplete ¶
func (cs *ConstantSizer) BytesComplete() int64
func (*ConstantSizer) Size ¶
func (cs *ConstantSizer) Size() int64
type FileInfo ¶
Our own FileInfo structure to hold information about a file NOTE: this was created to provide more flexibility to information on a file. The fs.FileInfo interface was causing some issues like not always returning a Name attribute ALSO NOTE: the fields are exported so they can be marshalled into JSON, it does not work otherwise
type HeaderTimeoutError ¶
type HeaderTimeoutError struct{}
func (*HeaderTimeoutError) Error ¶
func (e *HeaderTimeoutError) Error() string
func (*HeaderTimeoutError) Is ¶
func (e *HeaderTimeoutError) Is(target error) bool
type HttpErrResp ¶
func (*HttpErrResp) Error ¶
func (e *HttpErrResp) Error() string
type NetworkResetError ¶
type NetworkResetError struct{}
func (*NetworkResetError) Error ¶
func (e *NetworkResetError) Error() string
type ProgressReader ¶
type ProgressReader struct {
// contains filtered or unexported fields
}
ProgressReader wraps the io.Reader to get progress Adapted from https://stackoverflow.com/questions/26050380/go-tracking-post-request-progress
func (*ProgressReader) BytesComplete ¶
func (pr *ProgressReader) BytesComplete() int64
func (*ProgressReader) Close ¶
func (pr *ProgressReader) Close() error
Close implements the close function of io.Closer
func (*ProgressReader) Read ¶
func (pr *ProgressReader) Read(p []byte) (n int, err error)
Read implements the common read function for io.Reader
func (*ProgressReader) Size ¶
func (pr *ProgressReader) Size() int64
type ServerPriority ¶
type SlowTransferError ¶
type SlowTransferError struct { BytesTransferred int64 BytesPerSecond int64 BytesTotal int64 Duration time.Duration CacheAge time.Duration }
SlowTransferError is an error that is returned when a transfer takes longer than the configured timeout
func (*SlowTransferError) Error ¶
func (e *SlowTransferError) Error() (errMsg string)
func (*SlowTransferError) Is ¶
func (e *SlowTransferError) Is(target error) bool
type StatusCodeError ¶
type StatusCodeError grab.StatusCodeError
StatusCodeError is a wrapper around grab.StatusCodeError that indicates the server returned a non-200 code.
The wrapper is done to provide a Pelican-based error hierarchy in case we ever decide to have a different underlying download package.
func (*StatusCodeError) Error ¶
func (e *StatusCodeError) Error() string
func (*StatusCodeError) Is ¶
func (e *StatusCodeError) Is(target error) bool
type StoppedTransferError ¶
type StoppedTransferError struct { BytesTransferred int64 StoppedTime time.Duration CacheHit bool Upload bool }
Error type for when the transfer started to return data then completely stopped
func (*StoppedTransferError) Error ¶
func (e *StoppedTransferError) Error() (errMsg string)
func (*StoppedTransferError) Is ¶
func (e *StoppedTransferError) Is(target error) bool
type TimestampedError ¶
type TimestampedError struct {
// contains filtered or unexported fields
}
func (*TimestampedError) Error ¶
func (te *TimestampedError) Error() string
func (*TimestampedError) Unwrap ¶
func (te *TimestampedError) Unwrap() error
type TransferAttemptError ¶
type TransferAttemptError struct {
// contains filtered or unexported fields
}
Transfer attempt error wraps an error with information about the service/proxy used
func (*TransferAttemptError) Error ¶
func (tae *TransferAttemptError) Error() (errMsg string)
func (*TransferAttemptError) Is ¶
func (tae *TransferAttemptError) Is(target error) bool
func (*TransferAttemptError) Unwrap ¶
func (tae *TransferAttemptError) Unwrap() error
type TransferCallbackFunc ¶
type TransferClient ¶
type TransferClient struct {
// contains filtered or unexported fields
}
A client to the transfer engine.
func (*TransferClient) CacheInfo ¶
func (tc *TransferClient) CacheInfo(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (age int, size int64, err error)
cacheInfo retrieves and returns the age and size of the specified object.
func (*TransferClient) Cancel ¶
func (tc *TransferClient) Cancel()
Cancel a client
When cancelled, all channels and goroutines associated with the client will close/exit immediately.
func (*TransferClient) Close ¶
func (tc *TransferClient) Close()
Close the transfer client object
Any subsequent job submissions will cause a panic
func (*TransferClient) NewPrestageJob ¶
func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (tj *TransferJob, err error)
Create a new prestage job for the client
The returned object can be further customized as desired. This function does not "submit" the job for execution.
func (*TransferClient) NewTransferJob ¶
func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL, localPath string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error)
Create a new transfer job for the client
The returned object can be further customized as desired. This function does not "submit" the job for execution.
func (*TransferClient) Results ¶
func (tc *TransferClient) Results() chan TransferResults
Return a channel containing the results from the client
func (*TransferClient) Shutdown ¶
func (tc *TransferClient) Shutdown() (results []TransferResults, err error)
Shutdown the transfer client
Closes the client and waits for all jobs to exit cleanly. Returns any results that were pending when Shutdown was called
func (*TransferClient) Submit ¶
func (tc *TransferClient) Submit(tj *TransferJob) error
Submit the transfer job to the client for processing
type TransferEngine ¶
type TransferEngine struct {
// contains filtered or unexported fields
}
An object able to process transfer jobs.
func NewTransferEngine ¶
func NewTransferEngine(ctx context.Context) (te *TransferEngine, err error)
Returns a new transfer engine object whose lifetime is tied to the provided context. Will launcher worker goroutines to handle the underlying transfers
func (*TransferEngine) Close ¶
func (te *TransferEngine) Close()
Closes the TransferEngine. No new work may be submitted. Any ongoing work will continue
func (*TransferEngine) NewClient ¶
func (te *TransferEngine) NewClient(options ...TransferOption) (client *TransferClient, err error)
Create a new client to work with an engine
func (*TransferEngine) Shutdown ¶
func (te *TransferEngine) Shutdown() error
Initiates a shutdown of the transfer engine. Waits until all workers have finished
type TransferErrors ¶
type TransferErrors struct {
// contains filtered or unexported fields
}
A container object for multiple sub-errors representing transfer failures.
func NewTransferErrors ¶
func NewTransferErrors() *TransferErrors
Create a new transfer error object
func (*TransferErrors) AddError ¶
func (te *TransferErrors) AddError(err error)
func (*TransferErrors) AddPastError ¶
func (te *TransferErrors) AddPastError(err error, timestamp time.Time)
func (*TransferErrors) AllErrorsRetryable ¶
func (te *TransferErrors) AllErrorsRetryable() bool
Returns true if all errors are retryable. If no errors are present, then returns true
func (*TransferErrors) Error ¶
func (te *TransferErrors) Error() string
func (*TransferErrors) Unwrap ¶
func (te *TransferErrors) Unwrap() []error
func (*TransferErrors) UserError ¶
func (te *TransferErrors) UserError() string
Return a more refined, user-friendly error string
type TransferJob ¶
type TransferJob struct {
// contains filtered or unexported fields
}
A representation of a "transfer job". The job can be submitted to the client library, resulting in one or more transfers (if recursive is true). We assume the transfer job is potentially queued for a long time and all the transfers generated by this job will use the same namespace and token.
func (*TransferJob) GetLookupStatus ¶
func (tj *TransferJob) GetLookupStatus() (ok bool, err error)
Returns the status of the transfer job-to-file(s) lookup
ok is true if the lookup has completed.
type TransferOption ¶
func WithAcquireToken ¶
func WithAcquireToken(enable bool) TransferOption
Create an option to specify the token acquisition logic
Token acquisition (e.g., using OAuth2 to get a token when one isn't found in the environment) defaults to `true` but can be disabled with this options
func WithCaches ¶
func WithCaches(caches ...*url.URL) TransferOption
Create an option to override the cache list
func WithCallback ¶
func WithCallback(callback TransferCallbackFunc) TransferOption
Create an option that provides a callback for a TransferClient
The callback is invoked periodically by one of the transfer workers, with inputs of the local path (e.g., source on upload), the current bytes transferred, and the total object size
func WithCollectionsUrl ¶
func WithCollectionsUrl(url string) TransferOption
Override collections URL to be used by the TransferClient
func WithSynchronize ¶
func WithSynchronize(level SyncLevel) TransferOption
Create an option to specify the object synchronization level
The synchronization level specifies what to do if the destination object already exists.
func WithToken ¶
func WithToken(token string) TransferOption
Create an option to provide a specific token to the transfer
The contents of the token will be used as part of the HTTP request
func WithTokenLocation ¶
func WithTokenLocation(location string) TransferOption
Create an option to override the token locating logic
This will force the transfer to use a specific file for the token contents instead of doing any sort of auto-detection
type TransferResult ¶
type TransferResult struct { Number int // indicates which attempt this is TransferFileBytes int64 // how much each attempt downloaded TimeToFirstByte time.Duration // how long it took to download the first byte TransferEndTime time.Time // when the transfer ends TransferTime time.Duration // amount of time we were transferring per attempt (in seconds) CacheAge time.Duration // age of the data reported by the cache Endpoint string // which origin did it use ServerVersion string // version of the server Error error // what error the attempt returned (if any) }
type TransferResults ¶
type TransferResults struct { Error error TransferredBytes int64 TransferStartTime time.Time Scheme string Attempts []TransferResult // contains filtered or unexported fields }
Represents the results of a single object transfer, potentially across multiple attempts / retries.
func DoCopy ¶
func DoCopy(ctx context.Context, sourceFile string, destination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start the transfer, whether read or write back. Primarily used for backwards compatibility
func DoGet ¶
func DoGet(ctx context.Context, remoteObject string, localDestination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start of transfer for pelican object get, gets information from the target source before doing our HTTP GET request
remoteObject: the source file/directory you would like to upload localDestination: the end location of the upload recursive: a boolean indicating if the source is a directory or not
func DoPrestage ¶
func DoPrestage(ctx context.Context, prefixUrl string, options ...TransferOption) (transferResults []TransferResults, err error)
Single-shot call to prestage a single prefix
func DoPut ¶
func DoPut(ctx context.Context, localObject string, remoteDestination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start of transfer for pelican object put, gets information from the target destination before doing our HTTP PUT request
localObject: the source file/directory you would like to upload remoteDestination: the end location of the upload recursive: a boolean indicating if the source is a directory or not
func (TransferResults) ID ¶
func (tr TransferResults) ID() string