Documentation
¶
Index ¶
- Constants
- func BSearch(records []*IncRecord, progress int64) int
- type BaseInfo
- type BaseReq
- type BaseRes
- type BiFrostStreamerCfg
- type BifrostService
- type BifrostStreamer
- func (bs *BifrostStreamer) GetContainer() container.Container
- func (bs *BifrostStreamer) GetSchedInfo() *SchedInfo
- func (bs *BifrostStreamer) HasNext() bool
- func (fs *BifrostStreamer) InfoStatus(s string)
- func (bs *BifrostStreamer) Next() (container.DataMode, container.MapKey, interface{}, error)
- func (bs *BifrostStreamer) SetContainer(container container.Container)
- func (bs *BifrostStreamer) UpdateData(ctx context.Context) error
- func (fs *BifrostStreamer) WarnStatus(s string)
- type DataParser
- type DefaultTextParser
- type GobCodec
- type IncRecord
- type IncReq
- type IncRes
- type LocalFileStreamer
- func (fs *LocalFileStreamer) GetContainer() container.Container
- func (fs *LocalFileStreamer) GetSchedInfo() *SchedInfo
- func (fs *LocalFileStreamer) HasNext() bool
- func (ms *LocalFileStreamer) InfoStatus(s string)
- func (fs *LocalFileStreamer) Next() (container.DataMode, container.MapKey, interface{}, error)
- func (fs *LocalFileStreamer) SetContainer(container container.Container)
- func (fs *LocalFileStreamer) UpdateData(ctx context.Context) error
- func (ms *LocalFileStreamer) WarnStatus(s string)
- type LocalFileStreamerCfg
- type MongoStreamer
- func (ms *MongoStreamer) GetContainer() container.Container
- func (ms *MongoStreamer) GetSchedInfo() *SchedInfo
- func (ms *MongoStreamer) HasNext() bool
- func (ms *MongoStreamer) InfoStatus(s string)
- func (ms *MongoStreamer) Next() (container.DataMode, container.MapKey, interface{}, error)
- func (ms *MongoStreamer) SetContainer(container container.Container)
- func (ms *MongoStreamer) UpdateData(ctx context.Context) error
- func (ms *MongoStreamer) WarnStatus(s string)
- type MongoStreamerCfg
- type ParserResult
- type Sched
- func (s *Sched) AddStreamer(name string, dataStreamer Streamer)
- func (s Sched) Len() int
- func (s Sched) Less(i, j int) bool
- func (s *Sched) Pop() interface{}
- func (s *Sched) Push(x interface{})
- func (s *Sched) Schedule(ctx context.Context)
- func (s Sched) Swap(i, j int)
- func (s *Sched) Top() *SchedUnit
- type SchedInfo
- type SchedUnit
- type Status
- type Streamer
- type StreamerProvider
- type StreamerProviderCfg
- type StreamerProviderManager
- type UpdatMode
Constants ¶
View Source
const ( Static = iota Dynamic Increment DynInc )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BiFrostStreamerCfg ¶
type BiFrostStreamerCfg struct { Name string // streamer名字 NameSpace string // streamer命名空间 Version int // 数据格式的版本 URI string // BaseFilePath string // 基准文件路径 Interval int // 增量更新时间间隔 IsSync bool // 是否同步加载 IsOnline bool // 离线模式生效 WriteFile bool // 离线模式生效 UserData interface{} // 用户自定义数据 Logger log.BiLogger }
type BifrostService ¶
type BifrostService struct {
StreamerManager *StreamerProviderManager
}
func NewBifrostServer ¶
func NewBifrostServer(streamerManager *StreamerProviderManager) *BifrostService
type BifrostStreamer ¶
type BifrostStreamer struct {
// contains filtered or unexported fields
}
func NewBiFrostStreamer ¶
func NewBiFrostStreamer(cfg *BiFrostStreamerCfg) *BifrostStreamer
func (*BifrostStreamer) GetContainer ¶
func (bs *BifrostStreamer) GetContainer() container.Container
func (*BifrostStreamer) GetSchedInfo ¶
func (bs *BifrostStreamer) GetSchedInfo() *SchedInfo
func (*BifrostStreamer) HasNext ¶
func (bs *BifrostStreamer) HasNext() bool
func (*BifrostStreamer) InfoStatus ¶
func (fs *BifrostStreamer) InfoStatus(s string)
func (*BifrostStreamer) SetContainer ¶
func (bs *BifrostStreamer) SetContainer(container container.Container)
func (*BifrostStreamer) UpdateData ¶
func (bs *BifrostStreamer) UpdateData(ctx context.Context) error
func (*BifrostStreamer) WarnStatus ¶
func (fs *BifrostStreamer) WarnStatus(s string)
type DataParser ¶
type DataParser interface {
Parse([]byte, interface{}) []ParserResult
}
type DefaultTextParser ¶
type DefaultTextParser struct { }
func (*DefaultTextParser) Parse ¶
func (*DefaultTextParser) Parse(data []byte, userData interface{}) []ParserResult
type LocalFileStreamer ¶
type LocalFileStreamer struct {
// contains filtered or unexported fields
}
func NewFileStreamer ¶
func NewFileStreamer(cfg *LocalFileStreamerCfg) *LocalFileStreamer
func (*LocalFileStreamer) GetContainer ¶
func (fs *LocalFileStreamer) GetContainer() container.Container
func (*LocalFileStreamer) GetSchedInfo ¶
func (fs *LocalFileStreamer) GetSchedInfo() *SchedInfo
func (*LocalFileStreamer) HasNext ¶
func (fs *LocalFileStreamer) HasNext() bool
func (*LocalFileStreamer) InfoStatus ¶
func (ms *LocalFileStreamer) InfoStatus(s string)
func (*LocalFileStreamer) SetContainer ¶
func (fs *LocalFileStreamer) SetContainer(container container.Container)
func (*LocalFileStreamer) UpdateData ¶
func (fs *LocalFileStreamer) UpdateData(ctx context.Context) error
func (*LocalFileStreamer) WarnStatus ¶
func (ms *LocalFileStreamer) WarnStatus(s string)
type LocalFileStreamerCfg ¶
type MongoStreamer ¶
type MongoStreamer struct {
// contains filtered or unexported fields
}
func NewMongoStreamer ¶
func NewMongoStreamer(mongoConfig *MongoStreamerCfg) (*MongoStreamer, error)
func (*MongoStreamer) GetContainer ¶
func (ms *MongoStreamer) GetContainer() container.Container
func (*MongoStreamer) GetSchedInfo ¶
func (ms *MongoStreamer) GetSchedInfo() *SchedInfo
func (*MongoStreamer) HasNext ¶
func (ms *MongoStreamer) HasNext() bool
func (*MongoStreamer) InfoStatus ¶
func (ms *MongoStreamer) InfoStatus(s string)
func (*MongoStreamer) SetContainer ¶
func (ms *MongoStreamer) SetContainer(container container.Container)
func (*MongoStreamer) UpdateData ¶
func (ms *MongoStreamer) UpdateData(ctx context.Context) error
func (*MongoStreamer) WarnStatus ¶
func (ms *MongoStreamer) WarnStatus(s string)
type MongoStreamerCfg ¶
type MongoStreamerCfg struct { Name string UpdatMode UpdatMode IncInterval int IsSync bool URI string DB string Collection string ConnectTimeout int ReadTimeout int BaseParser DataParser IncParser DataParser BaseQuery interface{} IncQuery interface{} UserData interface{} FindOpt *options.FindOptions OnBeforeBase func(interface{}) interface{} OnBeforeInc func(interface{}) interface{} Logger log.BiLogger }
type ParserResult ¶
type StreamerProvider ¶
type StreamerProvider struct { Cfg *StreamerProviderCfg BaseInfo *BaseInfo Cached []*IncRecord // contains filtered or unexported fields }
func NewStreamerProvider ¶
func NewStreamerProvider(cfg *StreamerProviderCfg) *StreamerProvider
func (*StreamerProvider) AddInc ¶
func (sp *StreamerProvider) AddInc(incs []*IncRecord)
func (*StreamerProvider) GetBase ¶
func (sp *StreamerProvider) GetBase() *BaseInfo
func (*StreamerProvider) GetInc ¶
func (sp *StreamerProvider) GetInc(progress int64, size int) ([]*IncRecord, error)
func (*StreamerProvider) SetBase ¶
func (sp *StreamerProvider) SetBase(baseInfo *BaseInfo)
type StreamerProviderCfg ¶
type StreamerProviderManager ¶
type StreamerProviderManager struct {
StreamerProviders map[string]*StreamerProvider
}
func NewStreamerProviderManager ¶
func NewStreamerProviderManager() *StreamerProviderManager
func (*StreamerProviderManager) GetProvider ¶
func (spm *StreamerProviderManager) GetProvider(name string, progress int64) *StreamerProvider
func (*StreamerProviderManager) RegiterProvider ¶
func (spm *StreamerProviderManager) RegiterProvider(name string, provider *StreamerProvider) error
Source Files
¶
- bifrost_service.go
- bifrost_streamer.go
- bifrost_streamer_config.go
- data_parser.go
- default_parser.go
- enum.go
- local_file_streamer.go
- local_file_streamer_config.go
- mongo_streamer.go
- mongo_streamer_config.go
- mongo_streamer_v2.go
- schedule.go
- streamer.go
- streamer_provider.go
- streamer_provider_config.go
- streamer_provider_manager.go
Click to show internal directories.
Click to hide internal directories.