Documentation
¶
Index ¶
- Variables
- func BuildMessageFromTemplate(template model.Template, context map[string]interface{}) (string, error)
- func BuildTopicName(appName, service, topicTemplate string) string
- func CompressUsers(users *[]User) (string, error)
- func GetPushDBTableName(appName, service string) string
- func GetTimeOffsetFromUTCInSeconds(tz string, l zap.Logger) (int, error)
- func GetWhereClauseFromFilters(filters map[string]interface{}) string
- func IsUserIDValid(userID string) bool
- func RandomElementFromSlice(elements []string) string
- func SplitUsersInBucketsByTZ(users *[]User) map[string]*[]User
- type Batch
- type BatchWorkerMessage
- type BenchmarkWorker
- type CreateBatchesFromFiltersWorker
- type CreateBatchesWorker
- type DBPage
- type JobCompletedWorker
- type ProcessBatchWorker
- type ResumeJobWorker
- type SentBatches
- type User
- type Worker
- func (w *Worker) CreateBatchesFromFiltersJob(jobID *[]string) (string, error)
- func (w *Worker) CreateBatchesJob(jobID *[]string) (string, error)
- func (w *Worker) CreateProcessBatchJob(jobID string, appName string, users *[]User) (string, error)
- func (w *Worker) CreateResumeJob(jobID *[]string) (string, error)
- func (w *Worker) ScheduleCreateBatchesFromFiltersJob(jobID *[]string, at int64) (string, error)
- func (w *Worker) ScheduleCreateBatchesJob(jobID *[]string, at int64) (string, error)
- func (w *Worker) ScheduleJobCompletedJob(jobID string, at int64) (string, error)
- func (w *Worker) ScheduleProcessBatchJob(jobID string, appName string, users *[]User, at int64) (string, error)
- func (w *Worker) Start()
Constants ¶
This section is empty.
Variables ¶
var InvalidMessageArray = "array must be of the form [jobId, appName, users]"
InvalidMessageArray is the string returned when the message array of the process batch worker is not valid
Functions ¶
func BuildMessageFromTemplate ¶ added in v0.5.0
func BuildMessageFromTemplate(template model.Template, context map[string]interface{}) (string, error)
BuildMessageFromTemplate build a message using a template and the context
func BuildTopicName ¶ added in v0.5.0
BuildTopicName builds a topic name based in appName, service and a template
func CompressUsers ¶
TODO test this function CompressUsers compresses users payload for enqueuing the message
func GetPushDBTableName ¶ added in v0.5.0
GetPushDBTableName get the table name using appName and service
func GetTimeOffsetFromUTCInSeconds ¶ added in v0.5.0
GetTimeOffsetFromUTCInSeconds returns the offset in seconds from UTC for tz
func GetWhereClauseFromFilters ¶ added in v0.5.0
GetWhereClauseFromFilters returns a string cointaining the where clause to use in the query
func IsUserIDValid ¶ added in v1.0.0
IsUserIDValid tests whether a userID is valid or not
func RandomElementFromSlice ¶
RandomElementFromSlice gets a random element from a slice
func SplitUsersInBucketsByTZ ¶ added in v0.5.0
SplitUsersInBucketsByTZ splits users in buckets by tz
Types ¶
type BatchWorkerMessage ¶ added in v0.5.0
BatchWorkerMessage is the batch worker message struct
func ParseProcessBatchWorkerMessageArray ¶ added in v0.5.0
func ParseProcessBatchWorkerMessageArray(arr []interface{}) (*BatchWorkerMessage, error)
ParseProcessBatchWorkerMessageArray parses the message array of the process batch worker
type BenchmarkWorker ¶ added in v0.5.0
type BenchmarkWorker struct { RedisServer string Database string PoolSize string ProcessID string RedisPool *redis.Pool }
BenchmarkWorker is a worker to benchmark go-workers performance
func GetBenchmarkWorker ¶ added in v0.5.0
func GetBenchmarkWorker(redisServer, database string) *BenchmarkWorker
GetBenchmarkWorker is used to instantiate a new BenchmarkWorker
func (*BenchmarkWorker) Process ¶ added in v0.5.0
func (b *BenchmarkWorker) Process(message *workers.Msg)
Process is a worker for benchmarking go-workers
type CreateBatchesFromFiltersWorker ¶ added in v0.5.0
type CreateBatchesFromFiltersWorker struct { Logger zap.Logger PushDB *extensions.PGClient MarathonDB *extensions.PGClient Workers *Worker Config *viper.Viper DBPageSize int S3Client s3iface.S3API PageProcessingConcurrency int RedisClient *redis.Client }
CreateBatchesFromFiltersWorker is the CreateBatchesUsingFiltersWorker struct
func NewCreateBatchesFromFiltersWorker ¶ added in v0.5.0
func NewCreateBatchesFromFiltersWorker(config *viper.Viper, logger zap.Logger, workers *Worker) *CreateBatchesFromFiltersWorker
NewCreateBatchesFromFiltersWorker gets a new CreateBatchesFromFiltersWorker
func (*CreateBatchesFromFiltersWorker) Process ¶ added in v0.5.0
func (b *CreateBatchesFromFiltersWorker) Process(message *workers.Msg)
Process processes the messages sent to worker queue
type CreateBatchesWorker ¶ added in v0.5.0
type CreateBatchesWorker struct { BatchSize int Config *viper.Viper DBPageSize int Logger zap.Logger MarathonDB *extensions.PGClient PageProcessingConcurrency int PushDB *extensions.PGClient RedisClient *redis.Client S3Client s3iface.S3API SendgridClient *extensions.SendgridClient Workers *Worker }
CreateBatchesWorker is the CreateBatchesWorker struct
func NewCreateBatchesWorker ¶ added in v0.5.0
func NewCreateBatchesWorker(config *viper.Viper, logger zap.Logger, workers *Worker) *CreateBatchesWorker
NewCreateBatchesWorker gets a new CreateBatchesWorker
func (*CreateBatchesWorker) Process ¶ added in v0.5.0
func (b *CreateBatchesWorker) Process(message *workers.Msg)
Process processes the messages sent to batch worker queue
func (*CreateBatchesWorker) ReadCSVFromS3 ¶
func (b *CreateBatchesWorker) ReadCSVFromS3(csvPath string) []string
ReadCSVFromS3 reads CSV from S3 and return correspondent array of strings
type JobCompletedWorker ¶
type JobCompletedWorker struct { Logger zap.Logger MarathonDB *extensions.PGClient Config *viper.Viper SendgridClient *extensions.SendgridClient }
JobCompletedWorker is the JobCompletedWorker struct
func NewJobCompletedWorker ¶
func NewJobCompletedWorker(config *viper.Viper, logger zap.Logger) *JobCompletedWorker
NewJobCompletedWorker gets a new JobCompletedWorker
func (*JobCompletedWorker) Process ¶
func (b *JobCompletedWorker) Process(message *workers.Msg)
Process processes the messages sent to worker queue
type ProcessBatchWorker ¶
type ProcessBatchWorker struct { Config *viper.Viper Kafka interfaces.PushProducer Logger zap.Logger MarathonDB *extensions.PGClient RedisClient *redis.Client SendgridClient *extensions.SendgridClient Workers *Worker }
ProcessBatchWorker is the ProcessBatchWorker struct
func NewProcessBatchWorker ¶ added in v0.5.0
func NewProcessBatchWorker(config *viper.Viper, logger zap.Logger, kafkaClient interfaces.PushProducer, workers *Worker) *ProcessBatchWorker
NewProcessBatchWorker gets a new ProcessBatchWorker
func (*ProcessBatchWorker) Process ¶ added in v0.5.0
func (batchWorker *ProcessBatchWorker) Process(message *workers.Msg)
Process processes the messages sent to batch worker queue and send them to kafka
type ResumeJobWorker ¶ added in v0.5.0
type ResumeJobWorker struct { Logger zap.Logger MarathonDB *extensions.PGClient Workers *Worker Config *viper.Viper RedisClient *redis.Client }
ResumeJobWorker is the CreateBatchesUsingFiltersWorker struct
func NewResumeJobWorker ¶ added in v0.5.0
NewResumeJobWorker gets a new ResumeJobWorker
func (*ResumeJobWorker) Process ¶ added in v0.5.0
func (b *ResumeJobWorker) Process(message *workers.Msg)
Process processes the messages sent to worker queue
type SentBatches ¶ added in v0.5.0
SentBatches is a struct that helps tracking sent batches
type User ¶ added in v0.5.0
type User struct { CreatedAt pg.NullTime `json:"created_at,omitempty" sql:"created_at"` UserID string `json:"user_id,omitempty" sql:"user_id"` Token string `json:"token,omitempty" sql:"token"` Locale string `json:"locale,omitempty" sql:"locale"` Region string `json:"region,omitempty" sql:"region"` Tz string `json:"tz,omitempty" sql:"tz"` Fiu string `json:"fiu,omitempty" sql:"fiu"` Adid string `json:"adid,omitempty" sql:"adid"` VendorID string `json:"vendor_id,omitempty" sql:"vendor_id"` }
User is the struct that will keep users before sending them to send batches worker
type Worker ¶
Worker is the struct that will configure workers
func (*Worker) CreateBatchesFromFiltersJob ¶ added in v0.5.0
CreateBatchesFromFiltersJob creates a new CreateBatchesFromFiltersWorker job
func (*Worker) CreateBatchesJob ¶ added in v0.5.0
CreateBatchesJob creates a new CreateBatchesWorker job
func (*Worker) CreateProcessBatchJob ¶ added in v0.5.0
CreateProcessBatchJob creates a new ProcessBatchWorker job
func (*Worker) CreateResumeJob ¶ added in v0.5.0
CreateResumeJob creates a new ResumeJobWorker job
func (*Worker) ScheduleCreateBatchesFromFiltersJob ¶ added in v0.5.0
ScheduleCreateBatchesFromFiltersJob schedules a new CreateBatchesWorker job
func (*Worker) ScheduleCreateBatchesJob ¶ added in v0.5.0
ScheduleCreateBatchesJob schedules a new CreateBatchesWorker job
func (*Worker) ScheduleJobCompletedJob ¶
ScheduleJobCompletedJob schedules a new JobCompletedWorker job