Documentation ¶
Index ¶
- Variables
- func BuildMessageFromTemplate(template model.Template, context map[string]interface{}) (string, error)
- func BuildTopicName(appName, service, topicTemplate string) string
- func CompressUsers(users *[]User) (string, error)
- func GetPushDBTableName(appName, service string) string
- func GetTimeOffsetFromUTCInSeconds(tz string, l zap.Logger) (int, error)
- func GetWhereClauseFromFilters(filters map[string]interface{}) string
- func IsUserIDValid(userID string) bool
- func RandomElementFromSlice(elements []string) string
- type Batch
- type BatchPart
- type BatchWorkerMessage
- type CSVSplitWorker
- type CreateBatchesWorker
- type DBPage
- type DirectPartMsg
- type DirectWorker
- type JobCompletedWorker
- type ProcessBatchWorker
- type ResumeJobWorker
- type SentBatches
- type StageStatus
- type User
- type Worker
- func (w *Worker) CreateBatchesJob(part *BatchPart) (string, error)
- func (w *Worker) CreateCSVSplitJob(job *model.Job) (string, error)
- func (w *Worker) CreateDirectBatchesJob(job *model.Job) error
- func (w *Worker) CreateProcessBatchJob(jobID string, appName string, users *[]User) (string, error)
- func (w *Worker) CreateResumeJob(jobID *[]string) (string, error)
- func (w *Worker) GetJob(jobID uuid.UUID) (*model.Job, error)
- func (w *Worker) ScheduleCSVSplitJob(job *model.Job, at int64) (string, error)
- func (w *Worker) ScheduleCreateBatchesFromFiltersJob(jobID *[]string, at int64) (string, error)
- func (w *Worker) ScheduleCreateBatchesJob(jobID *[]string, at int64) (string, error)
- func (w *Worker) ScheduleDirectBatchesJob(job *model.Job, at int64) error
- func (w *Worker) ScheduleJobCompletedJob(jobID string, at int64) (string, error)
- func (w *Worker) ScheduleProcessBatchJob(jobID string, appName string, users *[]User, at int64) (string, error)
- func (w *Worker) SendControlGroupToRedis(job *model.Job, ids []string)
- func (w *Worker) Start()
Constants ¶
This section is empty.
Variables ¶
var InvalidMessageArray = "array must be of the form [jobId, appName, users]"
InvalidMessageArray is the string returned when the message array of the process batch worker is not valid
Functions ¶
func BuildMessageFromTemplate ¶ added in v0.5.0
func BuildMessageFromTemplate(template model.Template, context map[string]interface{}) (string, error)
BuildMessageFromTemplate build a message using a template and the context
func BuildTopicName ¶ added in v0.5.0
BuildTopicName builds a topic name based in appName, service and a template
func CompressUsers ¶
CompressUsers compresses users payload for enqueuing the message
func GetPushDBTableName ¶ added in v0.5.0
GetPushDBTableName get the table name using appName and service
func GetTimeOffsetFromUTCInSeconds ¶ added in v0.5.0
GetTimeOffsetFromUTCInSeconds returns the offset in seconds from UTC for tz
func GetWhereClauseFromFilters ¶ added in v0.5.0
GetWhereClauseFromFilters returns a string cointaining the where clause to use in the query
func IsUserIDValid ¶ added in v1.0.0
IsUserIDValid tests whether a userID is valid or not
func RandomElementFromSlice ¶
RandomElementFromSlice gets a random element from a slice
Types ¶
type BatchWorkerMessage ¶ added in v0.5.0
BatchWorkerMessage is the batch worker message struct
func ParseProcessBatchWorkerMessageArray ¶ added in v0.5.0
func ParseProcessBatchWorkerMessageArray(arr []interface{}) (*BatchWorkerMessage, error)
ParseProcessBatchWorkerMessageArray parses the message array of the process batch worker
type CSVSplitWorker ¶
CSVSplitWorker is the CSVSplitWorker struct
func NewCSVSplitWorker ¶
func NewCSVSplitWorker(workers *Worker) *CSVSplitWorker
NewCSVSplitWorker gets a new CSVSplitWorker
func (*CSVSplitWorker) Process ¶
func (b *CSVSplitWorker) Process(message *workers.Msg)
Process processes the messages sent to batch worker queue
type CreateBatchesWorker ¶ added in v0.5.0
CreateBatchesWorker is the CreateBatchesWorker struct
func NewCreateBatchesWorker ¶ added in v0.5.0
func NewCreateBatchesWorker(workers *Worker) *CreateBatchesWorker
NewCreateBatchesWorker gets a new CreateBatchesWorker
func (*CreateBatchesWorker) Process ¶ added in v0.5.0
func (b *CreateBatchesWorker) Process(message *workers.Msg)
Process processes the messages sent to batch worker queue
func (*CreateBatchesWorker) ReadFromCSV ¶
func (b *CreateBatchesWorker) ReadFromCSV(buffer *[]byte, job *model.Job) []string
ReadFromCSV reads CSV from S3 and return correspondent array of strings
type DirectPartMsg ¶
type DirectPartMsg struct { SmallestSeqID uint64 // not in the interval BiggestSeqID uint64 // in the interval JobUUID uuid.UUID }
DirectPartMsg saves information about a block to process
type DirectWorker ¶
DirectWorker is the DirectWorker struct
func NewDirectWorker ¶
func NewDirectWorker(workers *Worker) *DirectWorker
NewDirectWorker gets a new DirectWorker
func (*DirectWorker) Process ¶
func (b *DirectWorker) Process(message *workers.Msg)
Process processes the messages sent to batch worker queue and send them to kafka
type JobCompletedWorker ¶
JobCompletedWorker is the JobCompletedWorker struct
func NewJobCompletedWorker ¶
func NewJobCompletedWorker(workers *Worker) *JobCompletedWorker
NewJobCompletedWorker gets a new JobCompletedWorker
func (*JobCompletedWorker) Process ¶
func (b *JobCompletedWorker) Process(message *workers.Msg)
Process processes the messages sent to worker queue
type ProcessBatchWorker ¶
ProcessBatchWorker is the ProcessBatchWorker struct
func NewProcessBatchWorker ¶ added in v0.5.0
func NewProcessBatchWorker(workers *Worker) *ProcessBatchWorker
NewProcessBatchWorker gets a new ProcessBatchWorker
func (*ProcessBatchWorker) Process ¶ added in v0.5.0
func (b *ProcessBatchWorker) Process(message *workers.Msg)
Process processes the messages sent to batch worker queue and send them to kafka
type ResumeJobWorker ¶ added in v0.5.0
ResumeJobWorker is the CreateBatchesUsingFiltersWorker struct
func NewResumeJobWorker ¶ added in v0.5.0
func NewResumeJobWorker(workers *Worker) *ResumeJobWorker
NewResumeJobWorker gets a new ResumeJobWorker
func (*ResumeJobWorker) Process ¶ added in v0.5.0
func (b *ResumeJobWorker) Process(message *workers.Msg)
Process processes the messages sent to worker queue
type SentBatches ¶ added in v0.5.0
SentBatches is a struct that helps tracking sent batches
type StageStatus ¶
type StageStatus struct { JobID string Stage string StageKey string Description string Client *redis.Client MaxProgress int CurrentProgress int Completed bool SubStageStatus []*StageStatus }
StageStatus holds information about a stage from a worker pipeline in Redis
func NewStageStatus ¶
func NewStageStatus(client *redis.Client, jobID, stage, description string, maxProgress int) (*StageStatus, error)
NewStageStatus returns a new StageStatus instance
func (*StageStatus) IncrProgress ¶
func (s *StageStatus) IncrProgress() error
IncrProgress increments the StageStatus progress by 1 unit
func (*StageStatus) NewSubStage ¶
func (s *StageStatus) NewSubStage( description string, maxProgress int, ) (*StageStatus, error)
NewSubStage creates a new StageStatus from a previous one and add it to its SubStages list
type User ¶ added in v0.5.0
type User struct { UserID string `json:"user_id,omitempty" sql:"user_id"` Token string `json:"token,omitempty" sql:"token"` Locale string `json:"locale,omitempty" sql:"locale"` Region string `json:"region,omitempty" sql:"region"` Tz string `json:"tz,omitempty" sql:"tz"` }
User is the struct that will keep users before sending them to send batches worker
type Worker ¶
type Worker struct { Logger zap.Logger PushDB interfaces.DB MarathonDB interfaces.DB Config *viper.Viper DBPageSize int S3Client interfaces.S3 PageProcessingConcurrency int Statsd *statsd.Client RedisClient *redis.Client ConfigPath string SendgridClient *extensions.SendgridClient Kafka interfaces.PushProducer }
Worker is the struct that will configure workers
func (*Worker) CreateBatchesJob ¶ added in v0.5.0
CreateBatchesJob creates a new CreateBatchesWorker job
func (*Worker) CreateCSVSplitJob ¶
CreateCSVSplitJob creates a new CSVSplitWorker job
func (*Worker) CreateDirectBatchesJob ¶
CreateDirectBatchesJob schedules a new DirectWorker job
func (*Worker) CreateProcessBatchJob ¶ added in v0.5.0
CreateProcessBatchJob creates a new ProcessBatchWorker job
func (*Worker) CreateResumeJob ¶ added in v0.5.0
CreateResumeJob creates a new ResumeJobWorker job
func (*Worker) ScheduleCSVSplitJob ¶
ScheduleCSVSplitJob schedules a new CSVSplitWorker job
func (*Worker) ScheduleCreateBatchesFromFiltersJob ¶ added in v0.5.0
ScheduleCreateBatchesFromFiltersJob schedules a new CreateBatchesWorker job
func (*Worker) ScheduleCreateBatchesJob ¶ added in v0.5.0
ScheduleCreateBatchesJob schedules a new CreateBatchesWorker job
func (*Worker) ScheduleDirectBatchesJob ¶
ScheduleDirectBatchesJob schedules a new DirectWorker job
func (*Worker) ScheduleJobCompletedJob ¶
ScheduleJobCompletedJob schedules a new JobCompletedWorker job
func (*Worker) ScheduleProcessBatchJob ¶ added in v0.5.0
func (w *Worker) ScheduleProcessBatchJob(jobID string, appName string, users *[]User, at int64) (string, error)
ScheduleProcessBatchJob schedules a new ProcessBatchWorker job
func (*Worker) SendControlGroupToRedis ¶
SendControlGroupToRedis send a sequency of users ids to redis