Documentation ¶
Index ¶
- Variables
- func CreateTables(ctx context.Context, conn *pgxpool.Pool) error
- func New() (*nats.Conn, nats.JetStreamContext, error)
- func NewRepository(db *pgxpool.Pool) repository
- func RecreateTables(ctx context.Context, conn *pgxpool.Pool) error
- type Dependency
- type DependencyContainer
- type Handler
- type Message
- type MessageOrchestration
- type MessageOrchestrationStep
- type MessageStatus
- type MockDependencyContainer
- type MockHandler
- type MockMessage
- func (_m *MockMessage) Call(ctx context.Context, key Dependency) (interface{}, error)
- func (_m *MockMessage) GetID() string
- func (_m *MockMessage) GetOrchestrationID() string
- func (_m *MockMessage) GetOrchestrationStep() string
- func (_m *MockMessage) MustGet(key Dependency) interface{}
- func (_m *MockMessage) ParsePayload(payload interface{}) error
- func (_m *MockMessage) RawPayload() []byte
- type MockStream
- func (_m *MockStream) CommandSubscriber(component string, method string, cb func(context.Context, Message) error) error
- func (_m *MockStream) Publish(ctx context.Context, message Message) error
- func (_m *MockStream) RunOrchestration(ctx context.Context, orchestration *MessageOrchestration) error
- func (_m *MockStream) Unsubscribe() error
- type NATSStream
- func (ns *NATSStream) CommandSubscriber(component, method string, cb func(context.Context, Message) error) error
- func (ns *NATSStream) Publish(ctx context.Context, msg Message) error
- func (ns *NATSStream) RunOrchestration(ctx context.Context, orchestration *MessageOrchestration) error
- func (ns *NATSStream) Unsubscribe() error
- type OrchestrationOutput
- type OrchestrationRunner
- type Stream
Constants ¶
This section is empty.
Variables ¶
var OrchestrationLifecycle = fx.Options( fx.Provide( func(jetstream nats.JetStreamContext, pool *pgxpool.Pool) (*OrchestrationRunner, error) { cfg := nats.ConsumerConfig{ Name: "orchestration-event", Durable: "orchestration-event", MaxDeliver: 1, } _, err := jetstream.AddConsumer("foreverbull", &cfg) if err != nil { return nil, fmt.Errorf("error adding consumer for orchestration: %w", err) } dc := NewDependencyContainer().(*dependencyContainer) stream := &NATSStream{module: "orchestration", jt: jetstream, repository: NewRepository(pool), deps: dc} return NewOrchestrationRunner(stream) }, ), fx.Invoke( func(lc fx.Lifecycle, orchestration *OrchestrationRunner) error { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return orchestration.Start() }, OnStop: func(ctx context.Context) error { return orchestration.Stop() }, }) return nil }), )
Functions ¶
func NewRepository ¶
Types ¶
type Dependency ¶
type Dependency string
const ( DBDep Dependency = "db" StreamDep Dependency = "stream" ContainerEngineDep Dependency = "container_engine" StorageDep Dependency = "storage" )
type DependencyContainer ¶
type DependencyContainer interface { AddMethod(key Dependency, f func(context.Context, Message) (interface{}, error)) AddSingleton(key Dependency, v interface{}) }
func NewDependencyContainer ¶
func NewDependencyContainer() DependencyContainer
type Message ¶
type Message interface { GetID() string GetOrchestrationID() string GetOrchestrationStep() string RawPayload() []byte ParsePayload(payload interface{}) error Call(ctx context.Context, key Dependency) (interface{}, error) MustGet(key Dependency) interface{} }
type MessageOrchestration ¶
type MessageOrchestration struct { Name string OrchestrationID string Steps []MessageOrchestrationStep FallbackStep *MessageOrchestrationStep }
func NewMessageOrchestration ¶
func NewMessageOrchestration(name string) *MessageOrchestration
func (*MessageOrchestration) AddStep ¶
func (mo *MessageOrchestration) AddStep(name string, commands []Message)
func (*MessageOrchestration) SettFallback ¶
func (mo *MessageOrchestration) SettFallback(commands []Message)
type MessageStatus ¶
type MessageStatus string
const ( MessageStatusCreated MessageStatus = "CREATED" MessageStatusPublished MessageStatus = "PUBLISHED" MessageStatusReceived MessageStatus = "RECEIVED" MessageStatusComplete MessageStatus = "COMPLETE" MessageStatusError MessageStatus = "ERROR" MessageStatusCanceled MessageStatus = "CANCELED" )
type MockDependencyContainer ¶
MockDependencyContainer is an autogenerated mock type for the DependencyContainer type
func NewMockDependencyContainer ¶
func NewMockDependencyContainer(t interface { mock.TestingT Cleanup(func()) }) *MockDependencyContainer
NewMockDependencyContainer creates a new instance of MockDependencyContainer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockDependencyContainer) AddMethod ¶
func (_m *MockDependencyContainer) AddMethod(key Dependency, f func(context.Context, Message) (interface{}, error))
AddMethod provides a mock function with given fields: key, f
func (*MockDependencyContainer) AddSingleton ¶
func (_m *MockDependencyContainer) AddSingleton(key Dependency, v interface{})
AddSingleton provides a mock function with given fields: key, v
type MockHandler ¶
MockHandler is an autogenerated mock type for the Handler type
func NewMockHandler ¶
func NewMockHandler(t interface { mock.TestingT Cleanup(func()) }) *MockHandler
NewMockHandler creates a new instance of MockHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
type MockMessage ¶
MockMessage is an autogenerated mock type for the Message type
func NewMockMessage ¶
func NewMockMessage(t interface { mock.TestingT Cleanup(func()) }) *MockMessage
NewMockMessage creates a new instance of MockMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockMessage) Call ¶
func (_m *MockMessage) Call(ctx context.Context, key Dependency) (interface{}, error)
Call provides a mock function with given fields: ctx, key
func (*MockMessage) GetID ¶
func (_m *MockMessage) GetID() string
GetID provides a mock function with given fields:
func (*MockMessage) GetOrchestrationID ¶
func (_m *MockMessage) GetOrchestrationID() string
GetOrchestrationID provides a mock function with given fields:
func (*MockMessage) GetOrchestrationStep ¶
func (_m *MockMessage) GetOrchestrationStep() string
GetOrchestrationStep provides a mock function with given fields:
func (*MockMessage) MustGet ¶
func (_m *MockMessage) MustGet(key Dependency) interface{}
MustGet provides a mock function with given fields: key
func (*MockMessage) ParsePayload ¶
func (_m *MockMessage) ParsePayload(payload interface{}) error
ParsePayload provides a mock function with given fields: payload
func (*MockMessage) RawPayload ¶
func (_m *MockMessage) RawPayload() []byte
RawPayload provides a mock function with given fields:
type MockStream ¶
MockStream is an autogenerated mock type for the Stream type
func NewMockStream ¶
func NewMockStream(t interface { mock.TestingT Cleanup(func()) }) *MockStream
NewMockStream creates a new instance of MockStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockStream) CommandSubscriber ¶
func (_m *MockStream) CommandSubscriber(component string, method string, cb func(context.Context, Message) error) error
CommandSubscriber provides a mock function with given fields: component, method, cb
func (*MockStream) Publish ¶
func (_m *MockStream) Publish(ctx context.Context, message Message) error
Publish provides a mock function with given fields: ctx, message
func (*MockStream) RunOrchestration ¶
func (_m *MockStream) RunOrchestration(ctx context.Context, orchestration *MessageOrchestration) error
RunOrchestration provides a mock function with given fields: ctx, orchestration
func (*MockStream) Unsubscribe ¶
func (_m *MockStream) Unsubscribe() error
Unsubscribe provides a mock function with given fields:
type NATSStream ¶
type NATSStream struct {
// contains filtered or unexported fields
}
func (*NATSStream) CommandSubscriber ¶
func (*NATSStream) RunOrchestration ¶
func (ns *NATSStream) RunOrchestration(ctx context.Context, orchestration *MessageOrchestration) error
func (*NATSStream) Unsubscribe ¶
func (ns *NATSStream) Unsubscribe() error
type OrchestrationOutput ¶
type OrchestrationOutput struct {
// contains filtered or unexported fields
}
func (*OrchestrationOutput) Add ¶
func (po *OrchestrationOutput) Add(orchestration *MessageOrchestration)
func (*OrchestrationOutput) Contains ¶
func (po *OrchestrationOutput) Contains(name string) bool
func (*OrchestrationOutput) Get ¶
func (po *OrchestrationOutput) Get() []*MessageOrchestration
type OrchestrationRunner ¶
type OrchestrationRunner struct {
// contains filtered or unexported fields
}
func NewOrchestrationRunner ¶
func NewOrchestrationRunner(stream *NATSStream) (*OrchestrationRunner, error)
func (*OrchestrationRunner) Start ¶
func (or *OrchestrationRunner) Start() error
func (*OrchestrationRunner) Stop ¶
func (or *OrchestrationRunner) Stop() error
type Stream ¶
type Stream interface { Unsubscribe() error Publish(ctx context.Context, message Message) error CommandSubscriber(component, method string, cb func(context.Context, Message) error) error RunOrchestration(ctx context.Context, orchestration *MessageOrchestration) error }