Documentation ¶
Index ¶
- Variables
- func AddMissingSlashToURL(baseUrl *string)
- func ConvertStringToTime(timeString string) (t time.Time, err error)
- func DecodeMapStruct(input map[string]interface{}, result interface{}) error
- func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, ...) error
- func EncodeStruct(input *viper.Viper, output interface{}, tag string) error
- func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error)
- func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error)
- func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, error)
- func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time
- func NewDefaultTaskContext(cfg *viper.Viper, logger core.Logger, db *gorm.DB, ctx context.Context, ...) core.TaskContext
- func NewDefaultTaskLogger(log *logrus.Logger, prefix string, loggerPool map[string]*logrus.Logger) core.Logger
- func NewStandaloneSubTaskContext(cfg *viper.Viper, logger core.Logger, db *gorm.DB, ctx context.Context, ...) core.SubTaskContext
- func RemoveStartingSlashFromPath(relativePath string) string
- func UnmarshalResponse(res *http.Response, v interface{}) error
- type ApiAsyncCallback
- type ApiAsyncClient
- func (apiClient *ApiAsyncClient) Add(delta int)
- func (apiClient *ApiAsyncClient) DoAsync(method string, path string, query url.Values, body interface{}, ...) error
- func (apiClient *ApiAsyncClient) Done()
- func (apiClient *ApiAsyncClient) GetAsync(path string, query url.Values, header http.Header, handler ApiAsyncCallback) error
- func (apiClient *ApiAsyncClient) GetMaxRetry() int
- func (apiClient *ApiAsyncClient) GetQps() float64
- func (apiClient *ApiAsyncClient) SetMaxRetry(maxRetry int)
- func (apiClient *ApiAsyncClient) WaitAsync() error
- type ApiClient
- func (apiClient *ApiClient) Do(method string, path string, query url.Values, body interface{}, ...) (*http.Response, error)
- func (apiClient *ApiClient) Get(path string, query url.Values, headers http.Header) (*http.Response, error)
- func (apiClient *ApiClient) GetEndpoint() string
- func (apiClient *ApiClient) GetHeaders() map[string]string
- func (apiClient *ApiClient) Post(path string, query url.Values, body interface{}, headers http.Header) (*http.Response, error)
- func (apiClient *ApiClient) SetAfterFunction(callback ApiClientAfterResponse)
- func (apiClient *ApiClient) SetBeforeFunction(callback ApiClientBeforeRequest)
- func (apiClient *ApiClient) SetContext(ctx context.Context)
- func (apiClient *ApiClient) SetEndpoint(endpoint string)
- func (apiClient *ApiClient) SetHeaders(headers map[string]string)
- func (apiClient *ApiClient) SetLogger(logger core.Logger)
- func (apiClient *ApiClient) SetProxy(proxyUrl string) error
- func (ApiClient *ApiClient) SetTimeout(timeout time.Duration)
- func (apiClient *ApiClient) Setup(endpoint string, headers map[string]string, timeout time.Duration)
- type ApiClientAfterResponse
- type ApiClientBeforeRequest
- type ApiCollector
- type ApiCollectorArgs
- type ApiExtractor
- type ApiExtractorArgs
- type ApiRateLimitCalculator
- type AsyncResponseHandler
- type BatchSave
- type BatchSaveDivider
- type CSTTime
- type CursorIterator
- type DataConvertHandler
- type DataConverter
- type DataConverterArgs
- type DateIterator
- type DatePair
- type DateTimeFormatItem
- type DefaultLogger
- func (l *DefaultLogger) Debug(format string, a ...interface{})
- func (l *DefaultLogger) Error(format string, a ...interface{})
- func (l *DefaultLogger) Info(format string, a ...interface{})
- func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool
- func (l *DefaultLogger) Log(level core.LogLevel, format string, a ...interface{})
- func (l *DefaultLogger) Nested(name string) core.Logger
- func (l *DefaultLogger) Printf(format string, a ...interface{})
- func (l *DefaultLogger) Warn(format string, a ...interface{})
- type DefaultSubTaskContext
- func (c DefaultSubTaskContext) GetConfig(name string) string
- func (c DefaultSubTaskContext) GetContext() context.Context
- func (c DefaultSubTaskContext) GetData() interface{}
- func (c DefaultSubTaskContext) GetDb() *gorm.DB
- func (c DefaultSubTaskContext) GetLogger() core.Logger
- func (c DefaultSubTaskContext) GetName() string
- func (c *DefaultSubTaskContext) IncProgress(quantity int)
- func (c *DefaultSubTaskContext) SetProgress(current int, total int)
- func (c *DefaultSubTaskContext) TaskContext() core.TaskContext
- type DefaultTaskContext
- func (c DefaultTaskContext) GetConfig(name string) string
- func (c DefaultTaskContext) GetContext() context.Context
- func (c DefaultTaskContext) GetData() interface{}
- func (c DefaultTaskContext) GetDb() *gorm.DB
- func (c DefaultTaskContext) GetLogger() core.Logger
- func (c DefaultTaskContext) GetName() string
- func (c *DefaultTaskContext) IncProgress(quantity int)
- func (c *DefaultTaskContext) SetData(data interface{})
- func (c *DefaultTaskContext) SetProgress(current int, total int)
- func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, error)
- type Iso8601Time
- type Iterator
- type OnNewBatchSave
- type Pager
- type RateLimitedApiClient
- type RawData
- type RawDataExtractor
- type RawDataSubTask
- type RawDataSubTaskArgs
- type RequestData
- type WorkerScheduler
Constants ¶
This section is empty.
Variables ¶
var DateTimeFormats []DateTimeFormatItem
var ErrIgnoreAndContinue = errors.New("ignore and continue")
var HttpMinStatusRetryCode = http.StatusBadRequest
Functions ¶
func AddMissingSlashToURL ¶
func AddMissingSlashToURL(baseUrl *string)
func DecodeMapStruct ¶
mapstructure.Decode with time.Time and Iso8601Time support
func DecodeStruct ¶
func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, tag string) error
DecodeStruct validates `input` struct with `validator` and set it into viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `input` must be a pointer
func EncodeStruct ¶
EncodeStruct encodes struct from viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `object` must be a pointer
func GetRawMessageArrayFromResponse ¶
func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error)
func GetRawMessageDirectFromResponse ¶
func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error)
func GetURIStringPointer ¶
func Iso8601TimeToTime ¶
func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time
func NewDefaultTaskContext ¶
func NewDefaultTaskLogger ¶
func NewStandaloneSubTaskContext ¶
func NewStandaloneSubTaskContext( cfg *viper.Viper, logger core.Logger, db *gorm.DB, ctx context.Context, name string, data interface{}, ) core.SubTaskContext
This returns a stand-alone core.SubTaskContext, not attached to any core.TaskContext. Use this if you need to run/debug a subtask without going through the usual workflow.
func UnmarshalResponse ¶
Types ¶
type ApiAsyncClient ¶
type ApiAsyncClient struct { *ApiClient // contains filtered or unexported fields }
ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic You may submit multiple requests at once by calling `GetAsync`, and those requests will be performed in parallel with rate-limit support
func CreateAsyncApiClient ¶
func CreateAsyncApiClient( taskCtx core.TaskContext, apiClient *ApiClient, rateLimiter *ApiRateLimitCalculator, ) (*ApiAsyncClient, error)
func (*ApiAsyncClient) Add ¶
func (apiClient *ApiAsyncClient) Add(delta int)
func (*ApiAsyncClient) DoAsync ¶
func (apiClient *ApiAsyncClient) DoAsync( method string, path string, query url.Values, body interface{}, header http.Header, handler ApiAsyncCallback, retry int, ) error
func (*ApiAsyncClient) Done ¶
func (apiClient *ApiAsyncClient) Done()
func (*ApiAsyncClient) GetAsync ¶
func (apiClient *ApiAsyncClient) GetAsync( path string, query url.Values, header http.Header, handler ApiAsyncCallback, ) error
Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests
func (*ApiAsyncClient) GetMaxRetry ¶
func (apiClient *ApiAsyncClient) GetMaxRetry() int
func (*ApiAsyncClient) GetQps ¶
func (apiClient *ApiAsyncClient) GetQps() float64
func (*ApiAsyncClient) SetMaxRetry ¶
func (apiClient *ApiAsyncClient) SetMaxRetry( maxRetry int, )
func (*ApiAsyncClient) WaitAsync ¶
func (apiClient *ApiAsyncClient) WaitAsync() error
Wait until all async requests were done
type ApiClient ¶
type ApiClient struct {
// contains filtered or unexported fields
}
ApiClient is designed for simple api requests
func NewApiClient ¶
func (*ApiClient) GetEndpoint ¶
func (*ApiClient) GetHeaders ¶
func (*ApiClient) SetAfterFunction ¶
func (apiClient *ApiClient) SetAfterFunction(callback ApiClientAfterResponse)
func (*ApiClient) SetBeforeFunction ¶
func (apiClient *ApiClient) SetBeforeFunction(callback ApiClientBeforeRequest)
func (*ApiClient) SetContext ¶
func (*ApiClient) SetEndpoint ¶
func (*ApiClient) SetHeaders ¶
func (*ApiClient) SetTimeout ¶
type ApiClientAfterResponse ¶
type ApiClientBeforeRequest ¶
type ApiCollector ¶
type ApiCollector struct { *RawDataSubTask // contains filtered or unexported fields }
func NewApiCollector ¶
func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error)
NewApiCollector allocates a new ApiCollector with the given args. ApiCollector can help you collecting data from some api with ease, pass in a AsyncApiClient and tell it which part of response you want to save, ApiCollector will collect them from remote server and store them into database.
func (*ApiCollector) SetAfterResponse ¶
func (collector *ApiCollector) SetAfterResponse(f ApiClientAfterResponse)
type ApiCollectorArgs ¶
type ApiCollectorArgs struct { RawDataSubTaskArgs /* url may use arbitrary variables from different source in any order, we need GoTemplate to allow more flexible for all kinds of possibility. Pager contains information for a particular page, calculated by ApiCollector, and will be passed into GoTemplate to generate a url for that page. We want to do page-fetching in ApiCollector, because the logic are highly similar, by doing so, we can avoid duplicate logic for every tasks, and when we have a better idea like improving performance, we can do it in one place */ UrlTemplate string `comment:"GoTemplate for API url"` // (Optional) Return query string for request, or you can plug them into UrlTemplate directly Query func(reqData *RequestData) (url.Values, error) `comment:"Extra query string when requesting API, like 'Since' option for jira issues collection"` // Some api might do pagination by http headers Header func(reqData *RequestData) (http.Header, error) PageSize int Incremental bool `comment:"Indicate this is a incremental collection, so the existing data won't get flushed"` ApiClient RateLimitedApiClient /* Sometimes, we need to collect data based on previous collected data, like jira changelog, it requires issue_id as part of the url. We can mimic `stdin` design, to accept a `Input` function which produces a `Iterator`, collector should iterate all records, and do data-fetching for each on, either in parallel or sequential order UrlTemplate: "api/3/issue/{{ Input.ID }}/changelog" */ Input Iterator InputRateLimit int /* For api endpoint that returns number of total pages, ApiCollector can collect pages in parallel with ease, or other techniques are required if this information was missing. */ GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, error) Concurrency int ResponseParser func(res *http.Response) ([]json.RawMessage, error) AfterResponse ApiClientAfterResponse }
type ApiExtractor ¶
type ApiExtractor struct { *RawDataSubTask // contains filtered or unexported fields }
ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data It reads rows from specified raw data table, and feed it into `Extract` handler you can return arbitrary tool layer entities in this handler, ApiExtractor would first delete old data by their RawDataOrigin information, and then perform a batch insertion for you.
func NewApiExtractor ¶
func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error)
func (*ApiExtractor) Execute ¶
func (extractor *ApiExtractor) Execute() error
type ApiExtractorArgs ¶
type ApiExtractorArgs struct { RawDataSubTaskArgs Params interface{} Extract RawDataExtractor BatchSize int }
type ApiRateLimitCalculator ¶
type ApiRateLimitCalculator struct { UserRateLimitPerHour int GlobalRateLimitPerHour int MaxRetry int Method string ApiPath string DynamicRateLimit func(res *http.Response) (int, time.Duration, error) }
A helper to calculate api rate limit dynamically, assuming api returning remaining/resettime information
type AsyncResponseHandler ¶
type BatchSave ¶
type BatchSave struct {
// contains filtered or unexported fields
}
Insert data by batch can increase database performance drastically, this class aim to make batch-save easier, It takes care the database operation for specified `slotType`, records got saved into database whenever cache hits The `size` limit, remember to call the `Close` method to save the last batch
func NewBatchSave ¶
type BatchSaveDivider ¶
type BatchSaveDivider struct {
// contains filtered or unexported fields
}
Holds a map of BatchInsert, return `*BatchInsert` for a specific records, so caller can do batch operation for it
func NewBatchSaveDivider ¶
func NewBatchSaveDivider(db *gorm.DB, batchSize int) *BatchSaveDivider
Return a new BatchInsertDivider instance
func (*BatchSaveDivider) Close ¶
func (d *BatchSaveDivider) Close() error
close all batches so all rest records get saved into db as well
func (*BatchSaveDivider) ForType ¶
func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error)
return *BatchSave for specified type
func (*BatchSaveDivider) OnNewBatchSave ¶
func (d *BatchSaveDivider) OnNewBatchSave(cb OnNewBatchSave)
type CSTTime ¶
func (*CSTTime) UnmarshalJSON ¶
type CursorIterator ¶
type CursorIterator struct {
// contains filtered or unexported fields
}
func NewCursorIterator ¶
func (*CursorIterator) Close ¶
func (c *CursorIterator) Close() error
func (*CursorIterator) Fetch ¶
func (c *CursorIterator) Fetch() (interface{}, error)
func (*CursorIterator) HasNext ¶
func (c *CursorIterator) HasNext() bool
type DataConvertHandler ¶
type DataConvertHandler func(row interface{}) ([]interface{}, error)
Accept row from source cursor, return list of entities that need to be stored
type DataConverter ¶
type DataConverter struct { *RawDataSubTask // contains filtered or unexported fields }
DataConverter helps you convert Data from Tool Layer Tables to Domain Layer Tables It reads rows from specified Iterator, and feed it into `Convter` handler you can return arbitrary domain layer entities from this handler, ApiConverter would first delete old data by their RawDataOrigin information, and then perform a batch save operation for you.
func NewDataConverter ¶
func NewDataConverter(args DataConverterArgs) (*DataConverter, error)
func (*DataConverter) Execute ¶
func (converter *DataConverter) Execute() error
type DataConverterArgs ¶
type DataConverterArgs struct { RawDataSubTaskArgs // Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, `github:GithubIssue` InputRowType reflect.Type // Cursor to a set of Tool Layer Records Input *sql.Rows Convert DataConvertHandler BatchSize int }
type DateIterator ¶
func NewDateIterator ¶
func NewDateIterator(days int) (*DateIterator, error)
func (*DateIterator) Close ¶
func (c *DateIterator) Close() error
func (*DateIterator) Fetch ¶
func (c *DateIterator) Fetch() (interface{}, error)
func (*DateIterator) HasNext ¶
func (c *DateIterator) HasNext() bool
type DateTimeFormatItem ¶
TODO: move this to helper
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
func NewDefaultLogger ¶
func (*DefaultLogger) Debug ¶
func (l *DefaultLogger) Debug(format string, a ...interface{})
func (*DefaultLogger) Error ¶
func (l *DefaultLogger) Error(format string, a ...interface{})
func (*DefaultLogger) Info ¶
func (l *DefaultLogger) Info(format string, a ...interface{})
func (*DefaultLogger) IsLevelEnabled ¶
func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool
func (*DefaultLogger) Log ¶
func (l *DefaultLogger) Log(level core.LogLevel, format string, a ...interface{})
func (*DefaultLogger) Nested ¶
func (l *DefaultLogger) Nested(name string) core.Logger
bind two writer to logger
func (*DefaultLogger) Printf ¶
func (l *DefaultLogger) Printf(format string, a ...interface{})
func (*DefaultLogger) Warn ¶
func (l *DefaultLogger) Warn(format string, a ...interface{})
type DefaultSubTaskContext ¶
type DefaultSubTaskContext struct {
// contains filtered or unexported fields
}
SubTaskContext default implementation
func (DefaultSubTaskContext) GetContext ¶
func (*DefaultSubTaskContext) IncProgress ¶
func (c *DefaultSubTaskContext) IncProgress(quantity int)
func (*DefaultSubTaskContext) SetProgress ¶
func (c *DefaultSubTaskContext) SetProgress(current int, total int)
func (*DefaultSubTaskContext) TaskContext ¶
func (c *DefaultSubTaskContext) TaskContext() core.TaskContext
type DefaultTaskContext ¶
type DefaultTaskContext struct {
// contains filtered or unexported fields
}
TaskContext default implementation
func (DefaultTaskContext) GetContext ¶
func (*DefaultTaskContext) IncProgress ¶
func (c *DefaultTaskContext) IncProgress(quantity int)
func (*DefaultTaskContext) SetData ¶
func (c *DefaultTaskContext) SetData(data interface{})
func (*DefaultTaskContext) SetProgress ¶
func (c *DefaultTaskContext) SetProgress(current int, total int)
func (*DefaultTaskContext) SubTaskContext ¶
func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, error)
type Iso8601Time ¶
type Iso8601Time struct {
// contains filtered or unexported fields
}
type Iso8601Time time.Time
func (Iso8601Time) MarshalJSON ¶
func (jt Iso8601Time) MarshalJSON() ([]byte, error)
func (*Iso8601Time) String ¶
func (jt *Iso8601Time) String() string
func (*Iso8601Time) ToNullableTime ¶
func (jt *Iso8601Time) ToNullableTime() *time.Time
func (*Iso8601Time) ToTime ¶
func (jt *Iso8601Time) ToTime() time.Time
func (*Iso8601Time) UnmarshalJSON ¶
func (jt *Iso8601Time) UnmarshalJSON(b []byte) error
type OnNewBatchSave ¶
type RateLimitedApiClient ¶
type RawData ¶
type RawData struct { ID uint64 `gorm:"primaryKey"` Params string `gorm:"type:varchar(255);index"` Data []byte Url string Input datatypes.JSON CreatedAt time.Time }
Table structure for raw data storage
type RawDataExtractor ¶
Accept raw json body and params, return list of entities that need to be stored
type RawDataSubTask ¶
type RawDataSubTask struct {
// contains filtered or unexported fields
}
Common features for raw data sub tasks
type RawDataSubTaskArgs ¶
type RawDataSubTaskArgs struct { Ctx core.SubTaskContext // Table store raw data Table string `comment:"Raw data table name"` // This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal // set of data to be process, for example, we process JiraIssues by Board Params interface{} `comment:"To identify a set of records with same UrlTemplate, i.e. {ConnectionId, BoardId} for jira entities"` }
type RequestData ¶
type RequestData struct { Pager *Pager Params interface{} Input interface{} }
type WorkerScheduler ¶
type WorkerScheduler struct {
// contains filtered or unexported fields
}
func NewWorkerScheduler ¶
func NewWorkerScheduler(workerNum int, maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error)
NewWorkerScheduler 创建一个并行执行的调度器,控制最大运行数和每秒最大运行数量 NewWorkerScheduler Create a parallel scheduler to control the maximum number of runs and the maximum number of runs per second 注意: task执行是无序的 Warning: task execution is out of order
func (*WorkerScheduler) Add ¶
func (s *WorkerScheduler) Add(delta int)
func (*WorkerScheduler) Done ¶
func (s *WorkerScheduler) Done()
func (*WorkerScheduler) Release ¶
func (s *WorkerScheduler) Release()
func (*WorkerScheduler) Submit ¶
func (s *WorkerScheduler) Submit(task func() error, pool ...*ants.Pool) error
func (*WorkerScheduler) WaitUntilFinish ¶
func (s *WorkerScheduler) WaitUntilFinish() error
Source Files ¶
- api_async_client.go
- api_client.go
- api_collector.go
- api_extractor.go
- api_ratelimit_calc.go
- api_rawdata.go
- batch_save.go
- batch_save_divider.go
- config_util.go
- cst_time.go
- data_convertor.go
- default_logger.go
- default_task_context.go
- default_task_logger.go
- iso8601time.go
- iterator.go
- subtask_flow_test_helper.go
- worker_scheduler.go