service

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GetDatasetClient = func(cfg *config.Config) clients.DatasetClient {
	return dataset.NewAPIClient(cfg.DatasetAPIURL)
}

GetDatasetClient gets the Dataset API client

View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer {
	s := dphttp.NewServer(bindAddr, router)
	s.HandleOSSignals = false
	return s
}

GetHTTPServer creates an HTTP Server with the provided bind address and router

View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) {
	versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version)
	if err != nil {
		return nil, fmt.Errorf("failed to get version info: %w", err)
	}
	hc := healthcheck.New(
		versionInfo,
		cfg.HealthCheckCriticalTimeout,
		cfg.HealthCheckInterval,
	)
	return &hc, nil
}

GetHealthCheck creates a healthcheck with versionInfo

View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka) (kafka.IConsumerGroup, error) {
	if cfg == nil {
		return nil, errors.New("cannot create a kafka consumer without kafka config")
	}
	kafkaOffset := kafka.OffsetNewest
	if cfg.OffsetOldest {
		kafkaOffset = kafka.OffsetOldest
	}
	cgConfig := &kafka.ConsumerGroupConfig{
		BrokerAddrs:       cfg.Addr,
		Topic:             cfg.ContentUpdatedTopic,
		GroupName:         cfg.ContentUpdatedGroup,
		MinBrokersHealthy: &cfg.ConsumerMinBrokersHealthy,
		KafkaVersion:      &cfg.Version,
		Offset:            &kafkaOffset,
	}
	if cfg.SecProtocol == config.KafkaTLSProtocol {
		cgConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.SecCACerts,
			cfg.SecClientCert,
			cfg.SecClientKey,
			cfg.SecSkipVerify,
		)
	}
	return kafka.NewConsumerGroup(ctx, cgConfig)
}

GetKafkaConsumer returns a Kafka Consumer group

View Source
var GetKafkaProducer = func(ctx context.Context, cfg *config.Kafka) (kafka.IProducer, error) {
	if cfg == nil {
		return nil, errors.New("cannot create a kafka producer without kafka config")
	}
	pConfig := &kafka.ProducerConfig{
		BrokerAddrs:       cfg.Addr,
		Topic:             cfg.ProducerTopic,
		MinBrokersHealthy: &cfg.ProducerMinBrokersHealthy,
		KafkaVersion:      &cfg.Version,
		MaxMessageBytes:   &cfg.MaxBytes,
	}
	if cfg.SecProtocol == config.KafkaTLSProtocol {
		pConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.SecCACerts,
			cfg.SecClientCert,
			cfg.SecClientKey,
			cfg.SecSkipVerify,
		)
	}
	return kafka.NewProducer(ctx, pConfig)
}

GetKafkaProducer creates a Kafka producer and sets the producder flag to true

View Source
var GetTopicClient = func(cfg *config.Config) topicCli.Clienter {
	return topicCli.New(cfg.TopicAPIURL)
}

GetTopicClient gets the Topic API client

View Source
var GetZebedee = func(cfg *config.Config) clients.ZebedeeClient {
	return zebedee.New(cfg.ZebedeeURL)
}

GetZebedee gets the Zebedee Client

Functions

This section is empty.

Types

type HTTPServer

type HTTPServer interface {
	ListenAndServe() error
	Shutdown(ctx context.Context) error
}

HTTPServer defines the required methods from the HTTP server

type HealthChecker

type HealthChecker interface {
	Handler(w http.ResponseWriter, req *http.Request)
	Start(ctx context.Context)
	Stop()
	AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error)
	Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check)
}

HealthChecker defines the required methods from Healthcheck

type Service

type Service struct {
	Cfg         *config.Config
	Cache       cache.List
	Server      HTTPServer
	HealthCheck HealthChecker
	Consumer    kafka.IConsumerGroup
	Producer    kafka.IProducer
	ZebedeeCli  clients.ZebedeeClient
	DatasetCli  clients.DatasetClient
	TopicCli    topicCli.Clienter
}

Service contains all the configs, server and clients to run the event handler service

func New added in v0.24.0

func New() *Service

func (*Service) Close

func (svc *Service) Close(ctx context.Context) error

Close gracefully shuts the service down in the required order, with timeout

func (*Service) Init added in v0.24.0

func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildTime, gitCommit, version string) error

func (*Service) Start added in v0.24.0

func (svc *Service) Start(ctx context.Context, svcErrors chan error) error

Start the service

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL