storage

package
v0.77.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: Apache-2.0 Imports: 35 Imported by: 0

README

#Storage Service - storage automation and testing

This service uses Abstract File Storage.

Introduction

This service provides local, remote and cloud storage utilities, for simple build automation, test preparation and verification.

To check all storage service methods run

endly -s=storage

To check individual method contract run:

endly -s=storage:method

For example to check all copy method contract options run:

endly -s=storage:copy

Usage

You can integrate storage service with unit, integration and end to end tests. For example to copy assets from local file system to Google Storage using automation workflow you can use the following:

endly cp

@cp.ymal

pipeline:
  copy:
    action: storage:copy
    source:
      URL: /tmp/folder
    dest:
      URL: s3://mybucket/data
      credentials: aws-e2e
    

to use API you can use the following snippet:


import (
	"github.com/viant/endly"
	"github.com/viant/endly/service/system/storage"
	"github.com/viant/endly/service/system/storage/copy"
	"github.com/viant/endly/model/location"

	"log"
)

func main() {
	request := storage.NewCopyRequest(nil, copy.New(location.NewResource("/tmp/folder"), location.NewResource("s3://mybucket/data", "aws-e2e"), false, true, nil))
	response := &storage.CopyResponse{}
	err := endly.Run(nil, request, response)
	if err != nil {
		log.Fatal(err)
	}
}

Data copy

To copy data from source to destination you can use the following workflow You can optionally specify prefix, suffix or filter expression that will match assets in a source location.

@copy.yaml

init:
  bucket: e2etst

pipeline:
  copy:
    action: storage:copy
    suffix: .txt
    source:
      URL: data
    dest:
      credentials: gcp-e2e
      URL: gs://$bucket/copy/data
  list:
    action: storage:list
    source:
      credentials: gcp-e2e
      URL: gs://$bucket/copy/data

Multi asset copy

To copy only a few asset from source location to destination you can use the following workflow:

@multi_cp.yaml

pipeline:
  copy:
    action: storage:copy
    source:
      URL: data/
    dest:
      URL: /tmp/data/multi/
    assets:
      'lorem1.txt': 'lorem1.txt'
      'lorem2.txt': renamedLorem2.txt

Expanding transferred data

When data is transferred between source and destination you can set expand flag to dynamically evaluate and workflow state variable, or you can provide a replacement map.

For example to substitute $expandMe expression and Lorem fragment when copying data from @data/lorem2.txt to destination you can use the the following workflow.

@expanded_cp.yaml

init:
  bucket: e2etst
  expandMe: '#dynamicly expanded#'

pipeline:
  copy:
    action: storage:copy
    expand: true
    replace:
      Lorem: blah
    source:
      URL: data
    dest:
      credentials: gcp-e2e
      URL: gs://$bucket/copy/modified

  list:
    action: storage:list
    content: true
    source:
      credentials: gcp-e2e
      URL: gs://$bucket/copy/modified

expand attribute instruct runner to expand any state variable matching '$'expression replace defines key value pairs for basic text replacements.

Expanding conditionally transferred data

When transferring and expanding data, you can also provide matcher expression to expand only specific asset.

For example to apply substitution only to file with suffix lorem2.txt you can use expandif node with suffix attribute.

@expandedif_cp.yaml

init:
  bucket: e2etst
  expandMe: '#dynamicly expanded#'

pipeline:
  copy:
    action: storage:copy
    expandIf:
      suffix: lorem2.txt
    expand: true
    replace:
      Lorem: blah
    source:
      URL: data
    dest:
      credentials: gcp-e2e
      URL: gs://$bucket/copy/filter_modified

  list:
    action: storage:list
    content: true
    source:
      credentials: gcp-e2e
      URL: gs://$bucket/copy/filter_modified

Compressing transferred data

When dealing with large files amount you can compress them on the source location, transfer archive and uncompress on the destination location with compress flag set.

@compressed_cp.yaml

pipeline:
  copy:
    action: storage:copy
    compress: true
    source:
      URL: data/
    dest:
      URL: /tmp/compressed/data

Currently this option is only supported with local or scp transfer type.

Archive transfer

When transferring data, destination can be any supported by Abstract File Storage URL.

For example to copy local folder to zip archive on Google Storage you can run the following workflow.

@archive.yaml

init:
  bucket: e2etst

pipeline:
  copy:
    action: storage:copy
    source:
      URL: data
    dest:
      credentials: gcp-e2e
      URL: gs:$bucket/copy/archive/data.zip/zip:///data
  listStorage:
    action: storage:list
    source:
      credentials: gcp-e2e
      URL: gs://$bucket/copy/archive

  listArchive:
    action: storage:list
    source:
      credentials: gcp-e2e
      URL: gs:$bucket/copy/archive/data.zip/zip:///

When gs://$bucket/copy/archive/data.zip archive does not exists it will be created on the fly, if already does the source assets will be appended or replaced to existing archive.

Archive substitution transfer

To dynamically append/replace asset with dynamic data substitution you can use the following workflow.

@archive_update.yaml

init:
  changeMe: this is my secret

pipeline:
  copy:
    action: storage:copy
    source:
      URL: app/app.war
    dest:
      URL: /tmp/app.war

  updateArchive:
    action: storage:copy
    expand: true
    source:
      URL: app/config.properties
    dest:
      URL: file:/tmp/app.war/zip://localhost/WEB-INF/classes/

  checkUpdate:
    action: storage:download
    source:
      URL: file:/tmp/app.war/zip://localhost/WEB-INF/classes/config.properties
    destKey: config

  info:
    action: print
    message: $checkUpdate.Payload

Assets udf transformation.

When transferring data you can apply transformation to each transferred asset using pre defined UDF: For example to apply Gziper udf to copied file you can you the following:

@udf.yaml

init:
pipeline:
  upload:
    action: storage:copy
    source:
      URL: data/lorem1.txt
    udf: GZipper
    dest:
      URL: /tmp/lorem.txt.gz

Listing location content

To list location content you can use storage service list method

To list recursively gs://$bucket/somepath content you can use the following:

@list.yaml

init:
  bucket: myBucket
pipeline:

  list:
    action: storage:list
    recursive: true
    content: false
    source:
      credentials: gcp-e2e
      URL: gs://$bucket/somepath

when content attribute is set, list operation downloads asset content.

Applying browsing basic criteria

When listing content you can specify a Basic matcher criteria.

@filter.yaml

init:
  bucket: e2etst
pipeline:

  list:
    action: storage:list
    recursive: true
    match:
      suffix: .txt
    source:
      credentials: gcp-e2e
      URL: gs://$bucket/

Applying browsing time criteria

In some situation exact file name may be dynamically generated with UUID generator, so in that case you can use updatedAfter or updatedBefore TimeAt expression to matched desired asset.

For example the following workflow create assets in Google Storage and then lists it by time expression.

@time_filter.yaml



```yaml
init:
  i: 0
  bucket: e2etst
  baseURL: gs://$bucket/timefilter
  data: test
pipeline:

  batchUpload:
    upload:
      init:
        _: $i++
      action: storage:upload
      sleepTimeMs: 1200
      sourceKey: data
      dest:
        credentials: gcp-e2e
        URL: ${baseURL}/subdir/file_${i}.txt
    goto:
      when: $i < 3
      action: goto
      task: batchUpload

  list:
    action: storage:list
    recursive: true
    logging: false
    content: true
    match:
      suffix: .txt
      updatedAfter: 2secAgo

    source:
      credentials: gcp-e2e
      URL: $baseURL

    message: $AsString($list.Assets)

Data upload

Data upload enables to upload workflow state directly to desired storage location.

Customer key data encryption

@custom_key.yaml

init:
  data: $Cat('lorem.txt')
  bucket: e2etst
  customerKey:
    key: this is secret :3rd party phrase

pipeline:
  upload:
    action: storage:upload
    sourceKey: data
    dest:
      URL: gs://$bucket/secured/lorem.txt
      credentials: gcp-e2e
      customKey: $customerKey
  list:
    action: storage:list
    source:
      URL: gs://$bucket/secured/
      credentials: gcp-e2e
  download:
    action: storage:download
    source:
      URL: gs://$bucket/secured/lorem.txt
      credentials: gcp-e2e
      customKey: $customerKey
  info:
    action: print
    message: 'Downloaded: $AsString(${download.Payload})'

Dynamic config/state upload

@dynamic.yaml

init:
  settings: $Cat('settings.json')
  settingsMap: $AsMap('$settings')
  config:
    key1: val1
    key2: val2
    featureX: ${settingsMap.featureX}


pipeline:
  info:
    action: print
    message: $AsString('$config')

  dynamic:
    init:
      cfg: $AsJSON('$config')
    action: storage:upload
    sourceKey: cfg
    dest:
      URL: /tmp/app.json

Data validation

The following service operations provide validation integration by 'expect' attriubte

  • storage:list
  • storage:exists
  • storage:download

When defining expect attribute you can use rule based assertly validation expressioa

For example to dynamically uncompress data/events.json.gz to perform structured data validation you can use the following workflow:

@download.yaml

init:
  expect: $Cat('data/expect.json')

pipeline:
  check:
    action: storage:download
    udf: UnzipText
    source:
      URL: data/events.json.gz
    expect: $expect

To validate if files exists you can you the following workflow:

@exists.yaml

pipeline:
  check:
    action: storage:exists
    assets:
      - URL: data/f1.txt
        credentials: localhost
      - URL: data/f2.txt
      - URL: data/f3.txt
      - URL: gs://blach/resource/assset1.txt
        credentials: gcp-e2e

    expect:
      'data/f1.txt': true
      'data/f2.txt': false
      'data/f3.txt': true
      'gs://blach/resource/assset1.txt': false

Generating file

To create dynamically file for specified size you can use the following workflow

@generate.yaml

pipeline:

  createTestAsset1:
    action: storage:generate
    sizeInMb: 20
    lineTemplate: '$i,name $i,address $i'
    dest:
      URL: /tmp/myasset.csv

  createTestAsset2:
    action: storage:generate
    sizeInMb: 300
    dest:
      URL: /tmp/myasset.txt


To generate 100 line JSON file with the following template use the following:

@json.yaml

pipeline:
  generate:
    action: storage:generate
    indexVariable: id
    lines: 100
    index: 55
    lineTemplate: '{"id": ${id}, "name": "dummy ${id}", "type_id": ${id % 3} } '
    dest:
      URL: dummy.json

To generate 12 files in the background use the following:

@stress.yaml

init:
  'self.i': 1
  'self.cnt': 0

pipeline:
  trigger:
    generate:
      action: storage:generate
      fileCount: 4
      inBackground: true
      indexVariable: id
      lines: 1
      index: ${self.i}
      lineTemplate: '{"id": ${id}, "name": "dummy ${id}", "type_id": ${id %4}}'
      dest:
        URL: gs://mybucket/test/data${self.cnt++}_$fileNo.json
    inc:
      action: nop
      sleepTimeMs: 500
      logging: false
      init:
        'self.i': ${self.i + 4}

    goto:
      when: ${self.i} < 12
      action: goto
      task: trigger
  • TODO add UDF (i.e to compress)

Documentation

Overview

Package storage implements storage operation

Index

Examples

Constants

View Source
const (
	//ServiceID represents transfer service id
	ServiceID = "storage"
)

Variables

This section is empty.

Functions

func Copy

func Copy(context *endly.Context, transfers ...*copy.Rule) (interface{}, error)

Copy transfers data for provided transfer definition.

Example
package main

import (
	"github.com/viant/endly"
	"github.com/viant/endly/model/location"
	"github.com/viant/endly/service/system/storage"

	"log"
)

func main() {
	request := storage.NewCopyRequest(nil, copy.New(location.NewResource("/tmp/folde"), location.NewResource("s3://mybucket/data", "aws-e2e"), false, true, nil))
	response := &storage.CopyResponse{}
	err := endly.Run(nil, request, response)
	if err != nil {
		log.Fatal(err)
	}
}
Output:

func GetResourceWithOptions

func GetResourceWithOptions(context *endly.Context, resource *location.Resource, options ...storage.Option) (*location.Resource, []storage.Option, error)

GetResourceWithOptions returns resource with afs storage option

func IsCompressable

func IsCompressable(protScheme string) bool

IsCompressable returns true if resource can be compress via shell command.

func New

func New() endly.Service

New creates a new service

func StorageOptions

func StorageOptions(ctx *endly.Context, resource *location.Resource, options ...storage.Option) ([]storage.Option, error)

StorageOptions returns storage option for supplied resource

func StorageService

func StorageService(ctx *endly.Context, resources ...*location.Resource) (afs.Service, error)

StorageService return afs storage service

func UseMemoryService

func UseMemoryService(context *endly.Context) afs.Service

UseMemoryService sets flag on context to always use memory service (testing only)

Types

type CopyRequest

type CopyRequest struct {
	*copy.Rule `description:"if asset uses relative path it will be joined with this URL" json:",inline"`
	Assets     copy.Assets `` // transfers
	/* 130-byte string literal not displayed */
	Transfers []*copy.Rule `description:"actual transfer assets, if empty it derives from assets or source/desc "`
	Udf       string       `description:"custom user defined function to return github.com/viant/afs/option.Modifier type to modify copied content"`
}

CopyRequest represents a resources Copy request

func NewCopyRequest

func NewCopyRequest(assets copy.Assets, transfers ...*copy.Rule) *CopyRequest

CopyRequest creates a new Copy request

func NewCopyRequestFromURL

func NewCopyRequestFromURL(URL string) (*CopyRequest, error)

NewCopyRequestFromURL creates a new request from URL (JSON or YAML format are supported)

func (*CopyRequest) Init

func (r *CopyRequest) Init() error

Init initialises request

func (*CopyRequest) Messages

func (r *CopyRequest) Messages() []*msg.Message

Items returns event messages

func (*CopyRequest) Validate

func (r *CopyRequest) Validate() error

Validate checks if request is valid

type CopyResponse

type CopyResponse struct {
	URLs []string //transferred URLs
}

CopyResponse represents a resources Copy response

type CreateRequest

type CreateRequest struct {
	SourceKey string             `required:"true" description:"state key with asset content"`
	Region    string             `description:"cloud storage region"`
	Mode      int                `description:"os.FileMode"`
	IsDir     bool               `description:"is directory flag"`
	Dest      *location.Resource `required:"true" description:"destination asset or directory"` //target URL with credentials
}

CreateRequest represents a resources Upload request, it takes context state key to Upload to target destination.

func (*CreateRequest) Init

func (r *CreateRequest) Init() error

Init initialises Upload request

func (*CreateRequest) Validate

func (r *CreateRequest) Validate() error

Validate checks if request is valid

type CreateResponse

type CreateResponse struct {
	Size int
	URL  string
}

CreateResponse represents a Upload response

type DownloadRequest

type DownloadRequest struct {
	Source  *location.Resource `required:"true" description:"source asset or directory"`
	DestKey string             `required:"true" description:"state map key destination"`
	Udf     string             `description:"name of udf to transform payload before placing into state map"` //name of udf function that will be used to transform payload
	Expect  interface{}        `description:"if specified expected file content used for validation"`
}

DownloadRequest represents a resources Download request, it downloads source into context.state target key

func (*DownloadRequest) Messages

func (r *DownloadRequest) Messages() []*msg.Message

Items returns event messages

func (*DownloadRequest) Validate

func (r *DownloadRequest) Validate() error

Validate checks if request is valid

type DownloadResponse

type DownloadResponse struct {
	Info        toolbox.FileInfo
	Payload     string //source content, if binary then is will be prefixed base64: followed by based 64 encoded content.
	Transformed interface{}
	Assert      *validator.AssertResponse
}

DownloadResponse represents a Download response

type ExistsRequest

type ExistsRequest struct {
	Assets []*location.Resource `required:"true" description:"source asset or directory"`
	Expect map[string]bool      `description:"map of asset and exists flag"`
}

ExistsRequest represents exists request

func (*ExistsRequest) Validate

func (r *ExistsRequest) Validate() error

Validate checks if request is valid

type ExistsResponse

type ExistsResponse struct {
	Exists map[string]bool `description:"locations with exists flag, when assets uses only asset location otherwise source URL"`
	Assert *validator.AssertResponse
}

ExistsResponse represents exists response

func (*ExistsResponse) Messages

func (r *ExistsResponse) Messages() []*msg.Message

Items returns event messages

type GenerateRequest

type GenerateRequest struct {
	Template      string
	LineTemplate  string
	Lines         int
	Size          int
	SizeInMb      int
	Index         int
	IndexVariable string
	Mode          int                `description:"os.FileMode"`
	Dest          *location.Resource `required:"true" description:"destination asset or directory"` //target URL with credentials
	FileCount     int
	InBackground  bool
}

CreateRequest represents a resources Upload request, it takes context state key to Upload to target destination.

func (*GenerateRequest) Init

func (r *GenerateRequest) Init() error

Init initialises Upload request

func (*GenerateRequest) Messages

func (r *GenerateRequest) Messages() []*msg.Message

Items returns event messages

func (*GenerateRequest) Validate

func (r *GenerateRequest) Validate() error

Validate checks if request is valid

type GenerateResponse

type GenerateResponse struct {
	Size int
	URLs []string
}

CreateResponse represents a Upload response

func (*GenerateResponse) Messages

func (r *GenerateResponse) Messages() []*msg.Message

Items returns event messages

type ListRequest

type ListRequest struct {
	Source    *location.Resource `required:"true" description:"source asset or directory"`
	Match     *copy.Matcher
	Content   bool
	Recursive bool
	Expect    interface{}
}

DownloadRequest represents a resources Download request, it downloads source into context.state target key

func (*ListRequest) Validate

func (r *ListRequest) Validate() error

Validate checks if request is valid

type ListResponse

type ListResponse struct {
	URL    string
	Assets []*asset.Resource
	Assert *validator.AssertResponse
}

DownloadResponse represents a Download response

func (*ListResponse) Messages

func (r *ListResponse) Messages() []*msg.Message

Items returns event messages

type RemoveRequest

type RemoveRequest struct {
	Assets []*location.Resource `required:"true" description:"resources to Remove"`
}

RemoveRequest represents a resources Remove request

func NewRemoveRequest

func NewRemoveRequest(assets ...*location.Resource) *RemoveRequest

NewRemoveRequest creates a new Remove request

func (*RemoveRequest) Messages

func (r *RemoveRequest) Messages() []*msg.Message

Items returns tag messages

func (*RemoveRequest) Validate

func (r *RemoveRequest) Validate() error

Validate checks if request is valid

type RemoveResponse

type RemoveResponse struct {
	Removed []string
}

RemoveResponse represents a resources Remove response, it returns url of all resource that have been removed.

type UploadRequest

type UploadRequest struct {
	SourceKey string             `required:"true" description:"state key with asset content"`
	Region    string             `description:"cloud storage region"`
	Mode      int                `description:"os.FileMode"`
	Udf       string             `description:"name of udf to transform payload before placing into state map"` //name of udf function that will be used to transform payload
	Dest      *location.Resource `required:"true" description:"destination asset or directory"`                 //target URL with credentials
}

UploadRequest represents a resources Upload request, it takes context state key to Upload to target destination.

func (*UploadRequest) Init

func (r *UploadRequest) Init() error

Init initialises Upload request

func (*UploadRequest) Messages

func (r *UploadRequest) Messages() []*msg.Message

Items returns event messages

func (*UploadRequest) Validate

func (r *UploadRequest) Validate() error

Validate checks if request is valid

type UploadResponse

type UploadResponse struct {
	Size        int
	Transformed string
	URL         string
}

UploadResponse represents a Upload response

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL