Documentation ¶
Index ¶
Constants ¶
const ( APNSPlatform = "apns" GCMPlatform = "gcm" )
Constants
const ( MetricsTokensDeleteSuccess = "tokens_delete_success" MetricsTokensDeleteError = "tokens_delete_error" MetricsTokensDeleteNonexistent = "tokens_delete_nonexistent" )
Metrics name sent by the Handler
Variables ¶
var ( ErrAPNSUnmarshal = errors.New("error unmarshalling apns message") ErrGCMUnmarshal = errors.New("error unmarshalling gcm message") )
Errors
var AvailableStatsReporters = map[string]statsReporterInitializer{ "statsd": func(config *viper.Viper, logger *logrus.Logger, clientOrNil interfaces.StatsDClient) (interfaces.StatsReporter, error) { return extensions.NewStatsD(config, logger, clientOrNil) }, }
AvailableStatsReporters contains functions to initialize all stats reporters
Functions ¶
func WaitTimeout ¶
WaitTimeout waits for the waitgroup for the specified max timeout. Returns true if waiting timed out. got from http://stackoverflow.com/a/32843750/3987733
Types ¶
type Broker ¶
type Broker struct { StatsReporters []interfaces.StatsReporter Logger *log.Logger Config *viper.Viper InChan chan QueueMessage InvalidTokenOutChan chan *InvalidToken InvalidTokenEnabled bool // contains filtered or unexported fields }
Broker receives kafka messages in its InChan, unmarshal them according to the platform and routes them to the correct out channel after examining their content.
func NewBroker ¶
func NewBroker( logger *log.Logger, cfg *viper.Viper, statsReporters []interfaces.StatsReporter, inChan chan QueueMessage, pendingMessagesWG *sync.WaitGroup, ) (*Broker, error)
NewBroker creates a new Broker instance
type InvalidToken ¶
InvalidToken represents a token with the necessary information to be deleted
type InvalidTokenHandler ¶
type InvalidTokenHandler struct { Logger *log.Logger Config *viper.Viper StatsReporter []interfaces.StatsReporter Client *extensions.PGClient InChan chan *InvalidToken Buffer []*InvalidToken // contains filtered or unexported fields }
InvalidTokenHandler takes the InvalidTokens from the InChannel and put them in a buffer. When the buffer is full or after a timeout, it is flushed, triggering the deletion of the tokens from the database
func NewInvalidTokenHandler ¶
func NewInvalidTokenHandler( logger *log.Logger, cfg *viper.Viper, statsReporter []interfaces.StatsReporter, inChan chan *InvalidToken, dbOrNil ...interfaces.DB, ) (*InvalidTokenHandler, error)
NewInvalidTokenHandler returns a new InvalidTokenHandler instance
func (*InvalidTokenHandler) Start ¶
func (i *InvalidTokenHandler) Start()
Start starts to process the InvalidTokens from the intake channel
func (*InvalidTokenHandler) Stop ¶
func (i *InvalidTokenHandler) Stop()
Stop stops the Handler from consuming messages from the intake channel
type KafkaConsumer ¶
type KafkaConsumer struct { Topics []string Brokers string Consumer interfaces.KafkaConsumerClient ConsumerGroup string OffsetResetStrategy string Config *viper.Viper ChannelSize int Logger *logrus.Logger FetchMinBytes int FetchWaitMaxMs int SessionTimeout int HandleAllMessagesBeforeExiting bool AssignedPartition bool // contains filtered or unexported fields }
KafkaConsumer for getting pusher feedbacks
func NewKafkaConsumer ¶
func NewKafkaConsumer( config *viper.Viper, logger *logrus.Logger, stopChannel *chan struct{}, clientOrNil ...interfaces.KafkaConsumerClient, ) (*KafkaConsumer, error)
NewKafkaConsumer for creating a new KafkaConsumer instance
func (*KafkaConsumer) Cleanup ¶
func (q *KafkaConsumer) Cleanup() error
Cleanup closes kafka consumer connection
func (*KafkaConsumer) ConsumeLoop ¶
func (q *KafkaConsumer) ConsumeLoop() error
ConsumeLoop consume messages from the queue and put in messages to send channel
func (*KafkaConsumer) MessagesChannel ¶
func (q *KafkaConsumer) MessagesChannel() chan QueueMessage
MessagesChannel returns the channel that will receive all messages got from kafka
func (*KafkaConsumer) PendingMessagesWaitGroup ¶
func (q *KafkaConsumer) PendingMessagesWaitGroup() *sync.WaitGroup
PendingMessagesWaitGroup returns the waitGroup that is incremented every time a feedback is consumed
func (*KafkaConsumer) StopConsuming ¶
func (q *KafkaConsumer) StopConsuming()
StopConsuming stops consuming messages from the queue
type KafkaMessage ¶
KafkaMessage implements the FeedbackMessage interface
func (*KafkaMessage) GetGame ¶
func (k *KafkaMessage) GetGame() string
GetGame returns the message's Game
func (*KafkaMessage) GetPlatform ¶
func (k *KafkaMessage) GetPlatform() string
GetPlatform returns the message's Platform
func (*KafkaMessage) GetValue ¶
func (k *KafkaMessage) GetValue() []byte
GetValue returns the message's Value
type Listener ¶
type Listener struct { Config *viper.Viper Logger *log.Logger StatsReporters []interfaces.StatsReporter Queue Queue Broker *Broker InvalidTokenHandler *InvalidTokenHandler GracefulShutdownTimeout int // contains filtered or unexported fields }
Listener will consume push feedbacks from a queue and use a broker to route the messages to a convenient handler
func NewListener ¶
func NewListener( config *viper.Viper, logger *log.Logger, statsdClientOrNil interfaces.StatsDClient, ) (*Listener, error)
NewListener creates and return a new Listener instance
type Message ¶
type Message struct { From string `json:"from"` MessageID string `json:"message_id"` MessageType string `json:"message_type"` Error string `json:"error"` ErrorDescription string `json:"error_description"` DeviceToken string `json:"DeviceToken"` ID string `json:"id"` Err map[string]interface{} `json:"Err"` Metadata map[string]interface{} `json:"metadata"` Reason string `json:"reason"` }
Message is a struct that will decode an apns or gcm feedback message.
type Queue ¶
type Queue interface { MessagesChannel() chan QueueMessage ConsumeLoop() error StopConsuming() Cleanup() error PendingMessagesWaitGroup() *sync.WaitGroup }
Queue interface for making new queues pluggable easily
type QueueMessage ¶
QueueMessage defines the interface that should be implemented by the type produced by a Queue