kernel

package
v0.20.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2024 License: Apache-2.0 Imports: 6 Imported by: 9

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Dequeue

func Dequeue[T any](k *Dequeuer, cat string, codec Decoder[T]) (<-chan swarm.Msg[T], chan<- swarm.Msg[T])

Dequeue creates pair of channels within kernel to enqueue messages

func Enqueue

func Enqueue[T any](k *Enqueuer, cat string, codec Encoder[T]) (chan<- T, <-chan T)

Enqueue creates pair of channels within kernel to enqueue messages

Types

type Bridge added in v0.20.0

type Bridge struct {
	// contains filtered or unexported fields
}

Bridge Lambda's main function to Cathode interface

func NewBridge added in v0.20.0

func NewBridge(timeToFlight time.Duration) *Bridge

func (*Bridge) Ack added in v0.20.0

func (s *Bridge) Ack(ctx context.Context, digest string) error

Acknowledge processed message, allowing lambda handler progress

func (*Bridge) Ask added in v0.20.0

func (s *Bridge) Ask(ctx context.Context) ([]swarm.Bag, error)

Ask converts input of Lambda handler to the context of the kernel

func (*Bridge) Dispatch added in v0.20.0

func (s *Bridge) Dispatch(seq []swarm.Bag) error

Dispatch the batch of messages in the context of Lambda handler.

lambda.Start(
	func(evt events.CloudWatchEvent) error {
		...
		bridge.Dispatch(bag)
	}
)

func (*Bridge) Err added in v0.20.0

func (s *Bridge) Err(ctx context.Context, digest string, err error) error

Acknowledge error, allowing lambda handler progress

type Cathode

type Cathode interface {
	Ack(ctx context.Context, digest string) error
	Err(ctx context.Context, digest string, err error) error
	Ask(ctx context.Context) ([]swarm.Bag, error)
}

Cathode defines on-the-wire protocol for swarm.Bag, covering the ingress.

type Decoder added in v0.20.0

type Decoder[T any] interface{ Decode([]byte) (T, error) }

Decode message from wire format

type Dequeuer added in v0.20.0

type Dequeuer struct {
	sync.WaitGroup
	sync.RWMutex

	// Kernel configuration
	Config swarm.Config

	// Cathode is the reader port on message broker
	Cathode Cathode
	// contains filtered or unexported fields
}

func NewDequeuer added in v0.20.0

func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer

Creates instance of broker reader

func (*Dequeuer) Await added in v0.20.0

func (k *Dequeuer) Await()

Await reader to complete

func (*Dequeuer) Close added in v0.20.0

func (k *Dequeuer) Close()

Closes broker reader, gracefully shutdowns all I/O

type Emitter

type Emitter interface {
	Enq(context.Context, swarm.Bag) error
}

Emitter defines on-the-wire protocol for swarm.Bag, covering egress.

type Encoder added in v0.20.0

type Encoder[T any] interface{ Encode(T) ([]byte, error) }

Encodes message into wire format

type Enqueuer added in v0.20.0

type Enqueuer struct {
	sync.WaitGroup

	// Kernel configuration
	Config swarm.Config

	// Emitter is the writer port on message broker
	Emitter Emitter
	// contains filtered or unexported fields
}

Messaging Egress port

func NewEnqueuer added in v0.20.0

func NewEnqueuer(emitter Emitter, config swarm.Config) *Enqueuer

Creates instance of broker writer

func (*Enqueuer) Await added in v0.20.0

func (k *Enqueuer) Await()

Await enqueue

func (*Enqueuer) Close added in v0.20.0

func (k *Enqueuer) Close()

Close enqueuer

type Kernel

type Kernel struct {
	*Enqueuer
	*Dequeuer
}

func New

func New(enqueuer *Enqueuer, dequeuer *Dequeuer) *Kernel

func (*Kernel) Await

func (k *Kernel) Await()

func (*Kernel) Close

func (k *Kernel) Close()

type Router added in v0.20.0

type Router = interface {
	Route(context.Context, swarm.Bag) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL