Documentation
¶
Index ¶
Constants ¶
const VaultRetries = 3
Variables ¶
var GetDatasetAPIClient = func(cfg *config.Config) DatasetAPIClient { return dataset.NewAPIClient(cfg.DatasetAPIURL) }
GetDatasetAPIClient gets and initialises the DatasetAPI Client
var GetGenerator = func() Generator { return generator.New() }
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer { s := dphttp.NewServer(bindAddr, router) s.HandleOSSignals = false return s }
GetHTTPServer creates a http server and sets the Server
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) { versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version) if err != nil { return nil, fmt.Errorf("failed to get version info: %w", err) } hc := healthcheck.New( versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval, ) return &hc, nil }
GetHealthCheck creates a healthcheck with versionInfo
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) { kafkaOffset := kafka.OffsetNewest if cfg.KafkaConfig.OffsetOldest { kafkaOffset = kafka.OffsetOldest } cgConfig := &kafka.ConsumerGroupConfig{ BrokerAddrs: cfg.KafkaConfig.Addr, Topic: cfg.KafkaConfig.CsvCreatedTopic, GroupName: cfg.KafkaConfig.CsvCreatedGroup, MinBrokersHealthy: &cfg.KafkaConfig.ConsumerMinBrokersHealthy, KafkaVersion: &cfg.KafkaConfig.Version, NumWorkers: &OneAndOnlyOneWorker, Offset: &kafkaOffset, } if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag { cgConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.KafkaConfig.SecCACerts, cfg.KafkaConfig.SecClientCert, cfg.KafkaConfig.SecClientKey, cfg.KafkaConfig.SecSkipVerify, ) } return kafka.NewConsumerGroup(ctx, cgConfig) }
GetKafkaConsumer creates a Kafka consumer
var GetS3Clients = func(cfg *config.Config) (privateClient, publicClient S3Client, err error) { if cfg.LocalObjectStore != "" { s3Config := &aws.Config{ Credentials: credentials.NewStaticCredentials(cfg.MinioAccessKey, cfg.MinioSecretKey, ""), Endpoint: aws.String(cfg.LocalObjectStore), Region: aws.String(cfg.AWSRegion), DisableSSL: aws.Bool(true), S3ForcePathStyle: aws.Bool(true), } s, err := session.NewSession(s3Config) if err != nil { return nil, nil, fmt.Errorf("failed to create aws session (local): %w", err) } return dps3.NewClientWithSession(cfg.PrivateBucketName, s), dps3.NewClientWithSession(cfg.PublicBucketName, s), nil } privateClient, err = dps3.NewClient(cfg.AWSRegion, cfg.PrivateBucketName) if err != nil { return nil, nil, fmt.Errorf("failed to create S3 Client: %w", err) } publicClient = dps3.NewClientWithSession(cfg.PublicBucketName, privateClient.Session()) return privateClient, publicClient, nil }
GetS3Clients creates the private and public S3 Clients using the same AWS session, or a local storage client if a non-empty LocalObjectStore is provided
var GetVault = func(cfg *config.Config) (VaultClient, error) { return vault.CreateClient(cfg.VaultToken, cfg.VaultAddress, VaultRetries) }
GetVault creates a VaultClient
var OneAndOnlyOneWorker = 1 // WARNING - Do NOT EVER make this bigger than '1' otherwise an OOM might happen for more than one large csv file being processed in parallel
Functions ¶
This section is empty.
Types ¶
type DatasetAPIClient ¶
type DatasetAPIClient interface { PutVersion(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, datasetID, edition, version string, m dataset.Version) error GetInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID, ifMatch string) (m dataset.Instance, eTag string, err error) Checker(context.Context, *healthcheck.CheckState) error GetVersionMetadataSelection(context.Context, dataset.GetVersionMetadataSelectionInput) (*dataset.Metadata, error) }
type Generator ¶
Generator contains methods for dynamically required strings and tokens e.g. UUIDs, PSKs.
type HTTPServer ¶
HTTPServer defines the required methods from the HTTP server
type HealthChecker ¶
type HealthChecker interface { Handler(w http.ResponseWriter, req *http.Request) Start(ctx context.Context) Stop() AddCheck(name string, checker healthcheck.Checker) (err error) SubscribeAll(s healthcheck.Subscriber) }
HealthChecker defines the required methods from Healthcheck
type Initialiser ¶
type Initialiser interface { DoGetHTTPServer(bindAddr string, router http.Handler) HTTPServer DoGetHealthCheck(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) DoGetKafkaConsumer(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) }
Initialiser defines the methods to initialise external services
type S3Client ¶
type S3Client interface { Get(key string) (io.ReadCloser, *int64, error) GetWithPSK(key string, psk []byte) (io.ReadCloser, *int64, error) Head(key string) (*s3.HeadObjectOutput, error) UploadWithContext(ctx context.Context, input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) UploadWithPSKAndContext(ctx context.Context, input *s3manager.UploadInput, psk []byte, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) BucketName() string Session() *session.Session Checker(context.Context, *healthcheck.CheckState) error }
type Service ¶
type Service struct { Cfg *config.Config Server HTTPServer HealthCheck HealthChecker Consumer kafka.IConsumerGroup DatasetAPIClient DatasetAPIClient S3Private S3Client S3Public S3Client VaultClient VaultClient // contains filtered or unexported fields }
Service contains all the configs, server and clients to run the event handler service
type VaultClient ¶
type VaultClient interface { ReadKey(path, key string) (string, error) WriteKey(path, key, value string) error Checker(context.Context, *healthcheck.CheckState) error }