helper

package
v0.12.0-beta3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2022 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DateTimeFormats []DateTimeFormatItem

DateTimeFormats FIXME ...

View Source
var ErrIgnoreAndContinue = errors.New("ignore and continue")

ErrIgnoreAndContinue is a error which should be ignored

View Source
var HttpMinStatusRetryCode = http.StatusBadRequest

HttpMinStatusRetryCode is which status will retry

Functions

func AddMissingSlashToURL

func AddMissingSlashToURL(baseUrl *string)

AddMissingSlashToURL FIXME ...

func ConvertStringToTime

func ConvertStringToTime(timeString string) (t time.Time, err error)

ConvertStringToTime FIXME ...

func DecodeMapStruct

func DecodeMapStruct(input map[string]interface{}, result interface{}) error

DecodeMapStruct with time.Time and Iso8601Time support

func DecodeStruct

func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, tag string) error

DecodeStruct validates `input` struct with `validator` and set it into viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `input` must be a pointer

func EncodeStruct

func EncodeStruct(input *viper.Viper, output interface{}, tag string) error

EncodeStruct encodes struct from viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `object` must be a pointer

func GetRawMessageArrayFromResponse

func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error)

GetRawMessageArrayFromResponse FIXME ...

func GetRawMessageDirectFromResponse

func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error)

GetRawMessageDirectFromResponse FIXME ...

func GetURIStringPointer

func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, error)

GetURIStringPointer FIXME ...

func Iso8601TimeToTime

func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time

Iso8601TimeToTime FIXME ...

func MakePipelinePlanSubtasks added in v0.12.0

func MakePipelinePlanSubtasks(subtaskMetas []core.SubTaskMeta, entities []string) ([]string, error)

MakePipelinePlanSubtasks generates subtasks list based on sub-task meta information and entities wanted by user

func NewDefaultTaskContext

func NewDefaultTaskContext(
	ctx context.Context,
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
	name string,
	subtasks map[string]bool,
	progress chan core.RunningProgress,
) core.TaskContext

NewDefaultTaskContext FIXME ...

func NewStandaloneSubTaskContext

func NewStandaloneSubTaskContext(
	ctx context.Context,
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
	name string,
	data interface{},
) core.SubTaskContext

NewStandaloneSubTaskContext returns a stand-alone core.SubTaskContext, not attached to any core.TaskContext. Use this if you need to run/debug a subtask without going through the usual workflow.

func RemoveStartingSlashFromPath

func RemoveStartingSlashFromPath(relativePath string) string

RemoveStartingSlashFromPath FIXME ...

func UnmarshalResponse

func UnmarshalResponse(res *http.Response, v interface{}) error

UnmarshalResponse FIXME ...

func UpdateEncryptFields added in v0.12.0

func UpdateEncryptFields(val interface{}, update func(in string) (string, error)) error

UpdateEncryptFields update fields of val with tag `encrypt:"yes|true"`

Types

type AccessToken added in v0.12.0

type AccessToken struct {
	Token string `mapstructure:"token" validate:"required" json:"token" encrypt:"yes"`
}

AccessToken FIXME ...

type ApiAsyncClient

type ApiAsyncClient struct {
	*ApiClient
	// contains filtered or unexported fields
}

ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic You may submit multiple requests at once by calling `DoGetAsync`, and those requests will be performed in parallel with rate-limit support

func CreateAsyncApiClient

func CreateAsyncApiClient(
	taskCtx core.TaskContext,
	apiClient *ApiClient,
	rateLimiter *ApiRateLimitCalculator,
) (*ApiAsyncClient, error)

CreateAsyncApiClient creates a new ApiAsyncClient

func (*ApiAsyncClient) DoAsync

func (apiClient *ApiAsyncClient) DoAsync(
	method string,
	path string,
	query url.Values,
	body interface{},
	header http.Header,
	handler common.ApiAsyncCallback,
	retry int,
)

DoAsync would carry out an asynchronous request

func (*ApiAsyncClient) DoGetAsync added in v0.12.0

func (apiClient *ApiAsyncClient) DoGetAsync(
	path string,
	query url.Values,
	header http.Header,
	handler common.ApiAsyncCallback,
)

DoGetAsync Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests

func (*ApiAsyncClient) GetMaxRetry

func (apiClient *ApiAsyncClient) GetMaxRetry() int

GetMaxRetry returns the maximum retry attempts for a request

func (*ApiAsyncClient) GetNumOfWorkers added in v0.12.0

func (apiClient *ApiAsyncClient) GetNumOfWorkers() int

GetNumOfWorkers to return the Workers count if scheduler.

func (*ApiAsyncClient) HasError added in v0.12.0

func (apiClient *ApiAsyncClient) HasError() bool

HasError to return if the scheduler has Error

func (*ApiAsyncClient) NextTick added in v0.12.0

func (apiClient *ApiAsyncClient) NextTick(task func() error)

NextTick to return the NextTick of scheduler

func (*ApiAsyncClient) Release added in v0.12.0

func (apiClient *ApiAsyncClient) Release()

Release will release the ApiAsyncClient with scheduler

func (*ApiAsyncClient) SetMaxRetry

func (apiClient *ApiAsyncClient) SetMaxRetry(
	maxRetry int,
)

SetMaxRetry sets the maximum retry attempts for a request

func (*ApiAsyncClient) WaitAsync

func (apiClient *ApiAsyncClient) WaitAsync() error

WaitAsync blocks until all async requests were done

type ApiClient

type ApiClient struct {
	// contains filtered or unexported fields
}

ApiClient is designed for simple api requests

func NewApiClient

func NewApiClient(
	ctx context.Context,
	endpoint string,
	headers map[string]string,
	timeout time.Duration,
	proxy string,
	br core.BasicRes,
) (*ApiClient, error)

NewApiClient FIXME ...

func (*ApiClient) Do

func (apiClient *ApiClient) Do(
	method string,
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, error)

Do FIXME ...

func (*ApiClient) Get

func (apiClient *ApiClient) Get(
	path string,
	query url.Values,
	headers http.Header,
) (*http.Response, error)

Get FIXME ...

func (*ApiClient) GetEndpoint

func (apiClient *ApiClient) GetEndpoint() string

GetEndpoint FIXME ...

func (*ApiClient) GetHeaders

func (apiClient *ApiClient) GetHeaders() map[string]string

GetHeaders FIXME ...

func (*ApiClient) Post

func (apiClient *ApiClient) Post(
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, error)

Post FIXME ...

func (*ApiClient) SetAfterFunction

func (apiClient *ApiClient) SetAfterFunction(callback common.ApiClientAfterResponse)

SetAfterFunction FIXME ...

func (*ApiClient) SetBeforeFunction

func (apiClient *ApiClient) SetBeforeFunction(callback common.ApiClientBeforeRequest)

SetBeforeFunction FIXME ...

func (*ApiClient) SetContext

func (apiClient *ApiClient) SetContext(ctx context.Context)

SetContext FIXME ...

func (*ApiClient) SetEndpoint

func (apiClient *ApiClient) SetEndpoint(endpoint string)

SetEndpoint FIXME ...

func (*ApiClient) SetHeaders

func (apiClient *ApiClient) SetHeaders(headers map[string]string)

SetHeaders FIXME ...

func (*ApiClient) SetLogger

func (apiClient *ApiClient) SetLogger(logger core.Logger)

SetLogger FIXME ...

func (*ApiClient) SetProxy

func (apiClient *ApiClient) SetProxy(proxyUrl string) error

SetProxy FIXME ...

func (*ApiClient) SetTimeout

func (apiClient *ApiClient) SetTimeout(timeout time.Duration)

SetTimeout FIXME ...

func (*ApiClient) Setup

func (apiClient *ApiClient) Setup(
	endpoint string,
	headers map[string]string,
	timeout time.Duration,

)

Setup FIXME ...

type ApiCollector

type ApiCollector struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

ApiCollector FIXME ...

func NewApiCollector

func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error)

NewApiCollector allocates a new ApiCollector with the given args. ApiCollector can help us collecting data from api with ease, pass in a AsyncApiClient and tell it which part of response we want to save, ApiCollector will collect them from remote server and store them into database.

func (*ApiCollector) Execute

func (collector *ApiCollector) Execute() error

Execute will start collection

func (*ApiCollector) SetAfterResponse

func (collector *ApiCollector) SetAfterResponse(f common.ApiClientAfterResponse)

SetAfterResponse FIXME ...

type ApiCollectorArgs

type ApiCollectorArgs struct {
	RawDataSubTaskArgs
	// UrlTemplate is used to generate the final URL for Api Collector to request
	// i.e. `api/3/issue/{{ .Input.IssueId }}/changelog`
	// For detail of what variables can be used, please check `RequestData`
	UrlTemplate string `comment:"GoTemplate for API url"`
	// Query would be sent out as part of the request URL
	Query func(reqData *RequestData) (url.Values, error) ``
	// Header would be sent out along with request
	Header func(reqData *RequestData) (http.Header, error)
	// PageSize tells ApiCollector the page size
	PageSize int
	// Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true
	Incremental bool `comment:""`
	// ApiClient is a asynchronize api request client with qps
	ApiClient RateLimitedApiClient
	// Input helps us collect data based on previous collected data, like collecting changelogs based on jira
	// issue ids
	Input Iterator
	// GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page.
	// so `ApiCollector` could collect those pages in parallel for us
	GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, error)
	// Concurrency specify qps for api that doesn't return total number of pages/records
	// NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means
	Concurrency    int
	ResponseParser func(res *http.Response) ([]json.RawMessage, error)
	AfterResponse  common.ApiClientAfterResponse
}

ApiCollectorArgs FIXME ...

type ApiExtractor

type ApiExtractor struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data It reads rows from specified raw data table, and feed it into `Extract` handler you can return arbitrary tool layer entities in this handler, ApiExtractor would first delete old data by their RawDataOrigin information, and then perform a batch save for you.

func NewApiExtractor

func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error)

NewApiExtractor creates a new ApiExtractor

func (*ApiExtractor) Execute

func (extractor *ApiExtractor) Execute() error

Execute sub-task

type ApiExtractorArgs

type ApiExtractorArgs struct {
	RawDataSubTaskArgs
	Params    interface{}
	Extract   func(row *RawData) ([]interface{}, error)
	BatchSize int
}

ApiExtractorArgs FIXME ...

type ApiRateLimitCalculator

type ApiRateLimitCalculator struct {
	UserRateLimitPerHour   int
	GlobalRateLimitPerHour int
	MaxRetry               int
	Method                 string
	ApiPath                string
	DynamicRateLimit       func(res *http.Response) (int, time.Duration, error)
}

ApiRateLimitCalculator is A helper to calculate api rate limit dynamically, assuming api returning remaining/resettime information

func (*ApiRateLimitCalculator) Calculate

func (c *ApiRateLimitCalculator) Calculate(apiClient *ApiClient) (int, time.Duration, error)

Calculate FIXME ...

type AppKey added in v0.12.0

type AppKey struct {
	AppId     string `mapstructure:"app_id" validate:"required" json:"app_id"`
	SecretKey string `mapstructure:"secret_key" validate:"required" json:"secret_key" encrypt:"yes"`
}

AppKey FIXME ...

type AsyncResponseHandler

type AsyncResponseHandler func(res *http.Response) error

AsyncResponseHandler FIXME ...

type BaseConnection added in v0.12.0

type BaseConnection struct {
	Name string `gorm:"type:varchar(100);uniqueIndex" json:"name" validate:"required"`
	common.Model
}

BaseConnection FIXME ...

type BasicAuth added in v0.12.0

type BasicAuth struct {
	Username string `mapstructure:"username" validate:"required" json:"username"`
	Password string `mapstructure:"password" validate:"required" json:"password" encrypt:"yes"`
}

BasicAuth FIXME ...

func (BasicAuth) GetEncodedToken added in v0.12.0

func (ba BasicAuth) GetEncodedToken() string

GetEncodedToken FIXME ...

type BatchSave

type BatchSave struct {
	// contains filtered or unexported fields
}

BatchSave performs mulitple records persistence of a specific type in one sql query to improve the performance

func NewBatchSave

func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) (*BatchSave, error)

NewBatchSave creates a new BatchSave instance

func (*BatchSave) Add

func (c *BatchSave) Add(slot interface{}) error

Add record to cache. BatchSave would flush them into Database when cache is max out

func (*BatchSave) Close

func (c *BatchSave) Close() error

Close would flash the cache and release resources

func (*BatchSave) Flush

func (c *BatchSave) Flush() error

Flush save cached records into database

type BatchSaveDivider

type BatchSaveDivider struct {
	// contains filtered or unexported fields
}

BatchSaveDivider creates and caches BatchSave, this is helpful when dealing with massive amount of data records with arbitrary types.

func NewBatchSaveDivider

func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string, params string) *BatchSaveDivider

NewBatchSaveDivider create a new BatchInsertDivider instance

func (*BatchSaveDivider) Close

func (d *BatchSaveDivider) Close() error

Close all batches so the rest records get saved into db

func (*BatchSaveDivider) ForType

func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error)

ForType returns a `BatchSave` instance for specific type

type CSTTime

type CSTTime time.Time

CSTTime FIXME ...

func (*CSTTime) Scan

func (jt *CSTTime) Scan(v interface{}) error

Scan FIXME ...

func (*CSTTime) UnmarshalJSON

func (jt *CSTTime) UnmarshalJSON(b []byte) error

UnmarshalJSON FIXME ...

func (CSTTime) Value

func (jt CSTTime) Value() (driver.Value, error)

Value FIXME ...

type ConnectionApiHelper added in v0.12.0

type ConnectionApiHelper struct {
	// contains filtered or unexported fields
}

ConnectionApiHelper is used to write the CURD of connection

func NewConnectionHelper added in v0.12.0

func NewConnectionHelper(
	basicRes core.BasicRes,
	vld *validator.Validate,
) *ConnectionApiHelper

NewConnectionHelper FIXME ...

func (*ConnectionApiHelper) Create added in v0.12.0

func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) error

Create a connection record based on request body

func (*ConnectionApiHelper) Delete added in v0.12.0

func (c *ConnectionApiHelper) Delete(connection interface{}) error

Delete connection

func (*ConnectionApiHelper) First added in v0.12.0

func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) error

First finds connection from db by parsing request input and decrypt it

func (*ConnectionApiHelper) FirstById added in v0.12.0

func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) error

FirstById finds connection from db by id and decrypt it

func (*ConnectionApiHelper) List added in v0.12.0

func (c *ConnectionApiHelper) List(connections interface{}) error

List returns all connections with password/token decrypted

func (*ConnectionApiHelper) Patch added in v0.12.0

func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) error

Patch (Modify) a connection record based on request body

type DalCursorIterator added in v0.12.0

type DalCursorIterator struct {
	// contains filtered or unexported fields
}

DalCursorIterator FIXME ...

func NewDalCursorIterator added in v0.12.0

func NewDalCursorIterator(db dal.Dal, cursor *sql.Rows, elemType reflect.Type) (*DalCursorIterator, error)

NewDalCursorIterator FIXME ...

func (*DalCursorIterator) Close added in v0.12.0

func (c *DalCursorIterator) Close() error

Close interator

func (*DalCursorIterator) Fetch added in v0.12.0

func (c *DalCursorIterator) Fetch() (interface{}, error)

Fetch FIXME ...

func (*DalCursorIterator) HasNext added in v0.12.0

func (c *DalCursorIterator) HasNext() bool

HasNext FIXME ...

type DataConvertHandler

type DataConvertHandler func(row interface{}) ([]interface{}, error)

DataConvertHandler Accept row from source cursor, return list of entities that need to be stored

type DataConverter

type DataConverter struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

DataConverter helps you convert Data from Tool Layer Tables to Domain Layer Tables It reads rows from specified Iterator, and feed it into `Converter` handler you can return arbitrary domain layer entities from this handler, ApiConverter would first delete old data by their RawDataOrigin information, and then perform a batch save operation for you.

func NewDataConverter

func NewDataConverter(args DataConverterArgs) (*DataConverter, error)

NewDataConverter function helps you create a DataConverter using DataConverterArgs. You can see the usage in plugins/github/tasks/pr_issue_convertor.go or other convertor file.

func (*DataConverter) Execute

func (converter *DataConverter) Execute() error

Execute function implements Subtask interface. It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data using `converter.args.Convert` handler Then save data to Domain Layer Tables using BatchSaveDivider

type DataConverterArgs

type DataConverterArgs struct {
	RawDataSubTaskArgs
	// Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, `github:GithubIssue`
	InputRowType reflect.Type
	Input        *sql.Rows
	Convert      DataConvertHandler
	BatchSize    int
}

DataConverterArgs includes the arguments about DataConverter. This will be used in Creating a DataConverter.

DataConverterArgs {
			InputRowType: 		type of inputRow ,
			Input:        		dal cursor,
			RawDataSubTaskArgs: args about raw data task
			Convert: 			main function including conversion logic
			BatchSize: 			batch size

type DateIterator

type DateIterator struct {
	Days    int
	Current int
	// contains filtered or unexported fields
}

DateIterator FIXME ...

func NewDateIterator

func NewDateIterator(days int) (*DateIterator, error)

NewDateIterator FIXME ...

func (*DateIterator) Close

func (c *DateIterator) Close() error

Close iterator

func (*DateIterator) Fetch

func (c *DateIterator) Fetch() (interface{}, error)

Fetch FIXME ...

func (*DateIterator) HasNext

func (c *DateIterator) HasNext() bool

HasNext FIXME ...

type DatePair

type DatePair struct {
	PairStartTime time.Time
	PairEndTime   time.Time
}

DatePair FIXME ...

type DateTimeFormatItem

type DateTimeFormatItem struct {
	Matcher *regexp.Regexp
	Format  string
}

DateTimeFormatItem FIXME ... TODO: move this to helper

type DefaultBasicRes added in v0.12.0

type DefaultBasicRes struct {
	// contains filtered or unexported fields
}

DefaultBasicRes FIXME ...

func NewDefaultBasicRes added in v0.12.0

func NewDefaultBasicRes(
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
) *DefaultBasicRes

NewDefaultBasicRes FIXME ...

func (*DefaultBasicRes) GetConfig added in v0.12.0

func (c *DefaultBasicRes) GetConfig(name string) string

GetConfig FIXME ...

func (*DefaultBasicRes) GetDal added in v0.12.0

func (c *DefaultBasicRes) GetDal() dal.Dal

GetDal FIXME ...

func (*DefaultBasicRes) GetDb added in v0.12.0

func (c *DefaultBasicRes) GetDb() *gorm.DB

GetDb FIXME ...

func (*DefaultBasicRes) GetLogger added in v0.12.0

func (c *DefaultBasicRes) GetLogger() core.Logger

GetLogger FIXME ...

type DefaultSubTaskContext

type DefaultSubTaskContext struct {
	LastProgressTime time.Time
	// contains filtered or unexported fields
}

DefaultSubTaskContext is default implementation

func (DefaultSubTaskContext) GetContext

func (c DefaultSubTaskContext) GetContext() context.Context

func (DefaultSubTaskContext) GetData

func (c DefaultSubTaskContext) GetData() interface{}

func (DefaultSubTaskContext) GetName

func (c DefaultSubTaskContext) GetName() string

func (*DefaultSubTaskContext) IncProgress

func (c *DefaultSubTaskContext) IncProgress(quantity int)

IncProgress FIXME ...

func (*DefaultSubTaskContext) SetProgress

func (c *DefaultSubTaskContext) SetProgress(current int, total int)

SetProgress FIXME ...

func (*DefaultSubTaskContext) TaskContext

func (c *DefaultSubTaskContext) TaskContext() core.TaskContext

TaskContext FIXME ...

type DefaultTaskContext

type DefaultTaskContext struct {
	// contains filtered or unexported fields
}

DefaultTaskContext is TaskContext default implementation

func (DefaultTaskContext) GetContext

func (c DefaultTaskContext) GetContext() context.Context

func (DefaultTaskContext) GetData

func (c DefaultTaskContext) GetData() interface{}

func (DefaultTaskContext) GetName

func (c DefaultTaskContext) GetName() string

func (*DefaultTaskContext) IncProgress

func (c *DefaultTaskContext) IncProgress(quantity int)

IncProgress FIXME ...

func (*DefaultTaskContext) SetData

func (c *DefaultTaskContext) SetData(data interface{})

SetData FIXME ...

func (*DefaultTaskContext) SetProgress

func (c *DefaultTaskContext) SetProgress(current int, total int)

SetProgress FIXME ...

func (*DefaultTaskContext) SubTaskContext

func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, error)

SubTaskContext FIXME ...

type Iso8601Time

type Iso8601Time struct {
	// contains filtered or unexported fields
}

Iso8601Time is type time.Time

func (Iso8601Time) MarshalJSON

func (jt Iso8601Time) MarshalJSON() ([]byte, error)

MarshalJSON FIXME ...

func (*Iso8601Time) String

func (jt *Iso8601Time) String() string

func (*Iso8601Time) ToNullableTime

func (jt *Iso8601Time) ToNullableTime() *time.Time

ToNullableTime FIXME ...

func (*Iso8601Time) ToTime

func (jt *Iso8601Time) ToTime() time.Time

ToTime FIXME ...

func (*Iso8601Time) UnmarshalJSON

func (jt *Iso8601Time) UnmarshalJSON(b []byte) error

UnmarshalJSON FIXME ...

type Iterator

type Iterator interface {
	HasNext() bool
	Fetch() (interface{}, error)
	Close() error
}

Iterator FIXME ...

type ListBaseNode added in v0.12.0

type ListBaseNode struct {
	// contains filtered or unexported fields
}

func NewListBaseNode added in v0.12.0

func NewListBaseNode() *ListBaseNode

NewListBaseNode create and init a new node

func (*ListBaseNode) Next added in v0.12.0

func (l *ListBaseNode) Next() interface{}

func (*ListBaseNode) SetNext added in v0.12.0

func (l *ListBaseNode) SetNext(next interface{})

type Pager

type Pager struct {
	Page int
	Skip int
	Size int
}

Pager contains pagination information for a api request

type Queue added in v0.12.0

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue added in v0.12.0

func NewQueue() *Queue

NewQueue create and init a new Queue

func (*Queue) Clean added in v0.12.0

func (q *Queue) Clean()

Clean remove all node on queue

func (*Queue) CleanWithOutLock added in v0.12.0

func (q *Queue) CleanWithOutLock()

CleanWithOutLock is no lock mode of Clean

func (*Queue) GetCount added in v0.12.0

func (q *Queue) GetCount() int64

GetCount get the node count

func (*Queue) GetCountWithOutLock added in v0.12.0

func (q *Queue) GetCountWithOutLock() int64

GetCountWithOutLock is no lock mode of GetCount

func (*Queue) Pull added in v0.12.0

func (q *Queue) Pull(add *int64) QueueNode

Pull get a node from queue

func (*Queue) PullWithOutLock added in v0.12.0

func (q *Queue) PullWithOutLock() QueueNode

PullWitouLock is no lock mode of Pull

func (*Queue) Push added in v0.12.0

func (q *Queue) Push(node QueueNode)

Push add a node to queue

func (*Queue) PushWitouLock added in v0.12.0

func (q *Queue) PushWitouLock(node QueueNode)

PushWitouLock is no lock mode of Push

type QueueIterator added in v0.12.0

type QueueIterator struct {
	// contains filtered or unexported fields
}

func NewQueueIterator added in v0.12.0

func NewQueueIterator() *QueueIterator

func (*QueueIterator) Close added in v0.12.0

func (q *QueueIterator) Close() error

func (*QueueIterator) Fetch added in v0.12.0

func (q *QueueIterator) Fetch() (interface{}, error)

func (*QueueIterator) HasNext added in v0.12.0

func (q *QueueIterator) HasNext() bool

func (*QueueIterator) Push added in v0.12.0

func (q *QueueIterator) Push(data QueueNode)

type QueueIteratorNode added in v0.12.0

type QueueIteratorNode struct {
	// contains filtered or unexported fields
}

func (*QueueIteratorNode) Next added in v0.12.0

func (q *QueueIteratorNode) Next() interface{}

func (*QueueIteratorNode) SetNext added in v0.12.0

func (q *QueueIteratorNode) SetNext(next interface{})

type QueueNode added in v0.12.0

type QueueNode interface {
	Next() interface{}
	SetNext(next interface{})
}

type RateLimitedApiClient

type RateLimitedApiClient interface {
	DoGetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback)
	WaitAsync() error
	HasError() bool
	NextTick(task func() error)
	GetNumOfWorkers() int
	SetAfterFunction(callback common.ApiClientAfterResponse)
	Release()
}

RateLimitedApiClient FIXME ...

type RawData

type RawData struct {
	ID        uint64 `gorm:"primaryKey"`
	Params    string `gorm:"type:varchar(255);index"`
	Data      []byte
	Url       string
	Input     datatypes.JSON
	CreatedAt time.Time
}

RawData is raw data structure in DB storage

type RawDataSubTask

type RawDataSubTask struct {
	// contains filtered or unexported fields
}

RawDataSubTask is Common features for raw data sub-tasks

type RawDataSubTaskArgs

type RawDataSubTaskArgs struct {
	Ctx core.SubTaskContext

	//	Table store raw data
	Table string `comment:"Raw data table name"`

	//	This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
	//	set of data to be process, for example, we process JiraIssues by Board
	Params interface{} `comment:"To identify a set of records with same UrlTemplate, i.e. {ConnectionId, BoardId} for jira entities"`
}

RawDataSubTaskArgs FIXME ...

type RequestData

type RequestData struct {
	Pager     *Pager
	Params    interface{}
	Input     interface{}
	InputJSON []byte
}

RequestData is the input of `UrlTemplate` `Query` and `Header`, so we can generate them dynamically

type RestConnection added in v0.12.0

type RestConnection struct {
	BaseConnection   `mapstructure:",squash"`
	Endpoint         string `mapstructure:"endpoint" validate:"required" json:"endpoint"`
	Proxy            string `mapstructure:"proxy" json:"proxy"`
	RateLimitPerHour int    `comment:"api request rate limit per hour" json:"rateLimit"`
}

RestConnection FIXME ...

type WorkerScheduler

type WorkerScheduler struct {
	// contains filtered or unexported fields
}

WorkerScheduler runs asynchronous tasks in parallel with throttling support

func NewWorkerScheduler

func NewWorkerScheduler(
	ctx context.Context,
	workerNum int,
	maxWork int,
	maxWorkDuration time.Duration,
	maxRetry int,
	logger core.Logger,
) (*WorkerScheduler, error)

NewWorkerScheduler creates a WorkerScheduler

func (*WorkerScheduler) HasError added in v0.12.0

func (s *WorkerScheduler) HasError() bool

HasError return if any error occurred

func (*WorkerScheduler) NextTick added in v0.12.0

func (s *WorkerScheduler) NextTick(task func() error)

NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by SubmitBlocking method IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory

func (*WorkerScheduler) Release

func (s *WorkerScheduler) Release()

Release resources

func (*WorkerScheduler) SubmitBlocking added in v0.12.0

func (s *WorkerScheduler) SubmitBlocking(task func() error)

SubmitBlocking enqueues a async task to ants, the task will be executed in future when timing is right. It doesn't return error because it wouldn't be any when with a Blocking semantic, returned error does nothing but causing confusion, more often, people thought it is returned by the task. Since it is async task, the callframes would not be available for production mode, you can export Environment Varaible ASYNC_CF=true to enable callframes capturing when debugging. IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call SubmitNonBlocking instead when number of tasks is relatively small.

func (*WorkerScheduler) Wait added in v0.12.0

func (s *WorkerScheduler) Wait() error

Wait blocks current go-routine until all workers returned

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL