storage

package
v0.29.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2019 License: Apache-2.0 Imports: 17 Imported by: 0

README

#Storage Service

Storage service represents local or remote storage to provide unified storage operations. Remote storage could be any cloud storage i.e. google cloud, amazon s3, or simple SCP or HTTP.

Endly inline workflow

endly -r=copy

@copy.yaml


pipeline:
  transfer:
    action: storage:copy  
    source:
      URL: s3://mybucket/dir
      credentials: aws-west
    dest:
      URL: scp://dest/dir2
      credential: dest
    assets:
      file1.txt:
      file2.txt: renamedFile2      

Endly workflow service action

Run the following command for storage service operation details:


endly -s=storage

endly -s=storage -a=copy
endly -s=storage -a=remove
endly -s=storage -a=upload
endly -s=storage -a=download

Service Id Action Description Request Response
storage copy copy one or more asset from the source to destination CopyRequest CopyResponse
storage remove remove or more resources if exsit RemoveRequest RemoveResponse
storage upload upload content pointed by context state key to target destination. UploadRequest UploadResponse
storage download copy source content into context state key DownloadRequest DownloadResponse

Storage service uses undelying Storage Service

Asset copy

Copy request provides flexible way of transferring assets from source to destination.

Make sure that when using cloud or other more specific URI scheme, the corresponding imports are in place.

Copy operation provides two way for asset content substitution:

  1. Replace - simple brute force key value pair replacement mechanism
  2. $ expression substitution from context.State() mechanism


    import   "github.com/viant/endly/storage"
    import _ "github.com/viant/toolbox/storage/aws"
    import   "github.com/viant/endly"
    import   "log"


    func copy() {
    	
    	var manager = endly.New()
    	var context := manager.NewContext(nil)
    	var s3CredentialLocation = ""
    	var request = storage.NewCopyRequest(nil, NewTransfer(url.NewResource("s3://mybucket/asset1", s3CredentialLocation), url.NewResource("/tmp/asset1"), false, false, nil))
    	err := endly.Run(context, request, nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    }


Loading request from URL

CopyRequest can be loaded from URL pointing either to JSON or YAML resource.

    copyReq1, err := storage.NewCopyRequestFromURL("copy.yaml")
    copyReq2, err := storage.NewCopyRequestFromURL("copy.json")

Single asset transfer

@copy.yaml

source:
  URL: mem://yaml1/dir
dest:
  URL: mem://dest/dir2

Multi asset transfer with asset In this scenario source and dest are used to build full URL with assets

@copy.yaml

source:
  URL: mem://yaml1/dir
dest:
  URL: mem://dest/dir2
assets:
  file1.txt:
  file2.txt: renamedFile2  

Multi asset transfer with transfers and compression

@copy.yaml

transfers:
- expand: true
  source:
    url: file1.txt
  dest:
    url: file101.txt
- source:
    url: largefile.txt
  dest:
    url: file201.txt
  compress: true

Single asset transfer with replace data substitution

@copy.json

{
  "Source": {
    "URL": "mem://yaml1/dir"
  },
  "Dest": {
    "URL": "mem://dest/dir2"
  },
  "Compress":true,
  "Expand": true,
  "Replace": {
    "k1": "1",
    "k2": "2"
  }
}

Single asset transfer with $expression data substitution


  func copy() {
    	
    	var manager = endly.New()
    	  
    	var context := manager.NewContext(nil)
    	var state = context.State()
    	
    	state.PutValue("settings.port", "8080")
    	state.PutValue("settings.host", "abc.com")
    	
    	var request = storage.NewCopyRequest(nil, NewTransfer(url.NewResource("myconfig.json"), url.NewResource("/app/config/"), false, true, nil))
    	err := endly.Run(context, request, nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    }


@myconfig.json

{
    "Port":"$settings.port",
    "Host":"$settings.host"
}

Single asset transfer with custom copy handler using an UDF (User defined function). Below is an example to use custom UDF CopyWithCompression that gzips target file

{
  "Transfers": [
    {
      "Source": {
        "URL": "s3://mybucket1/project1/Transfers/",
        "Credentials": "${env.HOME}/.secret/s3.json"
      },
      "Dest": {
        "URL": "gs://mybucket2/project1/Transfers/",
        "Credentials": "${env.HOME}/.secret/gs.gz"
      }
    }
  ],
  "CopyHandlerUdf": "CopyWithCompression"
}

See more how to register common codec UDF (avro, protobuf) with custom schema

Documentation

Index

Constants

View Source
const ServiceID = "storage"

ServiceID represents transfer service id

Variables

View Source
var CompressionTimeout = 120000

CompressionTimeout compression/decompression timeout

View Source
var MaxContentSize = 1024 * 64

MaxContentSize represent max allowed expandable content size

Functions

func Copy

func Copy(context *endly.Context, transfers ...*Transfer) (interface{}, error)

Copy transfers data for provided transfer definition.

func GetStorageService

func GetStorageService(context *endly.Context, resource *url.Resource) (storage.Service, error)

GetStorageService return toolbox storage service

func IsShellCompressable

func IsShellCompressable(protScheme string) bool

IsShellCompressable returns true if resource can be compress via shell command.

func New

func New() endly.Service

New creates a new storage service

func NewExpandedContentHandler

func NewExpandedContentHandler(context *endly.Context, replaceMap map[string]string, expand bool) func(reader io.ReadCloser) (io.ReadCloser, error)

NewExpandedContentHandler return a new reader that can substitute content with state map, replacement data provided in replacement map.

func UseMemoryService

func UseMemoryService(context *endly.Context) storage.Service

UseMemoryService sets flag on context to always use memory service (testing only)

Types

type AssetTransfer

type AssetTransfer map[string]interface{}

AssetTransfer represents asset transfer

func (*AssetTransfer) AsTransfer

func (t *AssetTransfer) AsTransfer(base *Transfer) []*Transfer

AsTransfer converts map to transfer or transfers

type CopyRequest

type CopyRequest struct {
	*Transfer `description:"if asset uses relative path it will be joined with this URL"`
	Assets    AssetTransfer `` // transfers
	/* 130-byte string literal not displayed */
	Transfers []*Transfer `description:"actual transfer assets, if empty it derives from assets or source/desc "`
	Udf       string      `description:"custom user defined function to returns a CopyHandler type func which performs the copy"`
}

CopyRequest represents a resources copy request

func NewCopyRequest

func NewCopyRequest(assets AssetTransfer, transfers ...*Transfer) *CopyRequest

CopyRequest creates a new copy request

func NewCopyRequestFromuRL

func NewCopyRequestFromuRL(URL string) (*CopyRequest, error)

NewCopyRequestFromuRL creates a new request from URL (JSON or YAML format are supported)

func (*CopyRequest) Init

func (r *CopyRequest) Init() error

Init initialises request

func (*CopyRequest) Messages

func (r *CopyRequest) Messages() []*msg.Message

Items returns event messages

func (*CopyRequest) Validate

func (r *CopyRequest) Validate() error

Validate checks if request is valid

type CopyResponse

type CopyResponse struct {
	TransferredURL []string //transferred URLs
}

CopyResponse represents a resources copy response

type DownloadRequest

type DownloadRequest struct {
	Source  *url.Resource `required:"true" description:"source asset or directory"`
	DestKey string        `required:"true" description:"state map key destination"`
	Udf     string        `description:"name of udf to transform payload before placing into state map"` //name of udf function that will be used to transform payload
	Expect  interface{}   `description:"if specified expected file content used for validation"`
}

DownloadRequest represents a resources download request, it downloads source into context.state target key

func (*DownloadRequest) Messages

func (r *DownloadRequest) Messages() []*msg.Message

Items returns event messages

func (*DownloadRequest) Validate

func (r *DownloadRequest) Validate() error

Validate checks if request is valid

type DownloadResponse

type DownloadResponse struct {
	Info        toolbox.FileInfo
	Payload     string //source content, if binary then is will be prefixed base64: followed by based 64 encoded content.
	Transformed interface{}
	Assert      *validator.AssertResponse
}

DownloadResponse represents a download response

type RemoveRequest

type RemoveRequest struct {
	Assets []*url.Resource `required:"true" description:"resources to remove"`
}

RemoveRequest represents a resources Remove request

func NewRemoveRequest

func NewRemoveRequest(assets ...*url.Resource) *RemoveRequest

NewRemoveRequest creates a new remove request

func (*RemoveRequest) Messages

func (r *RemoveRequest) Messages() []*msg.Message

Items returns tag messages

func (*RemoveRequest) Validate

func (r *RemoveRequest) Validate() error

Validate checks if request is valid

type RemoveResponse

type RemoveResponse struct {
	Removed []string
}

RemoveResponse represents a resources Remove response, it returns url of all resource that have been removed.

type Transfer

type Transfer struct {
	Expand   bool `description:"flag to substitute asset content with state keys"`
	Compress bool `` //flag to compress asset before sending over wirte and to decompress (this option is only supported on scp or file proto)
	/* 133-byte string literal not displayed */
	Replace map[string]string `description:"replacements map, if key if found in the conent it wil be replaced with corresponding value."`
	Source  *url.Resource     `required:"true" description:"source asset or directory"`
	Dest    *url.Resource     `required:"true" description:"destination asset or directory"`
}

Transfer represents copy instruction

func NewTransfer

func NewTransfer(source, dest *url.Resource, compress, expand bool, replace map[string]string) *Transfer

NewTransfer creates a new transfer

func (*Transfer) Init

func (t *Transfer) Init() error

Init initialises transfr

func (*Transfer) Validate

func (t *Transfer) Validate() error

Validate checks if request is valid

type UploadRequest

type UploadRequest struct {
	SourceKey string        `required:"true" description:"state key with asset content"`
	Dest      *url.Resource `required:"true" description:"destination asset or directory"` //target URL with credentials
}

UploadRequest represents a resources upload request, it takes context state key to upload to target destination.

func (*UploadRequest) Messages

func (r *UploadRequest) Messages() []*msg.Message

Items returns event messages

func (*UploadRequest) Validate

func (r *UploadRequest) Validate() error

Validate checks if request is valid

type UploadResponse

type UploadResponse struct {
	UploadSize int
	UploadURL  string
}

UploadResponse represents a upload response

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL