Documentation ¶
Index ¶
- Constants
- Variables
- type CantabularClient
- type DatasetAPIClient
- type FileManager
- type FilterAPIClient
- type Generator
- type HTTPServer
- type HealthChecker
- type PopulationTypesAPIClient
- type S3Uploader
- type Service
- func (svc *Service) BuildRoutes()
- func (svc *Service) BuildRoutesOtel(otServiceName string)
- func (svc *Service) Close(ctx context.Context) error
- func (svc *Service) Consumer() kafka.IConsumerGroup
- func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildT, commit, ver string) error
- func (svc *Service) Producer() kafka.IProducer
- func (svc *Service) Start(ctx context.Context, svcErrors chan error) error
- type VaultClient
Constants ¶
const (
VaultRetries = 3
)
Variables ¶
var GetCantabularClient = func(cfg *config.Config) CantabularClient { return cantabular.NewClient( cantabular.Config{ Host: cfg.CantabularURL, ExtApiHost: cfg.CantabularExtURL, GraphQLTimeout: cfg.DefaultRequestTimeout, }, dphttp.NewClient(), nil, ) }
GetCantabularClient gets and initialises the Cantabular Client
var GetDatasetAPIClient = func(cfg *config.Config) DatasetAPIClient { return dataset.NewAPIClient(cfg.DatasetAPIURL) }
GetDatasetAPIClient gets and initialises the DatasetAPI Client
var GetFileManager = func(cfg *config.Config, vault VaultClient, generator Generator) (FileManager, error) { awscfg := &aws.Config{ Region: aws.String(cfg.AWSRegion), } if cfg.LocalObjectStore != "" { awscfg = &aws.Config{ Credentials: credentials.NewStaticCredentials(cfg.MinioAccessKey, cfg.MinioSecretKey, ""), Endpoint: aws.String(cfg.LocalObjectStore), Region: aws.String(cfg.AWSRegion), DisableSSL: aws.Bool(true), S3ForcePathStyle: aws.Bool(true), } } sess, err := session.NewSession(awscfg) if err != nil { return nil, fmt.Errorf("failed to create aws session: %w", err) } return filemanager.New( filemanager.Config{ VaultKey: "key", PublicBucket: cfg.PublicBucket, PrivateBucket: cfg.PrivateBucket, PublicURL: cfg.S3BucketURL, }, sess, vault, generator, ), nil }
GetFileManager instantiates teh service FileManager
var GetFilterAPIClient = func(cfg *config.Config) FilterAPIClient { return filter.New(cfg.FilterAPIURL) }
GetFilterAPIClient gets and initialises the FilterAPI Client
var GetGenerator = func() Generator { return generator.New() }
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer { s := dphttp.NewServer(bindAddr, router) s.HandleOSSignals = false return s }
GetHTTPServer creates an http server
var GetHTTPServerOtel = func(bindAddr string, router http.Handler) HTTPServer { otelHandler := otelhttp.NewHandler(router, "/") s := dphttp.NewServer(bindAddr, otelHandler) s.HandleOSSignals = false return s }
GetHTTPServerOtel creates an http server with OTEL
var GetHealthCheck = func(cfg *config.Config, buildT, commit, ver string) (HealthChecker, error) { versionInfo, err := healthcheck.NewVersionInfo(buildT, commit, ver) if err != nil { return nil, fmt.Errorf("failed to get version info: %w", err) } hc := healthcheck.New(versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval) return &hc, nil }
GetHealthCheck creates a healthcheck with versionInfo and sets teh HealthCheck flag to true
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) { kafkaOffset := kafka.OffsetNewest if cfg.Kafka.OffsetOldest { kafkaOffset = kafka.OffsetOldest } cgConfig := &kafka.ConsumerGroupConfig{ BrokerAddrs: cfg.Kafka.Addr, Topic: cfg.Kafka.CantabularCSVCreatedTopic, GroupName: cfg.Kafka.CantabularMetadataExportGroup, KafkaVersion: &cfg.Kafka.Version, Offset: &kafkaOffset, NumWorkers: &cfg.Kafka.NumWorkers, MinBrokersHealthy: &cfg.Kafka.ConsumerMinBrokersHealthy, } if cfg.Kafka.SecProtocol == config.KafkaTLSProtocolFlag { cgConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.Kafka.SecCACerts, cfg.Kafka.SecClientCert, cfg.Kafka.SecClientKey, cfg.Kafka.SecSkipVerify, ) } return kafka.NewConsumerGroup(ctx, cgConfig) }
GetKafkaConsumer creates a Kafka consumer
var GetKafkaProducer = func(ctx context.Context, cfg *config.Config) (kafka.IProducer, error) { pConfig := &kafka.ProducerConfig{ BrokerAddrs: cfg.Kafka.Addr, Topic: cfg.Kafka.CantabularCSVWCreatedTopic, KafkaVersion: &cfg.Kafka.Version, MaxMessageBytes: &cfg.Kafka.MaxBytes, MinBrokersHealthy: &cfg.Kafka.ProducerMinBrokersHealthy, } if cfg.Kafka.SecProtocol == config.KafkaTLSProtocolFlag { pConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.Kafka.SecCACerts, cfg.Kafka.SecClientCert, cfg.Kafka.SecClientKey, cfg.Kafka.SecSkipVerify, ) } return kafka.NewProducer(ctx, pConfig) }
GetKafkaProducer creates a Kafka producer
var GetPopulationTypesAPIClient = func(cfg *config.Config) (PopulationTypesAPIClient, error) { return population.NewClient(cfg.PopulationTypesAPIURL) }
GetPopulationTypesAPIClient gets and initialises the PopulationTypesAPI Client
var GetVaultClient = func(cfg *config.Config) (VaultClient, error) { return vault.CreateClient(cfg.VaultToken, cfg.VaultAddress, VaultRetries) }
Functions ¶
This section is empty.
Types ¶
type CantabularClient ¶ added in v1.11.0
type CantabularClient interface {
GetDimensionsByName(context.Context, cantabular.GetDimensionsByNameRequest) (*cantabular.GetDimensionsResponse, error)
}
type DatasetAPIClient ¶ added in v1.0.0
type DatasetAPIClient interface { GetInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID, ifMatch string) (m dataset.Instance, eTag string, err error) PutInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID string, instanceUpdate dataset.UpdateInstance, ifMatch string) (eTag string, err error) GetVersion(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceAuthToken, collectionID, datasetID, edition, version string) (dataset.Version, error) GetVersionMetadataSelection(ctx context.Context, req dataset.GetVersionMetadataSelectionInput) (*dataset.Metadata, error) GetVersionMetadata(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version string) (dataset.Metadata, error) GetVersionDimensions(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version string) (dataset.VersionDimensions, error) GetOptionsInBatches(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, id, edition, version, dimension string, batchSize, maxWorkers int) (dataset.Options, error) GetMetadataURL(id, edition, version string) (url string) Checker(context.Context, *healthcheck.CheckState) error PutVersion(ctx context.Context, usrAuthToken, svcAuthToken, collectionID, datasetID, edition, ver string, v dataset.Version) error }
type FileManager ¶ added in v1.0.0
type FileManager interface { Upload(body io.Reader, filename string) (string, error) UploadPrivate(body io.Reader, filename, vaultPath string) (string, error) PrivateUploader() filemanager.S3Uploader PublicUploader() filemanager.S3Uploader }
type FilterAPIClient ¶ added in v1.3.0
type FilterAPIClient interface { UpdateFilterOutput(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceToken, filterOutputID string, m *filter.Model) error GetOutput(ctx context.Context, userAuthToken, serviceAuthToken, downloadServiceToken, collectionID, filterOutput string) (m filter.Model, err error) Checker(context.Context, *healthcheck.CheckState) error }
type Generator ¶ added in v1.0.0
Generator contains methods for dynamically required strings and tokens e.g. UUIDs, PSKs.
type HTTPServer ¶
HTTPServer defines the required methods from the HTTP server
type HealthChecker ¶
type HealthChecker interface { Handler(w http.ResponseWriter, req *http.Request) Start(ctx context.Context) Stop() AddAndGetCheck(name string, checker healthcheck.Checker) (*healthcheck.Check, error) SubscribeAll(s healthcheck.Subscriber) }
HealthChecker defines the required methods from Healthcheck
type PopulationTypesAPIClient ¶ added in v1.10.0
type PopulationTypesAPIClient interface {
GetAreaTypes(ctx context.Context, input population.GetAreaTypesInput) (population.GetAreaTypesResponse, error)
}
type S3Uploader ¶ added in v1.0.0
type S3Uploader interface { Get(key string) (io.ReadCloser, *int64, error) Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) UploadWithPSK(input *s3manager.UploadInput, psk []byte) (*s3manager.UploadOutput, error) BucketName() string Checker(context.Context, *healthcheck.CheckState) error }
type Service ¶
type Service struct { Config *config.Config Server HTTPServer HealthCheck HealthChecker // contains filtered or unexported fields }
Service contains all the configs, server and clients to run the dp-topic-api API
func (*Service) BuildRoutes ¶
func (svc *Service) BuildRoutes()
func (*Service) BuildRoutesOtel ¶ added in v1.18.0
BuildRoutes builds the routing for the API
func (*Service) Consumer ¶ added in v1.1.0
func (svc *Service) Consumer() kafka.IConsumerGroup
Consumer is a getter for the kafka consumer for use outside package
type VaultClient ¶ added in v1.0.0
type VaultClient interface { WriteKey(path, key, value string) error Checker(context.Context, *healthcheck.CheckState) error }