Documentation ¶
Index ¶
- Constants
- func ToJsonObject(data []byte) (map[string]interface{}, error)
- type BaseConfig
- type BaseJob
- func (job *BaseJob) Callback()
- func (job *BaseJob) ExpirationTime() int64
- func (job *BaseJob) Id() string
- func (job *BaseJob) Interval() int64
- func (job *BaseJob) Less(other llrb.Item) bool
- func (job *BaseJob) ResetFunc(f JobFunc)
- func (job *BaseJob) SetIntialExpirationTime(when int64)
- func (job *BaseJob) Start()
- func (job *BaseJob) Stop()
- func (job *BaseJob) String() string
- func (job *BaseJob) UpdateExpirationTime() int64
- type CassandraCheckpointer
- func (checkpoint *CassandraCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
- func (checkpoint *CassandraCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
- func (checkpoint *CassandraCheckpointer) Start()
- func (checkpoint *CassandraCheckpointer) Stop()
- func (checkpoint *CassandraCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
- type Checkpointer
- type Data
- type DataReader
- type DataWriter
- type FileCheckpointer
- func (ck *FileCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
- func (ck *FileCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
- func (ck *FileCheckpointer) Start()
- func (ck *FileCheckpointer) Stop()
- func (ck *FileCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
- type Job
- type JobFunc
- type JobList
- type JobParam
- type KafkaCheckpointer
- func (ck *KafkaCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
- func (ck *KafkaCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
- func (ck *KafkaCheckpointer) Start()
- func (ck *KafkaCheckpointer) Stop()
- func (ck *KafkaCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
- type KafkaClient
- func (client *KafkaClient) BrokerIPs() []string
- func (client *KafkaClient) Client() sarama.Client
- func (client *KafkaClient) Close()
- func (client *KafkaClient) GetConsumerOffset(consumerGroup string, topic string, partition int32) (int64, error)
- func (client *KafkaClient) GetLastBlock(topic string, partition int32) ([]byte, error)
- func (client *KafkaClient) GetProducerOffset(topic string, partition int32) (int64, error)
- func (client *KafkaClient) Leader(topic string, partition int32) (*sarama.Broker, error)
- func (client *KafkaClient) TopicPartitions(topic string) (map[string][]int32, error)
- type NullCheckpointer
- func (checkpoint *NullCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
- func (checkpoint *NullCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
- func (checkpoint *NullCheckpointer) Start()
- func (checkpoint *NullCheckpointer) Stop()
- func (checkpoint *NullCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
- type Scheduler
- type StdoutDataWriter
- type ZooKeeperCheckpointer
- func (checkpoint *ZooKeeperCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
- func (checkpoint *ZooKeeperCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
- func (checkpoint *ZooKeeperCheckpointer) Start()
- func (checkpoint *ZooKeeperCheckpointer) Stop()
- func (checkpoint *ZooKeeperCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
- type ZooKeeperClient
- func (client *ZooKeeperClient) Children(parentNode string) ([]string, error)
- func (client *ZooKeeperClient) ChildrenW(parentNode string) (<-chan zk.Event, error)
- func (client *ZooKeeperClient) Close()
- func (client *ZooKeeperClient) CreateNode(node string, value []byte, ephemeral, ignoreExists bool) error
- func (client *ZooKeeperClient) DeleteNode(node string, ignoreNotExists bool) error
- func (client *ZooKeeperClient) ElectionParticipants() ([]string, error)
- func (client *ZooKeeperClient) GetNode(node string, ignoreNotExists bool) ([]byte, error)
- func (client *ZooKeeperClient) IsConnected() bool
- func (client *ZooKeeperClient) IsDisconnected() bool
- func (client *ZooKeeperClient) IsExpired() bool
- func (client *ZooKeeperClient) IsLeader(nodeGUID string) (bool, error)
- func (client *ZooKeeperClient) JoinElection(node string) (string, error)
- func (client *ZooKeeperClient) NodeExists(node string) (bool, error)
- func (client *ZooKeeperClient) SetNode(node string, value []byte) error
- func (client *ZooKeeperClient) WatchElectionParticipants() (<-chan zk.Event, error)
Constants ¶
const ( App = "App" Broadcast = "Broadcast" CassandraKeyspace = "CassandraKeyspace" CassandraSeeds = "CassandraSeeds" CheckpointMethod = "CheckpointMethod" CheckpointDir = "CheckpointDir" CheckpointKey = "CheckpointKey" CheckpointNamespace = "CheckpointNamespace" CheckpointPartition = "CheckpointPartition" CheckpointTable = "CheckpointTable" CheckpointTopic = "CheckpointTopic" CpuCount = "CpuCount" FlushFrequency = "FlushFreqency" Heartbeat = "Heartbeat" Host = "Host" HostRegex = "Host_regex" Index = "Index" Interval = "Interval" KafkaApp = "kafka" KafkaBrokers = "KafkaBrokers" KafkaConsumerGroup = "KafkaConsumerGroup" KafkaPartition = "KafkaPartition" KafkaTopic = "KafkaTopic" KafkaZooKeepers = "KafkaZooKeepers" Key = "Key" LongRun = "LongRun" MemAlloc = "MemAlloc" Metric = "Metric" Password = "Password" Platform = "Platform" ProxyPassword = "ProxyPassword" ProxyURL = "ProxyURL" ProxyUsername = "ProxyUsername" RequireAcks = "RequiredAcks" ServerURL = "ServerURL" Source = "Source" Sourcetype = "Sourcetype" Splunk = "Splunk" AWSS3 = "AWSS3" SyncWrite = "SyncWrite" SysMemAlloc = "SysMemAlloc" TaskConfig = "_TaskConfigs_" TaskConfigAction = "TaskConfigAction" TaskConfigDelete = "TaskConfigDelete" TaskConfigKey = "TaskConfigKey" TaskConfigNew = "TaskConfigNew" TaskConfigUpdate = "TaskConfigUpdate" TaskStats = "TaskStats" Taskname = "Taskname" Tasks = "_Tasks_" TargetSystem = "TargetSystem" TargetSystemType = "TargetSystemType" Timestamp = "Timestamp" TotoalMemAlloc = "TotalMemAlloc" UseOffsetNewest = "UseOffsetNewest" UseOffsetOldest = "UseOffsetOldest" Username = "Username" ZooKeeperRoot = "ZooKeeperRoot" ZooKeeperElectionRoot = "ZooKeeperElectionRoot" ZooKeeperHeartbeatRoot = "ZooKeeperHeartbeatRoot" ZooKeeperLongRunTask = "ZooKeeperLongRunTask" ZooKeeperServers = "ZooKeeperServers" )
const ( Root = "/descartes" ElectionRoot = Root + "/election" HeartbeatRoot = Root + "/heartbeat" LongRunTaskRoot = Root + "/long_run_tasks" )
Variables ¶
This section is empty.
Functions ¶
func ToJsonObject ¶
Types ¶
type BaseConfig ¶
type BaseJob ¶
type BaseJob struct {
// contains filtered or unexported fields
}
func (*BaseJob) ExpirationTime ¶
func (*BaseJob) SetIntialExpirationTime ¶
func (*BaseJob) UpdateExpirationTime ¶
type CassandraCheckpointer ¶
type CassandraCheckpointer struct {
// contains filtered or unexported fields
}
func NewCassandraCheckpointer ¶
func NewCassandraCheckpointer(config BaseConfig) *CassandraCheckpointer
func (*CassandraCheckpointer) DeleteCheckpoint ¶
func (checkpoint *CassandraCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
func (*CassandraCheckpointer) GetCheckpoint ¶
func (checkpoint *CassandraCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
@keyInfo: shall contain a Key
func (*CassandraCheckpointer) Start ¶
func (checkpoint *CassandraCheckpointer) Start()
func (*CassandraCheckpointer) Stop ¶
func (checkpoint *CassandraCheckpointer) Stop()
func (*CassandraCheckpointer) WriteCheckpoint ¶
func (checkpoint *CassandraCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
@keyInfo: shall contain a Key
type Checkpointer ¶
type Checkpointer interface { Start() Stop() GetCheckpoint(keyInfo map[string]string) ([]byte, error) WriteCheckpoint(keyInfo map[string]string, value []byte) error DeleteCheckpoint(keyInfo map[string]string) error }
func NewFileCheckpointer ¶
func NewFileCheckpointer() Checkpointer
func NewKafkaCheckpointer ¶
func NewKafkaCheckpointer(client *KafkaClient) Checkpointer
type DataReader ¶
type DataWriter ¶
type FileCheckpointer ¶
type FileCheckpointer struct { }
func (*FileCheckpointer) DeleteCheckpoint ¶
func (ck *FileCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
@keyInfo: contains "CheckpointDir", "CheckpointNamespace", "CheckpointKey"
func (*FileCheckpointer) GetCheckpoint ¶
func (ck *FileCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
@keyInfo: contains "CheckpointDir", "CheckpointNamespace", "CheckpointKey"
func (*FileCheckpointer) Start ¶
func (ck *FileCheckpointer) Start()
func (*FileCheckpointer) Stop ¶
func (ck *FileCheckpointer) Stop()
func (*FileCheckpointer) WriteCheckpoint ¶
func (ck *FileCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
@keyInfo: contains "CheckpointDir", "CheckpointNamespace", "CheckpointKey"
type KafkaCheckpointer ¶
type KafkaCheckpointer struct {
// contains filtered or unexported fields
}
func (*KafkaCheckpointer) DeleteCheckpoint ¶
func (ck *KafkaCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
func (*KafkaCheckpointer) GetCheckpoint ¶
func (ck *KafkaCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
func (*KafkaCheckpointer) Start ¶
func (ck *KafkaCheckpointer) Start()
func (*KafkaCheckpointer) Stop ¶
func (ck *KafkaCheckpointer) Stop()
func (*KafkaCheckpointer) WriteCheckpoint ¶
func (ck *KafkaCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
type KafkaClient ¶
type KafkaClient struct {
// contains filtered or unexported fields
}
func NewKafkaClient ¶
func NewKafkaClient(brokerConfig BaseConfig, clientName string) *KafkaClient
func (*KafkaClient) BrokerIPs ¶
func (client *KafkaClient) BrokerIPs() []string
func (*KafkaClient) Client ¶
func (client *KafkaClient) Client() sarama.Client
func (*KafkaClient) Close ¶
func (client *KafkaClient) Close()
func (*KafkaClient) GetConsumerOffset ¶
func (*KafkaClient) GetLastBlock ¶
func (client *KafkaClient) GetLastBlock(topic string, partition int32) ([]byte, error)
@Return (position, nil) if no errors (nil, nil) if topic or partition etc doesn't exist (nil, err) for other errors
func (*KafkaClient) GetProducerOffset ¶
func (client *KafkaClient) GetProducerOffset(topic string, partition int32) (int64, error)
Return: OffsetNotExist
func (*KafkaClient) TopicPartitions ¶
func (client *KafkaClient) TopicPartitions(topic string) (map[string][]int32, error)
type NullCheckpointer ¶
type NullCheckpointer struct { }
func NewNullCheckpointer ¶
func NewNullCheckpointer() *NullCheckpointer
func (*NullCheckpointer) DeleteCheckpoint ¶
func (checkpoint *NullCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
func (*NullCheckpointer) GetCheckpoint ¶
func (checkpoint *NullCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
func (*NullCheckpointer) Start ¶
func (checkpoint *NullCheckpointer) Start()
func (*NullCheckpointer) Stop ¶
func (checkpoint *NullCheckpointer) Stop()
func (*NullCheckpointer) WriteCheckpoint ¶
func (checkpoint *NullCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler() *Scheduler
func (*Scheduler) RemoveJobs ¶
func (*Scheduler) SetMaxStartDelay ¶
SetMaxStartDelay: not immediately execute the job instead adding a random sleep before executing it @d: in nanosecond
func (*Scheduler) UpdateJobs ¶
type StdoutDataWriter ¶
type StdoutDataWriter struct { }
func (*StdoutDataWriter) Start ¶
func (d *StdoutDataWriter) Start()
func (*StdoutDataWriter) Stop ¶
func (d *StdoutDataWriter) Stop()
func (*StdoutDataWriter) WriteData ¶
func (d *StdoutDataWriter) WriteData(data *Data) error
func (*StdoutDataWriter) WriteDataAsync ¶
func (d *StdoutDataWriter) WriteDataAsync(data *Data) error
func (*StdoutDataWriter) WriteDataSync ¶
func (d *StdoutDataWriter) WriteDataSync(data *Data) error
type ZooKeeperCheckpointer ¶
type ZooKeeperCheckpointer struct {
// contains filtered or unexported fields
}
func NewZooKeeperCheckpointer ¶
func NewZooKeeperCheckpointer(config BaseConfig) *ZooKeeperCheckpointer
func (*ZooKeeperCheckpointer) DeleteCheckpoint ¶
func (checkpoint *ZooKeeperCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error
@keyInfo: shall contain a Key which is a node path
func (*ZooKeeperCheckpointer) GetCheckpoint ¶
func (checkpoint *ZooKeeperCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)
@keyInfo: shall contain a Key which is a node path
func (*ZooKeeperCheckpointer) Start ¶
func (checkpoint *ZooKeeperCheckpointer) Start()
func (*ZooKeeperCheckpointer) Stop ¶
func (checkpoint *ZooKeeperCheckpointer) Stop()
FIXME, guard the Close
func (*ZooKeeperCheckpointer) WriteCheckpoint ¶
func (checkpoint *ZooKeeperCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error
@keyInfo: shall contain a Key which is a node path
type ZooKeeperClient ¶
type ZooKeeperClient struct {
// contains filtered or unexported fields
}
func NewZooKeeperClient ¶
func NewZooKeeperClient(serverConfig BaseConfig) *ZooKeeperClient
func (*ZooKeeperClient) Children ¶
func (client *ZooKeeperClient) Children(parentNode string) ([]string, error)
func (*ZooKeeperClient) ChildrenW ¶
func (client *ZooKeeperClient) ChildrenW(parentNode string) (<-chan zk.Event, error)
ChildrenW returns a channel which can receive events when something happened to the children
func (*ZooKeeperClient) Close ¶
func (client *ZooKeeperClient) Close()
func (*ZooKeeperClient) CreateNode ¶
func (client *ZooKeeperClient) CreateNode(node string, value []byte, ephemeral, ignoreExists bool) error
Create stores a new value at node.
func (*ZooKeeperClient) DeleteNode ¶
func (client *ZooKeeperClient) DeleteNode(node string, ignoreNotExists bool) error
DeleteNode deletes the latest version data on the node
func (*ZooKeeperClient) ElectionParticipants ¶
func (client *ZooKeeperClient) ElectionParticipants() ([]string, error)
func (*ZooKeeperClient) GetNode ¶
func (client *ZooKeeperClient) GetNode(node string, ignoreNotExists bool) ([]byte, error)
GetNode returns the attached payload of the latest version on the node
func (*ZooKeeperClient) IsConnected ¶
func (client *ZooKeeperClient) IsConnected() bool
func (*ZooKeeperClient) IsDisconnected ¶
func (client *ZooKeeperClient) IsDisconnected() bool
func (*ZooKeeperClient) IsExpired ¶
func (client *ZooKeeperClient) IsExpired() bool
func (*ZooKeeperClient) IsLeader ¶
func (client *ZooKeeperClient) IsLeader(nodeGUID string) (bool, error)
nodeGUID is the result returned in JoinElection Compare all children in the election node, select the one which has min sequential number
func (*ZooKeeperClient) JoinElection ¶
func (client *ZooKeeperClient) JoinElection(node string) (string, error)
Return GUID strings of the participant in the format of node_%010d For e.g, /descartes/election/_c_11f6c4b023b29f33e17d4340ac6815f2-node_0000000010
func (*ZooKeeperClient) NodeExists ¶
func (client *ZooKeeperClient) NodeExists(node string) (bool, error)
NodeExists check if the node already exists
func (*ZooKeeperClient) SetNode ¶
func (client *ZooKeeperClient) SetNode(node string, value []byte) error
SetNode sets the payload on the node
func (*ZooKeeperClient) WatchElectionParticipants ¶
func (client *ZooKeeperClient) WatchElectionParticipants() (<-chan zk.Event, error)