Documentation ¶
Index ¶
- Constants
- func Contain(obj interface{}, target interface{}) bool
- func GetLogCount(logGroupList *sls.LogGroupList) int
- func GetLogGroupCount(logGroupList *sls.LogGroupList) int
- func IntSliceReflectEqual(a, b []int) bool
- func Min(a, b int64) int64
- func Set(slc []int) []int
- func Subtract(a []int, b []int) (diffSlice []int)
- func TimeToSleepInMillsecond(intervalTime, lastCheckTime int64, condition bool)
- func TimeToSleepInSecond(intervalTime, lastCheckTime int64, condition bool)
- type CheckPointTracker
- type ConsumerClient
- type ConsumerHeartBeat
- type ConsumerWorker
- type DefaultCheckPointTracker
- func (tracker *DefaultCheckPointTracker) GetCheckPoint() string
- func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string
- func (tracker *DefaultCheckPointTracker) GetNextCursor() string
- func (tracker *DefaultCheckPointTracker) GetShardId() int
- func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error
- type LogHubConfig
- type ProcessFunc
- type Processor
- type ShardConsumerWorker
Constants ¶
View Source
const ( BEGIN_CURSOR = "BEGIN_CURSOR" END_CURSOR = "END_CURSOR" SPECIAL_TIMER_CURSOR = "SPECIAL_TIMER_CURSOR" )
View Source
const ( INITIALIZING = "INITIALIZING" PULLING = "PULLING" PROCESSING = "PROCESSING" SHUTTING_DOWN = "SHUTTING_DOWN" SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" )
Variables ¶
This section is empty.
Functions ¶
func Contain ¶
func Contain(obj interface{}, target interface{}) bool
Determine whether obj is in target object
func GetLogCount ¶
func GetLogCount(logGroupList *sls.LogGroupList) int
Get the total number of logs
func GetLogGroupCount ¶
func GetLogGroupCount(logGroupList *sls.LogGroupList) int
func IntSliceReflectEqual ¶
Determine whether two lists are equal
func TimeToSleepInMillsecond ¶
func TimeToSleepInSecond ¶
Types ¶
type CheckPointTracker ¶ added in v0.1.46
type CheckPointTracker interface { // GetCheckPoint get lastest saved check point GetCheckPoint() string // SaveCheckPoint, save next cursor to checkpoint SaveCheckPoint(force bool) error // GetCurrentCursor get current fetched data cursor GetCurrentCursor() string // GetNextCursor get next fetched data cursor(this is also the next checkpoint to be saved) GetNextCursor() string // GetShardId, return the id of shard tracked GetShardId() int }
CheckPointTracker Generally, you just need SaveCheckPoint, if you use more funcs, make sure you understand these
type ConsumerClient ¶
type ConsumerClient struct {
// contains filtered or unexported fields
}
type ConsumerHeartBeat ¶ added in v0.1.46
type ConsumerHeartBeat struct {
// contains filtered or unexported fields
}
type ConsumerWorker ¶
func InitConsumerWorker ¶
func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) string) *ConsumerWorker
depreciated: this old logic is to automatically save to memory, and then commit at a fixed time we highly recommend you to use InitConsumerWorkerWithCheckpointTracker
func InitConsumerWorkerWithCheckpointTracker ¶ added in v0.1.46
func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) (string, error)) *ConsumerWorker
InitConsumerWorkerWithCheckpointTracker please note that you need to save after the process is successful,
func InitConsumerWorkerWithProcessor ¶ added in v0.1.46
func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker
InitConsumerWorkerWithProcessor you need save checkpoint by yourself and can do something after consumer shutdown
func (*ConsumerWorker) Start ¶
func (consumerWorker *ConsumerWorker) Start()
func (*ConsumerWorker) StopAndWait ¶
func (consumerWorker *ConsumerWorker) StopAndWait()
type DefaultCheckPointTracker ¶ added in v0.1.46
type DefaultCheckPointTracker struct {
// contains filtered or unexported fields
}
func (*DefaultCheckPointTracker) GetCheckPoint ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) GetCheckPoint() string
func (*DefaultCheckPointTracker) GetCurrentCursor ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string
func (*DefaultCheckPointTracker) GetNextCursor ¶ added in v0.1.47
func (tracker *DefaultCheckPointTracker) GetNextCursor() string
func (*DefaultCheckPointTracker) GetShardId ¶ added in v0.1.47
func (tracker *DefaultCheckPointTracker) GetShardId() int
func (*DefaultCheckPointTracker) SaveCheckPoint ¶ added in v0.1.46
func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error
type LogHubConfig ¶
type LogHubConfig struct { //:param Endpoint: //:param AccessKeyID: //:param AccessKeySecret: //:param SecurityToken: If you use sts token to consume data, you must make sure consumer will be stopped before this token expired. //:param CredentialsProvider: CredentialsProvider that providers credentials(AccessKeyID, AccessKeySecret, StsToken) //:param Project: //:param Logstore: //:param Query: Filter rules Corresponding rules must be set when consuming based on rules, such as *| where a = 'xxx' //:param ConsumerGroupName: //:param ConsumerName: //:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed. // Provide three options :BEGIN_CURSOR,END_CURSOR,SPECIAL_TIMER_CURSOR,when you choose SPECIAL_TIMER_CURSOR, you have to set CursorStartTime parameter. //:param HeartbeatIntervalInSecond: // default 20, once a client doesn't report to server * HeartbeatTimeoutInSecond seconds, // server will consider it's offline and re-assign its task to another consumer. // don't set the heatbeat interval too small when the network badwidth or performance of consumtion is not so good. //:param DataFetchIntervalInMs: default 200(Millisecond), don't configure it too small (<100Millisecond) //:param HeartbeatTimeoutInSecond: // default HeartbeatIntervalInSecond * 3, once a client doesn't report to server HeartbeatTimeoutInSecond seconds, // server will consider it's offline and re-assign its task to another consumer. //:param MaxFetchLogGroupCount: default 1000, fetch size in each request, normally use default. maximum is 1000, could be lower. the lower the size the memory efficiency might be better. //:param CursorStartTime: Will be used when cursor_position when could be "begin", "end", "specific time format in time stamp", it's log receiving time. The unit of parameter is seconds. //:param InOrder: // default False, during consuption, when shard is splitted, // if need to consume the newly splitted shard after its parent shard (read-only) is finished consumption or not. // suggest keep it as False (don't care) until you have good reasion for it. //:param AllowLogLevel: default info,optional: debug,info,warn,error //:param LogFileName: Setting Log File Path,for example "/root/log/log_file.log",default //:param IsJsonType: Set whether the log output type is JSON,default false. //:param LogMaxSize: MaxSize is the maximum size in megabytes of the log file before it gets rotated. It defaults to 100 megabytes. //:param LogMaxBackups: // MaxBackups is the maximum number of old log files to retain. The default // is to retain all old log files (though MaxAge may still cause them to get // deleted.) //:param LogCompass: Compress determines if the rotated log files should be compressed using gzip. //:param CompressType: CompressType is the type of compression to use, default 0 standand for lz4 //:param HTTPClient: custom http client for sending data to sls //:param AutoCommitDisabled: whether to disable commit checkpoint automatically, default is false, means auto commit checkpoint // Note that if you set autocommit to false, you must use InitConsumerWorkerWithCheckpointTracker instead of InitConsumerWorker //:param AutoCommitIntervalInSec: default auto commit interval, default is 30 //:param AuthVersion: signature algorithm version, default is sls.AuthV1 //:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4 Endpoint string AccessKeyID string AccessKeySecret string CredentialsProvider sls.CredentialsProvider Project string Logstore string Query string ConsumerGroupName string ConsumerName string CursorPosition string HeartbeatIntervalInSecond int HeartbeatTimeoutInSecond int DataFetchIntervalInMs int64 MaxFetchLogGroupCount int CursorStartTime int64 // Unix time stamp; Units are seconds. InOrder bool AllowLogLevel string LogFileName string IsJsonType bool LogMaxSize int LogMaxBackups int LogCompass bool CompressType int HTTPClient *http.Client SecurityToken string AutoCommitDisabled bool AutoCommitIntervalInMS int64 AuthVersion sls.AuthVersionType Region string }
type ProcessFunc ¶ added in v0.1.46
type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) (string, error)
func (ProcessFunc) Process ¶ added in v0.1.46
func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error)
func (ProcessFunc) Shutdown ¶ added in v0.1.46
func (processor ProcessFunc) Shutdown(checkpointTracker CheckPointTracker) error
type Processor ¶ added in v0.1.46
type Processor interface { Process(int, *sls.LogGroupList, CheckPointTracker) (string, error) Shutdown(CheckPointTracker) error }
type ShardConsumerWorker ¶
type ShardConsumerWorker struct {
// contains filtered or unexported fields
}
Click to show internal directories.
Click to hide internal directories.