Documentation ¶
Index ¶
- Constants
- Variables
- func CreateExecutionTree()
- func GetFileSize(path string) int64
- func GetFileSizeFile(file *os.File) int64
- func NewExpiryHeap() *expiryHeap
- func NewRandomUUIDStr() string
- func Run()
- type AckValue
- type Acker
- type AllStageStats
- type AllStages
- type AllStats
- type Collector
- type Config
- func (config *Config) GetIntVal(key string) (int, bool)
- func (config *Config) GetLogFile() string
- func (config *Config) GetLogLevel() string
- func (config *Config) GetLogRollSize() int
- func (config *Config) GetMaxUnacked() uint32
- func (config *Config) GetStrVal(key string) (string, bool)
- func (config *Config) GetTickMilli() time.Duration
- type DispPatcherCaller
- func (dcaller *DispPatcherCaller) Ack(id string)
- func (dcaller *DispPatcherCaller) Fail(id string)
- func (dcaller *DispPatcherCaller) NewDispatcher(d Dispatcher, coll Collector, acker Acker, nanodelay int64, id uint) *DispPatcherCaller
- func (dcaller *DispPatcherCaller) Stop()
- func (dcaller *DispPatcherCaller) TimedOut(id string)
- type Dispatcher
- type DispatcherRegistry
- type DispatcherStageInfo
- type Executor
- type ExecutorCaller
- type ExecutorRegistry
- type GraphNode
- type GraphNodePool
- type LocalAcker
- type LogWriter
- type Logger
- type NextIdGetter
- type OutPut
- type StageInfo
- type StageStats
- type Tracker
- type TupleCollector
- func (tc *TupleCollector) Ack(context interface{})
- func (tc *TupleCollector) AddOutput(chn chan *OutPut)
- func (tc *TupleCollector) Copy() *TupleCollector
- func (tc *TupleCollector) Emit(tuple map[string]interface{}, context interface{})
- func (tc *TupleCollector) Fail(context interface{})
- func (tc *TupleCollector) IsForDispatcher() bool
- type UUID
Constants ¶
View Source
const ( INVALID_VAL_INT = -1 INVALID_VAL_STR = "" DEFAULT_LOG_PATH = "<home>/log" DEFAULT_LOG_FILE = "pipeline.log" DEFAULT_HOME = "/tmp" DEFAULT_NUM_ACKER = 8 DEFAULT_LOG_BCK = 10 LOG_ROLLING_SZ = 10485760 MIN_LOG_ROLLING_SZ = 1048576 MAX_LOG_ROLLING_SZ = 2147483648 MAX_MAX_UNACKED = 2000000 MIN_MAX_UNACKED = 20 TIME_OUT = 120000 CLUSTER_NAME = "geetplaban" STATS_INTERVAL_SEC = 60 DEFAULT_LOG_LEVEL = "info" MIN_TICK_MILI_SEC = 1 MAX_TICK_MILI_SEC = 60000 )
View Source
const ( MAX_LOG_BUFFER = 100 * 1024 WRITE_SIZE = 8000 MAX_LOG_INTERVAL = 2 * time.Second MAX_ROLLOVER_SIZE = 2 * 1024 * 1024 * 1024 )
View Source
const ( DEFAULT_NUM_SLOTS = 8 MAX_NUM_SLOTS = 256 MAX_NUM_SLOTS_MINUS_1 = 255 MIN_INTERVAL = 20 MAX_INTERVAL = 3600 )
Variables ¶
View Source
var CONFIG_DIR string = ""
View Source
var HOME string = "PIPELINE_HOME"
View Source
var LOG *logrus.Logger
Functions ¶
func CreateExecutionTree ¶
func CreateExecutionTree()
func GetFileSize ¶
func GetFileSizeFile ¶
func NewExpiryHeap ¶
func NewExpiryHeap() *expiryHeap
func NewRandomUUIDStr ¶
func NewRandomUUIDStr() string
Types ¶
type AckValue ¶
type AckValue struct {
// contains filtered or unexported fields
}
func NewAckValue ¶
type Acker ¶
type AllStageStats ¶
type AllStageStats struct {
// contains filtered or unexported fields
}
func NewAllStageStats ¶
func NewAllStageStats(num_slots uint, interval int64) *AllStageStats
type AllStages ¶
type AllStages struct {
// contains filtered or unexported fields
}
var All *AllStages
type AllStats ¶
type AllStats struct {
// contains filtered or unexported fields
}
func NewAllStats ¶
func NewAllStats() *AllStats
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
func (*Config) GetLogFile ¶
func (*Config) GetLogLevel ¶
func (*Config) GetLogRollSize ¶
func (*Config) GetMaxUnacked ¶
func (*Config) GetTickMilli ¶
type DispPatcherCaller ¶
type DispPatcherCaller struct {
// contains filtered or unexported fields
}
func (*DispPatcherCaller) Ack ¶
func (dcaller *DispPatcherCaller) Ack(id string)
func (*DispPatcherCaller) Fail ¶
func (dcaller *DispPatcherCaller) Fail(id string)
func (*DispPatcherCaller) NewDispatcher ¶
func (dcaller *DispPatcherCaller) NewDispatcher(d Dispatcher, coll Collector, acker Acker, nanodelay int64, id uint) *DispPatcherCaller
func (*DispPatcherCaller) Stop ¶
func (dcaller *DispPatcherCaller) Stop()
func (*DispPatcherCaller) TimedOut ¶
func (dcaller *DispPatcherCaller) TimedOut(id string)
type Dispatcher ¶
type DispatcherRegistry ¶
type DispatcherRegistry struct {
// contains filtered or unexported fields
}
func GetDispRegistry ¶
func GetDispRegistry() *DispatcherRegistry
func (*DispatcherRegistry) AddType ¶
func (reg *DispatcherRegistry) AddType(name string, disp Dispatcher)
func (*DispatcherRegistry) GetInstance ¶
func (reg *DispatcherRegistry) GetInstance(name string) Dispatcher
type DispatcherStageInfo ¶
type DispatcherStageInfo struct {
// contains filtered or unexported fields
}
func NewDispatcherStageInfo ¶
func NewDispatcherStageInfo(num_tasks int, dispatcher_class string, name string) *DispatcherStageInfo
func (*DispatcherStageInfo) AddGroupingOutStage ¶
func (dsinfo *DispatcherStageInfo) AddGroupingOutStage(s *StageInfo, groupField []string)
func (*DispatcherStageInfo) AddOutStage ¶
func (dsinfo *DispatcherStageInfo) AddOutStage(s *StageInfo)
func (*DispatcherStageInfo) CheckStage ¶
func (sinfo *DispatcherStageInfo) CheckStage(s *StageInfo)
type ExecutorCaller ¶
type ExecutorCaller struct {
// contains filtered or unexported fields
}
type ExecutorRegistry ¶
type ExecutorRegistry struct {
// contains filtered or unexported fields
}
func GetRegistry ¶
func GetRegistry() *ExecutorRegistry
func (*ExecutorRegistry) AddType ¶
func (reg *ExecutorRegistry) AddType(name string, executor Executor)
func (*ExecutorRegistry) GetInstance ¶
func (reg *ExecutorRegistry) GetInstance(name string) Executor
type GraphNode ¶
type GraphNode struct { Name string // contains filtered or unexported fields }
func (*GraphNode) AddOutNode ¶
func (*GraphNode) DetectAnyCycle ¶
type GraphNodePool ¶
type GraphNodePool struct {
// contains filtered or unexported fields
}
func GetNewGraphNodePool ¶
func GetNewGraphNodePool() *GraphNodePool
func (*GraphNodePool) DetectAnyCycle ¶
func (npl *GraphNodePool) DetectAnyCycle() bool
func (*GraphNodePool) Exists ¶
func (npl *GraphNodePool) Exists(node *GraphNode) bool
func (*GraphNodePool) Get ¶
func (npl *GraphNodePool) Get(name string) *GraphNode
type LocalAcker ¶
type LocalAcker struct {
// contains filtered or unexported fields
}
func NewLocalAcker ¶
func NewLocalAcker(parallel uint, timeout int64) *LocalAcker
func (*LocalAcker) AddAck ¶
func (acker *LocalAcker) AddAck(id uint64, val uint64)
func (*LocalAcker) AddTracker ¶
func (acker *LocalAcker) AddTracker(id uint, tracker Tracker)
func (*LocalAcker) AddTracking ¶
func (acker *LocalAcker) AddTracking(ids string, tracker_id uint) uint64
func (*LocalAcker) SignalFail ¶
func (acker *LocalAcker) SignalFail(id uint64)
func (*LocalAcker) Start ¶
func (acker *LocalAcker) Start()
type LogWriter ¶
type LogWriter struct {
// contains filtered or unexported fields
}
func NewLogWriter ¶
type NextIdGetter ¶
type NextIdGetter struct {
// contains filtered or unexported fields
}
func NewNextIdGettr ¶
func NewNextIdGettr(initial_value uint32) *NextIdGetter
func (*NextIdGetter) NextId ¶
func (next *NextIdGetter) NextId() uint
type StageInfo ¶
type StageInfo struct {
// contains filtered or unexported fields
}
func (*StageInfo) AddDispGroupingStage ¶
func (sinfo *StageInfo) AddDispGroupingStage(disp *DispatcherStageInfo)
func (*StageInfo) AddGroupingStage ¶
func (*StageInfo) CheckStage ¶
type StageStats ¶
type StageStats struct {
// contains filtered or unexported fields
}
func NewStageStats ¶
func NewStageStats(stage uint) *StageStats
func (*StageStats) String ¶
func (ss *StageStats) String() string
type TupleCollector ¶
type TupleCollector struct {
// contains filtered or unexported fields
}
func NewTupleCollector ¶
func NewTupleCollector(stage string, id uint, isdisp uint) *TupleCollector
func (*TupleCollector) Ack ¶
func (tc *TupleCollector) Ack(context interface{})
func (*TupleCollector) AddOutput ¶
func (tc *TupleCollector) AddOutput(chn chan *OutPut)
func (*TupleCollector) Copy ¶
func (tc *TupleCollector) Copy() *TupleCollector
func (*TupleCollector) Emit ¶
func (tc *TupleCollector) Emit(tuple map[string]interface{}, context interface{})
func (*TupleCollector) Fail ¶
func (tc *TupleCollector) Fail(context interface{})
func (*TupleCollector) IsForDispatcher ¶
func (tc *TupleCollector) IsForDispatcher() bool
Click to show internal directories.
Click to hide internal directories.