Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var GetElasticSearchClient = func(ctx context.Context, cfg *config.Config) (dpESClient.Client, error) { esConfig := dpESClient.Config{ ClientLib: dpESClient.GoElasticV710, Address: cfg.ElasticSearchAPIURL, } if cfg.SignElasticsearchRequests { awsSigner, err := dpawsauth.NewAWSSignerRoundTripper("", "", cfg.AwsRegion, cfg.AwsService) if err != nil { return nil, fmt.Errorf("failed to create aws v4 signer: %w", err) } esConfig.Transport = awsSigner } esClient, esClientErr := dpES.NewClient(esConfig) if esClientErr != nil { log.Error(ctx, "Failed to create dp-elasticsearch client", esClientErr) return nil, esClientErr } return esClient, nil }
GetElasticSearchClient returns an Elastic Search Client with the AWS signer if the flag 'SignElasticsearchRequests' is enabled
View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer { s := dphttp.NewServer(bindAddr, router) s.HandleOSSignals = false return s }
GetHTTPServer creates an HTTP Server with the provided bind address and router
View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) { versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version) if err != nil { return nil, fmt.Errorf("failed to get version info: %w", err) } hc := healthcheck.New( versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval, ) return &hc, nil }
GetHealthCheck creates a healthcheck with versionInfo
View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka) (kafka.IConsumerGroup, error) { if cfg == nil { return nil, errors.New("cannot create a kafka consumer without kafka config") } kafkaOffset := kafka.OffsetNewest if cfg.OffsetOldest { kafkaOffset = kafka.OffsetOldest } cgConfig := &kafka.ConsumerGroupConfig{ BrokerAddrs: cfg.Addr, Topic: cfg.PublishedContentTopic, GroupName: cfg.PublishedContentGroup, MinBrokersHealthy: &cfg.ConsumerMinBrokersHealthy, KafkaVersion: &cfg.Version, Offset: &kafkaOffset, } if cfg.SecProtocol == config.KafkaTLSProtocol { cgConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.SecCACerts, cfg.SecClientCert, cfg.SecClientKey, cfg.SecSkipVerify, ) } return kafka.NewConsumerGroup(ctx, cgConfig) }
GetKafkaConsumer returns a Kafka Consumer group
Functions ¶
This section is empty.
Types ¶
type ElasticSearch ¶ added in v0.13.0
type ElasticSearch = dpelasticsearch.Client
ElasticSearch is an alias for the dp-elasticsearch client interface
type HTTPServer ¶
HTTPServer defines the required methods from the HTTP server
type HealthChecker ¶
type HealthChecker interface { Handler(w http.ResponseWriter, req *http.Request) Start(ctx context.Context) Stop() AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error) Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check) }
HealthChecker defines the required methods from Healthcheck
type Service ¶
type Service struct { Cfg *config.Config Server HTTPServer HealthCheck HealthChecker Consumer dpkafka.IConsumerGroup EsCli dpelasticsearch.Client }
Service contains all the configs, server and clients to run the event handler service
Click to show internal directories.
Click to hide internal directories.