nodes

package
v4.0.0-alpha1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2022 License: AGPL-3.0 Imports: 39 Imported by: 1

Documentation

Overview

Package nodes provides high-level clients for talking to the main data tree in certain context.

It follows the "wrapper" pattern of http handlers to filter all requests inputs and outputs. The "Router" is object is used by all services or gateways when accessing to data as a given user. Between others, it will - Load ACLs and perform checks to make sure user is allowed to read/write the data - Perform other meta-related or acl-related checks like Quota management, locks, etc. - Perform encryption/decryption of actual data on the fly - Compress / Decompress archives, - Add metadata collected from any services on the nodes outputted by the responses, - etc...

Index

Constants

View Source
const (
	ViewsLibraryName     = "pydio.lib.views"
	MetaAclCheckDownload = "acl-check-download"
	MetaAclCheckSyncable = "acl-check-syncable"
)

Variables

View Source
var (
	// IsUnitTestEnv flag prevents among others the ClientPool to look for declared
	// datasources in the registry. As none is present while running unit tests, it
	// otherwise times out.
	IsUnitTestEnv = false
)

Functions

func AncestorsListFromContext

func AncestorsListFromContext(ctx context.Context, node *tree.Node, identifier string, p SourcesPool, orParents bool) (updatedContext context.Context, parentsList []*tree.Node, e error)

func BuildAncestorsList

func BuildAncestorsList(ctx context.Context, treeClient tree.NodeProviderClient, node *tree.Node) (parentUuids []*tree.Node, err error)

BuildAncestorsList uses ListNodes with "Ancestors" flag to build the list of parent nodes. It uses an internal short-lived cache to throttle calls to the TreeService

func BuildAncestorsListOrParent

func BuildAncestorsListOrParent(ctx context.Context, treeClient tree.NodeProviderClient, node *tree.Node) (parentUuids []*tree.Node, err error)

BuildAncestorsListOrParent builds ancestors list when the node does not exist yet, by trying to find all existing parents.

func CopyMoveNodes

func CopyMoveNodes(ctx context.Context, router Handler, sourceNode *tree.Node, targetNode *tree.Node, move bool, isTask bool, statusChan chan string, progressChan chan float32, tFunc ...i18n.TranslateFunc) (oErr error)

CopyMoveNodes performs a recursive copy or move operation of a node to a new location. It can be inter- or intra-datasources. It will eventually pass contextual metadata like X-Pydio-Session (to batch event inside the SYNC) or X-Pydio-Move (to reconcile creates and deletes when move is done between two differing datasources).

func ErrBranchInfoMissing

func ErrBranchInfoMissing(identifier string) error

ErrBranchInfoMissing returns a 500 error

func ErrBranchInfoRootMissing

func ErrBranchInfoRootMissing(identifier string) error

ErrBranchInfoRootMissing returns a 500 error

func ErrCannotFindACL

func ErrCannotFindACL() error

ErrCannotFindACL returns a 500 error

func ErrCannotReadStore

func ErrCannotReadStore(store string) error

ErrCannotReadStore returns a 403 error

func ErrCannotWriteStore

func ErrCannotWriteStore(store string) error

ErrCannotWriteStore returns a 403 error

func ErrFileNotFound

func ErrFileNotFound(format string, a ...interface{}) error

ErrFileNotFound returns a 404 error

func GetGenericStoreClientConfig

func GetGenericStoreClientConfig(storeNamespace string) (dataSource string, bucket string, e error)

GetGenericStoreClientConfig finds datasource/bucket for a given store.

func GetSessionID

func GetSessionID(ctx context.Context) (string, bool)

GetSessionID returns the session ID in context

func HandlerListNodesWithCallback

func HandlerListNodesWithCallback(v Handler, ctx context.Context, request *tree.ListNodesRequest, callback WalkFunc, ignoreCbError bool, filters ...WalkFilterFunc) error

HandlerListNodesWithCallback is a generic implementation of ListNodesWithCallback for any Handler. Used by Client, Handler and HandlerMock

func Is403

func Is403(e error) bool

Is403 checks if error is not nil and has code 403

func IsFlatStorage

func IsFlatStorage(ctx context.Context, identifier string) bool

func RegisterStorageClient

func RegisterStorageClient(name string, provider StorageClientProvider)

func UseMockStorageClientType

func UseMockStorageClientType()

func WalkFilterSkipPydioHiddenFile

func WalkFilterSkipPydioHiddenFile(_ context.Context, node *tree.Node) bool

WalkFilterSkipPydioHiddenFile is a preset filter ignoring PydioSyncHiddenFile entries

func WithBranchInfo

func WithBranchInfo(ctx context.Context, identifier string, branchInfo BranchInfo, reset ...bool) context.Context

func WithSessionID

func WithSessionID(ctx context.Context, session string) context.Context

WithSessionID returns a context which knows its service assigned color

func WrapReaderForMime

func WrapReaderForMime(ctx context.Context, clone *tree.Node, reader io.Reader) io.Reader

Types

type Adapter

type Adapter interface {
	Adapt(h Handler, options RouterOptions) Handler
}

type BranchInfo

type BranchInfo struct {
	LoadedSource
	*idm.Workspace
	Root              *tree.Node
	Binary            bool
	TransparentBinary bool
	AncestorsList     map[string][]*tree.Node
}

BranchInfo contains information about a given identifier

func GetBranchInfo

func GetBranchInfo(ctx context.Context, identifier string) (BranchInfo, bool)

func (BranchInfo) IsInternal

func (b BranchInfo) IsInternal() bool

IsInternal check if either datasource is internal or branch has Binary flag

type CallbackFunc

type CallbackFunc func(inputFilter FilterFunc, outputFilter FilterFunc) error

type ChangesWrappingStreamer

type ChangesWrappingStreamer struct {
	// contains filtered or unexported fields
}

func NewChangesWrappingStreamer

func NewChangesWrappingStreamer() *ChangesWrappingStreamer

func (ChangesWrappingStreamer) CloseSend

func (l ChangesWrappingStreamer) CloseSend() error

func (ChangesWrappingStreamer) Context

func (l ChangesWrappingStreamer) Context() context.Context

TODO v4

func (ChangesWrappingStreamer) Header

func (l ChangesWrappingStreamer) Header() (metadata.MD, error)

TODO v4

func (*ChangesWrappingStreamer) Recv

func (ChangesWrappingStreamer) RecvMsg

func (l ChangesWrappingStreamer) RecvMsg(m interface{}) error

func (ChangesWrappingStreamer) Send

func (l ChangesWrappingStreamer) Send(in interface{}) error

func (ChangesWrappingStreamer) SendError

func (l ChangesWrappingStreamer) SendError(err error) error

func (ChangesWrappingStreamer) SendMsg

func (l ChangesWrappingStreamer) SendMsg(msg interface{}) error

func (ChangesWrappingStreamer) Trailer

func (l ChangesWrappingStreamer) Trailer() metadata.MD

TODO v4

type Client

type Client interface {
	Handler
	WrapCallback(provider CallbackFunc) error
	BranchInfoForNode(ctx context.Context, node *tree.Node) (branch BranchInfo, err error)
	CanApply(ctx context.Context, operation *tree.NodeChangeEvent) (*tree.NodeChangeEvent, error)
	GetClientsPool() SourcesPool
}

Client the main accessor to access nodes while going through a preset stack of Handler. Actual implementations are composed of a Core Handler (executor), a Pool and ordered middlewares. It implements all methods of a Handler

type ClientsPool

type ClientsPool struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ClientsPool is responsible for discovering available datasources and keeping an up to date registry that is used by the routers.

func NewPool added in v4.0.1

func NewPool(ctx context.Context, reg registry.Registry) *ClientsPool

func NewTestPool added in v4.0.1

func NewTestPool(ctx context.Context) *ClientsPool

NewTestPool creates a client Pool and initialises it by calling the registry.

func (*ClientsPool) Close

func (p *ClientsPool) Close()

Close stops the underlying watcher if defined.

func (*ClientsPool) CreateClientsForDataSource

func (p *ClientsPool) CreateClientsForDataSource(dataSourceName string, dataSource *object.DataSource) error

func (*ClientsPool) GetDataSourceInfo

func (p *ClientsPool) GetDataSourceInfo(dsName string, retries ...int) (LoadedSource, error)

GetDataSourceInfo tries to find information about a DataSource, eventually retrying as DataSource could be currently starting and not yet registered in the ClientsPool.

func (*ClientsPool) GetDataSources

func (p *ClientsPool) GetDataSources() (out map[string]LoadedSource)

GetDataSources returns currently loaded datasources

func (*ClientsPool) GetTreeClient

func (p *ClientsPool) GetTreeClient() tree.NodeProviderClient

GetTreeClient returns the internal NodeProviderClient pointing to the TreeService.

func (*ClientsPool) GetTreeClientWrite

func (p *ClientsPool) GetTreeClientWrite() tree.NodeReceiverClient

GetTreeClientWrite returns the internal NodeReceiverClient pointing to the TreeService.

func (*ClientsPool) LoadDataSources

func (p *ClientsPool) LoadDataSources()

LoadDataSources queries the registry to reload available datasources

type FilterFunc

type FilterFunc func(ctx context.Context, inputNode *tree.Node, identifier string) (context.Context, *tree.Node, error)

type Handler

type Handler interface {
	tree.NodeProviderClient
	tree.NodeReceiverClient
	tree.NodeChangesStreamerClient
	GetObject(ctx context.Context, node *tree.Node, requestData *models.GetRequestData) (io.ReadCloser, error)
	PutObject(ctx context.Context, node *tree.Node, reader io.Reader, requestData *models.PutRequestData) (int64, error)
	CopyObject(ctx context.Context, from *tree.Node, to *tree.Node, requestData *models.CopyRequestData) (int64, error)

	MultipartCreate(ctx context.Context, target *tree.Node, requestData *models.MultipartRequestData) (string, error)
	MultipartPutObjectPart(ctx context.Context, target *tree.Node, uploadID string, partNumberMarker int, reader io.Reader, requestData *models.PutRequestData) (models.MultipartObjectPart, error)
	MultipartAbort(ctx context.Context, target *tree.Node, uploadID string, requestData *models.MultipartRequestData) error
	MultipartComplete(ctx context.Context, target *tree.Node, uploadID string, uploadedParts []models.MultipartObjectPart) (models.ObjectInfo, error)

	MultipartList(ctx context.Context, prefix string, requestData *models.MultipartRequestData) (models.ListMultipartUploadsResult, error)
	MultipartListObjectParts(ctx context.Context, target *tree.Node, uploadID string, partNumberMarker int, maxParts int) (models.ListObjectPartsResult, error)

	ExecuteWrapped(inputFilter FilterFunc, outputFilter FilterFunc, provider CallbackFunc) error
	WrappedCanApply(srcCtx context.Context, targetCtx context.Context, operation *tree.NodeChangeEvent) error
	ListNodesWithCallback(ctx context.Context, request *tree.ListNodesRequest, callback WalkFunc, ignoreCbError bool, filters ...WalkFilterFunc) error

	SetClientsPool(p SourcesPool)
}

Handler is a composed interface providing abilities to CRUD nodes to datasources

type HandlerMock

type HandlerMock struct {
	RootDir string
	Nodes   map[string]*tree.Node
	Context context.Context
}

func NewHandlerMock

func NewHandlerMock() *HandlerMock

func (*HandlerMock) CopyObject

func (h *HandlerMock) CopyObject(ctx context.Context, from *tree.Node, to *tree.Node, requestData *models.CopyRequestData) (int64, error)

func (*HandlerMock) CreateNode

func (*HandlerMock) DeleteNode

func (*HandlerMock) ExecuteWrapped

func (h *HandlerMock) ExecuteWrapped(inputFilter FilterFunc, outputFilter FilterFunc, provider CallbackFunc) error

func (*HandlerMock) GetObject

func (h *HandlerMock) GetObject(ctx context.Context, node *tree.Node, requestData *models.GetRequestData) (io.ReadCloser, error)

func (*HandlerMock) ListNodes

func (*HandlerMock) ListNodesWithCallback

func (h *HandlerMock) ListNodesWithCallback(ctx context.Context, request *tree.ListNodesRequest, callback WalkFunc, ignoreCbError bool, filters ...WalkFilterFunc) error

func (*HandlerMock) MultipartAbort

func (h *HandlerMock) MultipartAbort(ctx context.Context, target *tree.Node, uploadID string, requestData *models.MultipartRequestData) error

func (*HandlerMock) MultipartComplete

func (h *HandlerMock) MultipartComplete(ctx context.Context, target *tree.Node, uploadID string, uploadedParts []models.MultipartObjectPart) (models.ObjectInfo, error)

func (*HandlerMock) MultipartCreate

func (h *HandlerMock) MultipartCreate(ctx context.Context, target *tree.Node, requestData *models.MultipartRequestData) (string, error)

func (*HandlerMock) MultipartList

func (h *HandlerMock) MultipartList(ctx context.Context, prefix string, requestData *models.MultipartRequestData) (models.ListMultipartUploadsResult, error)

func (*HandlerMock) MultipartListObjectParts

func (h *HandlerMock) MultipartListObjectParts(ctx context.Context, target *tree.Node, uploadID string, partNumberMarker int, maxParts int) (models.ListObjectPartsResult, error)

func (*HandlerMock) MultipartPutObjectPart

func (h *HandlerMock) MultipartPutObjectPart(ctx context.Context, target *tree.Node, uploadID string, partNumberMarker int, reader io.Reader, requestData *models.PutRequestData) (op models.MultipartObjectPart, e error)

func (*HandlerMock) PutObject

func (h *HandlerMock) PutObject(ctx context.Context, node *tree.Node, reader io.Reader, requestData *models.PutRequestData) (int64, error)

func (*HandlerMock) ReadNode

func (*HandlerMock) SetClientsPool

func (h *HandlerMock) SetClientsPool(p SourcesPool)

func (*HandlerMock) UpdateNode

func (*HandlerMock) WrappedCanApply

func (h *HandlerMock) WrappedCanApply(srcCtx context.Context, targetCtx context.Context, operation *tree.NodeChangeEvent) error

type LoadedSource

type LoadedSource struct {
	*object.DataSource
	Client StorageClient
}

func WithBucketName

func WithBucketName(s LoadedSource, bucket string) LoadedSource

WithBucketName creates a copy of a LoadedSource with a bucket name

func (LoadedSource) MarshalLogObject

func (s LoadedSource) MarshalLogObject(encoder zapcore.ObjectEncoder) error

type MimeResult

type MimeResult struct {
	// contains filtered or unexported fields
}

func (*MimeResult) GetError

func (m *MimeResult) GetError() error

func (*MimeResult) GetMime

func (m *MimeResult) GetMime() string

type MockReadCloser

type MockReadCloser struct {
	io.Reader
}

func (MockReadCloser) Close

func (r MockReadCloser) Close() error

type NodeWrappingStreamer

type NodeWrappingStreamer struct {
	// contains filtered or unexported fields
}

NodeWrappingStreamer wraps an existing Node Streamer.

func NewWrappingStreamer

func NewWrappingStreamer() *NodeWrappingStreamer

func (NodeWrappingStreamer) CloseSend

func (l NodeWrappingStreamer) CloseSend() error

func (NodeWrappingStreamer) Context

func (l NodeWrappingStreamer) Context() context.Context

TODO v4

func (NodeWrappingStreamer) Header

func (l NodeWrappingStreamer) Header() (metadata.MD, error)

TODO v4

func (*NodeWrappingStreamer) Recv

func (NodeWrappingStreamer) RecvMsg

func (l NodeWrappingStreamer) RecvMsg(m interface{}) error

func (NodeWrappingStreamer) Send

func (l NodeWrappingStreamer) Send(in interface{}) error

func (NodeWrappingStreamer) SendError

func (l NodeWrappingStreamer) SendError(err error) error

func (NodeWrappingStreamer) SendMsg

func (l NodeWrappingStreamer) SendMsg(msg interface{}) error

func (NodeWrappingStreamer) Trailer

func (l NodeWrappingStreamer) Trailer() metadata.MD

TODO v4

type Option

type Option func(options *RouterOptions)

func AsAdmin

func AsAdmin() Option

func WithAuditEventsLogging

func WithAuditEventsLogging() Option

func WithContext

func WithContext(ctx context.Context) Option

func WithCore

func WithCore(init func(pool SourcesPool) Handler) Option

func WithReadEventsLogging

func WithReadEventsLogging() Option

func WithSynchronousCaching

func WithSynchronousCaching() Option

func WithSynchronousTasks

func WithSynchronousTasks() Option

func WithVirtualNodesBrowsing

func WithVirtualNodesBrowsing() Option

func WithWrapper

func WithWrapper(adapter Adapter) Option

type RouterOptions

type RouterOptions struct {
	Context context.Context

	CoreClient func(pool SourcesPool) Handler

	AdminView     bool
	WatchRegistry bool
	Registry      registry.Registry

	LogReadEvents      bool
	BrowseVirtualNodes bool
	// AuditEvent flag turns audit logger ON for the corresponding router.
	AuditEvent       bool
	SynchronousCache bool
	SynchronousTasks bool

	Wrappers []Adapter
	Pool     SourcesPool
}

RouterOptions holds configuration flags to pass to a router constructor easily.

type SourcesPool

type SourcesPool interface {
	Close()
	GetTreeClient() tree.NodeProviderClient
	GetTreeClientWrite() tree.NodeReceiverClient
	GetDataSourceInfo(dsName string, retries ...int) (LoadedSource, error)
	GetDataSources() map[string]LoadedSource
	LoadDataSources()
}

type StorageClient

type StorageClient interface {
	ListBuckets(ctx context.Context) ([]models.BucketInfo, error)
	MakeBucket(ctx context.Context, bucketName string, location string) (err error)
	RemoveBucket(ctx context.Context, bucketName string) error

	ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result models.ListBucketResult, err error)
	GetObject(ctx context.Context, bucketName, objectName string, opts models.ReadMeta) (io.ReadCloser, models.ObjectInfo, error)
	StatObject(ctx context.Context, bucketName, objectName string, opts models.ReadMeta) (models.ObjectInfo, error)
	PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64, opts models.PutMeta) (n int64, err error)
	RemoveObject(ctx context.Context, bucketName, objectName string) error
	CopyObject(ctx context.Context, sourceBucket, sourceObject, destBucket, destObject string, srcMeta, metadata map[string]string, progress io.Reader) (models.ObjectInfo, error)
	CopyObjectMultipart(ctx context.Context, srcObject models.ObjectInfo, srcBucket, srcPath, destBucket, destPath string, meta map[string]string, progress io.Reader) error
	CopyObjectMultipartThreshold() int64

	NewMultipartUpload(ctx context.Context, bucket, object string, opts models.PutMeta) (uploadID string, err error)
	ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result models.ListMultipartUploadsResult, err error)
	ListObjectParts(ctx context.Context, bucketName, objectName, uploadID string, partNumberMarker, maxParts int) (models.ListObjectPartsResult, error)
	CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, parts []models.MultipartObjectPart) (string, error)
	PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data io.Reader, size int64, md5Base64, sha256Hex string) (models.MultipartObjectPart, error)
	AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error
}

func GetGenericStoreClient

func GetGenericStoreClient(ctx context.Context, storeNamespace string) (client StorageClient, bucket string, e error)

GetGenericStoreClient creates a *minio.Core client for a given binary store.

func NewStorageClient

func NewStorageClient(cfg configx.Values) (StorageClient, error)

type StorageClientProvider

type StorageClientProvider func(cfg configx.Values) (StorageClient, error)

type TeeMimeReader

type TeeMimeReader struct {
	// contains filtered or unexported fields
}

func NewTeeMimeReader

func NewTeeMimeReader(reader io.Reader, callbackRoutine func(result *MimeResult)) *TeeMimeReader

func (*TeeMimeReader) Read

func (m *TeeMimeReader) Read(p []byte) (n int, err error)

func (*TeeMimeReader) SetLimit

func (m *TeeMimeReader) SetLimit(size int)

func (*TeeMimeReader) Wait

func (m *TeeMimeReader) Wait() chan *MimeResult

type WalkFilterFunc

type WalkFilterFunc func(ctx context.Context, node *tree.Node) bool

type WalkFunc

type WalkFunc func(ctx context.Context, node *tree.Node, err error) error

Directories

Path Synopsis
Package meta provides tool for reading metadata from services declaring "MetaProvider" support
Package meta provides tool for reading metadata from services declaring "MetaProvider" support
Package mocks should provide utils used by tests to mock various layers.
Package mocks should provide utils used by tests to mock various layers.
objects
mc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL