worker

package
v3.0.3+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2018 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var InvalidMessageArray = "array must be of the form [jobId, appName, users]"

InvalidMessageArray is the string returned when the message array of the process batch worker is not valid

Functions

func BuildMessageFromTemplate added in v0.5.0

func BuildMessageFromTemplate(template model.Template, context map[string]interface{}) (string, error)

BuildMessageFromTemplate build a message using a template and the context

func BuildTopicName added in v0.5.0

func BuildTopicName(appName, service, topicTemplate string) string

BuildTopicName builds a topic name based in appName, service and a template

func CompressUsers

func CompressUsers(users *[]User) (string, error)

TODO test this function CompressUsers compresses users payload for enqueuing the message

func GetPushDBTableName added in v0.5.0

func GetPushDBTableName(appName, service string) string

GetPushDBTableName get the table name using appName and service

func GetTimeOffsetFromUTCInSeconds added in v0.5.0

func GetTimeOffsetFromUTCInSeconds(tz string, l zap.Logger) (int, error)

GetTimeOffsetFromUTCInSeconds returns the offset in seconds from UTC for tz

func GetWhereClauseFromFilters added in v0.5.0

func GetWhereClauseFromFilters(filters map[string]interface{}) string

GetWhereClauseFromFilters returns a string cointaining the where clause to use in the query

func IsUserIDValid added in v1.0.0

func IsUserIDValid(userID string) bool

IsUserIDValid tests whether a userID is valid or not

func RandomElementFromSlice

func RandomElementFromSlice(elements []string) string

RandomElementFromSlice gets a random element from a slice

func SplitUsersInBucketsByTZ added in v0.5.0

func SplitUsersInBucketsByTZ(users *[]User) map[string]*[]User

SplitUsersInBucketsByTZ splits users in buckets by tz

Types

type Batch added in v0.5.0

type Batch struct {
	UserIds *[]string
	PageID  int
}

Batch is a struct that helps tracking processes pages

type BatchWorkerMessage added in v0.5.0

type BatchWorkerMessage struct {
	JobID   uuid.UUID
	AppName string
	Users   []User
}

BatchWorkerMessage is the batch worker message struct

func ParseProcessBatchWorkerMessageArray added in v0.5.0

func ParseProcessBatchWorkerMessageArray(arr []interface{}) (*BatchWorkerMessage, error)

ParseProcessBatchWorkerMessageArray parses the message array of the process batch worker

type BenchmarkWorker added in v0.5.0

type BenchmarkWorker struct {
	RedisServer string
	Database    string
	PoolSize    string
	ProcessID   string
	RedisPool   *redis.Pool
}

BenchmarkWorker is a worker to benchmark go-workers performance

func GetBenchmarkWorker added in v0.5.0

func GetBenchmarkWorker(redisServer, database string) *BenchmarkWorker

GetBenchmarkWorker is used to instantiate a new BenchmarkWorker

func (*BenchmarkWorker) Process added in v0.5.0

func (b *BenchmarkWorker) Process(message *workers.Msg)

Process is a worker for benchmarking go-workers

type CreateBatchesFromFiltersWorker added in v0.5.0

type CreateBatchesFromFiltersWorker struct {
	Logger                    zap.Logger
	PushDB                    *extensions.PGClient
	MarathonDB                *extensions.PGClient
	Workers                   *Worker
	Config                    *viper.Viper
	DBPageSize                int
	S3Client                  s3iface.S3API
	PageProcessingConcurrency int
	RedisClient               *redis.Client
}

CreateBatchesFromFiltersWorker is the CreateBatchesUsingFiltersWorker struct

func NewCreateBatchesFromFiltersWorker added in v0.5.0

func NewCreateBatchesFromFiltersWorker(config *viper.Viper, logger zap.Logger, workers *Worker) *CreateBatchesFromFiltersWorker

NewCreateBatchesFromFiltersWorker gets a new CreateBatchesFromFiltersWorker

func (*CreateBatchesFromFiltersWorker) Process added in v0.5.0

func (b *CreateBatchesFromFiltersWorker) Process(message *workers.Msg)

Process processes the messages sent to worker queue

type CreateBatchesWorker added in v0.5.0

type CreateBatchesWorker struct {
	BatchSize                 int
	Config                    *viper.Viper
	DBPageSize                int
	Logger                    zap.Logger
	MarathonDB                *extensions.PGClient
	PageProcessingConcurrency int
	PushDB                    *extensions.PGClient
	RedisClient               *redis.Client
	S3Client                  s3iface.S3API
	SendgridClient            *extensions.SendgridClient
	Workers                   *Worker
}

CreateBatchesWorker is the CreateBatchesWorker struct

func NewCreateBatchesWorker added in v0.5.0

func NewCreateBatchesWorker(config *viper.Viper, logger zap.Logger, workers *Worker) *CreateBatchesWorker

NewCreateBatchesWorker gets a new CreateBatchesWorker

func (*CreateBatchesWorker) Process added in v0.5.0

func (b *CreateBatchesWorker) Process(message *workers.Msg)

Process processes the messages sent to batch worker queue

func (*CreateBatchesWorker) ReadCSVFromS3

func (b *CreateBatchesWorker) ReadCSVFromS3(csvPath string) []string

ReadCSVFromS3 reads CSV from S3 and return correspondent array of strings

type DBPage added in v0.5.0

type DBPage struct {
	Page   int
	Offset int
}

DBPage is a struct that helps create batches from filters jobs

type JobCompletedWorker

type JobCompletedWorker struct {
	Logger         zap.Logger
	MarathonDB     *extensions.PGClient
	Config         *viper.Viper
	SendgridClient *extensions.SendgridClient
}

JobCompletedWorker is the JobCompletedWorker struct

func NewJobCompletedWorker

func NewJobCompletedWorker(config *viper.Viper, logger zap.Logger) *JobCompletedWorker

NewJobCompletedWorker gets a new JobCompletedWorker

func (*JobCompletedWorker) Process

func (b *JobCompletedWorker) Process(message *workers.Msg)

Process processes the messages sent to worker queue

type ProcessBatchWorker

type ProcessBatchWorker struct {
	Config         *viper.Viper
	Kafka          interfaces.PushProducer
	Logger         zap.Logger
	MarathonDB     *extensions.PGClient
	RedisClient    *redis.Client
	SendgridClient *extensions.SendgridClient
	Workers        *Worker
}

ProcessBatchWorker is the ProcessBatchWorker struct

func NewProcessBatchWorker added in v0.5.0

func NewProcessBatchWorker(config *viper.Viper, logger zap.Logger, kafkaClient interfaces.PushProducer, workers *Worker) *ProcessBatchWorker

NewProcessBatchWorker gets a new ProcessBatchWorker

func (*ProcessBatchWorker) Process added in v0.5.0

func (batchWorker *ProcessBatchWorker) Process(message *workers.Msg)

Process processes the messages sent to batch worker queue and send them to kafka

type ResumeJobWorker added in v0.5.0

type ResumeJobWorker struct {
	Logger      zap.Logger
	MarathonDB  *extensions.PGClient
	Workers     *Worker
	Config      *viper.Viper
	RedisClient *redis.Client
}

ResumeJobWorker is the CreateBatchesUsingFiltersWorker struct

func NewResumeJobWorker added in v0.5.0

func NewResumeJobWorker(config *viper.Viper, logger zap.Logger, workers *Worker) *ResumeJobWorker

NewResumeJobWorker gets a new ResumeJobWorker

func (*ResumeJobWorker) Process added in v0.5.0

func (b *ResumeJobWorker) Process(message *workers.Msg)

Process processes the messages sent to worker queue

type SentBatches added in v0.5.0

type SentBatches struct {
	NumBatches  int
	TotalTokens int
}

SentBatches is a struct that helps tracking sent batches

type User added in v0.5.0

type User struct {
	CreatedAt pg.NullTime `json:"created_at,omitempty" sql:"created_at"`
	UserID    string      `json:"user_id,omitempty" sql:"user_id"`
	Token     string      `json:"token,omitempty" sql:"token"`
	Locale    string      `json:"locale,omitempty" sql:"locale"`
	Region    string      `json:"region,omitempty" sql:"region"`
	Tz        string      `json:"tz,omitempty" sql:"tz"`
	Fiu       string      `json:"fiu,omitempty" sql:"fiu"`
	Adid      string      `json:"adid,omitempty" sql:"adid"`
	VendorID  string      `json:"vendor_id,omitempty" sql:"vendor_id"`
}

User is the struct that will keep users before sending them to send batches worker

type Worker

type Worker struct {
	Debug      bool
	Logger     zap.Logger
	ConfigPath string
	Config     *viper.Viper
}

Worker is the struct that will configure workers

func NewWorker added in v0.5.0

func NewWorker(debug bool, l zap.Logger, configPath string) *Worker

NewWorker returns a configured worker

func (*Worker) CreateBatchesFromFiltersJob added in v0.5.0

func (w *Worker) CreateBatchesFromFiltersJob(jobID *[]string) (string, error)

CreateBatchesFromFiltersJob creates a new CreateBatchesFromFiltersWorker job

func (*Worker) CreateBatchesJob added in v0.5.0

func (w *Worker) CreateBatchesJob(jobID *[]string) (string, error)

CreateBatchesJob creates a new CreateBatchesWorker job

func (*Worker) CreateProcessBatchJob added in v0.5.0

func (w *Worker) CreateProcessBatchJob(jobID string, appName string, users *[]User) (string, error)

CreateProcessBatchJob creates a new ProcessBatchWorker job

func (*Worker) CreateResumeJob added in v0.5.0

func (w *Worker) CreateResumeJob(jobID *[]string) (string, error)

CreateResumeJob creates a new ResumeJobWorker job

func (*Worker) ScheduleCreateBatchesFromFiltersJob added in v0.5.0

func (w *Worker) ScheduleCreateBatchesFromFiltersJob(jobID *[]string, at int64) (string, error)

ScheduleCreateBatchesFromFiltersJob schedules a new CreateBatchesWorker job

func (*Worker) ScheduleCreateBatchesJob added in v0.5.0

func (w *Worker) ScheduleCreateBatchesJob(jobID *[]string, at int64) (string, error)

ScheduleCreateBatchesJob schedules a new CreateBatchesWorker job

func (*Worker) ScheduleJobCompletedJob

func (w *Worker) ScheduleJobCompletedJob(jobID string, at int64) (string, error)

ScheduleJobCompletedJob schedules a new JobCompletedWorker job

func (*Worker) ScheduleProcessBatchJob added in v0.5.0

func (w *Worker) ScheduleProcessBatchJob(jobID string, appName string, users *[]User, at int64) (string, error)

ScheduleProcessBatchJob schedules a new ProcessBatchWorker job

func (*Worker) Start

func (w *Worker) Start()

Start starts the worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL