Documentation
¶
Index ¶
- func NewHandlers(r *Repository)
- type Logs
- type LogsBatch
- type Message
- type ReceiverJob
- type ReceiverResult
- type Repository
- func (repo *Repository) BulkDbInsert(messageRows []Message, logSeverityRows []ServiceSeverity) error
- func (repo *Repository) CreateDbProcessWorkerPools(poolSize int, logsBatch <-chan LogsBatch, logsBatchReceive chan<- LogsBatch, ...)
- func (repo *Repository) CreateJobsPool(jobs chan<- ReceiverJob)
- func (repo *Repository) CreateProcessWorkerPools(poolSize int, results <-chan ReceiverResult, logsBatch chan<- LogsBatch, ...)
- func (repo *Repository) CreateReceiverWorkerPools(poolSize int, jobs <-chan ReceiverJob, results chan<- ReceiverResult, ...)
- func (repo *Repository) MessageDbProcessWorker(logsBatchReceive <-chan LogsBatch, logsBatchSend chan<- LogsBatch, ...)
- func (repo *Repository) MessageProcessWorker(msgSize int, results <-chan ReceiverResult, logsBatch chan<- LogsBatch)
- func (repo *Repository) ReceiverWorker(jobs <-chan ReceiverJob, results chan<- ReceiverResult, ...)
- type ServiceSeverity
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type LogsBatch ¶
type LogsBatch struct { LogMessage []Message ServiceSeverity []ServiceSeverity }
type Message ¶
type Message struct { ServiceName string `json:"service_name"` Payload string `json:"payload"` Severity string `json:"severity"` Timestamp time.Time `json:"timestamp"` }
Message holds the message structure
type ReceiverJob ¶
type ReceiverJob struct{}
type ReceiverResult ¶
type ReceiverResult struct {
Data []byte
}
type Repository ¶
Repository holds App config
var Repo *Repository
func NewRepo ¶
func NewRepo(a *config.AppConfig) *Repository
NewRepo initialise and return Repository Type Which holds AppConfig
func (*Repository) BulkDbInsert ¶
func (repo *Repository) BulkDbInsert(messageRows []Message, logSeverityRows []ServiceSeverity) error
BulkDbInsert inserts batch data
func (*Repository) CreateDbProcessWorkerPools ¶
func (repo *Repository) CreateDbProcessWorkerPools(poolSize int, logsBatch <-chan LogsBatch, logsBatchReceive chan<- LogsBatch, sLogs chan<- lmslogging.Log, wg *sync.WaitGroup)
CreateDbProcessWorkerPools creates a pool of Receiver Workers
func (*Repository) CreateJobsPool ¶
func (repo *Repository) CreateJobsPool(jobs chan<- ReceiverJob)
CreateJobsPool sending unlimited jobs to ReceiverJobs Channel
func (*Repository) CreateProcessWorkerPools ¶
func (repo *Repository) CreateProcessWorkerPools(poolSize int, results <-chan ReceiverResult, logsBatch chan<- LogsBatch, wg *sync.WaitGroup)
CreateProcessWorkerPools creates a pool of Receiver Workers
func (*Repository) CreateReceiverWorkerPools ¶
func (repo *Repository) CreateReceiverWorkerPools(poolSize int, jobs <-chan ReceiverJob, results chan<- ReceiverResult, logs chan<- lmslogging.Log, wg *sync.WaitGroup)
CreateReceiverWorkerPools creates a pool of Receiver Workers
func (*Repository) MessageDbProcessWorker ¶
func (repo *Repository) MessageDbProcessWorker(logsBatchReceive <-chan LogsBatch, logsBatchSend chan<- LogsBatch, lmsLogChan chan<- lmslogging.Log)
MessageDbProcessWorker gets batch the messages from LogsBatch channel and insert to DB 5 retries if error occurred. If still error on insert it will send the LogsBatch back to channel
func (*Repository) MessageProcessWorker ¶
func (repo *Repository) MessageProcessWorker(msgSize int, results <-chan ReceiverResult, logsBatch chan<- LogsBatch)
MessageProcessWorker gets the messages from results channel and process as batch send to 'Logs' channel
func (*Repository) ReceiverWorker ¶
func (repo *Repository) ReceiverWorker(jobs <-chan ReceiverJob, results chan<- ReceiverResult, logs chan<- lmslogging.Log)
ReceiverWorker receives messages from pub/sub and send it to receiverResult Channel