pipeline

package
v0.7.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2024 License: MIT Imports: 38 Imported by: 3

Documentation

Index

Constants

View Source
const (
	RWTypeCompress RWType = "compress"

	CompressTypeNop    CompressType = "nop"
	CompressTypeGzip   CompressType = "gzip"
	CompressTypeSnappy CompressType = "snappy"
	CompressTypeZstd   CompressType = "zstd"

	CompressLevelFast   CompressLevel = "fast"
	CompressLevelBetter CompressLevel = "better"
	CompressLevelBest   CompressLevel = "best"
)
View Source
const (
	RWTypeStore RWType = "store"

	StoreTypeOss = "oss"
	StoreTypeFtp = "ftp"
	StoreTypeSSH = "ssh"
)
View Source
const PluginTypePipeline plugin.Type = "pipeline"
View Source
const PluginTypeRWGroup plugin.Type = "rwgroup"

Variables

View Source
var (
	ErrStoppedManually  = errors.New("RW stopped manually")
	ErrChecksumNotMatch = errors.New("checksum not match")

	CreateBaseRW = newBaseRW
)

Functions

func ApplyCommonCfgToRW

func ApplyCommonCfgToRW(rw RW, cfg *RWCommonCfg)

func FileExtFromCompressCfg

func FileExtFromCompressCfg(c *CompressRWCfg) string

func NewCmdRWCfg

func NewCmdRWCfg() *cmd.Cfg

func NewGzipReader

func NewGzipReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr

func NewSnappyWriter

func NewSnappyWriter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr

func NewZstdReader

func NewZstdReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr

Types

type Cfg

type Cfg struct {
	RWs []*RWCfg `json:"rws" validate:"min=1" yaml:"rws"`
}

func NewCfg

func NewCfg() *Cfg

func (*Cfg) Add added in v0.0.8

func (c *Cfg) Add(role RWRole, typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *Cfg

type CmdRW

type CmdRW struct {
	RW
	*cmd.Cfg
}

func NewCmdRW

func NewCmdRW() *CmdRW

func (*CmdRW) GetCfg

func (c *CmdRW) GetCfg() interface{}

func (*CmdRW) Init

func (c *CmdRW) Init() error

func (*CmdRW) Start

func (c *CmdRW) Start() error

func (*CmdRW) Stop

func (c *CmdRW) Stop() error

func (*CmdRW) Type

func (c *CmdRW) Type() interface{}

type CompressLevel

type CompressLevel string

type CompressRW

type CompressRW struct {
	RW
	*CompressRWCfg
}

func NewCompressRW

func NewCompressRW() *CompressRW

func (*CompressRW) GetCfg

func (c *CompressRW) GetCfg() interface{}

func (*CompressRW) Init

func (c *CompressRW) Init() error

func (*CompressRW) NestReader

func (c *CompressRW) NestReader(r io.ReadCloser) error

func (*CompressRW) NestWriter

func (c *CompressRW) NestWriter(w io.WriteCloser) error

func (*CompressRW) Nwrite

func (c *CompressRW) Nwrite() uint64

func (*CompressRW) Type

func (c *CompressRW) Type() interface{}

type CompressRWCfg

type CompressRWCfg struct {
	Type        CompressType  `json:"type"        validate:"required" yaml:"type"`
	Level       CompressLevel `json:"level"       validate:"required" yaml:"level"`
	Concurrency int           `json:"concurrency" yaml:"concurrency"`
}

func NewCompressRWCfg

func NewCompressRWCfg() *CompressRWCfg

type CompressType

type CompressType string

type CopyRW

type CopyRW struct {
	RW
	*CopyRWCfg
}

func NewCopyRW

func NewCopyRW() *CopyRW

func (*CopyRW) GetCfg

func (cp *CopyRW) GetCfg() interface{}

func (*CopyRW) Init

func (cp *CopyRW) Init() error

func (*CopyRW) Start

func (cp *CopyRW) Start() error

func (*CopyRW) Type

func (cp *CopyRW) Type() interface{}

type CopyRWCfg

type CopyRWCfg struct {
	BufSize int `json:"bufSize" yaml:"bufSize"`
}

func NewCopyRWCfg

func NewCopyRWCfg() *CopyRWCfg

type FileRW

type FileRW struct {
	RW
	*FileRWCfg
	// contains filtered or unexported fields
}

func NewFileRW

func NewFileRW() *FileRW

func (*FileRW) GetCfg

func (f *FileRW) GetCfg() interface{}

func (*FileRW) Init

func (f *FileRW) Init() error

func (*FileRW) Type

func (f *FileRW) Type() interface{}

type FileRWCfg

type FileRWCfg struct {
	Path string `json:"path" yaml:"path"`
	Perm uint32 `json:"perm" yaml:"perm"`
}

func NewFileRWCfg

func NewFileRWCfg() *FileRWCfg

type FtpCfg

type FtpCfg struct {
	Addr string `json:"addr" validate:"min=1" yaml:"addr"`
	User string `json:"user" validate:"min=1" yaml:"user"`
	Pwd  string `json:"pwd"  validate:"min=1" yaml:"pwd"`
}

type NopRW

type NopRW struct {
	RW
	*NopRWCfg
}

func NewNopRW

func NewNopRW() *NopRW

func (*NopRW) GetCfg

func (n *NopRW) GetCfg() interface{}

func (*NopRW) Type

func (n *NopRW) Type() interface{}

type NopRWCfg

type NopRWCfg struct{}

func NewNopRWCfg

func NewNopRWCfg() *NopRWCfg

type OssCfg

type OssCfg struct {
	Ak     string `json:"ak"     validate:"min=1" yaml:"ak"`
	Sk     string `json:"sk"     validate:"min=1" yaml:"sk"`
	Region string `json:"region" yaml:"region"`
	Append bool   `json:"append" yaml:"append"`
}

type Pipeline

type Pipeline struct {
	runner.Runner
	*Cfg
	// contains filtered or unexported fields
}

func New

func New() *Pipeline

func (*Pipeline) GetCfg

func (p *Pipeline) GetCfg() interface{}

func (*Pipeline) Init

func (p *Pipeline) Init() error

func (*Pipeline) RWGroups added in v0.7.3

func (p *Pipeline) RWGroups() []*RWGroup

func (*Pipeline) Result added in v0.7.3

func (p *Pipeline) Result() *Result

func (*Pipeline) Start

func (p *Pipeline) Start() error

func (*Pipeline) Stop

func (p *Pipeline) Stop() error

func (*Pipeline) Type

func (p *Pipeline) Type() interface{}

type RW

type RW interface {
	runner.Runner
	io.ReadWriteCloser
	plugin.Plugin

	NestReader(io.ReadCloser) error
	NestWriter(io.WriteCloser) error
	Reader() io.ReadCloser
	Writer() io.WriteCloser

	Flush() error

	RegisterReadHook(...ReadHook)
	RegisterWriteHook(...WriteHook)

	Nwrite() uint64
	Nread() uint64

	Hash() string

	EnableMonitorSpeed()
	EnableCalcHash(hashAlgo string)
	EnableChecksum(checksum string, hashAlgo string)
	EnableRateLimit(ratelimit.RxTxRateLimiter)
	EnableWriteBuf(bufSize int, deadline int, async bool, asyncChanBufSize int)
	EnableReadBuf(bufSize int, async bool, asyncChanBufSize int)

	IsAsyncOrDeadline() bool
	AsyncChanLen() int
	AsyncChanCap() int

	AsStarter()
	IsStarter() bool
	AsReader()
	IsReader() bool
	AsWriter()
	IsWriter() bool
	As(RWRole)
	Is(RWRole) bool
	Role() RWRole
}

func CreateRW added in v0.7.4

func CreateRW(rwCfg *RWCfg) (RW, error)

type RWCfg

type RWCfg struct {
	Type      RWType       `json:"type"      validate:"required" yaml:"type"`
	Cfg       interface{}  `json:"cfg"       validate:"required" yaml:"cfg"`
	CommonCfg *RWCommonCfg `json:"commonCfg" yaml:"commonCfg"`
	Role      RWRole       `json:"role"      validate:"required" yaml:"role"`
}

type RWCommonCfg

type RWCommonCfg struct {
	RateLimiterCfg     *ratelimit.RateLimiterCfg `json:"rateLimiterCfg"     yaml:"rateLimiterCfg"`
	BufSize            int                       `json:"bufSize"            yaml:"bufSize"`
	Deadline           int                       `json:"deadline"           yaml:"deadline"`
	AsyncChanBufSize   int                       `json:"asyncChanBufSize"   yaml:"asyncChanBufSize"`
	EnableMonitorSpeed bool                      `json:"enableMonitorSpeed" yaml:"enableMonitorSpeed"`
	EnableCalcHash     bool                      `json:"enableCalcHash"     yaml:"enableCalcHash"`
	EnableRateLimit    bool                      `json:"enableRateLimit"    yaml:"enableRateLimit"`
	EnableAsync        bool                      `json:"enableAsync"        yaml:"enableAsync"`
	Checksum           string                    `json:"checksum"           yaml:"checksum"`
	HashAlgo           string                    `json:"hashAlgo"           yaml:"hashAlgo"`
}

type RWGroup

type RWGroup struct {
	runner.Runner
	*RWGroupCfg
	// contains filtered or unexported fields
}

func NewRWGroup

func NewRWGroup() *RWGroup

func (*RWGroup) FirstReader

func (g *RWGroup) FirstReader() RW

func (*RWGroup) Init

func (g *RWGroup) Init() error

func (*RWGroup) LastWriter

func (g *RWGroup) LastWriter() RW

func (*RWGroup) Readers

func (g *RWGroup) Readers() []RW

func (*RWGroup) Size

func (g *RWGroup) Size() int

func (*RWGroup) Start

func (g *RWGroup) Start() error

func (*RWGroup) Starter

func (g *RWGroup) Starter() RW

func (*RWGroup) Stop

func (g *RWGroup) Stop() error

func (*RWGroup) Writers

func (g *RWGroup) Writers() []RW

type RWGroupCfg

type RWGroupCfg struct {
	Readers []*RWCfg `json:"readers" yaml:"readers"`
	Starter *RWCfg   `json:"starter" yaml:"starter"`
	Writers []*RWCfg `json:"writers" yaml:"writers"`
}

func NewRWGroupCfg

func NewRWGroupCfg() *RWGroupCfg

func (*RWGroupCfg) FromReader

func (c *RWGroupCfg) FromReader(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

func (*RWGroupCfg) SetStarter

func (c *RWGroupCfg) SetStarter(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

func (*RWGroupCfg) ToWriter

func (c *RWGroupCfg) ToWriter(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

type RWRole

type RWRole string
const (
	RWRoleStarter RWRole = "starter"
	RWRoleReader  RWRole = "reader"
	RWRoleWriter  RWRole = "writer"
)

type RWType

type RWType string
const RWTypeCmd RWType = "cmd"
const RWTypeCopy RWType = "copy"
const RWTypeFile RWType = "file"
const RWTypeNop RWType = "nop"
const RWTypeTail RWType = "tail"

type ReadHook

type ReadHook func(n int, bs []byte, err error, cost int64, misc ...interface{}) error

type Result added in v0.7.3

type Result struct {
	Cfg     *Cfg                     `json:"cfg"     yaml:"cfg"`
	Data    map[string]interface{}   `json:"data"    yaml:"data"`
	RWsData []map[string]interface{} `json:"rwsData" yaml:"rwsData"`
}

type SSHCfg

type SSHCfg struct {
	Addr       string `json:"addr"       validate:"min=1"  yaml:"addr"`
	User       string `json:"user"       validate:"min=1"  yaml:"user"`
	Pwd        string `json:"pwd"        yaml:"pwd"`
	PrivateKey string `json:"privateKey" yaml:"privateKey"`
}

type StoreCfg added in v0.0.7

type StoreCfg struct {
	Type     StoreType   `json:"type"     validate:"required" yaml:"type"`
	Cfg      interface{} `json:"cfg"      validate:"required" yaml:"cfg"`
	URL      string      `json:"url"      validate:"min=1"    yaml:"url"`
	Checksum string      `json:"checksum" yaml:"checksum"`
	HashAlgo string      `json:"hashAlgo" yaml:"hashAlgo"`
	Retry    int         `json:"retry"    validate:"gte=1"    yaml:"retry"`
	Timeout  int         `json:"timeout"  validate:"gte=1"    yaml:"timeout"`
}

func NewStoreCfg

func NewStoreCfg() *StoreCfg

type StoreRW

type StoreRW struct {
	RW
	*StoreCfg
	// contains filtered or unexported fields
}

func NewStoreRW

func NewStoreRW() *StoreRW

func (*StoreRW) Close

func (s *StoreRW) Close() error

func (*StoreRW) EnableCalcHash added in v0.3.9

func (s *StoreRW) EnableCalcHash(hashAlgo string)

func (*StoreRW) EnableChecksum added in v0.3.9

func (s *StoreRW) EnableChecksum(checksum string, hashAlgo string)

func (*StoreRW) GetCfg

func (s *StoreRW) GetCfg() interface{}

func (*StoreRW) Init

func (s *StoreRW) Init() error

func (*StoreRW) Start

func (s *StoreRW) Start() error

func (*StoreRW) Type

func (s *StoreRW) Type() interface{}

type StoreType

type StoreType string

type TailRW

type TailRW struct {
	RW
	*TailRWCfg
	// contains filtered or unexported fields
}

func NewTailRW

func NewTailRW() *TailRW

func (*TailRW) GetCfg

func (t *TailRW) GetCfg() interface{}

func (*TailRW) Init

func (t *TailRW) Init() error

func (*TailRW) Stop

func (t *TailRW) Stop() error

func (*TailRW) Type

func (t *TailRW) Type() interface{}

type TailRWCfg

type TailRWCfg struct {
	Path string `json:"path" yaml:"path"`
	Pos  int64  `json:"pos"  yaml:"pos"`
}

func NewTailRWCfg

func NewTailRWCfg() *TailRWCfg

type WriteHook

type WriteHook func(n int, bs []byte, err error, cost int64, misc ...interface{}) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL