Documentation
¶
Index ¶
- Constants
- Variables
- func AddMissingSlashToURL(baseUrl *string)
- func ConvertStringToTime(timeString string) (t time.Time, err error)
- func DecodeMapStruct(input map[string]interface{}, result interface{}) error
- func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, ...) error
- func EncodeStruct(input *viper.Viper, output interface{}, tag string) error
- func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error)
- func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error)
- func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, error)
- func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time
- func NewDefaultTaskContext(cfg *viper.Viper, logger core.Logger, db *gorm.DB, ctx context.Context, ...) core.TaskContext
- func NewDefaultTaskLogger(log *logrus.Logger, prefix string, loggerPool map[string]*logrus.Logger) core.Logger
- func NewStandaloneSubTaskContext(cfg *viper.Viper, logger core.Logger, db *gorm.DB, ctx context.Context, ...) core.SubTaskContext
- func RemoveStartingSlashFromPath(relativePath string) string
- func UnmarshalResponse(res *http.Response, v interface{}) error
- func UpdateEncryptFields(val interface{}, update func(in string) (string, error)) error
- type AccessToken
- type ApiAsyncClient
- func (apiClient *ApiAsyncClient) DoAsync(method string, path string, query url.Values, body interface{}, ...)
- func (apiClient *ApiAsyncClient) GetAsync(path string, query url.Values, header http.Header, ...)
- func (apiClient *ApiAsyncClient) GetMaxRetry() int
- func (apiClient *ApiAsyncClient) GetNumOfWorkers() int
- func (apiClient *ApiAsyncClient) HasError() bool
- func (apiClient *ApiAsyncClient) NextTick(task func() error)
- func (apiClient *ApiAsyncClient) SetMaxRetry(maxRetry int)
- func (apiClient *ApiAsyncClient) WaitAsync() error
- type ApiClient
- func (apiClient *ApiClient) Do(method string, path string, query url.Values, body interface{}, ...) (*http.Response, error)
- func (apiClient *ApiClient) Get(path string, query url.Values, headers http.Header) (*http.Response, error)
- func (apiClient *ApiClient) GetEndpoint() string
- func (apiClient *ApiClient) GetHeaders() map[string]string
- func (apiClient *ApiClient) Post(path string, query url.Values, body interface{}, headers http.Header) (*http.Response, error)
- func (apiClient *ApiClient) SetAfterFunction(callback ApiClientAfterResponse)
- func (apiClient *ApiClient) SetBeforeFunction(callback ApiClientBeforeRequest)
- func (apiClient *ApiClient) SetContext(ctx context.Context)
- func (apiClient *ApiClient) SetEndpoint(endpoint string)
- func (apiClient *ApiClient) SetHeaders(headers map[string]string)
- func (apiClient *ApiClient) SetLogger(logger core.Logger)
- func (apiClient *ApiClient) SetProxy(proxyUrl string) error
- func (ApiClient *ApiClient) SetTimeout(timeout time.Duration)
- func (apiClient *ApiClient) Setup(endpoint string, headers map[string]string, timeout time.Duration)
- type ApiClientAfterResponse
- type ApiClientBeforeRequest
- type ApiCollector
- type ApiCollectorArgs
- type ApiExtractor
- type ApiExtractorArgs
- type ApiRateLimitCalculator
- type AsyncResponseHandler
- type BaseConnection
- type BasicAuth
- type BatchSave
- type BatchSaveDivider
- type CSTTime
- type ConnectionApiHelper
- func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) error
- func (c *ConnectionApiHelper) Delete(connection interface{}) error
- func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) error
- func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) error
- func (c *ConnectionApiHelper) List(connections interface{}) error
- func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) error
- type CursorIteratordeprecated
- type DalCursorIterator
- type DataConvertHandler
- type DataConverter
- type DataConverterArgs
- type DateIterator
- type DatePair
- type DateTimeFormatItem
- type DefaultBasicRes
- type DefaultSubTaskContext
- func (c DefaultSubTaskContext) GetContext() context.Context
- func (c DefaultSubTaskContext) GetData() interface{}
- func (c DefaultSubTaskContext) GetName() string
- func (c *DefaultSubTaskContext) IncProgress(quantity int)
- func (c *DefaultSubTaskContext) SetProgress(current int, total int)
- func (c *DefaultSubTaskContext) TaskContext() core.TaskContext
- type DefaultTaskContext
- func (c DefaultTaskContext) GetContext() context.Context
- func (c DefaultTaskContext) GetData() interface{}
- func (c DefaultTaskContext) GetName() string
- func (c *DefaultTaskContext) IncProgress(quantity int)
- func (c *DefaultTaskContext) SetData(data interface{})
- func (c *DefaultTaskContext) SetProgress(current int, total int)
- func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, error)
- type Iso8601Time
- type Iterator
- type Pager
- type RateLimitedApiClient
- type RawData
- type RawDataSubTask
- type RawDataSubTaskArgs
- type RequestData
- type RestConnection
- type WorkerScheduler
Constants ¶
const BATCH_SAVE_UPDATE_ONLY = 0
Variables ¶
var DateTimeFormats []DateTimeFormatItem
var HttpMinStatusRetryCode = http.StatusBadRequest
Functions ¶
func AddMissingSlashToURL ¶
func AddMissingSlashToURL(baseUrl *string)
func DecodeMapStruct ¶
mapstructure.Decode with time.Time and Iso8601Time support
func DecodeStruct ¶
func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, tag string) error
DecodeStruct validates `input` struct with `validator` and set it into viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `input` must be a pointer
func EncodeStruct ¶
EncodeStruct encodes struct from viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `object` must be a pointer
func GetRawMessageArrayFromResponse ¶
func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error)
func GetRawMessageDirectFromResponse ¶
func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error)
func GetURIStringPointer ¶
func Iso8601TimeToTime ¶
func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time
func NewDefaultTaskContext ¶
func NewDefaultTaskLogger ¶
func NewStandaloneSubTaskContext ¶
func NewStandaloneSubTaskContext( cfg *viper.Viper, logger core.Logger, db *gorm.DB, ctx context.Context, name string, data interface{}, ) core.SubTaskContext
NewStandaloneSubTaskContext returns a stand-alone core.SubTaskContext, not attached to any core.TaskContext. Use this if you need to run/debug a subtask without going through the usual workflow.
func UnmarshalResponse ¶
Types ¶
type AccessToken ¶ added in v0.12.0
type AccessToken struct {
Token string `mapstructure:"token" validate:"required" json:"token" encrypt:"yes"`
}
type ApiAsyncClient ¶
type ApiAsyncClient struct { *ApiClient // contains filtered or unexported fields }
ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic You may submit multiple requests at once by calling `GetAsync`, and those requests will be performed in parallel with rate-limit support
func CreateAsyncApiClient ¶
func CreateAsyncApiClient( taskCtx core.TaskContext, apiClient *ApiClient, rateLimiter *ApiRateLimitCalculator, ) (*ApiAsyncClient, error)
CreateAsyncApiClient creates a new ApiAsyncClient
func (*ApiAsyncClient) DoAsync ¶
func (apiClient *ApiAsyncClient) DoAsync( method string, path string, query url.Values, body interface{}, header http.Header, handler common.ApiAsyncCallback, retry int, )
DoAsync would carry out a asynchronous request
func (*ApiAsyncClient) GetAsync ¶
func (apiClient *ApiAsyncClient) GetAsync( path string, query url.Values, header http.Header, handler common.ApiAsyncCallback, )
Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests
func (*ApiAsyncClient) GetMaxRetry ¶
func (apiClient *ApiAsyncClient) GetMaxRetry() int
GetMaxRetry returns the maximum retry attempts for a request
func (*ApiAsyncClient) GetNumOfWorkers ¶ added in v0.12.0
func (apiClient *ApiAsyncClient) GetNumOfWorkers() int
func (*ApiAsyncClient) HasError ¶ added in v0.12.0
func (apiClient *ApiAsyncClient) HasError() bool
func (*ApiAsyncClient) NextTick ¶ added in v0.12.0
func (apiClient *ApiAsyncClient) NextTick(task func() error)
func (*ApiAsyncClient) SetMaxRetry ¶
func (apiClient *ApiAsyncClient) SetMaxRetry( maxRetry int, )
SetMaxRetry sets the maximum retry attempts for a request
func (*ApiAsyncClient) WaitAsync ¶
func (apiClient *ApiAsyncClient) WaitAsync() error
WaitAsync blocks until all async requests were done
type ApiClient ¶
type ApiClient struct {
// contains filtered or unexported fields
}
ApiClient is designed for simple api requests
func NewApiClient ¶
func (*ApiClient) GetEndpoint ¶
func (*ApiClient) GetHeaders ¶
func (*ApiClient) SetAfterFunction ¶
func (apiClient *ApiClient) SetAfterFunction(callback ApiClientAfterResponse)
func (*ApiClient) SetBeforeFunction ¶
func (apiClient *ApiClient) SetBeforeFunction(callback ApiClientBeforeRequest)
func (*ApiClient) SetContext ¶
func (*ApiClient) SetEndpoint ¶
func (*ApiClient) SetHeaders ¶
func (*ApiClient) SetTimeout ¶
type ApiClientAfterResponse ¶
type ApiClientBeforeRequest ¶
type ApiCollector ¶
type ApiCollector struct { *RawDataSubTask // contains filtered or unexported fields }
func NewApiCollector ¶
func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error)
NewApiCollector allocates a new ApiCollector with the given args. ApiCollector can help us collecting data from api with ease, pass in a AsyncApiClient and tell it which part of response we want to save, ApiCollector will collect them from remote server and store them into database.
type ApiCollectorArgs ¶
type ApiCollectorArgs struct { RawDataSubTaskArgs // UrlTemplate is used to generate the final URL for Api Collector to request // i.e. `api/3/issue/{{ .Input.IssueId }}/changelog` // For detail of what variables can be used, please check `RequestData` UrlTemplate string `comment:"GoTemplate for API url"` // Query would be sent out as part of the request URL Query func(reqData *RequestData) (url.Values, error) `` // Header would be sent out along with request Header func(reqData *RequestData) (http.Header, error) // PageSize tells ApiCollector the page size PageSize int // Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true Incremental bool `comment:""` // ApiClient is a asynchronize api request client with qps ApiClient RateLimitedApiClient // Input helps us collect data based on previous collected data, like collecting changelogs based on jira // issue ids Input Iterator // GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page. // so `ApiCollector` could collect those pages in parallel for us GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, error) // Concurrency specify qps for api that doesn't return total number of pages/records // NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means Concurrency int ResponseParser func(res *http.Response) ([]json.RawMessage, error) }
type ApiExtractor ¶
type ApiExtractor struct { *RawDataSubTask // contains filtered or unexported fields }
ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data It reads rows from specified raw data table, and feed it into `Extract` handler you can return arbitrary tool layer entities in this handler, ApiExtractor would first delete old data by their RawDataOrigin information, and then perform a batch save for you.
func NewApiExtractor ¶
func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error)
NewApiExtractor creates a new ApiExtractor
type ApiExtractorArgs ¶
type ApiExtractorArgs struct { RawDataSubTaskArgs Params interface{} Extract func(row *RawData) ([]interface{}, error) BatchSize int }
type ApiRateLimitCalculator ¶
type ApiRateLimitCalculator struct { UserRateLimitPerHour int GlobalRateLimitPerHour int MaxRetry int Method string ApiPath string DynamicRateLimit func(res *http.Response) (int, time.Duration, error) }
A helper to calculate api rate limit dynamically, assuming api returning remaining/resettime information
type AsyncResponseHandler ¶
type BaseConnection ¶ added in v0.12.0
type BasicAuth ¶ added in v0.12.0
type BasicAuth struct { Username string `mapstructure:"username" validate:"required" json:"username"` Password string `mapstructure:"password" validate:"required" json:"password" encrypt:"yes"` }
func (BasicAuth) GetEncodedToken ¶ added in v0.12.0
type BatchSave ¶
type BatchSave struct {
// contains filtered or unexported fields
}
BatchSave performs mulitple records persistence of a specific type in one sql query to improve the performance
func NewBatchSave ¶
NewBatchSave creates a new BatchSave instance
func (*BatchSave) Add ¶
Add record to cache. BatchSave would flush them into Database when cache is max out
type BatchSaveDivider ¶
type BatchSaveDivider struct {
// contains filtered or unexported fields
}
BatchSaveDivider creates and caches BatchSave, this is helpful when dealing with massive amount of data records with arbitrary types.
func NewBatchSaveDivider ¶
func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string, params string) *BatchSaveDivider
NewBatchSaveDivider create a new BatchInsertDivider instance
func (*BatchSaveDivider) Close ¶
func (d *BatchSaveDivider) Close() error
Close all batches so the rest records get saved into db
type CSTTime ¶
func (*CSTTime) UnmarshalJSON ¶
type ConnectionApiHelper ¶ added in v0.12.0
type ConnectionApiHelper struct {
// contains filtered or unexported fields
}
func NewConnectionHelper ¶ added in v0.12.0
func NewConnectionHelper( basicRes core.BasicRes, vld *validator.Validate, ) *ConnectionApiHelper
func (*ConnectionApiHelper) Create ¶ added in v0.12.0
func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) error
Create a connection record based on request body
func (*ConnectionApiHelper) Delete ¶ added in v0.12.0
func (c *ConnectionApiHelper) Delete(connection interface{}) error
Delete connection
func (*ConnectionApiHelper) First ¶ added in v0.12.0
func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) error
First finds connection from db by parsing request input and decrypt it
func (*ConnectionApiHelper) FirstById ¶ added in v0.12.0
func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) error
func (*ConnectionApiHelper) List ¶ added in v0.12.0
func (c *ConnectionApiHelper) List(connections interface{}) error
List returns all connections with password/token decrypted
func (*ConnectionApiHelper) Patch ¶ added in v0.12.0
func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) error
type CursorIterator
deprecated
type CursorIterator struct {
// contains filtered or unexported fields
}
Deprecated: use DalCursorIterator instead
func NewCursorIterator
deprecated
func (*CursorIterator) Close ¶
func (c *CursorIterator) Close() error
func (*CursorIterator) Fetch ¶
func (c *CursorIterator) Fetch() (interface{}, error)
func (*CursorIterator) HasNext ¶
func (c *CursorIterator) HasNext() bool
type DalCursorIterator ¶ added in v0.12.0
type DalCursorIterator struct {
// contains filtered or unexported fields
}
DalCursorIterator
func NewDalCursorIterator ¶ added in v0.12.0
func (*DalCursorIterator) Close ¶ added in v0.12.0
func (c *DalCursorIterator) Close() error
func (*DalCursorIterator) Fetch ¶ added in v0.12.0
func (c *DalCursorIterator) Fetch() (interface{}, error)
func (*DalCursorIterator) HasNext ¶ added in v0.12.0
func (c *DalCursorIterator) HasNext() bool
type DataConvertHandler ¶
type DataConvertHandler func(row interface{}) ([]interface{}, error)
Accept row from source cursor, return list of entities that need to be stored
type DataConverter ¶
type DataConverter struct { *RawDataSubTask // contains filtered or unexported fields }
DataConverter helps you convert Data from Tool Layer Tables to Domain Layer Tables It reads rows from specified Iterator, and feed it into `Convter` handler you can return arbitrary domain layer entities from this handler, ApiConverter would first delete old data by their RawDataOrigin information, and then perform a batch save operation for you.
func NewDataConverter ¶
func NewDataConverter(args DataConverterArgs) (*DataConverter, error)
func (*DataConverter) Execute ¶
func (converter *DataConverter) Execute() error
type DataConverterArgs ¶
type DataConverterArgs struct { RawDataSubTaskArgs // Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, `github:GithubIssue` InputRowType reflect.Type // Cursor to a set of Tool Layer Records Input *sql.Rows Convert DataConvertHandler BatchSize int }
type DateIterator ¶
DateIterator
func NewDateIterator ¶
func NewDateIterator(days int) (*DateIterator, error)
func (*DateIterator) Close ¶
func (c *DateIterator) Close() error
func (*DateIterator) Fetch ¶
func (c *DateIterator) Fetch() (interface{}, error)
func (*DateIterator) HasNext ¶
func (c *DateIterator) HasNext() bool
type DateTimeFormatItem ¶
TODO: move this to helper
type DefaultBasicRes ¶ added in v0.12.0
type DefaultBasicRes struct {
// contains filtered or unexported fields
}
func NewDefaultBasicRes ¶ added in v0.12.0
func (*DefaultBasicRes) GetConfig ¶ added in v0.12.0
func (c *DefaultBasicRes) GetConfig(name string) string
func (*DefaultBasicRes) GetDal ¶ added in v0.12.0
func (c *DefaultBasicRes) GetDal() dal.Dal
func (*DefaultBasicRes) GetDb ¶ added in v0.12.0
func (c *DefaultBasicRes) GetDb() *gorm.DB
func (*DefaultBasicRes) GetLogger ¶ added in v0.12.0
func (c *DefaultBasicRes) GetLogger() core.Logger
type DefaultSubTaskContext ¶
type DefaultSubTaskContext struct { LastProgressTime time.Time // contains filtered or unexported fields }
SubTaskContext default implementation
func (DefaultSubTaskContext) GetContext ¶
func (*DefaultSubTaskContext) IncProgress ¶
func (c *DefaultSubTaskContext) IncProgress(quantity int)
func (*DefaultSubTaskContext) SetProgress ¶
func (c *DefaultSubTaskContext) SetProgress(current int, total int)
func (*DefaultSubTaskContext) TaskContext ¶
func (c *DefaultSubTaskContext) TaskContext() core.TaskContext
type DefaultTaskContext ¶
type DefaultTaskContext struct {
// contains filtered or unexported fields
}
TaskContext default implementation
func (DefaultTaskContext) GetContext ¶
func (*DefaultTaskContext) IncProgress ¶
func (c *DefaultTaskContext) IncProgress(quantity int)
func (*DefaultTaskContext) SetData ¶
func (c *DefaultTaskContext) SetData(data interface{})
func (*DefaultTaskContext) SetProgress ¶
func (c *DefaultTaskContext) SetProgress(current int, total int)
func (*DefaultTaskContext) SubTaskContext ¶
func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, error)
type Iso8601Time ¶
type Iso8601Time struct {
// contains filtered or unexported fields
}
type Iso8601Time time.Time
func (Iso8601Time) MarshalJSON ¶
func (jt Iso8601Time) MarshalJSON() ([]byte, error)
func (*Iso8601Time) String ¶
func (jt *Iso8601Time) String() string
func (*Iso8601Time) ToNullableTime ¶
func (jt *Iso8601Time) ToNullableTime() *time.Time
func (*Iso8601Time) ToTime ¶
func (jt *Iso8601Time) ToTime() time.Time
func (*Iso8601Time) UnmarshalJSON ¶
func (jt *Iso8601Time) UnmarshalJSON(b []byte) error
type RateLimitedApiClient ¶
type RawData ¶
type RawData struct { ID uint64 `gorm:"primaryKey"` Params string `gorm:"type:varchar(255);index"` Data []byte Url string Input datatypes.JSON CreatedAt time.Time }
Table structure for raw data storage
type RawDataSubTask ¶
type RawDataSubTask struct {
// contains filtered or unexported fields
}
Common features for raw data sub tasks
type RawDataSubTaskArgs ¶
type RawDataSubTaskArgs struct { Ctx core.SubTaskContext // Table store raw data Table string `comment:"Raw data table name"` // This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal // set of data to be process, for example, we process JiraIssues by Board Params interface{} `comment:"To identify a set of records with same UrlTemplate, i.e. {ConnectionId, BoardId} for jira entities"` }
type RequestData ¶
RequestData is the input of `UrlTemplate` `Query` and `Header`, so we can generate them dynamically
type RestConnection ¶ added in v0.12.0
type RestConnection struct { BaseConnection `mapstructure:",squash"` Endpoint string `mapstructure:"endpoint" validate:"required" json:"endpoint"` Proxy string `mapstructure:"proxy" json:"proxy"` RateLimit int `comment:"api request rate limit per hour" json:"rateLimit"` }
type WorkerScheduler ¶
type WorkerScheduler struct {
// contains filtered or unexported fields
}
WorkerScheduler runs asynchronous tasks in parallel with throttling support
func NewWorkerScheduler ¶
func NewWorkerScheduler( workerNum int, maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int, logger core.Logger, ) (*WorkerScheduler, error)
NewWorkerScheduler creates a WorkerScheduler
func (*WorkerScheduler) HasError ¶ added in v0.12.0
func (s *WorkerScheduler) HasError() bool
HasError return if any error occurred
func (*WorkerScheduler) NextTick ¶ added in v0.12.0
func (s *WorkerScheduler) NextTick(task func() error)
NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by SubmitBlocking method IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory
func (*WorkerScheduler) SubmitBlocking ¶ added in v0.12.0
func (s *WorkerScheduler) SubmitBlocking(task func() error)
SubmitBlocking enqueues a async task to ants, the task will be executed in future when timing is right. It doesn't return error because it wouldn't be any when with a Blocking semantic, returned error does nothing but causing confusion, more often, people thought it is returned by the task. Since it is async task, the callframes would not be available for production mode, you can export Environment Varaible ASYNC_CF=true to enable callframes capturing when debugging. IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call SubmitNonBlocking instead when number of tasks is relatively small.
func (*WorkerScheduler) Wait ¶ added in v0.12.0
func (s *WorkerScheduler) Wait() error
Wait blocks current go-routine until all workers returned
Source Files
¶
- api_async_client.go
- api_client.go
- api_collector.go
- api_collector_func.go
- api_extractor.go
- api_ratelimit_calc.go
- api_rawdata.go
- batch_save.go
- batch_save_divider.go
- config_util.go
- connection.go
- cst_time.go
- data_convertor.go
- default_task_context.go
- default_task_logger.go
- iso8601time.go
- iterator.go
- worker_scheduler.go