services

package
v1.3.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2020 License: Apache-2.0 Imports: 6 Imported by: 30

README

Services

This is a Go implementation of services model from Google Guava library.

It provides Service interface (with implementation in BasicService type) and Manager for managing group of services at once.

Main benefits of this model are:

  • Services have well-defined explicit states. Services are not supposed to start any work until they are started, and they are supposed to enter Running state only if they have successfully done all initialization in Starting state.
  • States are observable by clients. Client can not only see the state, but also wait for Running or Terminated state.
  • If more observability is needed, clients can register state listeners.
  • Service startup and shutdown is done asynchronously. This allows for nice parallelization of startup or shutdown of multiple services.
  • Services that depend on each other can simply wait for other service to be in correct state before using it.

Service interface

As the user of the service, here is what you need to know: Each service starts in New state. In this state, service is not yet doing anything. It is only instantiated, and ready to be started.

Service is started by calling its StartAsync method. This will make service transition to Starting state, and eventually to Running state, if starting is successful. Starting is done asynchronously, so that client can do other work while service is starting, for example start more services.

Service spends most of its time in Running state, in which it provides it services to the clients. What exactly it does depends on service itself. Typical examples include responding to HTTP requests, running periodic background tasks, etc.

Clients can stop the service by calling StopAsync, which tells service to stop. Service will transition to Stopping state (in which it does the necessary cleanup) and eventually Terminated state. If service fails in its Starting, Running or Stopping state, it will end up in Failed state instead of Terminated.

Once service is in Terminated or Failed state, it cannot be restarted, these states are terminal.

Full state diagram:

   ┌────────────────────────────────────────────────────────────────────┐
   │                                                                    │
   │                                                                    ▼
┌─────┐      ┌──────────┐      ┌─────────┐     ┌──────────┐      ┌────────────┐
│ New │─────▶│ Starting │─────▶│ Running │────▶│ Stopping │───┬─▶│ Terminated │
└─────┘      └──────────┘      └─────────┘     └──────────┘   │  └────────────┘
                   │                                          │
                   │                                          │
                   │                                          │   ┌────────┐
                   └──────────────────────────────────────────┴──▶│ Failed │
                                                                  └────────┘

API and states and semantics are implemented to correspond to Service class in Guava library.

Manager

Multiple services can be managed via Manager (corresponds to ServiceManager in Guava library). Manager is initialized with list of New services. It can start the services, and wait until all services are running (= "Healthy" state). Manager can also be stopped – which triggers stopping of all its services. When all services are in their terminal states (Terminated or Final), manager is said to be Stopped.

Implementing custom Service

As a developer who wants to implement your own service, there are several possibilities.

Using NewService

The easiest possible way to create a service is using NewService function with three functions called StartingFn, RunningFn and StoppingFn. Returned service will be in New state. When it transitions to Starting state (by calling StartAsync), StartingFn is called. When StartingFn finishes with no error, service transitions to Running state and RunningFn is called. When RunningFn finishes, services transitions to Stopping state, and StoppingFn is called. After StoppingFn is done, service ends in Terminated state (if none of the functions returned error), or Failed state, if there were errors.

Any of the functions can be nil, in which case service simply moves to the next state.

Using NewIdleService

"Idle" service is a service which needs to run custom code during Starting or Stopping state, but not in Running state. Service will remain in Running state until explicitly stopped via StopAsync.

Example usage is a service that registers some HTTP or gRPC handlers.

Using NewTimerService

Timer service runs supplied function on every tick. If this function returns error, service will fail. Otherwise service will continue calling supplied function until stopped via StopAsync.

Using BasicService struct

All previous options use BasicService type internally, and it is BasicService which implements semantics of Service interface. This struct can also be embedded into custom struct, and then initialized with starting/running/stopping functions via InitBasicService:

type exampleService struct {
	*BasicService

	log []string
	ch  chan string
}

func newExampleServ() *exampleService {
	s := &exampleService{
		ch: make(chan string),
	}
    s.BasicService = NewBasicService(nil, s.collect, nil) // StartingFn, RunningFn, StoppingFn
	return s
}

// used as Running function. When service is stopped, context is canceled, so we react on it.
func (s *exampleService) collect(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return nil
		case msg := <-s.ch:
			s.log = append(s.log, msg)
		}
	}
}

// External method called by clients of the Service.
func (s *exampleService) Send(msg string) bool {
	ctx := s.ServiceContext() // provided by BasicService. Not part of Service interface.
	if ctx == nil {
		// Service is not yet started
		return false
	}
	select {
	case s.ch <- msg:
		return true
	case <-ctx.Done():
		// Service is not running anymore.
		return false
	}
}

Now serv is a service that can be started, observed for state changes, or stopped. As long as service is in Running state, clients can call its Send method:

s := newServ()
s.StartAsync(context.Background())
s.AwaitRunning(context.Background())
// now collect() is running
s.Send("A")
s.Send("B")
s.Send("C")
s.StopAsync()
s.AwaitTerminated(context.Background())
// now service is finished, and we can access s.log

After service is stopped (in Terminated or Failed state, although here the "running" function doesn't return error, so only Terminated state is possible), all collected messages can be read from log field. Notice that no further synchronization is necessary in this case... when service is stopped and client has observed that via AwaitTerminated, any access to log is safe.

(This example is adapted from unit tests in basic_service_test.go)

This may seem like a lot of extra code, and for such a simple usage it probably is. Real benefit comes when one starts combining multiple services into a manager, observe them as a group, or let services depend on each other via Await methods.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func StartAndAwaitRunning

func StartAndAwaitRunning(ctx context.Context, service Service) error

StartAndAwaitRunning starts the service, and then waits until it reaches Running state. If service fails to start, its failure case is returned. Service must be in New state when this function is called.

Notice that context passed to the service for starting is the same as context used for waiting! If you need these contexts to be different, please use StartAsync and AwaitRunning directly.

func StartManagerAndAwaitHealthy

func StartManagerAndAwaitHealthy(ctx context.Context, manager *Manager) error

StartManagerAndAwaitHealthy starts the manager (which in turns starts all services managed by it), and then waits until it reaches Running state. All services that this manager manages must be in New state, otherwise starting will fail.

Notice that context passed to the manager for starting its services is the same as context used for waiting!

func StopAndAwaitTerminated

func StopAndAwaitTerminated(ctx context.Context, service Service) error

StopAndAwaitTerminated asks service to stop, and then waits until service reaches Terminated or Failed state. If service ends in Terminated state, this function returns error. On Failed state, it returns the failure case. Other errors are possible too (eg. if context stops before service does).

func StopManagerAndAwaitStopped

func StopManagerAndAwaitStopped(ctx context.Context, manager *Manager) error

StopManagerAndAwaitStopped asks manager to stop its services, and then waits until manager reaches the stopped state or context is stopped.

Types

type BasicService

type BasicService struct {
	// contains filtered or unexported fields
}

BasicService implements contract of Service interface, using three supplied functions: StartingFn, RunningFn and StoppingFn. When service is started, these three functions are called as service transitions to Starting, Running and Stopping state.

Since they are called sequentially, they don't need to synchronize access on the state. (In other words: StartingFn happens-before RunningFn, RunningFn happens-before StoppingFn).

All three functions are called at most once. If they are nil, they are not called and service transitions to the next state.

Context passed to StartingFn and RunningFn function is canceled when StopAsync() is called, or service enters Stopping state. This context can be used to start additional tasks from inside StartingFn or RunningFn. Same context is available via ServiceContext() method (not part of Service interface).

Possible orders of how functions are called:

* 1. StartingFn -- if StartingFn returns error, no other functions are called.

* 1. StartingFn, 2. StoppingFn -- StartingFn doesn't return error, but StopAsync is called while running StartingFn, or context is canceled from outside while StartingFn still runs.

* 1. StartingFn, 2. RunningFn, 3. StoppingFn -- this is most common, when StartingFn doesn't return error, service is not stopped and context isn't stopped externally while running StartingFn.

func NewBasicService

func NewBasicService(start StartingFn, run RunningFn, stop StoppingFn) *BasicService

Returns service built from three functions (using BasicService).

func NewIdleService

func NewIdleService(up StartingFn, down StoppingFn) *BasicService

Initializes basic service as an "idle" service -- it doesn't do anything in its Running state, but still supports all state transitions.

func (*BasicService) AddListener

func (b *BasicService) AddListener(listener Listener)

AddListener is part of Service interface.

func (*BasicService) AwaitRunning

func (b *BasicService) AwaitRunning(ctx context.Context) error

AwaitRunning is part of Service interface.

func (*BasicService) AwaitTerminated

func (b *BasicService) AwaitTerminated(ctx context.Context) error

AwaitTerminated is part of Service interface.

func (*BasicService) FailureCase

func (b *BasicService) FailureCase() error

FailureCase is part of Service interface.

func (*BasicService) ServiceContext

func (b *BasicService) ServiceContext() context.Context

Returns context that this service uses internally for controlling its lifecycle. It is the same context that is passed to Starting and Running functions, and is based on context passed to the service via StartAsync.

Before service enters Starting state, there is no context. This context is stopped when service enters Stopping state.

This can be useful in code, that embeds BasicService and wants to provide additional methods to its clients.

Example:

func (s *exampleService) Send(msg string) bool {
	ctx := s.ServiceContext()
	if ctx == nil {
		// Service is not yet started
		return false
	}
	select {
	case s.ch <- msg:
		return true
	case <-ctx.Done():
		// Service is not running anymore.
		return false
	}
}

This is not part of Service interface, and clients of the Service should not use it.

func (*BasicService) StartAsync

func (b *BasicService) StartAsync(parentContext context.Context) error

StartAsync is part of Service interface.

func (*BasicService) State

func (b *BasicService) State() State

State is part of Service interface.

func (*BasicService) StopAsync

func (b *BasicService) StopAsync()

StopAsync is part of Service interface.

type FailureWatcher added in v1.0.0

type FailureWatcher struct {
	// contains filtered or unexported fields
}

FailureWatcher waits for service failures, and passed them to the channel.

func NewFailureWatcher added in v1.0.0

func NewFailureWatcher() *FailureWatcher

func (*FailureWatcher) Chan added in v1.0.0

func (w *FailureWatcher) Chan() <-chan error

Returns channel for this watcher. If watcher is nil, returns nil channel. Errors returned on the channel include failure case and service description.

func (*FailureWatcher) WatchManager added in v1.0.0

func (w *FailureWatcher) WatchManager(manager *Manager)

func (*FailureWatcher) WatchService added in v1.0.0

func (w *FailureWatcher) WatchService(service Service)

type Listener

type Listener interface {
	// Called when the service transitions from NEW to STARTING.
	Starting()

	// Called when the service transitions from STARTING to RUNNING.
	Running()

	// Called when the service transitions to the STOPPING state.
	Stopping(from State)

	// Called when the service transitions to the TERMINATED state.
	Terminated(from State)

	// Called when the service transitions to the FAILED state.
	Failed(from State, failure error)
}

Listener receives notifications about Service state changes.

func NewListener

func NewListener(starting, running func(), stopping, terminated func(from State), failed func(from State, failure error)) Listener

NewListener provides a simple way to build service listener from supplied functions. Functions are only called when not nil.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Service Manager is initialized with a collection of services. They all must be in New state. Service manager can start them, and observe their state as a group. Once all services are running, Manager is said to be Healthy. It is possible for manager to never reach the Healthy state, if some services fail to start. When all services are stopped (Terminated or Failed), manager is Stopped.

func NewManager

func NewManager(services ...Service) (*Manager, error)

NewManager creates new service manager. It needs at least one service, and all services must be in New state.

func (*Manager) AddListener

func (m *Manager) AddListener(listener ManagerListener)

Registers a ManagerListener to be run when this Manager changes state. The listener will not have previous state changes replayed, so it is suggested that listeners are added before any of the managed services are started.

AddListener guarantees execution ordering across calls to a given listener but not across calls to multiple listeners. Specifically, a given listener will have its callbacks invoked in the same order as the underlying service enters those states. Additionally, at most one of the listener's callbacks will execute at once. However, multiple listeners' callbacks may execute concurrently, and listeners may execute in an order different from the one in which they were registered.

func (*Manager) AwaitHealthy

func (m *Manager) AwaitHealthy(ctx context.Context) error

Waits for the ServiceManager to become healthy. Returns nil, if manager is healthy, error otherwise (eg. manager is in a state in which it cannot get healthy anymore).

func (*Manager) AwaitStopped

func (m *Manager) AwaitStopped(ctx context.Context) error

Waits for the ServiceManager to become stopped. Returns nil, if manager is stopped, error when context finishes earlier.

func (*Manager) IsHealthy

func (m *Manager) IsHealthy() bool

Returns true if all services are currently in the Running state.

func (*Manager) IsStopped

func (m *Manager) IsStopped() bool

Returns true if all services are in terminal state (Terminated or Failed)

func (*Manager) ServicesByState

func (m *Manager) ServicesByState() map[State][]Service

Provides a snapshot of the current state of all the services under management.

func (*Manager) StartAsync

func (m *Manager) StartAsync(ctx context.Context) error

Initiates service startup on all the services being managed. It is only valid to call this method if all of the services are New.

func (*Manager) StopAsync

func (m *Manager) StopAsync()

Initiates service shutdown if necessary on all the services being managed.

type ManagerListener

type ManagerListener interface {
	// Called when Manager reaches Healthy state (all services Running)
	Healthy()

	// Called when Manager reaches Stopped state (all services are either Terminated or Failed)
	Stopped()

	// Called when service fails.
	Failure(service Service)
}

ManagerListener listens for events from Manager.

func NewManagerListener

func NewManagerListener(healthy, stopped func(), failure func(service Service)) ManagerListener

NewManagerListener provides a simple way to build manager listener from supplied functions. Functions will only be called when not nil.

type OneIteration

type OneIteration func(ctx context.Context) error

One iteration of the timer service. Called repeatedly until service is stopped, or this function returns error in which case, service will fail.

type RunningFn

type RunningFn func(serviceContext context.Context) error

RunningFn function is called when service enters Running state. When it returns, service will move to Stopping state. If RunningFn or Stopping return error, Service will end in Failed state, otherwise if both functions return without error, service will end in Terminated state.

type Service

type Service interface {
	// Starts Service asynchronously. Service must be in New State, otherwise error is returned.
	// Context is used as a parent context for service own context.
	StartAsync(ctx context.Context) error

	// Waits until service gets into Running state.
	// If service is in New or Starting state, this method is blocking.
	// If service is already in Running state, returns immediately with no error.
	// If service is in a state, from which it cannot get into Running state, error is returned immediately.
	AwaitRunning(ctx context.Context) error

	// Tell the service to stop. This method doesn't block and can be called multiple times.
	// If Service is New, it is Terminated without having been started nor stopped.
	// If Service is in Starting or Running state, this initiates shutdown and returns immediately.
	// If Service has already been stopped, this method returns immediately, without taking action.
	StopAsync()

	// Waits for the service to reach Terminated or Failed state. If service is already in one of these states,
	// when method is called, method returns immediately.
	// If service enters Terminated state, this method returns nil.
	// If service enters Failed state, or context is finished before reaching Terminated or Failed, error is returned.
	AwaitTerminated(ctx context.Context) error

	// If Service is in Failed state, this method returns the error.
	// If Service is not in Failed state, this method returns nil.
	FailureCase() error

	// Returns current state of the service.
	State() State

	// AddListener adds listener to this service. Listener will be notified on subsequent state transitions
	// of the service. Previous state transitions are not replayed, so it is suggested to add listeners before
	// service is started.
	//
	// AddListener guarantees execution ordering across calls to a given listener but not across calls to
	// multiple listeners. Specifically, a given listener will have its callbacks invoked in the same order
	// as the service enters those states. Additionally, at most one of the listener's callbacks will execute
	// at once. However, multiple listeners' callbacks may execute concurrently, and listeners may execute
	// in an order different from the one in which they were registered.
	AddListener(listener Listener)
}

Service defines interface for controlling a service.

State diagram for the service:

   ┌────────────────────────────────────────────────────────────────────┐
   │                                                                    │
   │                                                                    ▼
┌─────┐      ┌──────────┐      ┌─────────┐     ┌──────────┐      ┌────────────┐
│ New │─────▶│ Starting │─────▶│ Running │────▶│ Stopping │───┬─▶│ Terminated │
└─────┘      └──────────┘      └─────────┘     └──────────┘   │  └────────────┘
                   │                                          │
                   │                                          │
                   │                                          │   ┌────────┐
                   └──────────────────────────────────────────┴──▶│ Failed │
                                                                  └────────┘
Example
package main

import (
	"context"
	"fmt"
)

type exampleService struct {
	*BasicService

	log []string
	ch  chan string
}

func newExampleServ() *exampleService {
	s := &exampleService{
		ch: make(chan string),
	}
	s.BasicService = NewBasicService(nil, s.collect, nil) // StartingFn, RunningFn, StoppingFn
	return s
}

// used as Running function. When service is stopped, context is canceled, so we react on it.
func (s *exampleService) collect(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return nil
		case msg := <-s.ch:
			s.log = append(s.log, msg)
		}
	}
}

// External method called by clients of the Service.
func (s *exampleService) Send(msg string) bool {
	ctx := s.ServiceContext()
	if ctx == nil {
		// Service is not yet started
		return false
	}
	select {
	case s.ch <- msg:
		return true
	case <-ctx.Done():
		// Service is not running anymore.
		return false
	}
}

func main() {
	es := newExampleServ()
	es.Send("first") // ignored, as service is not running yet

	_ = es.StartAsync(context.Background())
	_ = es.AwaitRunning(context.Background())

	es.Send("second")

	es.StopAsync()
	_ = es.AwaitTerminated(context.Background())

	es.Send("third") // ignored, service is now stopped

	fmt.Println(es.log)
}
Output:

[second]

func NewTimerService

func NewTimerService(interval time.Duration, start StartingFn, iter OneIteration, stop StoppingFn) Service

Runs iteration function on every interval tick. When iteration returns error, service fails.

type StartingFn

type StartingFn func(serviceContext context.Context) error

StartingFn is called when service enters Starting state. If StartingFn returns error, service goes into Failed state. If StartingFn returns without error, service transitions into Running state (unless context has been canceled).

serviceContext is a context that is finished at latest when service enters Stopping state, but can also be finished earlier when StopAsync is called on the service. This context is derived from context passed to StartAsync method.

type State

type State int

State of the service. See Service interface for full state diagram.

const (
	New        State = iota // Service is new, not running yet. Initial state.
	Starting                // Service is starting. If starting succeeds, service enters Running state.
	Running                 // Service is fully running now. When service stops running, it enters Stopping state.
	Stopping                // Service is shutting down
	Terminated              // Service has stopped successfully. Terminal state.
	Failed                  // Service has failed in Starting, Running or Stopping state. Terminal state.
)

func (State) String

func (s State) String() string

type StoppingFn

type StoppingFn func(failureCase error) error

StoppingFn function is called when service enters Stopping state. When it returns, service moves to Terminated or Failed state, depending on whether there was any error returned from previous RunningFn (if it was called) and this StoppingFn function. If both return error, RunningFn's error will be saved as failure case for Failed state. Parameter is error from Running function, or nil if there was no error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL