Documentation ¶
Index ¶
- Variables
- func AddMissingSlashToURL(baseUrl *string)
- func ConvertStringToTime(timeString string) (t time.Time, err error)
- func Decode(source interface{}, target interface{}, vld *validator.Validate) errors.Error
- func DecodeMapStruct(input map[string]interface{}, result interface{}) errors.Error
- func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, ...) errors.Error
- func EncodeStruct(input *viper.Viper, output interface{}, tag string) errors.Error
- func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, errors.Error)
- func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, errors.Error)
- func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, errors.Error)
- func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time
- func MakePipelinePlanSubtasks(subtaskMetas []core.SubTaskMeta, entities []string) ([]string, errors.Error)
- func NewDefaultTaskContext(ctx context.Context, cfg *viper.Viper, logger core.Logger, db *gorm.DB, ...) core.TaskContext
- func NewStandaloneSubTaskContext(ctx context.Context, cfg *viper.Viper, logger core.Logger, db *gorm.DB, ...) core.SubTaskContext
- func RemoveStartingSlashFromPath(relativePath string) string
- func UnmarshalResponse(res *http.Response, v interface{}) errors.Error
- func UpdateEncryptFields(val interface{}, update func(in string) (string, errors.Error)) errors.Error
- type AccessToken
- type ApiAsyncClient
- func (apiClient *ApiAsyncClient) DoAsync(method string, path string, query url.Values, body interface{}, ...)
- func (apiClient *ApiAsyncClient) DoGetAsync(path string, query url.Values, header http.Header, ...)
- func (apiClient *ApiAsyncClient) DoPostAsync(path string, query url.Values, body interface{}, header http.Header, ...)
- func (apiClient *ApiAsyncClient) GetMaxRetry() int
- func (apiClient *ApiAsyncClient) GetNumOfWorkers() int
- func (apiClient *ApiAsyncClient) HasError() bool
- func (apiClient *ApiAsyncClient) NextTick(task func() errors.Error)
- func (apiClient *ApiAsyncClient) Release()
- func (apiClient *ApiAsyncClient) SetMaxRetry(maxRetry int)
- func (apiClient *ApiAsyncClient) WaitAsync() errors.Error
- type ApiClient
- func (apiClient *ApiClient) Do(method string, path string, query url.Values, body interface{}, ...) (*http.Response, errors.Error)
- func (apiClient *ApiClient) Get(path string, query url.Values, headers http.Header) (*http.Response, errors.Error)
- func (apiClient *ApiClient) GetAfterFunction() common.ApiClientAfterResponse
- func (apiClient *ApiClient) GetBeforeFunction() common.ApiClientBeforeRequest
- func (apiClient *ApiClient) GetEndpoint() string
- func (apiClient *ApiClient) GetHeaders() map[string]string
- func (apiClient *ApiClient) GetTimeout() time.Duration
- func (apiClient *ApiClient) Post(path string, query url.Values, body interface{}, headers http.Header) (*http.Response, errors.Error)
- func (apiClient *ApiClient) SetAfterFunction(callback common.ApiClientAfterResponse)
- func (apiClient *ApiClient) SetBeforeFunction(callback common.ApiClientBeforeRequest)
- func (apiClient *ApiClient) SetContext(ctx context.Context)
- func (apiClient *ApiClient) SetEndpoint(endpoint string)
- func (apiClient *ApiClient) SetHeaders(headers map[string]string)
- func (apiClient *ApiClient) SetLogger(logger core.Logger)
- func (apiClient *ApiClient) SetProxy(proxyUrl string) errors.Error
- func (apiClient *ApiClient) SetTimeout(timeout time.Duration)
- func (apiClient *ApiClient) Setup(endpoint string, headers map[string]string, timeout time.Duration)
- type ApiCollector
- type ApiCollectorArgs
- type ApiExtractor
- type ApiExtractorArgs
- type ApiRateLimitCalculator
- type AppKey
- type AsyncResponseHandler
- type BaseConnection
- type BasicAuth
- type BatchSave
- type BatchSaveDivider
- type CSTTime
- type ConnectionApiHelper
- func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) errors.Error
- func (c *ConnectionApiHelper) Delete(connection interface{}) errors.Error
- func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) errors.Error
- func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) errors.Error
- func (c *ConnectionApiHelper) List(connections interface{}) errors.Error
- func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) errors.Error
- type CursorPager
- type DalCursorIterator
- type DataConvertHandler
- type DataConverter
- type DataConverterArgs
- type DateIterator
- type DatePair
- type DateTimeFormatItem
- type DefaultBasicRes
- type DefaultSubTaskContext
- func (c DefaultSubTaskContext) GetContext() context.Context
- func (c DefaultSubTaskContext) GetData() interface{}
- func (c DefaultSubTaskContext) GetName() string
- func (c *DefaultSubTaskContext) IncProgress(quantity int)
- func (c *DefaultSubTaskContext) SetProgress(current int, total int)
- func (c *DefaultSubTaskContext) TaskContext() core.TaskContext
- type DefaultTaskContext
- func (c DefaultTaskContext) GetContext() context.Context
- func (c DefaultTaskContext) GetData() interface{}
- func (c DefaultTaskContext) GetName() string
- func (c *DefaultTaskContext) IncProgress(quantity int)
- func (c *DefaultTaskContext) SetData(data interface{})
- func (c *DefaultTaskContext) SetProgress(current int, total int)
- func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, errors.Error)
- type GraphqlAsyncClient
- func (apiClient *GraphqlAsyncClient) GetMaxRetry() (int, time.Duration)
- func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err errors.Error))
- func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, errors.Error)
- func (apiClient *GraphqlAsyncClient) Release()
- func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q interface{}) int)
- func (apiClient *GraphqlAsyncClient) SetMaxRetry(maxRetry int, waitBeforeRetry time.Duration)
- func (apiClient *GraphqlAsyncClient) Wait()
- type GraphqlAsyncResponseHandler
- type GraphqlCollector
- type GraphqlCollectorArgs
- type GraphqlQueryPageInfo
- type GraphqlRequestData
- type Iso8601Time
- type Iterator
- type ListBaseNode
- type Pager
- type Queue
- func (q *Queue) Clean()
- func (q *Queue) CleanWithOutLock()
- func (q *Queue) GetCount() int64
- func (q *Queue) GetCountWithOutLock() int64
- func (q *Queue) Pull(add *int64) QueueNode
- func (q *Queue) PullWithOutLock() QueueNode
- func (q *Queue) Push(node QueueNode)
- func (q *Queue) PushWithoutLock(node QueueNode)
- type QueueIterator
- type QueueIteratorNode
- type QueueNode
- type RateLimitedApiClient
- type RawData
- type RawDataSubTask
- type RawDataSubTaskArgs
- type RequestData
- type RestConnection
- type WorkerScheduler
Constants ¶
This section is empty.
Variables ¶
var DateTimeFormats []DateTimeFormatItem
DateTimeFormats FIXME ...
var ErrIgnoreAndContinue = errors.Default.New("ignore and continue")
ErrIgnoreAndContinue is a error which should be ignored
var HttpMinStatusRetryCode = http.StatusBadRequest
HttpMinStatusRetryCode is which status will retry
Functions ¶
func AddMissingSlashToURL ¶
func AddMissingSlashToURL(baseUrl *string)
AddMissingSlashToURL FIXME ...
func ConvertStringToTime ¶
ConvertStringToTime FIXME ...
func Decode ¶ added in v0.14.0
Decode decodes `source` into `target`. Pass an optional validator to validate the target.
func DecodeMapStruct ¶
DecodeMapStruct with time.Time and Iso8601Time support
func DecodeStruct ¶
func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, tag string) errors.Error
DecodeStruct validates `input` struct with `validator` and set it into viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `input` must be a pointer
func EncodeStruct ¶
EncodeStruct encodes struct from viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `object` must be a pointer
func GetRawMessageArrayFromResponse ¶
GetRawMessageArrayFromResponse FIXME ...
func GetRawMessageDirectFromResponse ¶
GetRawMessageDirectFromResponse FIXME ...
func GetURIStringPointer ¶
func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, errors.Error)
GetURIStringPointer FIXME ...
func Iso8601TimeToTime ¶
func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time
Iso8601TimeToTime FIXME ...
func MakePipelinePlanSubtasks ¶ added in v0.12.0
func MakePipelinePlanSubtasks(subtaskMetas []core.SubTaskMeta, entities []string) ([]string, errors.Error)
MakePipelinePlanSubtasks generates subtasks list based on sub-task meta information and entities wanted by user
func NewDefaultTaskContext ¶
func NewDefaultTaskContext( ctx context.Context, cfg *viper.Viper, logger core.Logger, db *gorm.DB, name string, subtasks map[string]bool, progress chan core.RunningProgress, ) core.TaskContext
NewDefaultTaskContext FIXME ...
func NewStandaloneSubTaskContext ¶
func NewStandaloneSubTaskContext( ctx context.Context, cfg *viper.Viper, logger core.Logger, db *gorm.DB, name string, data interface{}, ) core.SubTaskContext
NewStandaloneSubTaskContext returns a stand-alone core.SubTaskContext, not attached to any core.TaskContext. Use this if you need to run/debug a subtask without going through the usual workflow.
func RemoveStartingSlashFromPath ¶
RemoveStartingSlashFromPath FIXME ...
func UnmarshalResponse ¶
UnmarshalResponse FIXME ...
Types ¶
type AccessToken ¶ added in v0.12.0
type AccessToken struct {
Token string `mapstructure:"token" validate:"required" json:"token" encrypt:"yes"`
}
AccessToken FIXME ...
type ApiAsyncClient ¶
type ApiAsyncClient struct { *ApiClient // contains filtered or unexported fields }
ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic You may submit multiple requests at once by calling `DoGetAsync`, and those requests will be performed in parallel with rate-limit support
func CreateAsyncApiClient ¶
func CreateAsyncApiClient( taskCtx core.TaskContext, apiClient *ApiClient, rateLimiter *ApiRateLimitCalculator, ) (*ApiAsyncClient, errors.Error)
CreateAsyncApiClient creates a new ApiAsyncClient
func (*ApiAsyncClient) DoAsync ¶
func (apiClient *ApiAsyncClient) DoAsync( method string, path string, query url.Values, body interface{}, header http.Header, handler common.ApiAsyncCallback, retry int, )
DoAsync would carry out an asynchronous request
func (*ApiAsyncClient) DoGetAsync ¶ added in v0.12.0
func (apiClient *ApiAsyncClient) DoGetAsync( path string, query url.Values, header http.Header, handler common.ApiAsyncCallback, )
DoGetAsync Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests
func (*ApiAsyncClient) DoPostAsync ¶ added in v0.14.0
func (apiClient *ApiAsyncClient) DoPostAsync( path string, query url.Values, body interface{}, header http.Header, handler common.ApiAsyncCallback, )
DoPostAsync Enqueue an api post request, the request may be sent sometime in future in parallel with other api requests
func (*ApiAsyncClient) GetMaxRetry ¶
func (apiClient *ApiAsyncClient) GetMaxRetry() int
GetMaxRetry returns the maximum retry attempts for a request
func (*ApiAsyncClient) GetNumOfWorkers ¶ added in v0.12.0
func (apiClient *ApiAsyncClient) GetNumOfWorkers() int
GetNumOfWorkers to return the Workers count if scheduler.
func (*ApiAsyncClient) HasError ¶ added in v0.12.0
func (apiClient *ApiAsyncClient) HasError() bool
HasError to return if the scheduler has Error
func (*ApiAsyncClient) NextTick ¶ added in v0.12.0
func (apiClient *ApiAsyncClient) NextTick(task func() errors.Error)
NextTick to return the NextTick of scheduler
func (*ApiAsyncClient) Release ¶ added in v0.12.0
func (apiClient *ApiAsyncClient) Release()
Release will release the ApiAsyncClient with scheduler
func (*ApiAsyncClient) SetMaxRetry ¶
func (apiClient *ApiAsyncClient) SetMaxRetry( maxRetry int, )
SetMaxRetry sets the maximum retry attempts for a request
func (*ApiAsyncClient) WaitAsync ¶
func (apiClient *ApiAsyncClient) WaitAsync() errors.Error
WaitAsync blocks until all async requests were done
type ApiClient ¶
type ApiClient struct {
// contains filtered or unexported fields
}
ApiClient is designed for simple api requests
func NewApiClient ¶
func NewApiClient( ctx context.Context, endpoint string, headers map[string]string, timeout time.Duration, proxy string, br core.BasicRes, ) (*ApiClient, errors.Error)
NewApiClient FIXME ...
func (*ApiClient) Do ¶
func (apiClient *ApiClient) Do( method string, path string, query url.Values, body interface{}, headers http.Header, ) (*http.Response, errors.Error)
Do FIXME ...
func (*ApiClient) Get ¶
func (apiClient *ApiClient) Get( path string, query url.Values, headers http.Header, ) (*http.Response, errors.Error)
Get FIXME ...
func (*ApiClient) GetAfterFunction ¶ added in v0.13.0
func (apiClient *ApiClient) GetAfterFunction() common.ApiClientAfterResponse
GetAfterFunction return afterResponseFunction
func (*ApiClient) GetBeforeFunction ¶ added in v0.13.0
func (apiClient *ApiClient) GetBeforeFunction() common.ApiClientBeforeRequest
GetBeforeFunction return beforeResponseFunction
func (*ApiClient) GetEndpoint ¶
GetEndpoint FIXME ...
func (*ApiClient) GetHeaders ¶
GetHeaders FIXME ...
func (*ApiClient) GetTimeout ¶ added in v0.14.0
GetTimeout FIXME ...
func (*ApiClient) Post ¶
func (apiClient *ApiClient) Post( path string, query url.Values, body interface{}, headers http.Header, ) (*http.Response, errors.Error)
Post FIXME ...
func (*ApiClient) SetAfterFunction ¶
func (apiClient *ApiClient) SetAfterFunction(callback common.ApiClientAfterResponse)
SetAfterFunction will set afterResponseFunction
func (*ApiClient) SetBeforeFunction ¶
func (apiClient *ApiClient) SetBeforeFunction(callback common.ApiClientBeforeRequest)
SetBeforeFunction will set beforeResponseFunction
func (*ApiClient) SetContext ¶
SetContext FIXME ...
func (*ApiClient) SetEndpoint ¶
SetEndpoint FIXME ...
func (*ApiClient) SetHeaders ¶
SetHeaders FIXME ...
func (*ApiClient) SetTimeout ¶
SetTimeout FIXME ...
type ApiCollector ¶
type ApiCollector struct { *RawDataSubTask // contains filtered or unexported fields }
ApiCollector FIXME ...
func NewApiCollector ¶
func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, errors.Error)
NewApiCollector allocates a new ApiCollector with the given args. ApiCollector can help us collecting data from api with ease, pass in a AsyncApiClient and tell it which part of response we want to save, ApiCollector will collect them from remote server and store them into database.
func (*ApiCollector) Execute ¶
func (collector *ApiCollector) Execute() errors.Error
Execute will start collection
func (*ApiCollector) GetAfterResponse ¶ added in v0.13.0
func (collector *ApiCollector) GetAfterResponse() common.ApiClientAfterResponse
GetAfterResponse return apiClient's afterResponseFunction
func (*ApiCollector) SetAfterResponse ¶
func (collector *ApiCollector) SetAfterResponse(f common.ApiClientAfterResponse)
SetAfterResponse set apiClient's afterResponseFunction
type ApiCollectorArgs ¶
type ApiCollectorArgs struct { RawDataSubTaskArgs // UrlTemplate is used to generate the final URL for Api Collector to request // i.e. `api/3/issue/{{ .Input.IssueId }}/changelog` // For detail of what variables can be used, please check `RequestData` UrlTemplate string `comment:"GoTemplate for API url"` // Query would be sent out as part of the request URL Query func(reqData *RequestData) (url.Values, errors.Error) `` // Header would be sent out along with request Header func(reqData *RequestData) (http.Header, errors.Error) // PageSize tells ApiCollector the page size PageSize int // Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true Incremental bool `comment:""` // ApiClient is a asynchronize api request client with qps ApiClient RateLimitedApiClient // Input helps us collect data based on previous collected data, like collecting changelogs based on jira // issue ids Input Iterator // GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page. // so `ApiCollector` could collect those pages in parallel for us GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error) // Concurrency specify qps for api that doesn't return total number of pages/records // NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means Concurrency int ResponseParser func(res *http.Response) ([]json.RawMessage, errors.Error) AfterResponse common.ApiClientAfterResponse RequestBody func(reqData *RequestData) map[string]interface{} Method string }
ApiCollectorArgs FIXME ...
type ApiExtractor ¶
type ApiExtractor struct { *RawDataSubTask // contains filtered or unexported fields }
ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data It reads rows from specified raw data table, and feed it into `Extract` handler you can return arbitrary tool layer entities in this handler, ApiExtractor would first delete old data by their RawDataOrigin information, and then perform a batch save for you.
func NewApiExtractor ¶
func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, errors.Error)
NewApiExtractor creates a new ApiExtractor
func (*ApiExtractor) Execute ¶
func (extractor *ApiExtractor) Execute() errors.Error
Execute sub-task
type ApiExtractorArgs ¶
type ApiExtractorArgs struct { RawDataSubTaskArgs Params interface{} Extract func(row *RawData) ([]interface{}, errors.Error) BatchSize int }
ApiExtractorArgs FIXME ...
type ApiRateLimitCalculator ¶
type ApiRateLimitCalculator struct { UserRateLimitPerHour int GlobalRateLimitPerHour int MaxRetry int Method string ApiPath string DynamicRateLimit func(res *http.Response) (int, time.Duration, errors.Error) }
ApiRateLimitCalculator is A helper to calculate api rate limit dynamically, assuming api returning remaining/resettime information
type AppKey ¶ added in v0.12.0
type AppKey struct { AppId string `mapstructure:"app_id" validate:"required" json:"appId"` SecretKey string `mapstructure:"secret_key" validate:"required" json:"secretKey" encrypt:"yes"` }
AppKey FIXME ...
type AsyncResponseHandler ¶
AsyncResponseHandler FIXME ...
type BaseConnection ¶ added in v0.12.0
type BaseConnection struct { Name string `gorm:"type:varchar(100);uniqueIndex" json:"name" validate:"required"` common.Model }
BaseConnection FIXME ...
type BasicAuth ¶ added in v0.12.0
type BasicAuth struct { Username string `mapstructure:"username" validate:"required" json:"username"` Password string `mapstructure:"password" validate:"required" json:"password" encrypt:"yes"` }
BasicAuth FIXME ...
func (BasicAuth) GetEncodedToken ¶ added in v0.12.0
GetEncodedToken FIXME ...
type BatchSave ¶
type BatchSave struct {
// contains filtered or unexported fields
}
BatchSave performs mulitple records persistence of a specific type in one sql query to improve the performance
func NewBatchSave ¶
func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) (*BatchSave, errors.Error)
NewBatchSave creates a new BatchSave instance
func (*BatchSave) Add ¶
Add record to cache. BatchSave would flush them into Database when cache is max out
type BatchSaveDivider ¶
type BatchSaveDivider struct {
// contains filtered or unexported fields
}
BatchSaveDivider creates and caches BatchSave, this is helpful when dealing with massive amount of data records with arbitrary types.
func NewBatchSaveDivider ¶
func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string, params string) *BatchSaveDivider
NewBatchSaveDivider create a new BatchInsertDivider instance
func (*BatchSaveDivider) Close ¶
func (d *BatchSaveDivider) Close() errors.Error
Close all batches so the rest records get saved into db
type ConnectionApiHelper ¶ added in v0.12.0
type ConnectionApiHelper struct {
// contains filtered or unexported fields
}
ConnectionApiHelper is used to write the CURD of connection
func NewConnectionHelper ¶ added in v0.12.0
func NewConnectionHelper( basicRes core.BasicRes, vld *validator.Validate, ) *ConnectionApiHelper
NewConnectionHelper FIXME ...
func (*ConnectionApiHelper) Create ¶ added in v0.12.0
func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) errors.Error
Create a connection record based on request body
func (*ConnectionApiHelper) Delete ¶ added in v0.12.0
func (c *ConnectionApiHelper) Delete(connection interface{}) errors.Error
Delete connection
func (*ConnectionApiHelper) First ¶ added in v0.12.0
func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) errors.Error
First finds connection from db by parsing request input and decrypt it
func (*ConnectionApiHelper) FirstById ¶ added in v0.12.0
func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) errors.Error
FirstById finds connection from db by id and decrypt it
func (*ConnectionApiHelper) List ¶ added in v0.12.0
func (c *ConnectionApiHelper) List(connections interface{}) errors.Error
List returns all connections with password/token decrypted
func (*ConnectionApiHelper) Patch ¶ added in v0.12.0
func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) errors.Error
Patch (Modify) a connection record based on request body
type CursorPager ¶ added in v0.13.0
CursorPager contains pagination information for a graphql request
type DalCursorIterator ¶ added in v0.12.0
type DalCursorIterator struct {
// contains filtered or unexported fields
}
DalCursorIterator FIXME ...
func NewBatchedDalCursorIterator ¶ added in v0.13.0
func NewBatchedDalCursorIterator(db dal.Dal, cursor *sql.Rows, elemType reflect.Type, batchSize int) (*DalCursorIterator, errors.Error)
NewBatchedDalCursorIterator FIXME ...
func NewDalCursorIterator ¶ added in v0.12.0
func NewDalCursorIterator(db dal.Dal, cursor *sql.Rows, elemType reflect.Type) (*DalCursorIterator, errors.Error)
NewDalCursorIterator FIXME ...
func (*DalCursorIterator) Close ¶ added in v0.12.0
func (c *DalCursorIterator) Close() errors.Error
Close iterator
func (*DalCursorIterator) Fetch ¶ added in v0.12.0
func (c *DalCursorIterator) Fetch() (interface{}, errors.Error)
Fetch if batching is disabled, it'll read a single row, otherwise it'll read as many rows up to the batch size, and the runtime return type will be []interface{}. Note, HasNext needs to have been called before invoking this.
func (*DalCursorIterator) HasNext ¶ added in v0.12.0
func (c *DalCursorIterator) HasNext() bool
HasNext increments the row curser. If we're at the end, it'll return false.
type DataConvertHandler ¶
DataConvertHandler Accept row from source cursor, return list of entities that need to be stored
type DataConverter ¶
type DataConverter struct { *RawDataSubTask // contains filtered or unexported fields }
DataConverter helps you convert Data from Tool Layer Tables to Domain Layer Tables It reads rows from specified Iterator, and feed it into `Converter` handler you can return arbitrary domain layer entities from this handler, ApiConverter would first delete old data by their RawDataOrigin information, and then perform a batch save operation for you.
func NewDataConverter ¶
func NewDataConverter(args DataConverterArgs) (*DataConverter, errors.Error)
NewDataConverter function helps you create a DataConverter using DataConverterArgs. You can see the usage in plugins/github/tasks/pr_issue_convertor.go or other convertor file.
func (*DataConverter) Execute ¶
func (converter *DataConverter) Execute() errors.Error
Execute function implements Subtask interface. It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data using `converter.args.Convert` handler Then save data to Domain Layer Tables using BatchSaveDivider
type DataConverterArgs ¶
type DataConverterArgs struct { RawDataSubTaskArgs // Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, `github:GithubIssue` InputRowType reflect.Type Input *sql.Rows Convert DataConvertHandler BatchSize int }
DataConverterArgs includes the arguments about DataConverter. This will be used in Creating a DataConverter.
DataConverterArgs { InputRowType: type of inputRow , Input: dal cursor, RawDataSubTaskArgs: args about raw data task Convert: main function including conversion logic BatchSize: batch size
type DateIterator ¶
DateIterator FIXME ...
func NewDateIterator ¶
func NewDateIterator(days int) (*DateIterator, errors.Error)
NewDateIterator FIXME ...
func (*DateIterator) Fetch ¶
func (c *DateIterator) Fetch() (interface{}, errors.Error)
Fetch FIXME ...
type DateTimeFormatItem ¶
DateTimeFormatItem FIXME ... TODO: move this to helper
type DefaultBasicRes ¶ added in v0.12.0
type DefaultBasicRes struct {
// contains filtered or unexported fields
}
DefaultBasicRes FIXME ...
func NewDefaultBasicRes ¶ added in v0.12.0
NewDefaultBasicRes FIXME ...
func (*DefaultBasicRes) GetConfig ¶ added in v0.12.0
func (c *DefaultBasicRes) GetConfig(name string) string
GetConfig FIXME ...
func (*DefaultBasicRes) GetDal ¶ added in v0.12.0
func (c *DefaultBasicRes) GetDal() dal.Dal
GetDal FIXME ...
func (*DefaultBasicRes) GetDb ¶ added in v0.12.0
func (c *DefaultBasicRes) GetDb() *gorm.DB
GetDb FIXME ...
func (*DefaultBasicRes) GetLogger ¶ added in v0.12.0
func (c *DefaultBasicRes) GetLogger() core.Logger
GetLogger FIXME ...
type DefaultSubTaskContext ¶
type DefaultSubTaskContext struct { LastProgressTime time.Time // contains filtered or unexported fields }
DefaultSubTaskContext is default implementation
func (DefaultSubTaskContext) GetContext ¶
func (*DefaultSubTaskContext) IncProgress ¶
func (c *DefaultSubTaskContext) IncProgress(quantity int)
IncProgress FIXME ...
func (*DefaultSubTaskContext) SetProgress ¶
func (c *DefaultSubTaskContext) SetProgress(current int, total int)
SetProgress FIXME ...
func (*DefaultSubTaskContext) TaskContext ¶
func (c *DefaultSubTaskContext) TaskContext() core.TaskContext
TaskContext FIXME ...
type DefaultTaskContext ¶
type DefaultTaskContext struct {
// contains filtered or unexported fields
}
DefaultTaskContext is TaskContext default implementation
func (DefaultTaskContext) GetContext ¶
func (*DefaultTaskContext) IncProgress ¶
func (c *DefaultTaskContext) IncProgress(quantity int)
IncProgress FIXME ...
func (*DefaultTaskContext) SetData ¶
func (c *DefaultTaskContext) SetData(data interface{})
SetData FIXME ...
func (*DefaultTaskContext) SetProgress ¶
func (c *DefaultTaskContext) SetProgress(current int, total int)
SetProgress FIXME ...
func (*DefaultTaskContext) SubTaskContext ¶
func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, errors.Error)
SubTaskContext FIXME ...
type GraphqlAsyncClient ¶ added in v0.13.0
type GraphqlAsyncClient struct {
// contains filtered or unexported fields
}
GraphqlAsyncClient send graphql one by one
func CreateAsyncGraphqlClient ¶ added in v0.13.0
func CreateAsyncGraphqlClient( taskCtx core.TaskContext, graphqlClient *graphql.Client, logger core.Logger, getRateRemaining func(context.Context, *graphql.Client, core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error), ) (*GraphqlAsyncClient, errors.Error)
CreateAsyncGraphqlClient creates a new GraphqlAsyncClient
func (*GraphqlAsyncClient) GetMaxRetry ¶ added in v0.14.3
func (apiClient *GraphqlAsyncClient) GetMaxRetry() (int, time.Duration)
GetMaxRetry returns the maximum retry attempts for a request
func (*GraphqlAsyncClient) NextTick ¶ added in v0.13.0
func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err errors.Error))
NextTick to return the NextTick of scheduler
func (*GraphqlAsyncClient) Query ¶ added in v0.13.0
func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, errors.Error)
Query send a graphql request when get lock []graphql.DataError are the errors returned in response body errors.Error is other error
func (*GraphqlAsyncClient) Release ¶ added in v0.14.3
func (apiClient *GraphqlAsyncClient) Release()
Release will release the ApiAsyncClient with scheduler
func (*GraphqlAsyncClient) SetGetRateCost ¶ added in v0.13.0
func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q interface{}) int)
SetGetRateCost to calculate how many rate cost if not set, all query just cost 1
func (*GraphqlAsyncClient) SetMaxRetry ¶ added in v0.14.3
func (apiClient *GraphqlAsyncClient) SetMaxRetry( maxRetry int, waitBeforeRetry time.Duration, )
SetMaxRetry sets the maximum retry attempts for a request
func (*GraphqlAsyncClient) Wait ¶ added in v0.13.0
func (apiClient *GraphqlAsyncClient) Wait()
Wait blocks until all async requests were done
type GraphqlAsyncResponseHandler ¶ added in v0.13.0
GraphqlAsyncResponseHandler callback function to handle the Response asynchronously
type GraphqlCollector ¶ added in v0.13.0
type GraphqlCollector struct { *RawDataSubTask // contains filtered or unexported fields }
GraphqlCollector help you collect data from Graphql services
func NewGraphqlCollector ¶ added in v0.13.0
func NewGraphqlCollector(args GraphqlCollectorArgs) (*GraphqlCollector, errors.Error)
NewGraphqlCollector allocates a new GraphqlCollector with the given args. GraphqlCollector can help us collect data from api with ease, pass in a AsyncGraphqlClient and tell it which part of response we want to save, GraphqlCollector will collect them from remote server and store them into database.
func (*GraphqlCollector) Execute ¶ added in v0.13.0
func (collector *GraphqlCollector) Execute() errors.Error
Execute api collection
func (*GraphqlCollector) HasError ¶ added in v0.14.3
func (collector *GraphqlCollector) HasError() bool
HasError return if any error occurred
type GraphqlCollectorArgs ¶ added in v0.13.0
type GraphqlCollectorArgs struct { RawDataSubTaskArgs // BuildQuery would be sent out as part of the request URL BuildQuery func(reqData *GraphqlRequestData) (query interface{}, variables map[string]interface{}, err error) // PageSize tells ApiCollector the page size PageSize int // GraphqlClient is a asynchronize api request client with qps GraphqlClient *GraphqlAsyncClient // Input helps us collect data based on previous collected data, like collecting changelogs based on jira // issue ids Input Iterator // how many times fetched from input, default 1 means only fetch once // NOTICE: InputStep=1 will fill value as item and InputStep>1 will fill value as []item InputStep int // GetPageInfo is to tell `GraphqlCollector` is page information GetPageInfo func(query interface{}, args *GraphqlCollectorArgs) (*GraphqlQueryPageInfo, error) BatchSize int // one of ResponseParser and ResponseParserEvenWhenDataErrors is required to parse response ResponseParser func(query interface{}, variables map[string]interface{}) ([]interface{}, error) ResponseParserWithDataErrors func(query interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error) }
GraphqlCollectorArgs arguments needed by GraphqlCollector
type GraphqlQueryPageInfo ¶ added in v0.13.0
type GraphqlQueryPageInfo struct { EndCursor string `json:"endCursor"` HasNextPage bool `json:"hasNextPage"` }
GraphqlQueryPageInfo contains the pagination data
type GraphqlRequestData ¶ added in v0.13.0
type GraphqlRequestData struct { Pager *CursorPager Params interface{} Input interface{} InputJSON []byte }
GraphqlRequestData is the input of `UrlTemplate` `BuildQuery` and `Header`, so we can generate them dynamically
type Iso8601Time ¶
type Iso8601Time struct {
// contains filtered or unexported fields
}
Iso8601Time is type time.Time
func (Iso8601Time) MarshalJSON ¶
func (jt Iso8601Time) MarshalJSON() ([]byte, error)
MarshalJSON FIXME ...
func (*Iso8601Time) String ¶
func (jt *Iso8601Time) String() string
func (*Iso8601Time) ToNullableTime ¶
func (jt *Iso8601Time) ToNullableTime() *time.Time
ToNullableTime FIXME ...
func (*Iso8601Time) UnmarshalJSON ¶
func (jt *Iso8601Time) UnmarshalJSON(b []byte) error
UnmarshalJSON FIXME ...
type ListBaseNode ¶ added in v0.12.0
type ListBaseNode struct {
// contains filtered or unexported fields
}
ListBaseNode 'abstract' base struct for Nodes that are chained in a linked list manner
func NewListBaseNode ¶ added in v0.12.0
func NewListBaseNode() *ListBaseNode
NewListBaseNode create and init a new node (only to be called by subclasses)
func (*ListBaseNode) Data ¶ added in v0.13.0
func (l *ListBaseNode) Data() interface{}
Data returns data of the node
func (*ListBaseNode) Next ¶ added in v0.12.0
func (l *ListBaseNode) Next() interface{}
Next return the next node
func (*ListBaseNode) SetNext ¶ added in v0.12.0
func (l *ListBaseNode) SetNext(next interface{})
SetNext updates the next pointer of the node
type Queue ¶ added in v0.12.0
type Queue struct {
// contains filtered or unexported fields
}
Queue represetns a queue
func (*Queue) CleanWithOutLock ¶ added in v0.12.0
func (q *Queue) CleanWithOutLock()
CleanWithOutLock is no lock mode of Clean
func (*Queue) GetCountWithOutLock ¶ added in v0.12.0
GetCountWithOutLock is no lock mode of GetCount
func (*Queue) PullWithOutLock ¶ added in v0.12.0
PullWithOutLock is no lock mode of Pull
func (*Queue) PushWithoutLock ¶ added in v0.13.0
PushWithoutLock is no lock mode of Push
type QueueIterator ¶ added in v0.12.0
type QueueIterator struct {
// contains filtered or unexported fields
}
QueueIterator implements Iterator based on Queue
func NewQueueIterator ¶ added in v0.12.0
func NewQueueIterator() *QueueIterator
NewQueueIterator creates a new QueueIterator
func (*QueueIterator) Close ¶ added in v0.12.0
func (q *QueueIterator) Close() errors.Error
Close releases resources
func (*QueueIterator) Fetch ¶ added in v0.12.0
func (q *QueueIterator) Fetch() (interface{}, errors.Error)
Fetch current item
func (*QueueIterator) HasNext ¶ added in v0.12.0
func (q *QueueIterator) HasNext() bool
HasNext increments the row curser. If we're at the end, it'll return false.
func (*QueueIterator) Push ¶ added in v0.12.0
func (q *QueueIterator) Push(data QueueNode)
Push a data into queue
type QueueIteratorNode ¶ added in v0.12.0
type QueueIteratorNode struct {
// contains filtered or unexported fields
}
QueueIteratorNode implements the helper.Iterator interface with ability to accept new item when being iterated
func NewQueueIteratorNode ¶ added in v0.13.0
func NewQueueIteratorNode(data interface{}) *QueueIteratorNode
NewQueueIteratorNode creates a new QueueIteratorNode
func (*QueueIteratorNode) Data ¶ added in v0.13.0
func (q *QueueIteratorNode) Data() interface{}
Data returns data of the node
func (*QueueIteratorNode) Next ¶ added in v0.12.0
func (q *QueueIteratorNode) Next() interface{}
Next return the next node
func (*QueueIteratorNode) SetNext ¶ added in v0.12.0
func (q *QueueIteratorNode) SetNext(next interface{})
SetNext updates the next pointer of the node
type QueueNode ¶ added in v0.12.0
type QueueNode interface { Next() interface{} SetNext(next interface{}) Data() interface{} }
QueueNode represents a node in the queue
type RateLimitedApiClient ¶
type RateLimitedApiClient interface { DoGetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback) DoPostAsync(path string, query url.Values, body interface{}, header http.Header, handler common.ApiAsyncCallback) WaitAsync() errors.Error HasError() bool NextTick(task func() errors.Error) GetNumOfWorkers() int GetAfterFunction() common.ApiClientAfterResponse SetAfterFunction(callback common.ApiClientAfterResponse) Release() }
RateLimitedApiClient FIXME ...
type RawData ¶
type RawData struct { ID uint64 `gorm:"primaryKey"` Params string `gorm:"type:varchar(255);index"` Data []byte Url string Input datatypes.JSON CreatedAt time.Time }
RawData is raw data structure in DB storage
type RawDataSubTask ¶
type RawDataSubTask struct {
// contains filtered or unexported fields
}
RawDataSubTask is Common features for raw data sub-tasks
type RawDataSubTaskArgs ¶
type RawDataSubTaskArgs struct { Ctx core.SubTaskContext // Table store raw data Table string `comment:"Raw data table name"` // This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal // set of data to be process, for example, we process JiraIssues by Board Params interface{} `comment:"To identify a set of records with same UrlTemplate, i.e. {ConnectionId, BoardId} for jira entities"` }
RawDataSubTaskArgs FIXME ...
type RequestData ¶
RequestData is the input of `UrlTemplate` `Query` and `Header`, so we can generate them dynamically
type RestConnection ¶ added in v0.12.0
type RestConnection struct { BaseConnection `mapstructure:",squash"` Endpoint string `mapstructure:"endpoint" validate:"required" json:"endpoint"` Proxy string `mapstructure:"proxy" json:"proxy"` RateLimitPerHour int `comment:"api request rate limit per hour" json:"rateLimitPerHour"` }
RestConnection FIXME ...
type WorkerScheduler ¶
type WorkerScheduler struct {
// contains filtered or unexported fields
}
WorkerScheduler runs asynchronous tasks in parallel with throttling support
func NewWorkerScheduler ¶
func NewWorkerScheduler( ctx context.Context, workerNum int, maxWork int, maxWorkDuration time.Duration, maxRetry int, logger core.Logger, ) (*WorkerScheduler, errors.Error)
NewWorkerScheduler creates a WorkerScheduler
func (*WorkerScheduler) HasError ¶ added in v0.12.0
func (s *WorkerScheduler) HasError() bool
HasError return if any error occurred
func (*WorkerScheduler) NextTick ¶ added in v0.12.0
func (s *WorkerScheduler) NextTick(task func() errors.Error)
NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by SubmitBlocking method IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory
func (*WorkerScheduler) SubmitBlocking ¶ added in v0.12.0
func (s *WorkerScheduler) SubmitBlocking(task func() errors.Error)
SubmitBlocking enqueues a async task to ants, the task will be executed in future when timing is right. It doesn't return error because it wouldn't be any when with a Blocking semantic, returned error does nothing but causing confusion, more often, people thought it is returned by the task. Since it is async task, the callframes would not be available for production mode, you can export Environment Varaible ASYNC_CF=true to enable callframes capturing when debugging. IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call SubmitNonBlocking instead when number of tasks is relatively small.
func (*WorkerScheduler) Wait ¶ added in v0.12.0
func (s *WorkerScheduler) Wait() errors.Error
Wait blocks current go-routine until all workers returned
Source Files ¶
- api_async_client.go
- api_client.go
- api_collector.go
- api_collector_func.go
- api_extractor.go
- api_ratelimit_calc.go
- api_rawdata.go
- batch_save.go
- batch_save_divider.go
- config_util.go
- connection.go
- cst_time.go
- data_convertor.go
- default_task_context.go
- graphql_async_client.go
- graphql_collector.go
- iso8601time.go
- iterator.go
- list.go
- mapstructure.go
- pipeline_plan.go
- queue.go
- worker_scheduler.go