service

package
v1.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GetCantabularClient = func(cfg *config.Config) CantabularClient {
	return cantabular.NewClient(
		cantabular.Config{
			Host:       cfg.CantabularURL,
			ExtApiHost: cfg.CantabularExtURL,
		},
		dphttp.NewClient(),
		nil,
	)
}
View Source
var GetDatasetAPIClient = func(cfg *config.Config) DatasetAPIClient {
	return dataset.NewAPIClient(cfg.DatasetAPIURL)
}
View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer {
	s := dphttp.NewServer(bindAddr, router)
	s.HandleOSSignals = false
	return s
}

GetHTTPServer returns an http server

View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) {
	versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version)
	if err != nil {
		return nil, err
	}
	hc := healthcheck.New(versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval)
	return &hc, nil
}

GetHealthCheck returns a healthcheck

View Source
var GetImportAPIClient = func(cfg *config.Config) ImportAPIClient {
	return importapi.New(cfg.ImportAPIURL)
}
View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) {
	kafkaOffset := kafka.OffsetNewest
	if cfg.KafkaConfig.OffsetOldest {
		kafkaOffset = kafka.OffsetOldest
	}
	cgConfig := &kafka.ConsumerGroupConfig{
		BrokerAddrs:       cfg.KafkaConfig.Addr,
		Topic:             cfg.KafkaConfig.InstanceStartedTopic,
		GroupName:         cfg.KafkaConfig.InstanceStartedGroup,
		MinBrokersHealthy: &cfg.KafkaConfig.ConsumerMinBrokersHealthy,
		KafkaVersion:      &cfg.KafkaConfig.Version,
		NumWorkers:        &cfg.KafkaConfig.NumWorkers,
		Offset:            &kafkaOffset,
	}
	if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
		cgConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.KafkaConfig.SecCACerts,
			cfg.KafkaConfig.SecClientCert,
			cfg.KafkaConfig.SecClientKey,
			cfg.KafkaConfig.SecSkipVerify,
		)
	}
	return kafka.NewConsumerGroup(ctx, cgConfig)
}

GetKafkaConsumer returns a Kafka consumer with the provided config

View Source
var GetKafkaProducer = func(ctx context.Context, cfg *config.Config) (kafka.IProducer, error) {
	pConfig := &kafka.ProducerConfig{
		BrokerAddrs:       cfg.KafkaConfig.Addr,
		Topic:             cfg.KafkaConfig.CategoryDimensionImportTopic,
		MinBrokersHealthy: &cfg.KafkaConfig.ProducerMinBrokersHealthy,
		KafkaVersion:      &cfg.KafkaConfig.Version,
		MaxMessageBytes:   &cfg.KafkaConfig.MaxBytes,
	}
	if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
		pConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.KafkaConfig.SecCACerts,
			cfg.KafkaConfig.SecClientCert,
			cfg.KafkaConfig.SecClientKey,
			cfg.KafkaConfig.SecSkipVerify,
		)
	}
	return kafka.NewProducer(ctx, pConfig)
}

GetKafkaProducer returns a kafka producer with the provided config

View Source
var GetRecipeAPIClient = func(cfg *config.Config) RecipeAPIClient {
	return recipe.NewClient(cfg.RecipeAPIURL)
}

Functions

This section is empty.

Types

type CantabularClient added in v0.3.0

type CantabularClient interface {
	GetDimensionsByName(ctx context.Context, req cantabular.GetDimensionsByNameRequest) (*cantabular.GetDimensionsResponse, error)
	Checker(context.Context, *healthcheck.CheckState) error
	CheckerAPIExt(ctx context.Context, state *healthcheck.CheckState) error
}

type DatasetAPIClient added in v0.3.0

type DatasetAPIClient interface {
	PutInstance(context.Context, string, string, string, string, dataset.UpdateInstance, string) (string, error)
	PutInstanceState(context.Context, string, string, dataset.State, string) (string, error)
	Checker(context.Context, *healthcheck.CheckState) error
}

type HTTPServer

type HTTPServer interface {
	ListenAndServe() error
	Shutdown(ctx context.Context) error
}

HTTPServer defines the required methods from the HTTP server

type HealthChecker

type HealthChecker interface {
	Handler(w http.ResponseWriter, req *http.Request)
	Start(ctx context.Context)
	Stop()
	AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error)
	Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check)
}

HealthChecker defines the required methods from Healthcheck

type ImportAPIClient added in v0.5.0

type ImportAPIClient interface {
	UpdateImportJobState(context.Context, string, string, importapi.State) error
}

type RecipeAPIClient added in v0.3.0

type RecipeAPIClient interface {
	GetRecipe(context.Context, string, string, string) (*recipe.Recipe, error)
	Checker(context.Context, *healthcheck.CheckState) error
}

type Service

type Service struct {
	Cfg         *config.Config
	Server      HTTPServer
	HealthCheck HealthChecker
	Consumer    kafka.IConsumerGroup
	Producer    kafka.IProducer
	// contains filtered or unexported fields
}

Service contains all the configs, server and clients to run the event handler service

func New added in v0.3.0

func New() *Service

New creates a new empty service

func (*Service) Close

func (svc *Service) Close(ctx context.Context) error

Close gracefully shuts the service down in the required order, with timeout

func (*Service) Init added in v0.3.0

func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildTime, gitCommit, version string) error

Init initialises all the service dependencies, including healthcheck with checkers, api and middleware

func (*Service) Start added in v0.3.0

func (svc *Service) Start(ctx context.Context, svcErrors chan error) error

Start starts an initialised service

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL