service

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2022 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const VaultRetries = 3

Variables

View Source
var GetDatasetAPIClient = func(cfg *config.Config) DatasetAPIClient {
	return dataset.NewAPIClient(cfg.DatasetAPIURL)
}

GetDatasetAPIClient gets and initialises the DatasetAPI Client

View Source
var GetGenerator = func() Generator {
	return generator.New()
}
View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer {
	s := dphttp.NewServer(bindAddr, router)
	s.HandleOSSignals = false
	return s
}

GetHTTPServer creates a http server and sets the Server

View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) {
	versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version)
	if err != nil {
		return nil, fmt.Errorf("failed to get version info: %w", err)
	}

	hc := healthcheck.New(
		versionInfo,
		cfg.HealthCheckCriticalTimeout,
		cfg.HealthCheckInterval,
	)
	return &hc, nil
}

GetHealthCheck creates a healthcheck with versionInfo

View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) {
	kafkaOffset := kafka.OffsetNewest
	if cfg.KafkaConfig.OffsetOldest {
		kafkaOffset = kafka.OffsetOldest
	}
	cgConfig := &kafka.ConsumerGroupConfig{
		BrokerAddrs:       cfg.KafkaConfig.Addr,
		Topic:             cfg.KafkaConfig.CsvCreatedTopic,
		GroupName:         cfg.KafkaConfig.CsvCreatedGroup,
		MinBrokersHealthy: &cfg.KafkaConfig.ConsumerMinBrokersHealthy,
		KafkaVersion:      &cfg.KafkaConfig.Version,
		NumWorkers:        &OneAndOnlyOneWorker,
		Offset:            &kafkaOffset,
	}
	if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
		cgConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.KafkaConfig.SecCACerts,
			cfg.KafkaConfig.SecClientCert,
			cfg.KafkaConfig.SecClientKey,
			cfg.KafkaConfig.SecSkipVerify,
		)
	}
	return kafka.NewConsumerGroup(ctx, cgConfig)
}

GetKafkaConsumer creates a Kafka consumer

View Source
var GetS3Clients = func(cfg *config.Config) (privateClient, publicClient S3Client, err error) {
	if cfg.LocalObjectStore != "" {
		s3Config := &aws.Config{
			Credentials:      credentials.NewStaticCredentials(cfg.MinioAccessKey, cfg.MinioSecretKey, ""),
			Endpoint:         aws.String(cfg.LocalObjectStore),
			Region:           aws.String(cfg.AWSRegion),
			DisableSSL:       aws.Bool(true),
			S3ForcePathStyle: aws.Bool(true),
		}

		s, err := session.NewSession(s3Config)
		if err != nil {
			return nil, nil, fmt.Errorf("failed to create aws session (local): %w", err)
		}
		return dps3.NewClientWithSession(cfg.PrivateBucketName, s),
			dps3.NewClientWithSession(cfg.PublicBucketName, s),
			nil
	}

	privateClient, err = dps3.NewClient(cfg.AWSRegion, cfg.PrivateBucketName)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to create S3 Client: %w", err)
	}

	publicClient = dps3.NewClientWithSession(cfg.PublicBucketName, privateClient.Session())
	return privateClient, publicClient, nil
}

GetS3Clients creates the private and public S3 Clients using the same AWS session, or a local storage client if a non-empty LocalObjectStore is provided

View Source
var GetVault = func(cfg *config.Config) (VaultClient, error) {
	return vault.CreateClient(cfg.VaultToken, cfg.VaultAddress, VaultRetries)
}

GetVault creates a VaultClient

View Source
var OneAndOnlyOneWorker = 1 // WARNING - Do NOT EVER make this bigger than '1' otherwise an OOM might happen for more than one large csv file being processed in parallel

Functions

This section is empty.

Types

type DatasetAPIClient

type DatasetAPIClient interface {
	PutVersion(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, datasetID, edition, version string, m dataset.Version) error
	GetInstance(ctx context.Context, userAuthToken, serviceAuthToken, collectionID, instanceID, ifMatch string) (m dataset.Instance, eTag string, err error)
	Checker(context.Context, *healthcheck.CheckState) error
	GetVersionMetadataSelection(context.Context, dataset.GetVersionMetadataSelectionInput) (*dataset.Metadata, error)
}

type Generator

type Generator interface {
	NewPSK() ([]byte, error)
}

Generator contains methods for dynamically required strings and tokens e.g. UUIDs, PSKs.

type HTTPServer

type HTTPServer interface {
	ListenAndServe() error
	Shutdown(ctx context.Context) error
}

HTTPServer defines the required methods from the HTTP server

type HealthChecker

type HealthChecker interface {
	Handler(w http.ResponseWriter, req *http.Request)
	Start(ctx context.Context)
	Stop()
	AddCheck(name string, checker healthcheck.Checker) (err error)
	SubscribeAll(s healthcheck.Subscriber)
}

HealthChecker defines the required methods from Healthcheck

type Initialiser

type Initialiser interface {
	DoGetHTTPServer(bindAddr string, router http.Handler) HTTPServer
	DoGetHealthCheck(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error)
	DoGetKafkaConsumer(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error)
}

Initialiser defines the methods to initialise external services

type S3Client

type S3Client interface {
	Get(key string) (io.ReadCloser, *int64, error)
	GetWithPSK(key string, psk []byte) (io.ReadCloser, *int64, error)
	Head(key string) (*s3.HeadObjectOutput, error)
	UploadWithContext(ctx context.Context, input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
	UploadWithPSKAndContext(ctx context.Context, input *s3manager.UploadInput, psk []byte, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
	BucketName() string
	Session() *session.Session
	Checker(context.Context, *healthcheck.CheckState) error
}

type Service

type Service struct {
	Cfg              *config.Config
	Server           HTTPServer
	HealthCheck      HealthChecker
	Consumer         kafka.IConsumerGroup
	DatasetAPIClient DatasetAPIClient
	S3Private        S3Client
	S3Public         S3Client
	VaultClient      VaultClient
	// contains filtered or unexported fields
}

Service contains all the configs, server and clients to run the event handler service

func New

func New() *Service

func (*Service) Close

func (svc *Service) Close(ctx context.Context) error

Close gracefully shuts the service down in the required order, with timeout

func (*Service) Init

func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildTime, gitCommit, version string) error

Init initialises the service and it's dependencies

func (*Service) Start

func (svc *Service) Start(ctx context.Context, svcErrors chan error) error

Start the service

type VaultClient

type VaultClient interface {
	ReadKey(path, key string) (string, error)
	WriteKey(path, key, value string) error
	Checker(context.Context, *healthcheck.CheckState) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL