Documentation
¶
Index ¶
- Variables
- func StartController(c *Controller)
- type Controller
- type Driver
- type Drivers
- type FailureHandler
- type NewConsumer
- type NewDecoder
- type NewProcessor
- type NewSink
- type Params
- type Processor
- type ProcessorContext
- type ProcessorError
- type Result
- type RetryFailureHandler
- type StreamingProcessor
- type ZKNodeSubscriber
Constants ¶
This section is empty.
Variables ¶
var Module = fx.Options( fx.Provide( NewController, ), fx.Invoke(StartController), )
Module configures Drivers and Controller.
Functions ¶
func StartController ¶
func StartController(c *Controller)
StartController starts periodically sync up with aresDB controller
Types ¶
type Controller ¶
type Controller struct { sync.RWMutex // Drivers are all running jobs Drivers Drivers // contains filtered or unexported fields }
Controller is responsible for syncing up with aresDB control
func (*Controller) RegisterOnZK ¶
func (c *Controller) RegisterOnZK() error
RegisterOnZK registes aresDB subscriber instance in zookeeper as an ephemeral node
func (*Controller) RestartEtcdHBService ¶
func (c *Controller) RestartEtcdHBService(params Params)
RestartEtcdHBService registers heartbeat again if etcd cluster changes are detected
func (*Controller) SyncUpJobConfigs ¶
func (c *Controller) SyncUpJobConfigs()
SyncUpJobConfigs sync up jobConfigs with aresDB controller
type Driver ¶
type Driver struct { sync.RWMutex Topic string StartTime time.Time Shutdown bool JobName string AresCluster string TotalProcessors int RunningProcessors int StoppedProcessors int FailedProcessors int RestartingProcessors int ProcessorContext map[string]*ProcessorContext // contains filtered or unexported fields }
Driver will initialize and start the Processor's based on the JobConfig provided
func NewDriver ¶
func NewDriver( jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig, aresControllerClient controllerCli.ControllerClient, processorInitFunc NewProcessor, sinkInitFunc NewSink, consumerInitFunc NewConsumer, decoderInitFunc NewDecoder) (*Driver, error)
NewDriver will return a new Driver instance to start a ingest job
func (*Driver) AddProcessor ¶
AddProcessor will add a new processor to driver
func (*Driver) GetErrors ¶
func (d *Driver) GetErrors() chan ProcessorError
GetErrors returns errors
func (*Driver) MarshalJSON ¶
MarshalJSON marshal driver into json
func (*Driver) RemoveProcessor ¶
RemoveProcessor will remove a processor from driver.
func (*Driver) Start ¶
func (d *Driver) Start()
Start will start the Driver, which starts Processor's to read from Kafka consumer, process and save to database
func (*Driver) WriteContext ¶
func (d *Driver) WriteContext(w http.ResponseWriter)
WriteContext writes context
type Drivers ¶
Drivers contains information about job, ares cluster and its driver
func NewDrivers ¶
func NewDrivers(params Params, aresControllerClient controllerCli.ControllerClient) (Drivers, error)
NewDrivers return Drivers
type FailureHandler ¶
type FailureHandler interface { // HandleFailure will provide a contingent plan to // keep track of failed save HandleFailure(destination sink.Destination, rows []client.Row) error }
FailureHandler interface will be implemented by failure handler that are used when saving to storage layer fails
type NewConsumer ¶
type NewConsumer func(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error)
NewConsumer is the type of function each consumer that implements Consumer should provide for initialization.
type NewDecoder ¶
type NewDecoder func(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (decoder message.Decoder, err error)
NewDecoder is the type of function each decoder that implements decoder should provide for initialization.
type NewProcessor ¶
type NewProcessor func(id int, jobConfig *rules.JobConfig, aresControllerClient controllerCli.ControllerClient, sinkInitFunc NewSink, consumerInitFunc NewConsumer, decoderInitFunc NewDecoder, errors chan ProcessorError, msgSizes chan int64, serviceConfig config.ServiceConfig) (Processor, error)
NewProcessor is the type of function each processor that implements Processor should provide for initialization This function implementation should always return a new instance of the processor
type NewSink ¶
type NewSink func( serviceConfig config.ServiceConfig, jobConfig *rules.JobConfig, cluster string, sinkCfg config.SinkConfig, aresControllerClient controllerCli.ControllerClient) (sink.Sink, error)
NewSink is the type of function each decoder that implements sink should provide for initialization.
type Params ¶
type Params struct { fx.In LifeCycle fx.Lifecycle ServiceConfig config.ServiceConfig JobConfigs rules.JobConfigs SinkInitFunc NewSink ConsumerInitFunc NewConsumer DecoderInitFunc NewDecoder }
Params defines the base objects for jobConfigs.
type Processor ¶
type Processor interface { // GetId will return ID of this processor GetID() int // GetContext will return the processor context GetContext() *ProcessorContext // Run will start the processor and run until shutdown // is triggered for close for some other reason Run() // Stop will stop the processor and close all connections // to kafka consumer group and storage layer Stop() // Restart will stop and start current processor Restart() }
Processor is a interface that all processor needs to implement to work with Driver
func NewStreamingProcessor ¶
func NewStreamingProcessor(id int, jobConfig *rules.JobConfig, aresControllerClient controllerCli.ControllerClient, sinkInitFunc NewSink, consumerInitFunc NewConsumer, decoderInitFunc NewDecoder, errors chan ProcessorError, msgSizes chan int64, serviceConfig config.ServiceConfig) (Processor, error)
NewStreamingProcessor returns Processor to consume, process and save data to db.
type ProcessorContext ¶
type ProcessorContext struct { sync.RWMutex StartTime time.Time `json:"startTime"` TotalMessages int64 `json:"totalMessages"` FailedMessages int64 `json:"failedMessages"` Stopped bool `json:"stopped"` Shutdown bool `json:"shutdown"` Errors processorErrors `json:"errors"` LastUpdated time.Time `json:"lastUpdated"` RestartCount int64 `json:"restartCount"` Restarting bool `json:"restarting"` RestartTime int64 `json:"restartTime"` }
ProcessorContext holds information about total messages processed, number of failed messages, number of waiting messages in batcher and last updated timestamp for this information
type ProcessorError ¶
type ProcessorError struct { // ID of the processor ID int // Timestamp defines time when this error // happened Timestamp int64 // Error generated Error error }
ProcessorError will define the error and ID of processor that generated it
func (ProcessorError) ErrorToJSON ¶
func (p ProcessorError) ErrorToJSON() string
ErrorToJSON converts error to json format
type Result ¶
type Result struct { fx.Out Controller *Controller }
Result defines the objects that the job module provides.
type RetryFailureHandler ¶
type RetryFailureHandler struct {
// contains filtered or unexported fields
}
RetryFailureHandler implements exponential backoff retry
func NewRetryFailureHandler ¶
func NewRetryFailureHandler( config models.FailureHandlerConfig, serviceConfig config.ServiceConfig, db sink.Sink, jobName string) *RetryFailureHandler
NewRetryFailureHandler creates a new RetryFailureHandler
func (*RetryFailureHandler) HandleFailure ¶
func (handler *RetryFailureHandler) HandleFailure(destination sink.Destination, rows []client.Row) (err error)
HandleFailure handles failure with retry
type StreamingProcessor ¶
type StreamingProcessor struct { ID int // contains filtered or unexported fields }
StreamingProcessor defines a individual processor that connects to a Kafka high level consumer, processes the messages based on the type of job and saves to database
func (*StreamingProcessor) GetContext ¶
func (s *StreamingProcessor) GetContext() *ProcessorContext
GetContext will return context of this processor
func (*StreamingProcessor) GetID ¶
func (s *StreamingProcessor) GetID() int
GetID will return ID of this processor
func (*StreamingProcessor) Restart ¶
func (s *StreamingProcessor) Restart()
Restart will stop the processor and start the process again in the case failure detected
func (*StreamingProcessor) Run ¶
func (s *StreamingProcessor) Run()
Run will start the Processor that reads from high level kafka consumer, decodes the message and add the row to batcher for saving to ares.
type ZKNodeSubscriber ¶
type ZKNodeSubscriber struct { // Name is subscriber instanceId Name string `json:"name"` // Host is host name of subscriber Host string `json:"host"` }
ZKNodeSubscriber defines the information stored in ZKNode subscriber