rxd

package module
v1.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

README

RxDaemon (RxD)

codecov

A simple (alpha) reactive services daemon

NOTE: RxD has been through another refactor to make it more flexible with the introduction of injectable "Service Managers" which are effectively supervisors of a given service. Intracom (pub/sub package) has become a subpackage of rxd.

RxD leverages Intracom for internal comms which allows for the ability of your individual RxD services to subscribe interest to the lifecycle states of other RxD services running alongside each other. This means each service can be notified independently of what state another service is in. You can ultimately have any service "watch and react" to a state change of another service.

A good example to imagine here would be something like ServiceA that has subscribed interest in another service, ServiceB, where ServiceB happens to be a health check service. It might maintain a live TCP connection to an external service, run interval queries against a database engine, or health check a docker container, it doesnt really matter. The goal here is to NOT have alot of services individually doing their own health checks against the same resource because the more services you have the more checks you are potentially doing against the same resource which might be creating socket connections or doing file I/O not to mention the potential code duplication lines for each service to do their own check. Why not instead, write a service that can do the main logic of the check then signal using its own lifecycle states to anyone who is interested in that health check logic. RxD gives us that ability.

Note about service managers like Systemd

In rxd when setting the WithReportAlive option on the daemon, this effectively causes the daemon during Start to launch a routine to interact with the underlying system service manager to report in with alive checks. For now this is purely meant for Systemd. This does also require you to set the appropriate configurations in your .service file. You would be required by systemd to set the WatchdogSec property.

This WatchdogSec property should be less for rxd because this is the amount of time systemd will wait to hear from your running daemon. After this time if your service has not reported in, systemd will consider it frozen/hung and will make attempts to stop or restart it.

By default, systemd has notify turned off. So it is possible to just not set or set a zero-value for the UsingReportAlive which will disable rxds notifier, by default RxD leaves this disabled.

[Unit]
Description=My Notifying Service

[Service]
Type=notify # WatchdogSec is required if this is set.
ExecStart=/path/to/my-service
NotifyAccess=main
WatchdogSec=10s  # Service must send a watchdog notification every 10 seconds, required it Type=notify is set.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NoFilter = ServiceFilter{Mode: None, Names: map[string]struct{}{}}

Functions

This section is empty.

Types

type CommandHandler added in v1.2.0

type CommandHandler struct {
	// contains filtered or unexported fields
}

func (CommandHandler) ChangeLogLevel added in v1.2.0

func (h CommandHandler) ChangeLogLevel(level log.Level, resp *error) error

type Daemon added in v1.2.0

type Daemon interface {
	AddServices(services ...Service) error
	AddService(service Service) error
	Start(ctx context.Context) error
}

func NewDaemon

func NewDaemon(name string, options ...DaemonOption) Daemon

NewDaemon creates and return an instance of the reactive daemon NOTE: The service logger runs with a default stdout logger. This can be optionally changed by passing the WithServiceLogger option in NewDaemon The internal logger is disabled by default and can be enabled by passing the WithInternalLogger option in NewDaemon

func NewDaemonWithLogger added in v1.2.1

func NewDaemonWithLogger(name string, logger log.Logger, options ...DaemonOption) Daemon

NewDaemonWithLogger creates and return an instance of the reactive daemon with a custom service logger NOTE: this can also be set by passing the WithServiceLogger option in NewDaemon This is to support the old pattern of creating a daemon with a custom service logger.

type DaemonLog added in v1.2.0

type DaemonLog struct {
	Level   log.Level
	Message string
	Fields  []log.Field
}

func (DaemonLog) String added in v1.2.0

func (l DaemonLog) String() string

type DaemonOption added in v1.2.0

type DaemonOption func(*daemon)

func WithCustomPrestartPipeline added in v1.2.1

func WithCustomPrestartPipeline(prestart Pipeline) DaemonOption

func WithInternalLogger added in v1.2.0

func WithInternalLogger(logger log.Logger) DaemonOption

WithInternalLogger sets a custom logger for the daemon to use for internal logging. by default, the daemon will use a noop logger since this logger is used for rxd internals.

func WithInternalLogging added in v1.2.0

func WithInternalLogging(filepath string, level log.Level) DaemonOption

WithInternalLogging enables the internal logger to write to the filepath using the provided log level.

func WithLogWorkerCount added in v1.2.1

func WithLogWorkerCount(count int) DaemonOption

func WithPrestart added in v1.2.1

func WithPrestart(conf PrestartConfig, stages ...Stage) DaemonOption

func WithRPC added in v1.2.0

func WithRPC(cfg RPCConfig) DaemonOption

WithRPC enables an RPC server to run alongside the daemon. The RPC server will be available at the provided address and port. Currently the RPC server only supports a single method to change log level. An RPC client is provided in the pkg/rxrpc package for external use.

func WithReportAlive added in v1.2.0

func WithReportAlive(timeoutSecs uint64) DaemonOption

WithReportAlive sets the interval in seconds for when the daemon should report that it is still alive to the service manager. If the value is set to 0, the daemon will not interact with the service manager.

func WithServiceLogger added in v1.2.0

func WithServiceLogger(logger log.Logger) DaemonOption

func WithSignals added in v1.2.0

func WithSignals(signals ...os.Signal) DaemonOption

WithSignals sets the OS signals that the daemon should listen for. If no signals are provided, the daemon will listen for SIGINT and SIGTERM by default.

type DaemonService added in v1.2.0

type DaemonService struct {
	Name   string
	Runner ServiceRunner
}

DaemonService is a struct that contains the Name of the service, the ServiceRunner this struct is what is passed into a Handler for the handler to decide how to interact with the service using the ServiceRunner.

type ErrUninitialized added in v1.2.0

type ErrUninitialized struct {
	StructName string
	Method     string
}

func (ErrUninitialized) Error added in v1.2.0

func (e ErrUninitialized) Error() string

type Error

type Error string
const (
	ErrDaemonStarted            Error = Error("daemon has already been started")
	ErrDuplicateServiceName     Error = Error("duplicate service name found")
	ErrNoServices               Error = Error("no services to run")
	ErrNoServiceName            Error = Error("no service name provided")
	ErrNilService               Error = Error("nil service provided")
	ErrDuplicateServicePolicy   Error = Error("duplicate service policy found")
	ErrAddingServiceOnceStarted Error = Error("cannot add a service once the daemon is started")
)

func (Error) Error added in v1.2.0

func (e Error) Error() string

type FilterMode added in v1.2.0

type FilterMode int
const (
	None FilterMode = iota
	Include
	Exclude
)

type ManagerOption added in v1.2.0

type ManagerOption func(m *RunContinuousManager)

func WithInitDelay added in v1.2.0

func WithInitDelay(delay time.Duration) ManagerOption

func WithTransitionTimeouts added in v1.2.0

func WithTransitionTimeouts(t ManagerStateTimeouts) ManagerOption

type ManagerStateTimeouts added in v1.2.0

type ManagerStateTimeouts map[State]time.Duration

type NotifyState added in v1.2.0

type NotifyState uint8
const (
	NotifyStateStopped NotifyState = iota
	NotifyStateStopping
	NotifyStateRestarting
	NotifyStateReloading
	NotifyStateReady
	NotifyStateAlive
)

func (NotifyState) String added in v1.2.0

func (s NotifyState) String() string

type Pipeline added in v1.2.1

type Pipeline interface {
	Add(stage Stage)
	Run(ctx context.Context) <-chan DaemonLog
}

func NewPrestartPipeline added in v1.2.1

func NewPrestartPipeline(conf PrestartConfig, stages ...Stage) Pipeline

type PrestartConfig added in v1.2.1

type PrestartConfig struct {
	RestartOnError bool
	RestartDelay   time.Duration
}

type RPCConfig added in v1.2.0

type RPCConfig struct {
	Addr string
	Port uint16
}

type RPCServer added in v1.2.0

type RPCServer struct {
	// contains filtered or unexported fields
}

func NewRPCHandler added in v1.2.0

func NewRPCHandler(cfg RPCConfig) (*RPCServer, error)

func (*RPCServer) Start added in v1.2.0

func (s *RPCServer) Start() error

func (*RPCServer) Stop added in v1.2.0

func (s *RPCServer) Stop() error

type RunContinuousManager added in v1.2.0

type RunContinuousManager struct {
	DefaultDelay  time.Duration
	StartupDelay  time.Duration
	StateTimeouts ManagerStateTimeouts
}

RunContinuousManager is a service handler that does its best to run the service moving the service to the next desired state returned from each lifecycle The handle will override the state transition if the context is cancelled and force the service to Exit.

func NewDefaultManager added in v1.2.0

func NewDefaultManager(opts ...ManagerOption) RunContinuousManager

func (RunContinuousManager) Manage added in v1.2.0

func (m RunContinuousManager) Manage(sctx ServiceContext, ds DaemonService, updateC chan<- StateUpdate)

RunContinuousManager runs the service continuously until the context is cancelled. service contains the service runner that will be executed. which is then handled by the daemon.

type RunUntilSuccessManager added in v1.2.1

type RunUntilSuccessManager struct {
	StartupDelay time.Duration
	DefaultDelay time.Duration
}

func NewRunUntilSuccessManager added in v1.2.1

func NewRunUntilSuccessManager(defaultDelay, startupDelay time.Duration) RunUntilSuccessManager

NewRunUntilSuccessManager creates a new RunUntilSuccessManager with the provided startup delay. RunUntilSuccessManager will continue to try to run the service lifecycles until the service exits Run with nil error.

func (RunUntilSuccessManager) Manage added in v1.2.1

func (m RunUntilSuccessManager) Manage(sctx ServiceContext, ds DaemonService, updateC chan<- StateUpdate)

type Service

type Service struct {
	Name    string
	Runner  ServiceRunner
	Manager ServiceManager
}

Service is a struct that contains the Name of the service, the ServiceRunner and the ServiceHandler. This struct is what the caller uses to add a new service to the daemon. The daemon performs checks and translates this struct into a Service struct before starting it.

func NewService

func NewService(name string, runner ServiceRunner, opts ...ServiceOption) Service

type ServiceAction added in v1.2.0

type ServiceAction uint8
const (
	Entered ServiceAction = iota
	Exited
	Changed
	// Deprecated: In favor of using Entered
	Entering
	// Deprecated: In favor of using Exited
	Exiting
	// Deprecated: In favor of using Changed
	Changing
	NotIn // used for inverse matching of states
)

func (ServiceAction) String added in v1.2.0

func (s ServiceAction) String() string

type ServiceContext

type ServiceContext interface {
	context.Context
	ServiceWatcher
	ServiceLogger
	Name() string
	WithFields(fields ...log.Field) ServiceContext
	WithParent(ctx context.Context) (ServiceContext, context.CancelFunc)
	WithName(name string) (ServiceContext, context.CancelFunc)
}

type ServiceFilter added in v1.2.0

type ServiceFilter struct {
	Mode  FilterMode
	Names map[string]struct{}
}

func NewServiceFilter added in v1.2.0

func NewServiceFilter(mode FilterMode, names ...string) ServiceFilter

type ServiceLogger added in v1.2.0

type ServiceLogger interface {
	Log(level log.Level, message string, extra ...log.Field)
}

type ServiceManager added in v1.2.0

type ServiceManager interface {
	Manage(ctx ServiceContext, dService DaemonService, updateC chan<- StateUpdate)
}

ServiceManager interface defines the methods that a service handler must implement

type ServiceOption

type ServiceOption func(*Service)

func WithManager added in v1.2.0

func WithManager(manager ServiceManager) ServiceOption

type ServiceRunner added in v1.2.0

type ServiceRunner interface {
	Init(ServiceContext) error
	Idle(ServiceContext) error
	Run(ServiceContext) error
	Stop(ServiceContext) error
}

type ServiceStates added in v1.2.0

type ServiceStates map[string]State

type ServiceWatcher added in v1.2.0

type ServiceWatcher interface {
	WatchAllStates(ServiceFilter) (<-chan ServiceStates, context.CancelFunc)
	WatchAnyServices(action ServiceAction, target State, services ...string) (<-chan ServiceStates, context.CancelFunc)
	WatchAllServices(action ServiceAction, target State, services ...string) (<-chan ServiceStates, context.CancelFunc)
}

type Stage added in v1.2.1

type Stage struct {
	Name string
	Func StageFunc
}

type StageFunc added in v1.2.1

type StageFunc func(ctx context.Context) error

type State

type State uint8
const (
	StateExit State = iota
	StateInit
	StateIdle
	StateRun
	StateStop
)

func (State) String added in v1.1.2

func (s State) String() string

type StateUpdate added in v1.0.0

type StateUpdate struct {
	Name  string
	State State
}

StateUpdate reflects any given update of lifecycle state at a given time.

type States added in v1.0.0

type States map[string]State

States is a map of service name to service state which reflects the service name and its lifecycle state.

type StatesResponse added in v1.2.0

type StatesResponse struct {
	States ServiceStates
	Err    error
}

type SystemNotifier added in v1.2.0

type SystemNotifier interface {
	Start(ctx context.Context, logger log.Logger) error
	Notify(state NotifyState) error
}

func NewSystemdNotifier added in v1.2.0

func NewSystemdNotifier(socketName string, durationSecs uint64) (SystemNotifier, error)

Directories

Path Synopsis
examples
multi_service
For this example we will create two services that will run until they are stopped.
For this example we will create two services that will run until they are stopped.
single_service
For this example we will create a simple API service that will run until it is stopped either via the context timeout (30s) or an OS signal (SIGINT, SIGTERM).
For this example we will create a simple API service that will run until it is stopped either via the context timeout (30s) or an OS signal (SIGINT, SIGTERM).
log
pkg
rpc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL