stream

package
v0.0.0-...-9243632 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var OrchestrationLifecycle = fx.Options(
	fx.Provide(
		func(jetstream nats.JetStreamContext, pool *pgxpool.Pool) (*OrchestrationRunner, error) {
			cfg := nats.ConsumerConfig{
				Name:       "orchestration-event",
				Durable:    "orchestration-event",
				MaxDeliver: 1,
			}
			_, err := jetstream.AddConsumer("foreverbull", &cfg)
			if err != nil {
				return nil, fmt.Errorf("error adding consumer for orchestration: %w", err)
			}
			dc := NewDependencyContainer().(*dependencyContainer)
			stream := &NATSStream{module: "orchestration", jt: jetstream, repository: NewRepository(pool), deps: dc}
			return NewOrchestrationRunner(stream)
		},
	),
	fx.Invoke(
		func(lc fx.Lifecycle, orchestration *OrchestrationRunner) error {
			lc.Append(fx.Hook{
				OnStart: func(ctx context.Context) error {
					return orchestration.Start()
				},
				OnStop: func(ctx context.Context) error {
					return orchestration.Stop()
				},
			})
			return nil
		}),
)

Functions

func CreateTables

func CreateTables(ctx context.Context, conn *pgxpool.Pool) error

func New

func New() (*nats.Conn, nats.JetStreamContext, error)

func NewRepository

func NewRepository(db *pgxpool.Pool) repository

func RecreateTables

func RecreateTables(ctx context.Context, conn *pgxpool.Pool) error

Types

type Dependency

type Dependency string
const (
	DBDep              Dependency = "db"
	StreamDep          Dependency = "stream"
	ContainerEngineDep Dependency = "container_engine"
	StorageDep         Dependency = "storage"
)

type DependencyContainer

type DependencyContainer interface {
	AddMethod(key Dependency, f func(context.Context, Message) (interface{}, error))
	AddSingleton(key Dependency, v interface{})
}

func NewDependencyContainer

func NewDependencyContainer() DependencyContainer

type Handler

type Handler interface {
	Process(ctx context.Context, message Message) error
}

type Message

type Message interface {
	GetID() string
	GetOrchestrationID() string
	GetOrchestrationStep() string
	RawPayload() []byte
	ParsePayload(payload interface{}) error
	Call(ctx context.Context, key Dependency) (interface{}, error)
	MustGet(key Dependency) interface{}
}

func NewMessage

func NewMessage(module, component, method string, entity any) (Message, error)

type MessageOrchestration

type MessageOrchestration struct {
	Name            string
	OrchestrationID string
	Steps           []MessageOrchestrationStep
	FallbackStep    *MessageOrchestrationStep
}

func NewMessageOrchestration

func NewMessageOrchestration(name string) *MessageOrchestration

func (*MessageOrchestration) AddStep

func (mo *MessageOrchestration) AddStep(name string, commands []Message)

func (*MessageOrchestration) SettFallback

func (mo *MessageOrchestration) SettFallback(commands []Message)

type MessageOrchestrationStep

type MessageOrchestrationStep struct {
	Name string

	OrchestrationID   string
	OrchestrationName string
	OrchestrationStep string
	Commands          []Message
}

type MessageStatus

type MessageStatus string
const (
	MessageStatusCreated   MessageStatus = "CREATED"
	MessageStatusPublished MessageStatus = "PUBLISHED"
	MessageStatusReceived  MessageStatus = "RECEIVED"
	MessageStatusComplete  MessageStatus = "COMPLETE"
	MessageStatusError     MessageStatus = "ERROR"
	MessageStatusCanceled  MessageStatus = "CANCELED"
)

type MockDependencyContainer

type MockDependencyContainer struct {
	mock.Mock
}

MockDependencyContainer is an autogenerated mock type for the DependencyContainer type

func NewMockDependencyContainer

func NewMockDependencyContainer(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockDependencyContainer

NewMockDependencyContainer creates a new instance of MockDependencyContainer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockDependencyContainer) AddMethod

func (_m *MockDependencyContainer) AddMethod(key Dependency, f func(context.Context, Message) (interface{}, error))

AddMethod provides a mock function with given fields: key, f

func (*MockDependencyContainer) AddSingleton

func (_m *MockDependencyContainer) AddSingleton(key Dependency, v interface{})

AddSingleton provides a mock function with given fields: key, v

type MockHandler

type MockHandler struct {
	mock.Mock
}

MockHandler is an autogenerated mock type for the Handler type

func NewMockHandler

func NewMockHandler(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockHandler

NewMockHandler creates a new instance of MockHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockHandler) Process

func (_m *MockHandler) Process(ctx context.Context, message Message) error

Process provides a mock function with given fields: ctx, message

type MockMessage

type MockMessage struct {
	mock.Mock
}

MockMessage is an autogenerated mock type for the Message type

func NewMockMessage

func NewMockMessage(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockMessage

NewMockMessage creates a new instance of MockMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockMessage) Call

func (_m *MockMessage) Call(ctx context.Context, key Dependency) (interface{}, error)

Call provides a mock function with given fields: ctx, key

func (*MockMessage) GetID

func (_m *MockMessage) GetID() string

GetID provides a mock function with given fields:

func (*MockMessage) GetOrchestrationID

func (_m *MockMessage) GetOrchestrationID() string

GetOrchestrationID provides a mock function with given fields:

func (*MockMessage) GetOrchestrationStep

func (_m *MockMessage) GetOrchestrationStep() string

GetOrchestrationStep provides a mock function with given fields:

func (*MockMessage) MustGet

func (_m *MockMessage) MustGet(key Dependency) interface{}

MustGet provides a mock function with given fields: key

func (*MockMessage) ParsePayload

func (_m *MockMessage) ParsePayload(payload interface{}) error

ParsePayload provides a mock function with given fields: payload

func (*MockMessage) RawPayload

func (_m *MockMessage) RawPayload() []byte

RawPayload provides a mock function with given fields:

type MockStream

type MockStream struct {
	mock.Mock
}

MockStream is an autogenerated mock type for the Stream type

func NewMockStream

func NewMockStream(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockStream

NewMockStream creates a new instance of MockStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockStream) CommandSubscriber

func (_m *MockStream) CommandSubscriber(component string, method string, cb func(context.Context, Message) error) error

CommandSubscriber provides a mock function with given fields: component, method, cb

func (*MockStream) Publish

func (_m *MockStream) Publish(ctx context.Context, message Message) error

Publish provides a mock function with given fields: ctx, message

func (*MockStream) RunOrchestration

func (_m *MockStream) RunOrchestration(ctx context.Context, orchestration *MessageOrchestration) error

RunOrchestration provides a mock function with given fields: ctx, orchestration

func (*MockStream) Unsubscribe

func (_m *MockStream) Unsubscribe() error

Unsubscribe provides a mock function with given fields:

type NATSStream

type NATSStream struct {
	// contains filtered or unexported fields
}

func (*NATSStream) CommandSubscriber

func (ns *NATSStream) CommandSubscriber(component, method string, cb func(context.Context, Message) error) error

func (*NATSStream) Publish

func (ns *NATSStream) Publish(ctx context.Context, msg Message) error

func (*NATSStream) RunOrchestration

func (ns *NATSStream) RunOrchestration(ctx context.Context, orchestration *MessageOrchestration) error

func (*NATSStream) Unsubscribe

func (ns *NATSStream) Unsubscribe() error

type OrchestrationOutput

type OrchestrationOutput struct {
	// contains filtered or unexported fields
}

func (*OrchestrationOutput) Add

func (po *OrchestrationOutput) Add(orchestration *MessageOrchestration)

func (*OrchestrationOutput) Contains

func (po *OrchestrationOutput) Contains(name string) bool

func (*OrchestrationOutput) Get

type OrchestrationRunner

type OrchestrationRunner struct {
	// contains filtered or unexported fields
}

func NewOrchestrationRunner

func NewOrchestrationRunner(stream *NATSStream) (*OrchestrationRunner, error)

func (*OrchestrationRunner) Start

func (or *OrchestrationRunner) Start() error

func (*OrchestrationRunner) Stop

func (or *OrchestrationRunner) Stop() error

type Stream

type Stream interface {
	Unsubscribe() error
	Publish(ctx context.Context, message Message) error
	CommandSubscriber(component, method string, cb func(context.Context, Message) error) error
	RunOrchestration(ctx context.Context, orchestration *MessageOrchestration) error
}

func NewNATSStream

func NewNATSStream(jetstream nats.JetStreamContext, module string,
	dependencies DependencyContainer, pool *pgxpool.Pool,
) (Stream, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL