Documentation
¶
Overview ¶
A lightweight framework for managing the lifetime of multiple services and their resource dependencies (db connector, etc).
Index ¶
- Constants
- Variables
- type GenericService
- type GrpcService
- type Manager
- type ManagerError
- type ManagerOpts
- func (opts *ManagerOpts) WithErrNotFoundMiddleware() *ManagerOpts
- func (opts *ManagerOpts) WithErrorGenerator(errGen *pkerr.ErrorGenerator) *ManagerOpts
- func (opts *ManagerOpts) WithGrpcLogging(logRPCLevel zerolog.Level, logReqLevel zerolog.Level, ...) *ManagerOpts
- func (opts *ManagerOpts) WithGrpcPingService(addService bool) *ManagerOpts
- func (opts *ManagerOpts) WithGrpcServerAddress(address string) *ManagerOpts
- func (opts *ManagerOpts) WithGrpcServerOpts(grpcOpts ...grpc.ServerOption) *ManagerOpts
- func (opts *ManagerOpts) WithGrpcStreamServerMiddleware(middleware ...pkmiddleware.StreamServerMiddleware)
- func (opts *ManagerOpts) WithGrpcUnaryServerMiddleware(middleware ...pkmiddleware.UnaryServerMiddleware) *ManagerOpts
- func (opts *ManagerOpts) WithLogger(logger zerolog.Logger) *ManagerOpts
- func (opts *ManagerOpts) WithMaxShutdownDuration(max time.Duration) *ManagerOpts
- func (opts *ManagerOpts) WithoutGrpcLogging() *ManagerOpts
- type ManagerTesting
- func (tester ManagerTesting) GrpcClientConn(cleanup bool, opts ...grpc.DialOption) *grpc.ClientConn
- func (tester ManagerTesting) GrpcPingClient(cleanup bool, opts ...grpc.DialOption) PingClient
- func (tester ManagerTesting) PingGrpcServer(ctx context.Context)
- func (tester ManagerTesting) SendSignal(osSignal os.Signal)
- func (tester ManagerTesting) Services() []Service
- type PanicError
- type PingClient
- type PingServer
- type Service
- type ServiceError
- type ServicesErrors
Examples ¶
Constants ¶
const DefaultGrpcAddress = "0.0.0.0:50051"
DefaultGrpcAddress is the default address the gRPC server will listen on.
Variables ¶
var NewPingClient = protogen.NewPingClient
NewPingClient is an alias to protogen.NewPingClient
var RegisterPingServer = protogen.RegisterPingServer
RegisterPingServer is an alias to protogen.RegisterPingServer
Functions ¶
This section is empty.
Types ¶
type GenericService ¶
type GenericService interface { Service // Run is called to run the main service. Run should not exit until an irrecoverable // error occurs or runCtx is cancelled. // // runCtx cancelling indicates that the service should begin to gracefully shutdown. // // When shutdownCtx is cancelled, the manager will stop waiting for run to return // and continue with the shutdown process. // // After Run returns, the resourcesCtx passed to Setup will be cancelled. Run(runCtx context.Context, shutdownCtx context.Context) error }
Generic framework for registering non-grpc based services to the monitor, so multiple services can be run in parallel by the same monitor.
type GrpcService ¶
type GrpcService interface { Service // RegisterOnServer is invoked before Service.Setup to allow the service to // register itself with the server. // // RegisterOnServer need only call the protoc generated registration function. RegisterOnServer(server *grpc.Server) }
GrpcService is a Service with methods to support running a gRPC service.
grpc Server options are passed to the monitor, which is responsible for the lifetime and spin up, and shutdown of the server hosting and GrpcService values.
The service just needs to expose a generic server registration method that calls the specific protoc generated registration method for that service.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the lifetime of the service.
Example (Basic) ¶
Run a basic gRPC service with a manger.
package main import ( "context" "fmt" "github.com/golang/protobuf/ptypes/empty" "github.com/peake100/gRPEAKEC-go/pkclients" "github.com/peake100/gRPEAKEC-go/pkservices" "github.com/peake100/gRPEAKEC-go/pkservices/protogen" "github.com/peake100/gRPEAKEC-go/pktesting" "github.com/rs/zerolog" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "sync" ) // pingService is a basic implementation of PingServer that the manager can use to test // connectivity to the server. type ExampleService struct { } // Id implements Service and returns "ExampleService". func (ping ExampleService) Id() string { return "ExampleService" } // Setup implements Service. func (ping ExampleService) Setup( resourcesCtx context.Context, resourcesReleased *sync.WaitGroup, shutdownCtx context.Context, logger zerolog.Logger, ) error { // Add 1 to the resourcesReleased waitGroup resourcesReleased.Add(1) // Run some sort of resource (like a db connector, message broker, etc) inside // a goroutine. go func() { // When resourcesCtx is cancelled, this resource should be released and the // WaitGroup should be decremented. defer resourcesReleased.Done() logger.Info().Msg("Resource 1 starting") <-resourcesCtx.Done() logger.Info().Msg("Resource 1 released") }() // Add another to the resource. resourcesReleased.Add(1) go func() { defer resourcesReleased.Done() logger.Info().Msg("Resource 2 starting") <-resourcesCtx.Done() logger.Info().Msg("Resource 2 released") }() // When this function returns, the manager will move on to the run stage. return nil } // RegisterOnServer implements GrpcService, and handles registering it onto the // manager's gRPC server. func (ping ExampleService) RegisterOnServer(server *grpc.Server) { protogen.RegisterPingServer(server, ping) } // Ping implements PingServer. It receives an empty message and returns the // result. func (ping ExampleService) Ping( ctx context.Context, msg *empty.Empty, ) (*empty.Empty, error) { fmt.Println("PING RECEIVED!") return msg, nil } // Run a basic gRPC service with a manger. func main() { // Our manager options. managerOpts := pkservices.NewManagerOpts(). // We are adding our own implementation of Ping, so we don't need to register // The default one. WithGrpcPingService(false) // Create a new manager to manage our service. Example service implements // pkservices.PingServer. manager := pkservices.NewManager(managerOpts, ExampleService{}) // Run the manager in it's own goroutine and return errors to our error channel. errChan := make(chan error) go func() { defer close(errChan) errChan <- manager.Run() }() // make a gRPC client to ping the service clientConn, err := grpc.Dial(pkservices.DefaultGrpcAddress, grpc.WithInsecure()) if err != nil { panic(err) } // create a new client to interact with our server pingClient := pkservices.NewPingClient(clientConn) // Wait for the server to be serving requests. ctx, cancel := pktesting.New3SecondCtx() defer cancel() err = pkclients.WaitForGrpcServer(ctx, clientConn) if err != nil { panic(fmt.Errorf("server did not respond: %w", err)) } // Send a ping ctx, cancel = pktesting.New3SecondCtx() defer cancel() _, err = pingClient.Ping(ctx, new(emptypb.Empty)) if err != nil { panic(fmt.Errorf("error on ping: %w", err)) } // Start shutdown. manager.StartShutdown() // Grab our error from the error channel (blocks until the manager is shutdown) err = <-errChan if err != nil { panic(fmt.Errorf("error from manager: %w", err)) } // Exit. }
Output: PING RECEIVED!
func NewManager ¶
func NewManager(opts *ManagerOpts, services ...Service) *Manager
NewManager creates a new Manager to run the given Service values with the passed ManagerOpts. If opts is nil, NewManagerOpts will be used to generate default options.
func (*Manager) Run ¶
Run runs the manager and all it's services / resources. Run blocks until the manager has fully shut down.
func (*Manager) StartShutdown ¶
func (manager *Manager) StartShutdown()
StartShutdown begins shutdown of the manager. Can be called multiple times. This methods returns immediately rather than blocking until the manager shuts down.
func (*Manager) Test ¶
func (manager *Manager) Test(t *testing.T) ManagerTesting
Test returns a test harness with helper methods for testing the manager.
func (*Manager) WaitForShutdown ¶
func (manager *Manager) WaitForShutdown()
WaitForShutdown blocks until the manager is fully shutdown.
type ManagerError ¶
type ManagerError struct { // SetupErr is an error that was returned during setup of the services. This error // will be a ServicesErrors if the cause of the error was one or more services. SetupErr error // RunErr is an error that occurred during running of 1 or more services.This error // will be a ServicesErrors if the cause of the error was one or more services. RunErr error // ShutdownErr is an error that occurred during the shutdown of 1 or more services. ShutdownErr error }
ManagerError is returned from Manager.Run, and returns errors from the various stages of the run.
type ManagerOpts ¶
type ManagerOpts struct {
// contains filtered or unexported fields
}
ManagerOpts are the options for running a manager
func NewManagerOpts ¶
func NewManagerOpts() *ManagerOpts
NewManagerOpts creates a new *ManagerOpts value with default options set.
func (*ManagerOpts) WithErrNotFoundMiddleware ¶ added in v0.0.5
func (opts *ManagerOpts) WithErrNotFoundMiddleware() *ManagerOpts
WithErrNotFoundMiddleware adds pkerr.NewErrNotFoundMiddleware to the unary middlewares.
Default: false.
func (*ManagerOpts) WithErrorGenerator ¶ added in v0.0.4
func (opts *ManagerOpts) WithErrorGenerator( errGen *pkerr.ErrorGenerator, ) *ManagerOpts
WithErrorGenerator adds error-handling middleware from the passed generator to the list of gRPC server options.
Test clients will have corresponding interceptors added to them as well.
Default: nil
func (*ManagerOpts) WithGrpcLogging ¶ added in v0.0.4
func (opts *ManagerOpts) WithGrpcLogging( logRPCLevel zerolog.Level, logReqLevel zerolog.Level, logRespLevel zerolog.Level, logErrors bool, errorTrace bool, ) *ManagerOpts
WithGrpcLogging will add logging middleware to the grpc server using pkmiddleware.NewLoggingMiddleware.
Default:
logRPCLevel: zerolog.DebugLevel
logReqLevel: zerolog.Disabled + 1 (won't be logged)
logRespLevel: zerolog.Disabled + 1 (won't be logged)
logErrors: true
errorTrace: true
func (*ManagerOpts) WithGrpcPingService ¶
func (opts *ManagerOpts) WithGrpcPingService(addService bool) *ManagerOpts
WithGrpcPingService adds a protogen.PingServer service to the server that can be used to test if the server is taking requests.
If true, the service will be added regardless of whether there are any other gRPC services the manager is handling.
Default: true.
func (*ManagerOpts) WithGrpcServerAddress ¶
func (opts *ManagerOpts) WithGrpcServerAddress(address string) *ManagerOpts
WithGrpcServerAddress sets the address the gRPC server should handle rpc calls on.
Default: ':5051'.
func (*ManagerOpts) WithGrpcServerOpts ¶
func (opts *ManagerOpts) WithGrpcServerOpts( grpcOpts ...grpc.ServerOption, ) *ManagerOpts
WithGrpcServerOpts stores server opts to create the gRPC server with. When called multiple times, new grpc.ServerOption values are added to list, rather than replacing values from the previous call.
Default: nil.
func (*ManagerOpts) WithGrpcStreamServerMiddleware ¶ added in v0.0.4
func (opts *ManagerOpts) WithGrpcStreamServerMiddleware( middleware ...pkmiddleware.StreamServerMiddleware, )
WithGrpcStreamServerMiddleware adds stream server middlewares for the gRPC server.
Default: nil.
func (*ManagerOpts) WithGrpcUnaryServerMiddleware ¶ added in v0.0.4
func (opts *ManagerOpts) WithGrpcUnaryServerMiddleware( middleware ...pkmiddleware.UnaryServerMiddleware, ) *ManagerOpts
WithGrpcUnaryServerMiddleware adds unary server middlewares for the gRPC server.
func (*ManagerOpts) WithLogger ¶
func (opts *ManagerOpts) WithLogger(logger zerolog.Logger) *ManagerOpts
WithLogger sets a zerolog.Logger to use for the manger.
Default: Global Logger.
func (*ManagerOpts) WithMaxShutdownDuration ¶
func (opts *ManagerOpts) WithMaxShutdownDuration(max time.Duration) *ManagerOpts
WithMaxShutdownDuration sets the maximum duration the manager should allow for the services and resources to shut down gracefully before cancelling the shutdown context and force-exiting.
func (*ManagerOpts) WithoutGrpcLogging ¶ added in v0.0.4
func (opts *ManagerOpts) WithoutGrpcLogging() *ManagerOpts
WithoutGrpcLogging keeps the logging middleware from being added to the gRPC server.
type ManagerTesting ¶
type ManagerTesting struct {
// contains filtered or unexported fields
}
ManagerTesting exposes testing methods for testing a service manager. These methods are not safe to use in production code.
func (ManagerTesting) GrpcClientConn ¶
func (tester ManagerTesting) GrpcClientConn( cleanup bool, opts ...grpc.DialOption, ) *grpc.ClientConn
GrpcClientConn generates a grpc.ClientConn with the passed opts connected to the gRPC server address in the manager options.
If Dialing the server results in an error, an error will be added to the test and t.FailNow() is called to exit the test immediately.
If cleanup is set to true then a cleanup function will be registered that closes the connection on test completion.
If an error generator was passed to the manager for server interceptors, the corresponding client interceptors will be added to the grpc.ClientConn.
func (ManagerTesting) GrpcPingClient ¶
func (tester ManagerTesting) GrpcPingClient( cleanup bool, opts ...grpc.DialOption, ) PingClient
GrpcPingClient returns a PingClient connected to the PingServer running on the gRPC server. If ManagerOpts.WithGrpcPingService was set to false, an error will be logged to the test and t.FailNow() will be called,
If cleanup is set to true then a cleanup function will be registered that closes the underlying clientConn on test completion.
func (ManagerTesting) PingGrpcServer ¶
func (tester ManagerTesting) PingGrpcServer(ctx context.Context)
PingGrpcServer will continually ping the gRPC server's PingServer.Ping method until a connection is established or the passed context times out. All errors will be ignored and ping will be tried again on failure.
If this method returns, the server is up an running and ready to take requests.
If ctx expires, the ctx.Error() will be logged and FailNow() called on the test.
If ctx is nil, a default 3-second context will be used.
func (ManagerTesting) SendSignal ¶
func (tester ManagerTesting) SendSignal(osSignal os.Signal)
SendSignal sends an os.Signal to the Manager to test shutdown via host signal.
func (ManagerTesting) Services ¶
func (tester ManagerTesting) Services() []Service
Services returns the list of registered services. This method is for testing purposes only and returned values should not be modified unless you know what you are doing!
type PanicError ¶
type PanicError = pkerr.PanicError
PanicError is a type alias for pkerr.PanicError.
type PingClient ¶
type PingClient = protogen.PingClient
PingClient is a type alias to protogen.PingClient
type PingServer ¶
type PingServer = protogen.PingServer
PingServer is a type alias to protogen.PingServer
type Service ¶
type Service interface { // Id should return a unique, but human readable id for the service. This id is // used for both logging and ServiceError context. If two services share the same // id, the manger will return an error. Id() string // Setup is called before Run to set up any resources the service requires. // // resourcesCtx will be cancelled AFTER Run returns to signal that all the main // process has finished, and it is safe to release resources. // // resourcesReleased should be incremented and decremented by individual resources, // and the Manager will block on it until the context passed to Shutdown cancels. // // shutdownCtx is cancelled when the graceful shutdown limit has been reached and // resources should be released immediately for a forced shutdown. // // logger is a zerolog.Logger with a // zerolog.Logger.WithString("SERVICE", [Service.Id()]) entry already on it. Setup( resourcesCtx context.Context, resourcesReleased *sync.WaitGroup, shutdownCtx context.Context, logger zerolog.Logger, ) error }
Service is an interface that a service must implement to be run by the Manager
type ServiceError ¶
type ServiceError struct { // ServiceId is the return of the service's Service.Id method. ServiceId string // The error the service threw. Err error }
ServiceError reports an error from a specific service.
func (ServiceError) Unwrap ¶
func (err ServiceError) Unwrap() error
Unwrap implements xerrors.Wrapper and returns the underlying error.
type ServicesErrors ¶
type ServicesErrors struct {
Errs []error
}
ServicesErrors stores errors from multiple services as a single error.
func (ServicesErrors) Error ¶
func (err ServicesErrors) Error() string
Error implements builtins.errors, and reports the number of errors.