ctrl

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 32 Imported by: 1

README

Controller library

This library provides a wrapper to enable Controllers interface with NATS and abstract away dealing with retrieving a Condition, execute the registered handler and publish status updates.

It covers,

  • Interfacing with Jetstream, KV and dealing with keep alive sematics
  • Listening for Conditions on either NATS directly or through an HTTPs endpoint provided by the Orchestator API
  • Publishes the first status before executing the Controller handler method
  • Invokes the Controller handler, passing it a Task object (Controllers don't know and don't care about Conditions)
  • Covers up for any failures or panics in the controller Handler
  • Collect trace telemetry and metrics for Conditions handled

Projects using ctrl

This Controller ctrl library is imported by Controllers interfacing with the Condition Orchestrator.

Documentation

Index

Constants

View Source
const (

	// condition status considered stale after this period
	StatusStaleThreshold = condition.StaleThreshold
	//  controller considered dead after this period
	LivenessStaleThreshold = condition.StaleThreshold
)

Variables

View Source
var (
	ErrHandlerInit   = errors.New("error initializing handler")
	ErrEmptyResponse = errors.New("empty response with error")

	ErrNoCondition = errors.New("no condition available")
)
View Source
var (
	// This error when returned by the callback indicates it needs to be retried
	ErrRetryHandler = errors.New("retry callback")
)

Functions

This section is empty.

Types

type ConditionHandlerFactory

type ConditionHandlerFactory func() TaskHandler

type ConditionState

type ConditionState int

ConditionState represents the various states a condition can be in during its lifecycle.

type ConditionStatusPublisher

type ConditionStatusPublisher interface {
	Publish(ctx context.Context, serverID string, state condition.State, status json.RawMessage, tsUpdateOnly bool) error
}

ConditionStatusPublisher defines an interface for publishing status updates for conditions.

func NewHTTPConditionStatusPublisher

func NewHTTPConditionStatusPublisher(
	appName string,
	serverID,
	conditionID uuid.UUID,
	conditionKind condition.Kind,
	orcQueryor orc.Queryor,
	logger *logrus.Logger,
) ConditionStatusPublisher

func NewNatsConditionStatusPublisher

func NewNatsConditionStatusPublisher(
	appName,
	conditionID,
	facilityCode string,
	conditionKind condition.Kind,
	controllerID registry.ControllerID,
	kvReplicas int,
	stream *events.NatsJetstream,
	logger *logrus.Logger,
) (ConditionStatusPublisher, error)

NewNatsConditionStatusPublisher creates a new NatsConditionStatusPublisher for a given condition ID.

It initializes a NATS KeyValue store for tracking condition statuses.

type ConditionStatusQueryor

type ConditionStatusQueryor interface {
	// ConditionState returns the current state of a condition based on its ID.
	ConditionState(conditionID string) ConditionState
}

ConditionStatusQueryor defines an interface for querying the status of a condition.

type ConditionTaskRepository

type ConditionTaskRepository interface {
	Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error
	Query(ctx context.Context) (*condition.Task[any, any], error)
}

ConditionTaskRepository defines an interface for storing and querying Task information for a condition.

func NewHTTPTaskRepository

func NewHTTPTaskRepository(
	appName string,
	serverID,
	conditionID uuid.UUID,
	conditionKind condition.Kind,
	orcQueryor orc.Queryor,
	logger *logrus.Logger,
) ConditionTaskRepository

func NewNatsConditionTaskRepository

func NewNatsConditionTaskRepository(
	conditionID,
	serverID,
	facilityCode string,
	conditionKind condition.Kind,
	controllerID registry.ControllerID,
	kvReplicas int,
	stream *events.NatsJetstream,
	logger *logrus.Logger,
) (ConditionTaskRepository, error)

Returns a NatsConditionTaskRepository to store a retrieve Task information for a condition.

It initializes a NATS KeyValue store for storing Task information for conditions. The conditionID is the TaskID.

type HTTPConditionStatusPublisher

type HTTPConditionStatusPublisher struct {
	// contains filtered or unexported fields
}

HTTPConditionStatusPublisher implements the StatusPublisher interface to publish condition status updates over HTTP to NATS.

func (*HTTPConditionStatusPublisher) Publish

func (s *HTTPConditionStatusPublisher) Publish(ctx context.Context, serverID string, state condition.State, status json.RawMessage, tsUpdateOnly bool) error

type HTTPController

type HTTPController struct {
	// contains filtered or unexported fields
}

HTTPController implements the TaskHandler interface to interact with the NATS queue, KV over HTTP(s)

func NewHTTPController

func NewHTTPController(
	appName,
	facilityCode string,
	serverID uuid.UUID,
	conditionKind condition.Kind,
	orcClientCfg *OrchestratorAPIConfig,
	options ...OptionHTTPController) (*HTTPController, error)

func (*HTTPController) Run

func (n *HTTPController) Run(ctx context.Context, handler TaskHandler) error

type HTTPTaskRepository

type HTTPTaskRepository struct {
	// contains filtered or unexported fields
}

HTTPTaskRepository implements the ConditionTaskRepository to Publish and Query Task information from NATS over HTTP.

func (*HTTPTaskRepository) Publish

func (h *HTTPTaskRepository) Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error

Publish implements the ConditionTaskRepository interface to record Task information in the Task KV.

func (*HTTPTaskRepository) Query

Query implements the ConditionTaskRepository interface to retrieve Task information from the Task KV.

type LivenessCheckin

type LivenessCheckin interface {
	StartLivenessCheckin(ctx context.Context)
	ControllerID() registry.ControllerID
}

func NewNatsLiveness

func NewNatsLiveness(
	cfg events.NatsOptions,
	stream events.Stream,
	l *logrus.Logger,
	hostname string,
	interval time.Duration,
) LivenessCheckin

NewNatsLiveness returns a NATS implementation of the LivenessCheckin interface

type MockConditionStatusPublisher

type MockConditionStatusPublisher struct {
	mock.Mock
}

MockConditionStatusPublisher is an autogenerated mock type for the ConditionStatusPublisher type

func NewMockConditionStatusPublisher

func NewMockConditionStatusPublisher(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockConditionStatusPublisher

NewMockConditionStatusPublisher creates a new instance of MockConditionStatusPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockConditionStatusPublisher) EXPECT

func (*MockConditionStatusPublisher) Publish

func (_m *MockConditionStatusPublisher) Publish(ctx context.Context, serverID string, state condition.State, status json.RawMessage, tsUpdateOnly bool) error

Publish provides a mock function with given fields: ctx, serverID, state, status, tsUpdateOnly

type MockConditionStatusPublisher_Expecter

type MockConditionStatusPublisher_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockConditionStatusPublisher_Expecter) Publish

func (_e *MockConditionStatusPublisher_Expecter) Publish(ctx interface{}, serverID interface{}, state interface{}, status interface{}, tsUpdateOnly interface{}) *MockConditionStatusPublisher_Publish_Call

Publish is a helper method to define mock.On call

  • ctx context.Context
  • serverID string
  • state condition.State
  • status json.RawMessage
  • tsUpdateOnly bool

type MockConditionStatusPublisher_Publish_Call

type MockConditionStatusPublisher_Publish_Call struct {
	*mock.Call
}

MockConditionStatusPublisher_Publish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Publish'

func (*MockConditionStatusPublisher_Publish_Call) Return

func (*MockConditionStatusPublisher_Publish_Call) Run

func (*MockConditionStatusPublisher_Publish_Call) RunAndReturn

type MockConditionStatusQueryor

type MockConditionStatusQueryor struct {
	mock.Mock
}

MockConditionStatusQueryor is an autogenerated mock type for the ConditionStatusQueryor type

func NewMockConditionStatusQueryor

func NewMockConditionStatusQueryor(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockConditionStatusQueryor

NewMockConditionStatusQueryor creates a new instance of MockConditionStatusQueryor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockConditionStatusQueryor) ConditionState

func (_m *MockConditionStatusQueryor) ConditionState(conditionID string) ConditionState

ConditionState provides a mock function with given fields: conditionID

func (*MockConditionStatusQueryor) EXPECT

type MockConditionStatusQueryor_ConditionState_Call

type MockConditionStatusQueryor_ConditionState_Call struct {
	*mock.Call
}

MockConditionStatusQueryor_ConditionState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ConditionState'

func (*MockConditionStatusQueryor_ConditionState_Call) Return

func (*MockConditionStatusQueryor_ConditionState_Call) Run

func (*MockConditionStatusQueryor_ConditionState_Call) RunAndReturn

type MockConditionStatusQueryor_Expecter

type MockConditionStatusQueryor_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockConditionStatusQueryor_Expecter) ConditionState

func (_e *MockConditionStatusQueryor_Expecter) ConditionState(conditionID interface{}) *MockConditionStatusQueryor_ConditionState_Call

ConditionState is a helper method to define mock.On call

  • conditionID string

type MockLivenessCheckin

type MockLivenessCheckin struct {
	mock.Mock
}

MockLivenessCheckin is an autogenerated mock type for the LivenessCheckin type

func NewMockLivenessCheckin

func NewMockLivenessCheckin(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockLivenessCheckin

NewMockLivenessCheckin creates a new instance of MockLivenessCheckin. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockLivenessCheckin) ControllerID

func (_m *MockLivenessCheckin) ControllerID() registry.ControllerID

ControllerID provides a mock function with given fields:

func (*MockLivenessCheckin) EXPECT

func (*MockLivenessCheckin) StartLivenessCheckin

func (_m *MockLivenessCheckin) StartLivenessCheckin(ctx context.Context)

StartLivenessCheckin provides a mock function with given fields: ctx

type MockLivenessCheckin_ControllerID_Call

type MockLivenessCheckin_ControllerID_Call struct {
	*mock.Call
}

MockLivenessCheckin_ControllerID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ControllerID'

func (*MockLivenessCheckin_ControllerID_Call) Return

func (*MockLivenessCheckin_ControllerID_Call) Run

func (*MockLivenessCheckin_ControllerID_Call) RunAndReturn

type MockLivenessCheckin_Expecter

type MockLivenessCheckin_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockLivenessCheckin_Expecter) ControllerID

ControllerID is a helper method to define mock.On call

func (*MockLivenessCheckin_Expecter) StartLivenessCheckin

func (_e *MockLivenessCheckin_Expecter) StartLivenessCheckin(ctx interface{}) *MockLivenessCheckin_StartLivenessCheckin_Call

StartLivenessCheckin is a helper method to define mock.On call

  • ctx context.Context

type MockLivenessCheckin_StartLivenessCheckin_Call

type MockLivenessCheckin_StartLivenessCheckin_Call struct {
	*mock.Call
}

MockLivenessCheckin_StartLivenessCheckin_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartLivenessCheckin'

func (*MockLivenessCheckin_StartLivenessCheckin_Call) Return

func (*MockLivenessCheckin_StartLivenessCheckin_Call) Run

func (*MockLivenessCheckin_StartLivenessCheckin_Call) RunAndReturn

type MockPublisher

type MockPublisher struct {
	mock.Mock
}

MockPublisher is an autogenerated mock type for the Publisher type

func NewMockPublisher

func NewMockPublisher(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockPublisher

NewMockPublisher creates a new instance of MockPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockPublisher) EXPECT

func (_m *MockPublisher) EXPECT() *MockPublisher_Expecter

func (*MockPublisher) Publish

func (_m *MockPublisher) Publish(ctx context.Context, task *condition.Task[interface{}, interface{}], tsUpdateOnly bool) error

Publish provides a mock function with given fields: ctx, task, tsUpdateOnly

type MockPublisher_Expecter

type MockPublisher_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockPublisher_Expecter) Publish

func (_e *MockPublisher_Expecter) Publish(ctx interface{}, task interface{}, tsUpdateOnly interface{}) *MockPublisher_Publish_Call

Publish is a helper method to define mock.On call

  • ctx context.Context
  • task *condition.Task[interface{},interface{}]
  • tsUpdateOnly bool

type MockPublisher_Publish_Call

type MockPublisher_Publish_Call struct {
	*mock.Call
}

MockPublisher_Publish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Publish'

func (*MockPublisher_Publish_Call) Return

func (*MockPublisher_Publish_Call) Run

func (_c *MockPublisher_Publish_Call) Run(run func(ctx context.Context, task *condition.Task[interface{}, interface{}], tsUpdateOnly bool)) *MockPublisher_Publish_Call

func (*MockPublisher_Publish_Call) RunAndReturn

func (_c *MockPublisher_Publish_Call) RunAndReturn(run func(context.Context, *condition.Task[interface{}, interface{}], bool) error) *MockPublisher_Publish_Call

type MockTaskHandler

type MockTaskHandler struct {
	mock.Mock
}

MockTaskHandler is an autogenerated mock type for the TaskHandler type

func NewMockTaskHandler

func NewMockTaskHandler(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTaskHandler

NewMockTaskHandler creates a new instance of MockTaskHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTaskHandler) EXPECT

func (*MockTaskHandler) HandleTask

func (_m *MockTaskHandler) HandleTask(ctx context.Context, task *condition.Task[interface{}, interface{}], statusPublisher Publisher) error

HandleTask provides a mock function with given fields: ctx, task, statusPublisher

type MockTaskHandler_Expecter

type MockTaskHandler_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTaskHandler_Expecter) HandleTask

func (_e *MockTaskHandler_Expecter) HandleTask(ctx interface{}, task interface{}, statusPublisher interface{}) *MockTaskHandler_HandleTask_Call

HandleTask is a helper method to define mock.On call

  • ctx context.Context
  • task *condition.Task[interface{},interface{}]
  • statusPublisher Publisher

type MockTaskHandler_HandleTask_Call

type MockTaskHandler_HandleTask_Call struct {
	*mock.Call
}

MockTaskHandler_HandleTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HandleTask'

func (*MockTaskHandler_HandleTask_Call) Return

func (*MockTaskHandler_HandleTask_Call) Run

func (_c *MockTaskHandler_HandleTask_Call) Run(run func(ctx context.Context, task *condition.Task[interface{}, interface{}], statusPublisher Publisher)) *MockTaskHandler_HandleTask_Call

func (*MockTaskHandler_HandleTask_Call) RunAndReturn

func (_c *MockTaskHandler_HandleTask_Call) RunAndReturn(run func(context.Context, *condition.Task[interface{}, interface{}], Publisher) error) *MockTaskHandler_HandleTask_Call

type MockeventStatusAcknowleger

type MockeventStatusAcknowleger struct {
	mock.Mock
}

MockeventStatusAcknowleger is an autogenerated mock type for the eventStatusAcknowleger type

func NewMockeventStatusAcknowleger

func NewMockeventStatusAcknowleger(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockeventStatusAcknowleger

NewMockeventStatusAcknowleger creates a new instance of MockeventStatusAcknowleger. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockeventStatusAcknowleger) EXPECT

type MockeventStatusAcknowleger_Expecter

type MockeventStatusAcknowleger_Expecter struct {
	// contains filtered or unexported fields
}

type MockeventStatusAcknowleger_complete_Call

type MockeventStatusAcknowleger_complete_Call struct {
	*mock.Call
}

MockeventStatusAcknowleger_complete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'complete'

func (*MockeventStatusAcknowleger_complete_Call) Return

func (*MockeventStatusAcknowleger_complete_Call) Run

func (*MockeventStatusAcknowleger_complete_Call) RunAndReturn

type MockeventStatusAcknowleger_inProgress_Call

type MockeventStatusAcknowleger_inProgress_Call struct {
	*mock.Call
}

MockeventStatusAcknowleger_inProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'inProgress'

func (*MockeventStatusAcknowleger_inProgress_Call) Return

func (*MockeventStatusAcknowleger_inProgress_Call) Run

func (*MockeventStatusAcknowleger_inProgress_Call) RunAndReturn

type MockeventStatusAcknowleger_nak_Call

type MockeventStatusAcknowleger_nak_Call struct {
	*mock.Call
}

MockeventStatusAcknowleger_nak_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'nak'

func (*MockeventStatusAcknowleger_nak_Call) Return

func (*MockeventStatusAcknowleger_nak_Call) Run

func (*MockeventStatusAcknowleger_nak_Call) RunAndReturn

type NatsConditionStatusPublisher

type NatsConditionStatusPublisher struct {
	// contains filtered or unexported fields
}

NatsConditionStatusPublisher implements the StatusPublisher interface to publish condition status updates using NATS.

func (*NatsConditionStatusPublisher) Publish

func (s *NatsConditionStatusPublisher) Publish(ctx context.Context, serverID string, state condition.State, status json.RawMessage, tsUpdateOnly bool) error

Publish implements the StatusPublisher interface. It serializes and publishes the current status of a condition to NATS.

type NatsConditionStatusQueryor

type NatsConditionStatusQueryor struct {
	// contains filtered or unexported fields
}

NatsConditionStatusQueryor implements ConditionStatusQueryor to query condition states using NATS.

func (*NatsConditionStatusQueryor) ConditionState

func (p *NatsConditionStatusQueryor) ConditionState(conditionID string) ConditionState

ConditionState queries the NATS KeyValue store to determine the current state of a condition.

type NatsConditionTaskRepository

type NatsConditionTaskRepository struct {
	// contains filtered or unexported fields
}

func (*NatsConditionTaskRepository) Publish

func (n *NatsConditionTaskRepository) Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error

Publish implements the ConditionTaskRepository interface

func (*NatsConditionTaskRepository) Query

Query implements the ConditionTaskRepository interface to return the Task for a Condition

type NatsController

type NatsController struct {
	// contains filtered or unexported fields
}

func NewNatsController

func NewNatsController(
	appName,
	facilityCode,
	subjectSuffix,
	natsURL,
	credsFile string,
	conditionKind condition.Kind,
	options ...Option) *NatsController

func (*NatsController) Connect

func (n *NatsController) Connect(ctx context.Context) error

Connect to NATS Jetstream and register as a controller.

func (*NatsController) FacilityCode

func (n *NatsController) FacilityCode() string

func (*NatsController) ID

func (n *NatsController) ID() string

func (*NatsController) ListenEvents

func (n *NatsController) ListenEvents(ctx context.Context, chf ConditionHandlerFactory) error

Handle events accepts a callback function to run when an event is fetched from the NATS JS.

- The caller is expected to enclose any work and error handling for the work within the callback function. - When the callback function returns no error, the event is marked as completed. - When the callback function returns an ErrRetryHandler error, the corresponding event is placed back on the queue to be retried.

func (*NatsController) NewNatsConditionStatusQueryor

func (n *NatsController) NewNatsConditionStatusQueryor() (*NatsConditionStatusQueryor, error)

NewNatsConditionStatusQueryor creates a new NatsConditionStatusQueryor instance, initializing a NATS KeyValue store for condition status queries.

type NatsLiveness

type NatsLiveness struct {
	// contains filtered or unexported fields
}

NatsLiveness provides methods to register and periodically check into the controller registry.

It implements the LivenessCheckin interface

func (*NatsLiveness) ControllerID

func (n *NatsLiveness) ControllerID() registry.ControllerID

Returns the controller ID for this instance

func (*NatsLiveness) StartLivenessCheckin

func (n *NatsLiveness) StartLivenessCheckin(ctx context.Context)

This starts a go-routine to peridically check in with the NATS kv

type Option

type Option func(*NatsController)

Option sets parameters on the NatsController

func WithConcurrency

func WithConcurrency(c int) Option

func WithConnectionTimeout

func WithConnectionTimeout(t time.Duration) Option

func WithHandlerTimeout

func WithHandlerTimeout(t time.Duration) Option

func WithKVReplicas

func WithKVReplicas(c int) Option

Set the number of replicates to keep for the

!! In a non-clustered NATS environment, set this value to 0.

func WithLogger

func WithLogger(logger *logrus.Logger) Option

type OptionHTTPController

type OptionHTTPController func(*HTTPController)

OptionHTTPController sets parameters on the HTTPController

func WithNATSHTTPLogger

func WithNATSHTTPLogger(logger *logrus.Logger) OptionHTTPController

Sets a logger on the controller

func WithOrchestratorClient

func WithOrchestratorClient(c orc.Queryor) OptionHTTPController

Sets the Orchestrator API queryor client

type OrchestratorAPIConfig

type OrchestratorAPIConfig struct {
	AuthDisabled         bool
	Endpoint             string
	OidcIssuerEndpoint   string
	OidcAudienceEndpoint string
	OidcClientSecret     string
	OidcClientID         string
	OidcClientScopes     []string
}

type Publisher

type Publisher interface {
	Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error
}

The Publisher interface wraps the Task and StatusValue publishers into one, such that the caller invokes Publish and this interface takes care of publishing the status and the Task.

Subsequently the Task updates is all that is to be published, replacing the statusValue updates.

func NewHTTPPublisher

func NewHTTPPublisher(
	appName string,
	serverID,
	conditionID uuid.UUID,
	conditionKind condition.Kind,
	orcQueryor orc.Queryor,
	logger *logrus.Logger) Publisher

func NewNatsPublisher

func NewNatsPublisher(
	appName,
	conditionID,
	serverID,
	facilityCode string,
	conditionKind condition.Kind,
	controllerID registry.ControllerID,
	kvReplicas int,
	stream *events.NatsJetstream,
	logger *logrus.Logger,
) (Publisher, error)

type PublisherHTTP

type PublisherHTTP struct {
	// contains filtered or unexported fields
}

func (*PublisherHTTP) Publish

func (p *PublisherHTTP) Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error

type PublisherNATS

type PublisherNATS struct {
	// contains filtered or unexported fields
}

func (*PublisherNATS) Publish

func (p *PublisherNATS) Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error

type QueryError

type QueryError struct {
	// contains filtered or unexported fields
}

func (*QueryError) Error

func (a *QueryError) Error() string

type TaskHandler

type TaskHandler interface {
	HandleTask(ctx context.Context, task *condition.Task[any, any], statusPublisher Publisher) error
}

TaskHandler is passed in by the caller to be invoked when a message from the Jetstream is received for processing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL