consumerLibrary

package
v0.1.82-dev.20240905 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2024 License: MIT Imports: 15 Imported by: 9

README

Aliyun LOG Go Consumer Library

Aliyun LOG Go Consumer Library 是一个易于使用且高度可配置的golang 类库,专门为大数据高并发场景下的多个消费者协同消费同一个logstore而编写的纯go语言的类库。

功能特点

  1. 线程安全 - consumer 内所有的方法以及暴露的接口都是线程安全的。
  2. 异步拉取 - 调用consumer的拉取日志接口,会把当前拉取任务新开一个groutine中去执行,不会阻塞主groutine的执行。
  3. 自动重试 - 对程序运行当中出现的可重试的异常,consumer会自动重试,重试过程不会导致数据的重复消费。
  4. 优雅关闭 - 调用关闭程序接口,consumer会等待当前所有已开出的groutine任务结束后在结束主程序,保证下次开始不会重复消费数据。
  5. 本地调试 - 可通过配置支持将日志内容输出到本地或控制台,并支持轮转、日志数、轮转大小设置。
  6. 高性能 - 基于go语言的特性,go的goroutine在并发多任务处理能力上有着与生俱来的优势。所以consumer 对每一个获得的可消费分区都会开启一个单独的groutine去执行消费任务,相对比直接使用cpu线程处理,对系统性能消耗更小,效率更高。
  7. 使用简单 - 在整个使用过程中,不会产生数据丢失,以及重复,用户只需要配置简单配置文件,创建消费者实例,写处理日志的代码就可以,用户只需要把重心放到自己的消费逻辑上面即可,不需关心消费断点保存,以及错误重试等问题。

功能优势

使用consumer library 相对于直接通过 API 或 SDK 从 LogStore 拉取数据进行消费会有如下优势。

  • 用户可以创建多个消费者对同一Logstore中的数据进行消费,而且不用关心消费者之间的负载均衡,consumer library 会进行自动处理,并且保证数据不会被重复消费。在cpu等资源有限情况下可以尽最大能力去消费logstore中的数据,并且会自动为用户保存消费断点到服务端。
  • 当网络不稳定出现网络震荡时,consumer library可以在网络恢复时继续消费并且保证数据不会丢失及重复消费。
  • 提供了更多高阶用法,使用户可以通过多种方法去调控运行中的consumer library

安装

请先克隆代码到自己的GOPATH路径下(源码地址:aliyun-go-consumer-library),项目使用了vendor工具管理第三方依赖包,所以克隆下来项目以后无需安装任何第三方工具包。

git clone git@github.com:aliyun/aliyun-log-go-sdk.git

原理剖析及快速入门

参考教程: ALiyun LOG Go Consumer Library 快速入门及原理剖析

使用步骤

1.配置LogHubConfig

LogHubConfig是提供给用户的配置类,用于配置消费策略,您可以根据不同的需求设定不同的值,各参数含义如其中所示

参数 含义 详情
Endpoint sls的endpoint 必填,如cn-hangzhou.sls.aliyuncs.com
AccessKeyId aliyun的AccessKeyId 当 CredentialsProvider 为 nil 时必填
AccessKeySecret aliyun的AccessKeySecret 当 CredentialsProvider 为 nil 时必填
CredentialsProvider 自定义接口 可选,可自定义CredentialsProvider,来提供动态的 AccessKeyId/AccessKeySecret/StsToken,该接口应当缓存 AK,且必须线程安全
Project sls的project信息 必填
Logstore sls的logstore 必填
ConsumerGroupName 消费组名称 必填
Consumer 消费者名称 必填,sls的consumer需要自行指定,请注意不要重复
CursorPosition 消费的点位 必填,支持 1.BEGIN_CURSOR: logstore的开始点位 2. END_CURSOR: logstore的最新数据点位 3.SPECIAL_TIME_CURSOR: 自行设置的unix时间戳
sls的logstore 必填
HeartbeatIntervalInSecond 心跳的时间间隔 非必填,默认时间为20s, sdk会根据心跳时间与服务器确认alive
HeartbeatTimeoutInSecond 心跳的超时间隔 非必填,默认时间为HeartbeatIntervalInSecond的3倍, sdk会根据心跳时间与服务器确认alive,持续心跳失败达到超时时间后后,服务器可重新分配该超时shard
DataFetchIntervalInMs 数据默认拉取的间隔 非必填,默认为200ms
MaxFetchLogGroupCount 数据一次拉取的log group数量 非必填,默认为1000
CursorStartTime 数据点位的时间戳 非必填,CursorPosition为SPECIAL_TIME_CURSOR时需填写
InOrder shard分裂后是否in order消费 非必填,默认为false,当为true时,分裂shard会在老的read only shard消费完后再继续消费
AllowLogLevel 允许的日志级别 非必填,默认为info,日志级别由低到高为debug, info, warn, error,仅高于此AllowLogLevel的才会被log出来
LogFileName 程序运行日志文件名称 非必填,默认为stdout
IsJsonType 是否为json类型 非必填,默认为logfmt格式,true时为json格式
LogMaxSize 日志文件最大size 非必填,默认为10
LogMaxBackups 最大保存的old日志文件 非必填,默认为10
LogCompass 日志是否压缩 非必填,默认不压缩,如果压缩为gzip压缩
HTTPClient 指定http client 非必填,可指定http client实现一些逻辑,sdk发送http请求会使用这个client
SecurityToken aliyun SecurityToken 非必填,参考https://help.aliyun.com/document_detail/47277.html
AutoCommitDisabled 是否禁用sdk自动提交checkpoint 非必填,默认不会禁用
AutoCommitIntervalInMS 自动提交checkpoint的时间间隔 非必填,单位为MS,默认时间为60s
Query 过滤规则 基于规则消费时必须设置对应规则 如 * where a = 'xxx'

2.覆写消费逻辑

func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) {
    err := dosomething()
    if err != nil {
        return "", nil
    }
    fmt.Println("shardId %v processing works success", shardId)
    // 标记给CheckPointTracker process已成功,保存存档点,
    // false 标记process已成功,但并不直接写入服务器,等待一定的interval后sdk批量写入 (AutoCommitDisable为false情况SDK会批量写入)
    // true  标记已成功, 且直接写入服务器
    // 推荐大多数场景下使用false即可
    checkpointTracker.SaveCheckPoint(false); // 代表process成功保存存档点,但并不直接写入服务器,等待一定的interval后写入
    // 不需要重置检查点情况下,请返回空字符串,如需要重置检查点,请返回需要重置的检查点游标。
    // 如果需要重置检查点的情况下,比如可以返回checkpointTracker.GetCurrentCursor, current checkpoint即尚未process的这批数据开始的检查点
    // 如果已经返回error的话,无需重置到current checkpoint,代码会继续process这批数据,一般来说返回空即可
    return "", nil
}

在实际消费当中,您只需要根据自己的需要重新覆写消费函数process即可,上图只是一个简单的demo,将consumer获取到的日志进行了打印处理,注意,该函数参数和返回值不可改变,否则会导致消费失败。 另外的,如果你在process时有特别的需求,比如process暂存,实际异步操作,这里可以实现自己的Processor接口,除了Process函数,可以实现Shutdown函数对异步操作等进行优雅退出。 但是,请注意,checkpoint tracker是线程不安全的,它仅可负责本次process的checkpoint保存,请不要保存起来这个实例异步进行save!

type Processor interface {
	Process(int, *sls.LogGroupList, CheckPointTracker) string
	Shutdown(CheckPointTracker) error
}

3.创建消费者并开始消费

// option是LogHubConfig的实例
consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
// 如果实现了自己的processor,可以使用下面的语句
// consumerWroer := consumerLibrary.InitConsumerWorkerWithProcessor(option, myProcessor)
// 调用Start方法开始消费
consumerWorker.Start()

注意目前已废弃InitConsumerWorker(option, process),其代表在process函数后,sdk会执行一次checkpointTracker.SaveCheckPoint(false),但是无法手动强制写入服务器/获取上一个的checkpoint等功能

调用InitConsumerWorkwer方法,将配置实例对象和消费函数传递到参数中生成消费者实例对象,调用Start方法进行消费。

4.关闭消费者

ch := make(chan os.Signal, 1) //将os信号值作为信道
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
consumerWorker.Start() 
if _, ok := <-ch; ok { // 当获取到os停止信号以后,例如ctrl+c触发的os信号,会调用消费者退出方法进行退出。
    consumerWorker.StopAndWait() 
}

上图中的例子通过go的信道做了os信号的监听,当监听到用户触发了os退出信号以后,调用StopAndWait()方法进行退出,用户可以根据自己的需要设计自己的退出逻辑,只需要调用StopAndWait()即可。

简单样例

为了方便用户可以更快速的上手consumer library 我们提供了两个简单的通过代码操作consumer library的简单样例,请参考consumer library example

问题反馈

如果您在使用过程中遇到了问题,可以创建 GitHub Issue 或者前往阿里云支持中心提交工单

Documentation

Index

Constants

View Source
const (
	BEGIN_CURSOR         = "BEGIN_CURSOR"
	END_CURSOR           = "END_CURSOR"
	SPECIAL_TIMER_CURSOR = "SPECIAL_TIMER_CURSOR"
)
View Source
const (
	INITIALIZING      = "INITIALIZING"
	PULLING           = "PULLING"
	PROCESSING        = "PROCESSING"
	SHUTTING_DOWN     = "SHUTTING_DOWN"
	SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE"
)

Variables

This section is empty.

Functions

func Contain

func Contain(obj interface{}, target interface{}) bool

Determine whether obj is in target object

func GetLogCount

func GetLogCount(logGroupList *sls.LogGroupList) int

Get the total number of logs

func GetLogGroupCount

func GetLogGroupCount(logGroupList *sls.LogGroupList) int

func IntSliceReflectEqual

func IntSliceReflectEqual(a, b []int) bool

Determine whether two lists are equal

func Min

func Min(a, b int64) int64

Returns the smallest of two numbers

func Set

func Set(slc []int) []int

List removal of duplicate elements

func Subtract

func Subtract(a []int, b []int) (diffSlice []int)

Get the difference between the two lists

func TimeToSleepInMillsecond

func TimeToSleepInMillsecond(intervalTime, lastCheckTime int64, condition bool)

func TimeToSleepInSecond

func TimeToSleepInSecond(intervalTime, lastCheckTime int64, condition bool)

Types

type CheckPointTracker added in v0.1.46

type CheckPointTracker interface {
	// GetCheckPoint get lastest saved check point
	GetCheckPoint() string
	// SaveCheckPoint, save next cursor to checkpoint
	SaveCheckPoint(force bool) error
	// GetCurrentCursor get current fetched data cursor
	GetCurrentCursor() string
	// GetNextCursor get next fetched data cursor(this is also the next checkpoint to be saved)
	GetNextCursor() string
	// GetShardId, return the id of shard tracked
	GetShardId() int
}

CheckPointTracker Generally, you just need SaveCheckPoint, if you use more funcs, make sure you understand these

type ConsumerClient

type ConsumerClient struct {
	// contains filtered or unexported fields
}

type ConsumerHeartBeat added in v0.1.46

type ConsumerHeartBeat struct {
	// contains filtered or unexported fields
}

type ConsumerWorker

type ConsumerWorker struct {
	Logger log.Logger
	// contains filtered or unexported fields
}

func InitConsumerWorker

func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) string) *ConsumerWorker

depreciated: this old logic is to automatically save to memory, and then commit at a fixed time we highly recommend you to use InitConsumerWorkerWithCheckpointTracker

func InitConsumerWorkerWithCheckpointTracker added in v0.1.46

func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) (string, error)) *ConsumerWorker

InitConsumerWorkerWithCheckpointTracker please note that you need to save after the process is successful,

func InitConsumerWorkerWithProcessor added in v0.1.46

func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker

InitConsumerWorkerWithProcessor you need save checkpoint by yourself and can do something after consumer shutdown

func (*ConsumerWorker) Start

func (consumerWorker *ConsumerWorker) Start()

func (*ConsumerWorker) StopAndWait

func (consumerWorker *ConsumerWorker) StopAndWait()

type DefaultCheckPointTracker added in v0.1.46

type DefaultCheckPointTracker struct {
	// contains filtered or unexported fields
}

func (*DefaultCheckPointTracker) GetCheckPoint added in v0.1.46

func (tracker *DefaultCheckPointTracker) GetCheckPoint() string

func (*DefaultCheckPointTracker) GetCurrentCursor added in v0.1.46

func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string

func (*DefaultCheckPointTracker) GetNextCursor added in v0.1.47

func (tracker *DefaultCheckPointTracker) GetNextCursor() string

func (*DefaultCheckPointTracker) GetShardId added in v0.1.47

func (tracker *DefaultCheckPointTracker) GetShardId() int

func (*DefaultCheckPointTracker) SaveCheckPoint added in v0.1.46

func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error

type LogHubConfig

type LogHubConfig struct {
	//:param Endpoint:
	//:param AccessKeyID:
	//:param AccessKeySecret:
	//:param SecurityToken: If you use sts token to consume data, you must make sure consumer will be stopped before this token expired.
	//:param CredentialsProvider: CredentialsProvider that providers credentials(AccessKeyID, AccessKeySecret, StsToken)
	//:param Project:
	//:param Logstore:
	//:param Query: Filter rules Corresponding rules must be set when consuming based on rules, such as *| where a = 'xxx'
	//:param ConsumerGroupName:
	//:param ConsumerName:
	//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
	//  Provide three options :BEGIN_CURSOR,END_CURSOR,SPECIAL_TIMER_CURSOR,when you choose SPECIAL_TIMER_CURSOR, you have to set CursorStartTime parameter.
	//:param HeartbeatIntervalInSecond:
	// default 20, once a client doesn't report to server * HeartbeatTimeoutInSecond seconds,
	// server will consider it's offline and re-assign its task to another consumer.
	// don't set the heatbeat interval too small when the network badwidth or performance of consumtion is not so good.
	//:param DataFetchIntervalInMs: default 200(Millisecond), don't configure it too small (<100Millisecond)
	//:param HeartbeatTimeoutInSecond:
	// default HeartbeatIntervalInSecond * 3, once a client doesn't report to server HeartbeatTimeoutInSecond seconds,
	// server will consider it's offline and re-assign its task to another consumer.
	//:param MaxFetchLogGroupCount: default 1000, fetch size in each request, normally use default. maximum is 1000, could be lower. the lower the size the memory efficiency might be better.
	//:param CursorStartTime: Will be used when cursor_position when could be "begin", "end", "specific time format in time stamp", it's log receiving time. The unit of parameter is seconds.
	//:param InOrder:
	// 	default False, during consuption, when shard is splitted,
	// 	if need to consume the newly splitted shard after its parent shard (read-only) is finished consumption or not.
	// 	suggest keep it as False (don't care) until you have good reasion for it.
	//:param AllowLogLevel: default info,optional: debug,info,warn,error
	//:param LogFileName: Setting Log File Path,for example "/root/log/log_file.log",default
	//:param IsJsonType: Set whether the log output type is JSON,default false.
	//:param LogMaxSize: MaxSize is the maximum size in megabytes of the log file before it gets rotated. It defaults to 100 megabytes.
	//:param LogMaxBackups:
	// 	MaxBackups is the maximum number of old log files to retain.  The default
	// 	is to retain all old log files (though MaxAge may still cause them to get
	// 	deleted.)
	//:param LogCompass: Compress determines if the rotated log files should be compressed using gzip.
	//:param CompressType: CompressType is the type of compression to use, default 0 standand for lz4
	//:param HTTPClient: custom http client for sending data to sls
	//:param AutoCommitDisabled: whether to disable commit checkpoint automatically, default is false, means auto commit checkpoint
	//	  Note that if you set autocommit to false, you must use InitConsumerWorkerWithCheckpointTracker instead of InitConsumerWorker
	//:param AutoCommitIntervalInSec: default auto commit interval, default is 30
	//:param AuthVersion: signature algorithm version, default is sls.AuthV1
	//:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4
	Endpoint                  string
	AccessKeyID               string
	AccessKeySecret           string
	CredentialsProvider       sls.CredentialsProvider
	Project                   string
	Logstore                  string
	Query                     string
	ConsumerGroupName         string
	ConsumerName              string
	CursorPosition            string
	HeartbeatIntervalInSecond int
	HeartbeatTimeoutInSecond  int
	DataFetchIntervalInMs     int64
	MaxFetchLogGroupCount     int
	CursorStartTime           int64 // Unix time stamp; Units are seconds.
	InOrder                   bool
	AllowLogLevel             string
	LogFileName               string
	IsJsonType                bool
	LogMaxSize                int
	LogMaxBackups             int
	LogCompass                bool
	CompressType              int
	HTTPClient                *http.Client
	SecurityToken             string
	AutoCommitDisabled        bool
	AutoCommitIntervalInMS    int64
	AuthVersion               sls.AuthVersionType
	Region                    string
}

type ProcessFunc added in v0.1.46

type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) (string, error)

func (ProcessFunc) Process added in v0.1.46

func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error)

func (ProcessFunc) Shutdown added in v0.1.46

func (processor ProcessFunc) Shutdown(checkpointTracker CheckPointTracker) error

type Processor added in v0.1.46

type Processor interface {
	Process(int, *sls.LogGroupList, CheckPointTracker) (string, error)
	Shutdown(CheckPointTracker) error
}

type ShardConsumerWorker

type ShardConsumerWorker struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL