Documentation
¶
Index ¶
- Constants
- func Contains(s []string, e string) bool
- func ConvertKafkaMessageToTimber(message *sarama.ConsumerMessage) (timber pb.Timber, err error)
- func ConvertTimberToEsDocumentString(timber pb.Timber, m *jsonpb.Marshaler) string
- func ConvertTimberToKafkaMessage(timber *pb.Timber, topic string) *sarama.ProducerMessage
- func GetApplicationSecretCollection() []string
- func InstruApplicationSecret(appSecret string)
- func NewDummyKafkaFactory() *dummyKafkaFactory
- func NewDummyRateLimiter() *dummyRateLimiter
- func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, ...) (client elasticClient, err error)
- func NewEsConfig(indexMethod string, bulkSize int, flushMs time.Duration, printTPS bool) esConfig
- type ApplicationSecretCollection
- type BaritoConsumerService
- type ClusterConsumer
- type ConsumerWorker
- type ELasticTestHandler
- type Elastic
- type ElasticRetrier
- type KafkaAdmin
- type KafkaFactory
- type LeakyBucket
- type ProducerService
- type RateLimiter
Constants ¶
View Source
const ( ErrConvertKafkaMessage = errkit.Error("Convert KafkaMessage Failed") ErrStore = errkit.Error("Store Failed") ErrElasticsearchClient = errkit.Error("Elasticsearch Client Failed") ErrConsumerWorker = errkit.Error("Consumer Worker Failed") ErrMakeKafkaAdmin = errkit.Error("Make kafka admin failed") ErrMakeNewTopicWorker = errkit.Error("Make new topic worker failed") ErrSpawnWorkerOnNewTopic = errkit.Error("Spawn worker on new topic failed") ErrSpawnWorker = errkit.Error("Span worker failed") ErrHaltWorker = errkit.Error("Consumer Worker Halted") PrefixEventGroupID = "nte" )
View Source
const ( JsonParseError = errkit.Error("JSON Parse Error") ProtoParseError = errkit.Error("Protobuf Parse Error") )
View Source
const ( ErrMakeSyncProducer = errkit.Error("Make sync producer failed") ErrKafkaRetryLimitReached = errkit.Error("Error connecting to kafka, retry limit reached") ErrInitGrpc = errkit.Error("Failed to listen to gRPC address") ErrRegisterGrpc = errkit.Error("Error registering gRPC server endpoint into reverse proxy") ErrReverseProxy = errkit.Error("Error serving REST reverse proxy") )
View Source
const (
DEFAULT_ELASTIC_DOCUMENT_TYPE = "_doc"
)
View Source
const (
RetrieveMessageFailedError = errkit.Error("Retrieve message failed")
)
Variables ¶
This section is empty.
Functions ¶
func ConvertKafkaMessageToTimber ¶
func ConvertKafkaMessageToTimber(message *sarama.ConsumerMessage) (timber pb.Timber, err error)
func ConvertTimberToEsDocumentString ¶ added in v0.13.0
func ConvertTimberToKafkaMessage ¶
func ConvertTimberToKafkaMessage(timber *pb.Timber, topic string) *sarama.ProducerMessage
func GetApplicationSecretCollection ¶
func GetApplicationSecretCollection() []string
func InstruApplicationSecret ¶
func InstruApplicationSecret(appSecret string)
func NewDummyKafkaFactory ¶
func NewDummyKafkaFactory() *dummyKafkaFactory
func NewDummyRateLimiter ¶
func NewDummyRateLimiter() *dummyRateLimiter
func NewElastic ¶
func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, elasticUsername string, elasticPassword string) (client elasticClient, err error)
Types ¶
type ApplicationSecretCollection ¶
type ApplicationSecretCollection struct {
// contains filtered or unexported fields
}
type BaritoConsumerService ¶
type BaritoConsumerService interface { Start() error Close() WorkerMap() map[string]ConsumerWorker NewTopicEventWorker() ConsumerWorker }
func NewBaritoConsumerService ¶
func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerService
type ClusterConsumer ¶
type ClusterConsumer interface { Messages() <-chan *sarama.ConsumerMessage Notifications() <-chan *cluster.Notification Errors() <-chan error MarkOffset(msg *sarama.ConsumerMessage, metadata string) Close() error }
Interfacing cluser.Consumer for testing purpose
type ConsumerWorker ¶
type ConsumerWorker interface { Start() Stop() Halt() IsStart() bool OnError(f func(error)) OnSuccess(f func(*sarama.ConsumerMessage)) OnNotification(f func(*cluster.Notification)) }
func NewConsumerWorker ¶
func NewConsumerWorker(name string, consumer ClusterConsumer) ConsumerWorker
type ELasticTestHandler ¶
type ELasticTestHandler struct { ExistAPIStatus int CreateAPIStatus int PostAPIStatus int ResponseBody []byte CustomHandler func(w http.ResponseWriter, r *http.Request) }
func (*ELasticTestHandler) ServeHTTP ¶
func (handler *ELasticTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
type ElasticRetrier ¶
type ElasticRetrier struct {
// contains filtered or unexported fields
}
func NewElasticRetrier ¶
func NewElasticRetrier(t time.Duration, n int, f func(err error)) *ElasticRetrier
type KafkaAdmin ¶
type KafkaAdmin interface { RefreshTopics() error SetTopics([]string) Topics() []string AddTopic(topic string) Exist(topic string) bool CreateTopic(topic string, numPartitions int32, replicationFactor int16) error Close() }
func NewKafkaAdmin ¶
func NewKafkaAdmin(client sarama.Client) (admin KafkaAdmin, err error)
type KafkaFactory ¶
type KafkaFactory interface { MakeKafkaAdmin() (admin KafkaAdmin, err error) MakeClusterConsumer(groupID, topic string, initialOffset int64) (worker ClusterConsumer, err error) MakeSyncProducer() (producer sarama.SyncProducer, err error) }
func NewKafkaFactory ¶
func NewKafkaFactory(brokers []string, config *sarama.Config) KafkaFactory
type LeakyBucket ¶
type LeakyBucket struct {
// contains filtered or unexported fields
}
func NewLeakyBucket ¶
func NewLeakyBucket(max int32) *LeakyBucket
func (*LeakyBucket) IsFull ¶
func (b *LeakyBucket) IsFull() bool
func (*LeakyBucket) Max ¶
func (b *LeakyBucket) Max() int32
func (*LeakyBucket) Refill ¶
func (l *LeakyBucket) Refill()
func (*LeakyBucket) Take ¶
func (l *LeakyBucket) Take(count int) bool
func (*LeakyBucket) Token ¶
func (b *LeakyBucket) Token() int32
func (*LeakyBucket) UpdateMax ¶
func (b *LeakyBucket) UpdateMax(newMax int32)
type ProducerService ¶ added in v0.13.0
type ProducerService interface { pb.ProducerServer Start() error LaunchREST() error Close() }
func NewProducerService ¶ added in v0.13.0
func NewProducerService(params map[string]interface{}) ProducerService
type RateLimiter ¶
type RateLimiter interface { IsHitLimit(topic string, count int, maxTokenIfNotExist int32) bool Start() Stop() IsStart() bool PutBucket(topic string, bucket *LeakyBucket) Bucket(topic string) *LeakyBucket }
func NewRateLimiter ¶
func NewRateLimiter(duration int) RateLimiter
Source Files
¶
Click to show internal directories.
Click to hide internal directories.