Documentation ¶
Index ¶
- type MySQLDialInfo
- type MySQLPicTaskPipe
- type PicTaskPipe
- type QiniuStorer
- type QiniuStorerConf
- type StorePipeCtx
- func (p *StorePipeCtx) BuildNetwork()
- func (p *StorePipeCtx) Loop()
- func (p *StorePipeCtx) SetErrorProcessor(f func(error))
- func (p *StorePipeCtx) State_ErrorTask()
- func (p *StorePipeCtx) State_Fetch()
- func (p *StorePipeCtx) State_FinishTask()
- func (p *StorePipeCtx) State_GetTask()
- func (p *StorePipeCtx) State_ProcessError()
- func (p *StorePipeCtx) State_Store()
- func (p *StorePipeCtx) Stop()
- type StorePipeCtxConfig
- type Storer
- type Task
- type TaskFinished
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MySQLDialInfo ¶
type MySQLPicTaskPipe ¶
type MySQLPicTaskPipe struct {
// contains filtered or unexported fields
}
func NewMySQLPicPipe ¶
func NewMySQLPicPipe(dinfo *MySQLDialInfo) (*MySQLPicTaskPipe, error)
func NewMySQLPicPipeUseConnectedDB ¶
func NewMySQLPicPipeUseConnectedDB(db *sql.DB) *MySQLPicTaskPipe
func (*MySQLPicTaskPipe) ErrorTask ¶
func (p *MySQLPicTaskPipe) ErrorTask(task *Task) error
ErrorTask if errors when process the task, invoke this method
func (*MySQLPicTaskPipe) FinishTask ¶
func (p *MySQLPicTaskPipe) FinishTask(task *TaskFinished) error
FinishTask if task process finished, invoke this method
func (*MySQLPicTaskPipe) GetTasks ¶
func (p *MySQLPicTaskPipe) GetTasks(limit int) ([]*Task, error)
GetTasks
fetch tasks if no available task, return nil, and no error. fetch limits in p.conf, owner set to p's uuid.
func (*MySQLPicTaskPipe) UpsertTask ¶
func (p *MySQLPicTaskPipe) UpsertTask(url string) (*Task, error)
type PicTaskPipe ¶
type PicTaskPipe interface { /*ErrorTask if errors when process the task, invoke this method */ ErrorTask(task *Task) error /*FinishTask if task process finished, invoke this method */ FinishTask(task *TaskFinished) error /*GetTasks fetch tasks if no available task, return nil, and no error. */ GetTasks(limit int) ([]*Task, error) /*UpsertTask insert task to pipe */ UpsertTask(url string) (*Task, error) }
type QiniuStorer ¶
type QiniuStorer qiniu.QiniuUploader
func NewQiniuStorer ¶
func NewQiniuStorer(conf *QiniuStorerConf) *QiniuStorer
func (*QiniuStorer) StorerKey ¶
func (p *QiniuStorer) StorerKey() string
func (*QiniuStorer) StorerType ¶
func (p *QiniuStorer) StorerType() string
type QiniuStorerConf ¶
type StorePipeCtx ¶
type StorePipeCtx struct { StorePipeCtxConfig // contains filtered or unexported fields }
func NewStorePipeCtx ¶
func NewStorePipeCtx(conf *StorePipeCtxConfig, picTaskPipe PicTaskPipe, storer Storer) *StorePipeCtx
func (*StorePipeCtx) BuildNetwork ¶
func (p *StorePipeCtx) BuildNetwork()
func (*StorePipeCtx) Loop ¶
func (p *StorePipeCtx) Loop()
func (*StorePipeCtx) SetErrorProcessor ¶
func (p *StorePipeCtx) SetErrorProcessor(f func(error))
func (*StorePipeCtx) State_ErrorTask ¶
func (p *StorePipeCtx) State_ErrorTask()
func (*StorePipeCtx) State_Fetch ¶
func (p *StorePipeCtx) State_Fetch()
func (*StorePipeCtx) State_FinishTask ¶
func (p *StorePipeCtx) State_FinishTask()
func (*StorePipeCtx) State_GetTask ¶
func (p *StorePipeCtx) State_GetTask()
func (*StorePipeCtx) State_ProcessError ¶
func (p *StorePipeCtx) State_ProcessError()
func (*StorePipeCtx) State_Store ¶
func (p *StorePipeCtx) State_Store()
func (*StorePipeCtx) Stop ¶
func (p *StorePipeCtx) Stop()
type StorePipeCtxConfig ¶
type StorePipeCtxConfig struct { Conf_TaskFetchLimit int Conf_SleepDurationWhenFetchErrorOrNull time.Duration Conf_HttpConnectionTryTimes int Conf_HttpTimeout time.Duration BufLen_tasks, BufLen_fetched, BufLen_errc int BufLen_finishQueue, BufLen_errorTaskQueue int Cnt_StateFinishTask, Cnt_StateErrorTask, Cnt_StateGetTask int Cnt_StateStore, Cnt_StateFetch int }
Click to show internal directories.
Click to hide internal directories.