service

package
v1.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2024 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	VaultRetries = 3
)

Variables

View Source
var GetCantabularClient = func(cfg *config.Config) CantabularClient {
	return cantabular.NewClient(
		cantabular.Config{
			Host:           cfg.CantabularURL,
			ExtApiHost:     cfg.CantabularExtURL,
			GraphQLTimeout: cfg.DefaultRequestTimeout,
		},
		dphttp.NewClient(),
		nil,
	)
}

GetCantabularClient gets and initialises the Cantabular Client

View Source
var GetDatasetAPIClient = func(cfg *config.Config) DatasetAPIClient {
	return dataset.NewAPIClient(cfg.DatasetAPIURL)
}

GetDatasetAPIClient gets and initialises the DatasetAPI Client

View Source
var GetFileManager = func(cfg *config.Config, vault VaultClient, generator Generator) (FileManager, error) {
	awscfg := &aws.Config{
		Region: aws.String(cfg.AWSRegion),
	}

	if cfg.LocalObjectStore != "" {
		awscfg = &aws.Config{
			Credentials:      credentials.NewStaticCredentials(cfg.MinioAccessKey, cfg.MinioSecretKey, ""),
			Endpoint:         aws.String(cfg.LocalObjectStore),
			Region:           aws.String(cfg.AWSRegion),
			DisableSSL:       aws.Bool(true),
			S3ForcePathStyle: aws.Bool(true),
		}
	}

	sess, err := session.NewSession(awscfg)
	if err != nil {
		return nil, fmt.Errorf("failed to create aws session: %w", err)
	}

	return filemanager.New(
		filemanager.Config{
			VaultKey:      "key",
			PublicBucket:  cfg.PublicBucket,
			PrivateBucket: cfg.PrivateBucket,
			PublicURL:     cfg.S3BucketURL,
		},
		sess,
		vault,
		generator,
	), nil
}

GetFileManager instantiates teh service FileManager

View Source
var GetFilterAPIClient = func(cfg *config.Config) FilterAPIClient {
	return filter.New(cfg.FilterAPIURL)
}

GetFilterAPIClient gets and initialises the FilterAPI Client

View Source
var GetGenerator = func() Generator {
	return generator.New()
}
View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer {
	s := dphttp.NewServer(bindAddr, router)
	s.HandleOSSignals = false
	return s
}

GetHTTPServer creates an http server

View Source
var GetHTTPServerOtel = func(bindAddr string, router http.Handler) HTTPServer {
	otelHandler := otelhttp.NewHandler(router, "/")
	s := dphttp.NewServer(bindAddr, otelHandler)
	s.HandleOSSignals = false
	return s
}

GetHTTPServerOtel creates an http server with OTEL

View Source
var GetHealthCheck = func(cfg *config.Config, buildT, commit, ver string) (HealthChecker, error) {
	versionInfo, err := healthcheck.NewVersionInfo(buildT, commit, ver)
	if err != nil {
		return nil, fmt.Errorf("failed to get version info: %w", err)
	}

	hc := healthcheck.New(versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval)
	return &hc, nil
}

GetHealthCheck creates a healthcheck with versionInfo and sets teh HealthCheck flag to true

View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) {
	kafkaOffset := kafka.OffsetNewest
	if cfg.Kafka.OffsetOldest {
		kafkaOffset = kafka.OffsetOldest
	}

	cgConfig := &kafka.ConsumerGroupConfig{
		BrokerAddrs:       cfg.Kafka.Addr,
		Topic:             cfg.Kafka.CantabularCSVCreatedTopic,
		GroupName:         cfg.Kafka.CantabularMetadataExportGroup,
		KafkaVersion:      &cfg.Kafka.Version,
		Offset:            &kafkaOffset,
		NumWorkers:        &cfg.Kafka.NumWorkers,
		MinBrokersHealthy: &cfg.Kafka.ConsumerMinBrokersHealthy,
	}

	if cfg.Kafka.SecProtocol == config.KafkaTLSProtocolFlag {
		cgConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.Kafka.SecCACerts,
			cfg.Kafka.SecClientCert,
			cfg.Kafka.SecClientKey,
			cfg.Kafka.SecSkipVerify,
		)
	}

	return kafka.NewConsumerGroup(ctx, cgConfig)
}

GetKafkaConsumer creates a Kafka consumer

View Source
var GetKafkaProducer = func(ctx context.Context, cfg *config.Config) (kafka.IProducer, error) {
	pConfig := &kafka.ProducerConfig{
		BrokerAddrs:       cfg.Kafka.Addr,
		Topic:             cfg.Kafka.CantabularCSVWCreatedTopic,
		KafkaVersion:      &cfg.Kafka.Version,
		MaxMessageBytes:   &cfg.Kafka.MaxBytes,
		MinBrokersHealthy: &cfg.Kafka.ProducerMinBrokersHealthy,
	}

	if cfg.Kafka.SecProtocol == config.KafkaTLSProtocolFlag {
		pConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.Kafka.SecCACerts,
			cfg.Kafka.SecClientCert,
			cfg.Kafka.SecClientKey,
			cfg.Kafka.SecSkipVerify,
		)
	}

	return kafka.NewProducer(ctx, pConfig)
}

GetKafkaProducer creates a Kafka producer

View Source
var GetPopulationTypesAPIClient = func(cfg *config.Config) (PopulationTypesAPIClient, error) {
	return population.NewClient(cfg.PopulationTypesAPIURL)
}

GetPopulationTypesAPIClient gets and initialises the PopulationTypesAPI Client

View Source
var GetVaultClient = func(cfg *config.Config) (VaultClient, error) {
	return vault.CreateClient(cfg.VaultToken, cfg.VaultAddress, VaultRetries)
}

Functions

This section is empty.

Types

type CantabularClient added in v1.11.0

type CantabularClient interface {
	GetDimensionsByName(context.Context, cantabular.GetDimensionsByNameRequest) (*cantabular.GetDimensionsResponse, error)
}

type DatasetAPIClient added in v1.0.0

type DatasetAPIClient interface {
	GetInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID, ifMatch string) (m dataset.Instance, eTag string, err error)
	PutInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID string, instanceUpdate dataset.UpdateInstance, ifMatch string) (eTag string, err error)
	GetVersion(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceAuthToken, collectionID, datasetID, edition, version string) (dataset.Version, error)
	GetVersionMetadataSelection(ctx context.Context, req dataset.GetVersionMetadataSelectionInput) (*dataset.Metadata, error)
	GetVersionMetadata(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version string) (dataset.Metadata, error)
	GetVersionDimensions(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version string) (dataset.VersionDimensions, error)
	GetOptionsInBatches(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version, dimension string, batchSize, maxWorkers int) (dataset.Options, error)
	GetMetadataURL(id, edition, version string) (url string)
	Checker(context.Context, *healthcheck.CheckState) error
	PutVersion(ctx context.Context, usrAuthToken, svcAuthToken, collectionID, datasetID, edition, ver string, v dataset.Version) error
}

type FileManager added in v1.0.0

type FileManager interface {
	Upload(body io.Reader, filename string) (string, error)
	UploadPrivate(body io.Reader, filename, vaultPath string) (string, error)
	PrivateUploader() filemanager.S3Uploader
	PublicUploader() filemanager.S3Uploader
}

type FilterAPIClient added in v1.3.0

type FilterAPIClient interface {
	UpdateFilterOutput(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceToken, filterOutputID string, m *filter.Model) error
	GetOutput(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceToken, collectionID, filterOutput string) (m filter.Model, err error)
	Checker(context.Context, *healthcheck.CheckState) error
}

type Generator added in v1.0.0

type Generator interface {
	NewPSK() ([]byte, error)
	Timestamp() time.Time
}

Generator contains methods for dynamically required strings and tokens e.g. UUIDs, PSKs.

type HTTPServer

type HTTPServer interface {
	ListenAndServe() error
	Shutdown(ctx context.Context) error
}

HTTPServer defines the required methods from the HTTP server

type HealthChecker

type HealthChecker interface {
	Handler(w http.ResponseWriter, req *http.Request)
	Start(ctx context.Context)
	Stop()
	AddAndGetCheck(name string, checker healthcheck.Checker) (*healthcheck.Check, error)
	SubscribeAll(s healthcheck.Subscriber)
}

HealthChecker defines the required methods from Healthcheck

type PopulationTypesAPIClient added in v1.10.0

type PopulationTypesAPIClient interface {
	GetAreaTypes(ctx context.Context, input population.GetAreaTypesInput) (population.GetAreaTypesResponse, error)
}

type S3Uploader added in v1.0.0

type S3Uploader interface {
	Get(key string) (io.ReadCloser, *int64, error)
	Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
	UploadWithPSK(input *s3manager.UploadInput, psk []byte) (*s3manager.UploadOutput, error)
	BucketName() string
	Checker(context.Context, *healthcheck.CheckState) error
}

type Service

type Service struct {
	Config *config.Config
	Server HTTPServer

	HealthCheck HealthChecker
	// contains filtered or unexported fields
}

Service contains all the configs, server and clients to run the dp-topic-api API

func New

func New() *Service

New returns a new Service

func (*Service) BuildRoutes

func (svc *Service) BuildRoutes()

func (*Service) BuildRoutesOtel added in v1.18.0

func (svc *Service) BuildRoutesOtel(otServiceName string)

BuildRoutes builds the routing for the API

func (*Service) Close

func (svc *Service) Close(ctx context.Context) error

Close gracefully shuts the service down in the required order, with timeout

func (*Service) Consumer added in v1.1.0

func (svc *Service) Consumer() kafka.IConsumerGroup

Consumer is a getter for the kafka consumer for use outside package

func (*Service) Init

func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildT, commit, ver string) error

Init initialises the service

func (*Service) Producer added in v1.1.0

func (svc *Service) Producer() kafka.IProducer

Producer is a getter for the kafka producer for use outside package

func (*Service) Start

func (svc *Service) Start(ctx context.Context, svcErrors chan error) error

Start starts the service

type VaultClient added in v1.0.0

type VaultClient interface {
	WriteKey(path, key, value string) error
	Checker(context.Context, *healthcheck.CheckState) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL