base

package
v0.0.0-...-f13c5dc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2015 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	App                    = "App"
	Broadcast              = "Broadcast"
	CassandraKeyspace      = "CassandraKeyspace"
	CassandraSeeds         = "CassandraSeeds"
	CheckpointMethod       = "CheckpointMethod"
	CheckpointDir          = "CheckpointDir"
	CheckpointKey          = "CheckpointKey"
	CheckpointNamespace    = "CheckpointNamespace"
	CheckpointPartition    = "CheckpointPartition"
	CheckpointTable        = "CheckpointTable"
	CheckpointTopic        = "CheckpointTopic"
	CpuCount               = "CpuCount"
	FlushFrequency         = "FlushFreqency"
	Heartbeat              = "Heartbeat"
	Host                   = "Host"
	HostRegex              = "Host_regex"
	Index                  = "Index"
	Interval               = "Interval"
	KafkaApp               = "kafka"
	KafkaBrokers           = "KafkaBrokers"
	KafkaConsumerGroup     = "KafkaConsumerGroup"
	KafkaPartition         = "KafkaPartition"
	KafkaTopic             = "KafkaTopic"
	KafkaZooKeepers        = "KafkaZooKeepers"
	Key                    = "Key"
	LongRun                = "LongRun"
	MemAlloc               = "MemAlloc"
	Metric                 = "Metric"
	Password               = "Password"
	Platform               = "Platform"
	ProxyPassword          = "ProxyPassword"
	ProxyURL               = "ProxyURL"
	ProxyUsername          = "ProxyUsername"
	RequireAcks            = "RequiredAcks"
	ServerURL              = "ServerURL"
	Source                 = "Source"
	Sourcetype             = "Sourcetype"
	Splunk                 = "Splunk"
	AWSS3                  = "AWSS3"
	SyncWrite              = "SyncWrite"
	SysMemAlloc            = "SysMemAlloc"
	TaskConfig             = "_TaskConfigs_"
	TaskConfigAction       = "TaskConfigAction"
	TaskConfigDelete       = "TaskConfigDelete"
	TaskConfigKey          = "TaskConfigKey"
	TaskConfigNew          = "TaskConfigNew"
	TaskConfigUpdate       = "TaskConfigUpdate"
	TaskStats              = "TaskStats"
	Taskname               = "Taskname"
	Tasks                  = "_Tasks_"
	TargetSystem           = "TargetSystem"
	TargetSystemType       = "TargetSystemType"
	Timestamp              = "Timestamp"
	TotoalMemAlloc         = "TotalMemAlloc"
	UseOffsetNewest        = "UseOffsetNewest"
	UseOffsetOldest        = "UseOffsetOldest"
	Username               = "Username"
	ZooKeeperRoot          = "ZooKeeperRoot"
	ZooKeeperElectionRoot  = "ZooKeeperElectionRoot"
	ZooKeeperHeartbeatRoot = "ZooKeeperHeartbeatRoot"
	ZooKeeperLongRunTask   = "ZooKeeperLongRunTask"
	ZooKeeperServers       = "ZooKeeperServers"
)
View Source
const (
	Root            = "/descartes"
	ElectionRoot    = Root + "/election"
	HeartbeatRoot   = Root + "/heartbeat"
	LongRunTaskRoot = Root + "/long_run_tasks"
)

Variables

This section is empty.

Functions

func ToJsonObject

func ToJsonObject(data []byte) (map[string]interface{}, error)

Types

type BaseConfig

type BaseConfig map[string]string

type BaseJob

type BaseJob struct {
	// contains filtered or unexported fields
}

func NewJob

func NewJob(f JobFunc, when int64, interval int64, params JobParam) *BaseJob

func (*BaseJob) Callback

func (job *BaseJob) Callback()

func (*BaseJob) ExpirationTime

func (job *BaseJob) ExpirationTime() int64

func (*BaseJob) Id

func (job *BaseJob) Id() string

func (*BaseJob) Interval

func (job *BaseJob) Interval() int64

func (*BaseJob) Less

func (job *BaseJob) Less(other llrb.Item) bool

func (*BaseJob) ResetFunc

func (job *BaseJob) ResetFunc(f JobFunc)

func (*BaseJob) SetIntialExpirationTime

func (job *BaseJob) SetIntialExpirationTime(when int64)

func (*BaseJob) Start

func (job *BaseJob) Start()

func (*BaseJob) Stop

func (job *BaseJob) Stop()

func (*BaseJob) String

func (job *BaseJob) String() string

func (*BaseJob) UpdateExpirationTime

func (job *BaseJob) UpdateExpirationTime() int64

type CassandraCheckpointer

type CassandraCheckpointer struct {
	// contains filtered or unexported fields
}

func NewCassandraCheckpointer

func NewCassandraCheckpointer(config BaseConfig) *CassandraCheckpointer

func (*CassandraCheckpointer) DeleteCheckpoint

func (checkpoint *CassandraCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error

func (*CassandraCheckpointer) GetCheckpoint

func (checkpoint *CassandraCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)

@keyInfo: shall contain a Key

func (*CassandraCheckpointer) Start

func (checkpoint *CassandraCheckpointer) Start()

func (*CassandraCheckpointer) Stop

func (checkpoint *CassandraCheckpointer) Stop()

func (*CassandraCheckpointer) WriteCheckpoint

func (checkpoint *CassandraCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error

@keyInfo: shall contain a Key

type Checkpointer

type Checkpointer interface {
	Start()
	Stop()
	GetCheckpoint(keyInfo map[string]string) ([]byte, error)
	WriteCheckpoint(keyInfo map[string]string, value []byte) error
	DeleteCheckpoint(keyInfo map[string]string) error
}

func NewFileCheckpointer

func NewFileCheckpointer() Checkpointer

func NewKafkaCheckpointer

func NewKafkaCheckpointer(client *KafkaClient) Checkpointer

type Data

type Data struct {
	MetaInfo map[string]string
	RawData  [][]byte
}

func NewData

func NewData(metaInfo map[string]string, rawData [][]byte) *Data

type DataReader

type DataReader interface {
	Start()
	Stop()
	ReadData() ([]byte, error)
	IndexData() error
}

type DataWriter

type DataWriter interface {
	Start()
	Stop()
	WriteData(data *Data) error // can be sync or async
	WriteDataSync(data *Data) error
	WriteDataAsync(data *Data) error
}

type FileCheckpointer

type FileCheckpointer struct {
}

func (*FileCheckpointer) DeleteCheckpoint

func (ck *FileCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error

@keyInfo: contains "CheckpointDir", "CheckpointNamespace", "CheckpointKey"

func (*FileCheckpointer) GetCheckpoint

func (ck *FileCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)

@keyInfo: contains "CheckpointDir", "CheckpointNamespace", "CheckpointKey"

func (*FileCheckpointer) Start

func (ck *FileCheckpointer) Start()

func (*FileCheckpointer) Stop

func (ck *FileCheckpointer) Stop()

func (*FileCheckpointer) WriteCheckpoint

func (ck *FileCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error

@keyInfo: contains "CheckpointDir", "CheckpointNamespace", "CheckpointKey"

type Job

type Job interface {
	ResetFunc(f JobFunc)
	Less(other llrb.Item) bool
	String() string
	Id() string
	Interval() int64
	ExpirationTime() int64
	UpdateExpirationTime() int64
	SetIntialExpirationTime(when int64)
	Start()
	Stop()
	Callback()
}

type JobFunc

type JobFunc func(params JobParam) error

type JobList

type JobList []Job

func (JobList) Len

func (jobs JobList) Len() int

JobList implements sort.Interface

func (JobList) Less

func (jobs JobList) Less(i, j int) bool

func (JobList) Swap

func (jobs JobList) Swap(i, j int)

type JobParam

type JobParam interface{}

type KafkaCheckpointer

type KafkaCheckpointer struct {
	// contains filtered or unexported fields
}

func (*KafkaCheckpointer) DeleteCheckpoint

func (ck *KafkaCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error

func (*KafkaCheckpointer) GetCheckpoint

func (ck *KafkaCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)

func (*KafkaCheckpointer) Start

func (ck *KafkaCheckpointer) Start()

func (*KafkaCheckpointer) Stop

func (ck *KafkaCheckpointer) Stop()

func (*KafkaCheckpointer) WriteCheckpoint

func (ck *KafkaCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error

type KafkaClient

type KafkaClient struct {
	// contains filtered or unexported fields
}

func NewKafkaClient

func NewKafkaClient(brokerConfig BaseConfig, clientName string) *KafkaClient

func (*KafkaClient) BrokerIPs

func (client *KafkaClient) BrokerIPs() []string

func (*KafkaClient) Client

func (client *KafkaClient) Client() sarama.Client

func (*KafkaClient) Close

func (client *KafkaClient) Close()

func (*KafkaClient) GetConsumerOffset

func (client *KafkaClient) GetConsumerOffset(consumerGroup string,
	topic string, partition int32) (int64, error)

func (*KafkaClient) GetLastBlock

func (client *KafkaClient) GetLastBlock(topic string, partition int32) ([]byte, error)

@Return (position, nil) if no errors (nil, nil) if topic or partition etc doesn't exist (nil, err) for other errors

func (*KafkaClient) GetProducerOffset

func (client *KafkaClient) GetProducerOffset(topic string, partition int32) (int64, error)

Return: OffsetNotExist

func (*KafkaClient) Leader

func (client *KafkaClient) Leader(topic string, partition int32) (*sarama.Broker, error)

func (*KafkaClient) TopicPartitions

func (client *KafkaClient) TopicPartitions(topic string) (map[string][]int32, error)

type NullCheckpointer

type NullCheckpointer struct {
}

func NewNullCheckpointer

func NewNullCheckpointer() *NullCheckpointer

func (*NullCheckpointer) DeleteCheckpoint

func (checkpoint *NullCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error

func (*NullCheckpointer) GetCheckpoint

func (checkpoint *NullCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)

func (*NullCheckpointer) Start

func (checkpoint *NullCheckpointer) Start()

func (*NullCheckpointer) Stop

func (checkpoint *NullCheckpointer) Stop()

func (*NullCheckpointer) WriteCheckpoint

func (checkpoint *NullCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler() *Scheduler

func (*Scheduler) AddJobs

func (sched *Scheduler) AddJobs(jobs []Job)

func (*Scheduler) RemoveJobs

func (sched *Scheduler) RemoveJobs(jobs []Job)

func (*Scheduler) SetMaxStartDelay

func (sched *Scheduler) SetMaxStartDelay(d int)

SetMaxStartDelay: not immediately execute the job instead adding a random sleep before executing it @d: in nanosecond

func (*Scheduler) Start

func (sched *Scheduler) Start()

func (*Scheduler) Stop

func (sched *Scheduler) Stop()

func (*Scheduler) UpdateJobs

func (sched *Scheduler) UpdateJobs(jobs []Job)

type StdoutDataWriter

type StdoutDataWriter struct {
}

func (*StdoutDataWriter) Start

func (d *StdoutDataWriter) Start()

func (*StdoutDataWriter) Stop

func (d *StdoutDataWriter) Stop()

func (*StdoutDataWriter) WriteData

func (d *StdoutDataWriter) WriteData(data *Data) error

func (*StdoutDataWriter) WriteDataAsync

func (d *StdoutDataWriter) WriteDataAsync(data *Data) error

func (*StdoutDataWriter) WriteDataSync

func (d *StdoutDataWriter) WriteDataSync(data *Data) error

type ZooKeeperCheckpointer

type ZooKeeperCheckpointer struct {
	// contains filtered or unexported fields
}

func NewZooKeeperCheckpointer

func NewZooKeeperCheckpointer(config BaseConfig) *ZooKeeperCheckpointer

func (*ZooKeeperCheckpointer) DeleteCheckpoint

func (checkpoint *ZooKeeperCheckpointer) DeleteCheckpoint(keyInfo map[string]string) error

@keyInfo: shall contain a Key which is a node path

func (*ZooKeeperCheckpointer) GetCheckpoint

func (checkpoint *ZooKeeperCheckpointer) GetCheckpoint(keyInfo map[string]string) ([]byte, error)

@keyInfo: shall contain a Key which is a node path

func (*ZooKeeperCheckpointer) Start

func (checkpoint *ZooKeeperCheckpointer) Start()

func (*ZooKeeperCheckpointer) Stop

func (checkpoint *ZooKeeperCheckpointer) Stop()

FIXME, guard the Close

func (*ZooKeeperCheckpointer) WriteCheckpoint

func (checkpoint *ZooKeeperCheckpointer) WriteCheckpoint(keyInfo map[string]string, value []byte) error

@keyInfo: shall contain a Key which is a node path

type ZooKeeperClient

type ZooKeeperClient struct {
	// contains filtered or unexported fields
}

func NewZooKeeperClient

func NewZooKeeperClient(serverConfig BaseConfig) *ZooKeeperClient

func (*ZooKeeperClient) Children

func (client *ZooKeeperClient) Children(parentNode string) ([]string, error)

func (*ZooKeeperClient) ChildrenW

func (client *ZooKeeperClient) ChildrenW(parentNode string) (<-chan zk.Event, error)

ChildrenW returns a channel which can receive events when something happened to the children

func (*ZooKeeperClient) Close

func (client *ZooKeeperClient) Close()

func (*ZooKeeperClient) CreateNode

func (client *ZooKeeperClient) CreateNode(node string, value []byte, ephemeral, ignoreExists bool) error

Create stores a new value at node.

func (*ZooKeeperClient) DeleteNode

func (client *ZooKeeperClient) DeleteNode(node string, ignoreNotExists bool) error

DeleteNode deletes the latest version data on the node

func (*ZooKeeperClient) ElectionParticipants

func (client *ZooKeeperClient) ElectionParticipants() ([]string, error)

func (*ZooKeeperClient) GetNode

func (client *ZooKeeperClient) GetNode(node string, ignoreNotExists bool) ([]byte, error)

GetNode returns the attached payload of the latest version on the node

func (*ZooKeeperClient) IsConnected

func (client *ZooKeeperClient) IsConnected() bool

func (*ZooKeeperClient) IsDisconnected

func (client *ZooKeeperClient) IsDisconnected() bool

func (*ZooKeeperClient) IsExpired

func (client *ZooKeeperClient) IsExpired() bool

func (*ZooKeeperClient) IsLeader

func (client *ZooKeeperClient) IsLeader(nodeGUID string) (bool, error)

nodeGUID is the result returned in JoinElection Compare all children in the election node, select the one which has min sequential number

func (*ZooKeeperClient) JoinElection

func (client *ZooKeeperClient) JoinElection(node string) (string, error)

Return GUID strings of the participant in the format of node_%010d For e.g, /descartes/election/_c_11f6c4b023b29f33e17d4340ac6815f2-node_0000000010

func (*ZooKeeperClient) NodeExists

func (client *ZooKeeperClient) NodeExists(node string) (bool, error)

NodeExists check if the node already exists

func (*ZooKeeperClient) SetNode

func (client *ZooKeeperClient) SetNode(node string, value []byte) error

SetNode sets the payload on the node

func (*ZooKeeperClient) WatchElectionParticipants

func (client *ZooKeeperClient) WatchElectionParticipants() (<-chan zk.Event, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL