pipeline

package
v0.8.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2024 License: MIT Imports: 39 Imported by: 3

Documentation

Index

Constants

View Source
const (
	RWTypeCompress RWType = "compress"

	CompressTypeNop    CompressType = "nop"
	CompressTypeGzip   CompressType = "gzip"
	CompressTypeSnappy CompressType = "snappy"
	CompressTypeZstd   CompressType = "zstd"

	CompressLevelFast   CompressLevel = "fast"
	CompressLevelBetter CompressLevel = "better"
	CompressLevelBest   CompressLevel = "best"
)
View Source
const PluginTypePipeline plugin.Type = "pipeline"
View Source
const PluginTypeRWGroup plugin.Type = "rwgroup"

Variables

View Source
var (
	ErrStoppedManually  = errors.New("RW stopped manually")
	ErrChecksumNotMatch = errors.New("checksum not match")

	CreateBaseRW = newBaseRW
)

Functions

func ApplyCommonCfgToRW

func ApplyCommonCfgToRW(rw RW, cfg *RWCommonCfg)

func FileExtFromCompressCfg

func FileExtFromCompressCfg(c *CompressRWCfg) string

func NewCmdRWCfg

func NewCmdRWCfg() *cmd.Cfg

func NewGzipReader

func NewGzipReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr

func NewSnappyWriter

func NewSnappyWriter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr

func NewZstdReader

func NewZstdReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr

Types

type Cfg

type Cfg struct {
	RWs []*RWCfg `json:"rws" validate:"required" yaml:"rws"`
}

func NewCfg

func NewCfg() *Cfg

func (*Cfg) Add added in v0.0.8

func (c *Cfg) Add(role RWRole, typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *Cfg

func (*Cfg) AddCfg added in v0.8.3

func (c *Cfg) AddCfg(cfg *RWCfg) *Cfg

type CmdRW

type CmdRW struct {
	RW
	*cmd.Cfg
	// contains filtered or unexported fields
}

func NewCmdRW

func NewCmdRW() *CmdRW

func (*CmdRW) GetCfg

func (c *CmdRW) GetCfg() interface{}

func (*CmdRW) Init

func (c *CmdRW) Init() error

func (*CmdRW) Start

func (c *CmdRW) Start() error

func (*CmdRW) Stop

func (c *CmdRW) Stop() error

func (*CmdRW) Type

func (c *CmdRW) Type() interface{}

type CompressLevel

type CompressLevel string

type CompressRW

type CompressRW struct {
	RW
	*CompressRWCfg
}

func NewCompressRW

func NewCompressRW() *CompressRW

func (*CompressRW) GetCfg

func (c *CompressRW) GetCfg() interface{}

func (*CompressRW) Init

func (c *CompressRW) Init() error

func (*CompressRW) NestReader

func (c *CompressRW) NestReader(r io.ReadCloser) error

func (*CompressRW) NestWriter

func (c *CompressRW) NestWriter(w io.WriteCloser) error

func (*CompressRW) Nwrite

func (c *CompressRW) Nwrite() uint64

func (*CompressRW) Type

func (c *CompressRW) Type() interface{}

type CompressRWCfg

type CompressRWCfg struct {
	Type        CompressType  `json:"type"        validate:"required" yaml:"type"`
	Level       CompressLevel `json:"level"       validate:"required" yaml:"level"`
	Concurrency int           `json:"concurrency" yaml:"concurrency"`
}

func NewCompressRWCfg

func NewCompressRWCfg() *CompressRWCfg

type CompressType

type CompressType string

type CopyRW

type CopyRW struct {
	RW
	*CopyRWCfg
}

func NewCopyRW

func NewCopyRW() *CopyRW

func (*CopyRW) GetCfg

func (cp *CopyRW) GetCfg() interface{}

func (*CopyRW) Init

func (cp *CopyRW) Init() error

func (*CopyRW) Start

func (cp *CopyRW) Start() error

func (*CopyRW) Type

func (cp *CopyRW) Type() interface{}

type CopyRWCfg

type CopyRWCfg struct {
	BufSize int `json:"bufSize" yaml:"bufSize"`
}

func NewCopyRWCfg

func NewCopyRWCfg() *CopyRWCfg

type FileRW

type FileRW struct {
	RW
	*FileRWCfg
	// contains filtered or unexported fields
}

func NewFileRW

func NewFileRW() *FileRW

func (*FileRW) GetCfg

func (f *FileRW) GetCfg() interface{}

func (*FileRW) Init

func (f *FileRW) Init() error

func (*FileRW) Type

func (f *FileRW) Type() interface{}

type FileRWCfg

type FileRWCfg struct {
	Path string `json:"path" yaml:"path"`
	Perm uint32 `json:"perm" yaml:"perm"`
}

func NewFileRWCfg

func NewFileRWCfg() *FileRWCfg

type FtpRW added in v0.8.2

type FtpRW struct {
	RW
	*FtpRWCfg
}

func (*FtpRW) GetCfg added in v0.8.2

func (f *FtpRW) GetCfg() interface{}

func (*FtpRW) Init added in v0.8.2

func (f *FtpRW) Init() error

func (*FtpRW) Type added in v0.8.2

func (f *FtpRW) Type() interface{}

type FtpRWCfg added in v0.8.2

type FtpRWCfg struct {
	Addr    string `json:"addr"    validate:"required" yaml:"addr"`
	User    string `json:"user"    validate:"required" yaml:"user"`
	Pwd     string `json:"pwd"     validate:"required" yaml:"pwd"`
	Path    string `json:"path"    validate:"required" yaml:"path"`
	Timeout int    `json:"timeout" yaml:"timeout"`
	Retry   int    `json:"retry"   yaml:"retry"`
}

func NewFtpRWCfg added in v0.8.2

func NewFtpRWCfg() *FtpRWCfg

type NopRW

type NopRW struct {
	RW
	*NopRWCfg
}

func NewNopRW

func NewNopRW() *NopRW

func (*NopRW) GetCfg

func (n *NopRW) GetCfg() interface{}

func (*NopRW) Type

func (n *NopRW) Type() interface{}

type NopRWCfg

type NopRWCfg struct{}

func NewNopRWCfg

func NewNopRWCfg() *NopRWCfg

type OssRW added in v0.8.2

type OssRW struct {
	RW
	*OssRWCfg
}

func (*OssRW) GetCfg added in v0.8.2

func (o *OssRW) GetCfg() interface{}

func (*OssRW) Init added in v0.8.2

func (o *OssRW) Init() error

func (*OssRW) Type added in v0.8.2

func (o *OssRW) Type() interface{}

type OssRWCfg added in v0.8.2

type OssRWCfg struct {
	URL     string `json:"url"      validate:"required" yaml:"url"`
	Ak      string `json:"ak"       validate:"required" yaml:"ak"`
	Sk      string `json:"sk"       validate:"required" yaml:"sk"`
	Region  string `json:"region"   yaml:"region"`
	Append  bool   `json:"append"   yaml:"append"`
	Timeout int    `json:"timeout"  yaml:"timeout"`
	Retry   int    `json:"retry"    yaml:"retry"`
}

func NewOssRWCfg added in v0.8.2

func NewOssRWCfg() *OssRWCfg

type Pipeline

type Pipeline struct {
	runner.Runner
	*Cfg
	// contains filtered or unexported fields
}

func New

func New() *Pipeline

func (*Pipeline) GetCfg

func (p *Pipeline) GetCfg() interface{}

func (*Pipeline) Init

func (p *Pipeline) Init() error

func (*Pipeline) RWGroups added in v0.7.3

func (p *Pipeline) RWGroups() []*RWGroup

func (*Pipeline) Result added in v0.7.3

func (p *Pipeline) Result() *Result

func (*Pipeline) Start

func (p *Pipeline) Start() error

func (*Pipeline) Stop

func (p *Pipeline) Stop() error

func (*Pipeline) Type

func (p *Pipeline) Type() interface{}

type RW

type RW interface {
	runner.Runner
	io.ReadWriteCloser
	plugin.Plugin

	NestReader(io.ReadCloser) error
	NestWriter(io.WriteCloser) error
	Reader() io.ReadCloser
	Writer() io.WriteCloser

	Flush() error

	HookRead(...ReadHook)
	HookWrite(...WriteHook)

	Nwrite() uint64
	Nread() uint64

	Hash() string

	EnableMonitorSpeed()
	EnableCalcHash(hashAlgo string)
	EnableChecksum(checksum string, hashAlgo string)
	EnableRateLimit(ratelimit.RxTxRateLimiter)
	EnableWriteBuf(bufSize int, deadline int, async bool, asyncChanBufSize int)
	EnableReadBuf(bufSize int, async bool, asyncChanBufSize int)

	IsAsyncOrDeadline() bool
	AsyncChanLen() int
	AsyncChanCap() int

	AsStarter()
	IsStarter() bool
	AsReader()
	IsReader() bool
	AsWriter()
	IsWriter() bool
	As(RWRole)
	Is(RWRole) bool
	Role() RWRole
}

func CreateRW added in v0.7.4

func CreateRW(rwCfg *RWCfg) (RW, error)

func NewFtpRW added in v0.8.2

func NewFtpRW() RW

func NewOssRW added in v0.8.2

func NewOssRW() RW

func NewSSHRW added in v0.8.2

func NewSSHRW() RW

type RWCfg

type RWCfg struct {
	Type      RWType       `json:"type"      validate:"required" yaml:"type"`
	Cfg       interface{}  `json:"cfg"       validate:"required" yaml:"cfg"`
	CommonCfg *RWCommonCfg `json:"commonCfg" yaml:"commonCfg"`
	Role      RWRole       `json:"role"      validate:"required" yaml:"role"`
}

type RWCommonCfg

type RWCommonCfg struct {
	RateLimiterCfg     *ratelimit.RateLimiterCfg `json:"rateLimiterCfg"     yaml:"rateLimiterCfg"`
	BufSize            int                       `json:"bufSize"            yaml:"bufSize"`
	Deadline           int                       `json:"deadline"           yaml:"deadline"`
	AsyncChanBufSize   int                       `json:"asyncChanBufSize"   yaml:"asyncChanBufSize"`
	EnableMonitorSpeed bool                      `json:"enableMonitorSpeed" yaml:"enableMonitorSpeed"`
	EnableCalcHash     bool                      `json:"enableCalcHash"     yaml:"enableCalcHash"`
	EnableRateLimit    bool                      `json:"enableRateLimit"    yaml:"enableRateLimit"`
	EnableAsync        bool                      `json:"enableAsync"        yaml:"enableAsync"`
	Checksum           string                    `json:"checksum"           yaml:"checksum"`
	HashAlgo           string                    `json:"hashAlgo"           yaml:"hashAlgo"`
}

type RWGroup

type RWGroup struct {
	runner.Runner
	*RWGroupCfg
	// contains filtered or unexported fields
}

func NewRWGroup

func NewRWGroup() *RWGroup

func (*RWGroup) FirstReader

func (g *RWGroup) FirstReader() RW

func (*RWGroup) Init

func (g *RWGroup) Init() error

func (*RWGroup) LastWriter

func (g *RWGroup) LastWriter() RW

func (*RWGroup) Readers

func (g *RWGroup) Readers() []RW

func (*RWGroup) Size

func (g *RWGroup) Size() int

func (*RWGroup) Start

func (g *RWGroup) Start() error

func (*RWGroup) Starter

func (g *RWGroup) Starter() RW

func (*RWGroup) Stop

func (g *RWGroup) Stop() error

func (*RWGroup) Writers

func (g *RWGroup) Writers() []RW

type RWGroupCfg

type RWGroupCfg struct {
	Readers []*RWCfg `json:"readers" yaml:"readers"`
	Starter *RWCfg   `json:"starter" yaml:"starter"`
	Writers []*RWCfg `json:"writers" yaml:"writers"`
}

func NewRWGroupCfg

func NewRWGroupCfg() *RWGroupCfg

func (*RWGroupCfg) FromReader

func (c *RWGroupCfg) FromReader(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

func (*RWGroupCfg) SetStarter

func (c *RWGroupCfg) SetStarter(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

func (*RWGroupCfg) ToWriter

func (c *RWGroupCfg) ToWriter(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

type RWRole

type RWRole string
const (
	RWRoleStarter RWRole = "starter"
	RWRoleReader  RWRole = "reader"
	RWRoleWriter  RWRole = "writer"
)

type RWType

type RWType string
const RWTypeCmd RWType = "cmd"
const RWTypeCopy RWType = "copy"
const RWTypeFile RWType = "file"
const (
	RWTypeFtp RWType = "ftp"
)
const RWTypeNop RWType = "nop"
const (
	RWTypeOss RWType = "oss"
)
const (
	RWTypeSSH RWType = "ssh"
)
const RWTypeTail RWType = "tail"

type ReadHook

type ReadHook func(n int, bs []byte, err error, cost int64, misc ...interface{}) error

type Result added in v0.7.3

type Result struct {
	Cfg     *Cfg                     `json:"cfg"     yaml:"cfg"`
	Data    map[string]interface{}   `json:"data"    yaml:"data"`
	RWsData []map[string]interface{} `json:"rwsData" yaml:"rwsData"`
}

type SSHRW added in v0.8.2

type SSHRW struct {
	RW
	*SSHRWCfg
	// contains filtered or unexported fields
}

func (*SSHRW) GetCfg added in v0.8.2

func (s *SSHRW) GetCfg() interface{}

func (*SSHRW) Init added in v0.8.2

func (s *SSHRW) Init() error

func (*SSHRW) Start added in v0.8.2

func (s *SSHRW) Start() error

func (*SSHRW) Type added in v0.8.2

func (s *SSHRW) Type() interface{}

type SSHRWCfg added in v0.8.2

type SSHRWCfg struct {
	Addr       string `json:"addr"       validate:"required"  yaml:"addr"`
	User       string `json:"user"       validate:"required"  yaml:"user"`
	Pwd        string `json:"pwd"        yaml:"pwd"`
	PrivateKey string `json:"privateKey" yaml:"privateKey"`
	Timeout    int    `json:"timeout"    yaml:"timeout"`
	Path       string `json:"path"       validate:"required"  yaml:"path"`
}

func NewSSHRWCfg added in v0.8.2

func NewSSHRWCfg() *SSHRWCfg

type TailRW

type TailRW struct {
	RW
	*TailRWCfg
	// contains filtered or unexported fields
}

func NewTailRW

func NewTailRW() *TailRW

func (*TailRW) GetCfg

func (t *TailRW) GetCfg() interface{}

func (*TailRW) Init

func (t *TailRW) Init() error

func (*TailRW) Stop

func (t *TailRW) Stop() error

func (*TailRW) Type

func (t *TailRW) Type() interface{}

type TailRWCfg

type TailRWCfg struct {
	Path string `json:"path" yaml:"path"`
	Pos  int64  `json:"pos"  yaml:"pos"`
}

func NewTailRWCfg

func NewTailRWCfg() *TailRWCfg

type WriteHook

type WriteHook func(n int, bs []byte, err error, cost int64, misc ...interface{}) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL