gronos

package module
v0.0.0-...-2fc4b4f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2024 License: MIT Imports: 10 Imported by: 0

README

Gronos

Very freestyle programming for now, until I have a good look and feel The new version i'm working on will use etcd (embed standalone, embed etcd with follower/leader, remote etcd leader/follower) to handle the state, for now i just want to rush the general idea! On one of the branches i have a satifying concept, i'm completely re-writting everything to make a new start with etcd From frustration and instinct to reason and usability

I did a small experiment about it

Go Reference Go Report Card

Gronos is a concurrent application management library for Go, designed to simplify the process of managing multiple concurrent applications within a single program. It provides a structured approach to application lifecycle management, error handling, and inter-application communication.

I saw myself writting the same boilerplate code over and over again for Domain Driven Applications specifically so I made a library to stop wasting time and repeating myself. Now we can all enjoy a tool to sandbox lifecycled functions aka "runtime applications" broadly known as lifecycle functions.

Warning:

This is an experimental project aimed at simplifying the development of Domain-Driven Design (DDD) applications with Event-Driven Architecture (EDA) from the start. The goal is to provide a comprehensive toolbox that, once fully developed, will be refined to improve usability.

I believe we shouldn't have to choose between building a traditional application and later transitioning to DDD and EDA. We should be able to implement EDA from the beginning and take advantage of its scalability. If all domains and workers can operate across multiple binaries, why not within a single one? The application should be flexible enough to function either as a standalone or distributed system, allowing us to split and scale as needed.

I'm going to change a direction a bit with that library...

Roadmap notes:

  • add hierarchy of lifecycle function
  • a sub-lifecycle function could dies but the supervisor will revive it
  • a sub-lifecycle function could be be restricted for terminating another lifecyle function that is not in the same hierarchy
  • restrict messaging rights based on hierarchy
  • toolbox to create a Shepherd node (gronos instance guiding and managing) and Flock node (processes being guided)

Stuff like that, i have some vision of what i want and NEED, especially what i want to see in our industry.

Gronos

Table of Contents

Features

  • Concurrent Application Management: Manage multiple applications running concurrently with ease.
  • Type-Safe Keys: Use any comparable type as keys for your applications.
  • Dynamic Application Management: Add or remove applications at runtime.
  • Graceful Shutdown: Properly shut down all managed applications.
  • Error Propagation: Centralized error handling for all managed applications.
  • Worker Functionality: Easily create and manage periodic tasks.
  • Iterator Pattern: Implement repeating sequences of tasks effortlessly.
  • Internal Messaging System: Allow inter-application communication.
  • Flexible Configuration: Customize behavior with various options.
  • Watermill Integration: Incorporate event-driven architecture and message routing.

Installation

To install Gronos, use go get:

go get github.com/davidroman0O/gronos

Ensure your go.mod file contains the following line:

require github.com/davidroman0O/gronos v<latest-version>

Replace <latest-version> with the most recent version of Gronos.

Usage

Creating a Gronos Instance

To create a new Gronos instance, use the New function with a basic "Hello World" application:

import (
    "context"
    "fmt"
    "time"
    "github.com/davidroman0O/gronos"
)

ctx := context.Background()
g, errChan := gronos.New[string](ctx, map[string]gronos.RuntimeApplication{
    "hello-world": func(ctx context.Context, shutdown <-chan struct{}) error {
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                fmt.Println("Hello, World!")
            case <-ctx.Done():
                return ctx.Err()
            case <-shutdown:
                return nil
            }
        }
    },
})

The New function returns a Gronos instance and an error channel. The generic parameter (in this case, string) defines the type of keys used to identify applications.

Defining Applications

Applications in Gronos are defined as functions with the following signature:

type RuntimeApplication func(ctx context.Context, shutdown <-chan struct{}) error

Here's an example of a simple application:

func simpleApp(ctx context.Context, shutdown <-chan struct{}) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-shutdown:
            return nil
        default:
            // Do work here
            time.Sleep(time.Second)
            fmt.Println("Working...")
        }
    }
}
Adding Applications

Add applications to Gronos using the Add method:

g.Add("myApp", simpleApp)
Starting and Stopping

Gronos starts managing applications as soon as they're added. To stop all applications and shut down Gronos:

g.Shutdown()
g.Wait()
Error Handling

Errors from applications are sent to the error channel returned by New:

go func() {
    for err := range errChan {
        log.Printf("Error: %v\n", err)
    }
}()

Advanced Usage

Worker

The Worker function creates applications that perform periodic tasks:

worker := gronos.Worker(time.Second, gronos.NonBlocking, func(ctx context.Context) error {
    fmt.Println("Periodic task executed")
    return nil
})

g.Add("periodicTask", worker)
Iterator

The Iterator function creates applications that execute a sequence of tasks in a loop:

tasks := []gronos.CancellableTask{
    func(ctx context.Context) error {
        fmt.Println("Task 1 executed")
        return nil
    },
    func(ctx context.Context) error {
        fmt.Println("Task 2 executed")
        return nil
    },
}

iterApp := gronos.Iterator(context.Background(), tasks)
g.Add("taskSequence", iterApp)
Internal Messaging

Gronos provides an internal messaging system for communication between applications. The available public messages for runtime applications are:

func communicatingApp(ctx context.Context, shutdown <-chan struct{}) error {
    bus, err := gronos.UseBus(ctx)
    if err != nil {
        return err
    }
    
    // Available messages:
    // Add a new runtime application
    done, addMsg := gronos.MsgAdd("newApp", newAppFunc)
    bus(addMsg)
    <-done

    // Force cancel shutdown for an application
    bus(gronos.MsgForceCancelShutdown("appName", errors.New("force cancel reason")))

    // Force terminate shutdown for an application
    bus(gronos.MsgForceTerminateShutdown("appName"))
    
    // ... rest of the application logic
    return nil
}

These messages allow you to dynamically add new applications, force cancel a shutdown, or force terminate a shutdown for specific applications.

Watermill Integration

Gronos provides integration with the Watermill library, allowing you to easily incorporate event-driven architecture and message routing into your applications.

import (
    "github.com/davidroman0O/gronos"
    watermillext "github.com/davidroman0O/gronos/extensions/watermill"
)

func main() {
    ctx := context.Background()
    watermillMiddleware := watermillext.NewWatermillMiddleware[string](watermill.NewStdLogger(true, true))

    g, errChan := gronos.New[string](ctx, map[string]gronos.RuntimeApplication{
        "setup": setupApp,
    },
        gronos.WithExtension[string](watermillMiddleware),
    )

    // ... rest of your Gronos setup
}

func setupApp(ctx context.Context, shutdown <-chan struct{}) error {
    com, err := gronos.UseBus(ctx)
    if err != nil {
        return err
    }

    pubSub := gochannel.NewGoChannel(gochannel.Config{}, watermill.NewStdLogger(false, false))

    doneAddPublisher, msgAddPublisher := watermillext.MsgAddPublisher("pubsub", pubSub)
    com(msgAddPublisher) // send message
    <- doneAddPublisher // wait for it to be processed (you're not forced to but cool to have)

    doneAddSubscriber, msgAddSubscriber := watermillext.MsgAddSubscriber("pubsub", pubSub)
    com(msgAddSubscriber)
    <- doneAddSubscriber

    // ... rest of your setup
    return nil
}

This integration allows you to use Watermill's powerful messaging capabilities within your Gronos applications, enabling sophisticated pub/sub patterns and message routing.

Configuration

Gronos supports various configuration options:

g, errChan := gronos.New[string](ctx, nil,
    gronos.WithShutdownBehavior[string](gronos.ShutdownAutomatic),
    gronos.WithGracePeriod[string](5 * time.Second),
    gronos.WithMinRuntime[string](10 * time.Second),
)

Available options:

  • WithShutdownBehavior: Define how Gronos should handle shutdowns.
  • WithGracePeriod: Set the grace period for shutdowns.
  • WithMinRuntime: Set the minimum runtime before allowing shutdown.

Best Practices

  1. Error Handling: Always handle errors from the error channel to prevent goroutine leaks.
  2. Context Usage: Use the provided context for cancellation and timeout management.
  3. Graceful Shutdown: Implement proper shutdown logic in your applications to ensure clean exits.
  4. Resource Management: Properly manage resources (e.g., close file handles, database connections) in your applications.
  5. Avoid Blocking: In Worker and Iterator tasks, avoid long-running operations that could block other tasks.

Examples

Basic Usage
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/davidroman0O/gronos"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    g, errChan := gronos.New[string](ctx, map[string]gronos.RuntimeApplication{
        "hello-world": func(ctx context.Context, shutdown <-chan struct{}) error {
            ticker := time.NewTicker(time.Second)
            defer ticker.Stop()
            for {
                select {
                case <-ticker.C:
                    fmt.Println("Hello, World!")
                case <-ctx.Done():
                    return ctx.Err()
                case <-shutdown:
                    return nil
                }
            }
        },
    })

    // Error handling goroutine
    go func() {
        for err := range errChan {
            log.Printf("Error: %v\n", err)
        }
    }()

    // Add another application
    g.Add("app1", func(ctx context.Context, shutdown <-chan struct{}) error {
        ticker := time.NewTicker(2 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                fmt.Println("App1 is running")
            case <-ctx.Done():
                return ctx.Err()
            case <-shutdown:
                return nil
            }
        }
    })

    // Run for 10 seconds
    time.Sleep(10 * time.Second)

    // Shutdown Gronos
    g.Shutdown()

    // Wait for all applications to finish
    g.Wait()
}
Using Worker and Iterator
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/davidroman0O/gronos"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    g, errChan := gronos.New[string](ctx, nil)

    // Error handling
    go func() {
        for err := range errChan {
            log.Printf("Error: %v\n", err)
        }
    }()

    // Add a worker
    worker := gronos.Worker(time.Second, gronos.NonBlocking, func(ctx context.Context) error {
        fmt.Println("Worker task executed")
        return nil
    })
    g.Add("worker", worker)

    // Add an iterator
    tasks := []gronos.CancellableTask{
        func(ctx context.Context) error {
            fmt.Println("Iterator task 1")
            return nil
        },
        func(ctx context.Context) error {
            fmt.Println("Iterator task 2")
            return nil
        },
    }
    iterator := gronos.Iterator(context.Background(), tasks)
    g.Add("iterator", iterator)

    // Run for 10 seconds
    time.Sleep(10 * time.Second)

    // Shutdown and wait
    g.Shutdown()
    g.Wait()
}

Detailed Documentation

For more detailed information about specific features, please refer to the following documents:

Contributing

Contributions to Gronos are welcome! Please follow these steps:

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/AmazingFeature)
  3. Commit your changes (git commit -m 'Add some AmazingFeature')
  4. Push to the branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

Please make sure to update tests as appropriate and adhere to the existing coding style.

License

Gronos is released under the MIT License. See the LICENSE file for details.


For more information, please check the documentation or open an issue on GitHub.

Documentation

Overview

Package gronos provides a concurrent application management system.

Package gronos provides a concurrent application management system.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnhandledMessage         = errors.New("unhandled message")
	ErrUnmanageExtensionMessage = errors.New("unmanage extension message")
	ErrPanic                    = errors.New("panic")
)
View Source
var ErrLoopCritical = errors.New("critical error")

Functions

func Merge

func Merge[K comparable](apps ...map[K]RuntimeApplication) map[K]RuntimeApplication

func New

func New[K comparable](ctx context.Context, init map[K]RuntimeApplication, opts ...Option[K]) (*gronos[K], chan error)

New creates a new gronos instance with the given context and initial applications.

func UseBus

func UseBus(ctx context.Context) (func(m Message) bool, error)

UseBus retrieves the communication channel from a context created by gronos.

func UseBusConfirm

func UseBusConfirm(ctx context.Context) (func(fn FnConfirm) <-chan bool, error)

func UseBusWait

func UseBusWait(ctx context.Context) (func(fn FnWait) <-chan struct{}, error)

func WhenState

func WhenState(state StatusState) addOption

func WithFinalTick

func WithFinalTick() tickerOption

Types

type AddMessage

type AddMessage[K comparable] struct {
	KeyMessage[K]
	RuntimeApplication
	RequestMessage[K, struct{}]
}

func MsgAdd

func MsgAdd[K comparable](key K, app RuntimeApplication) (<-chan struct{}, *AddMessage[K])

type CancellableStateTask

type CancellableStateTask[T any] func(ctx context.Context, state *T) error

CancellableStateTask represents a task that can be cancelled and operates on a state

type CancellableTask

type CancellableTask func(ctx context.Context) error

type CancelledShutdown

type CancelledShutdown[K comparable] struct {
	KeyMessage[K]
	Error error
	RequestMessage[K, struct{}]
}

type CheckAutomaticShutdown

type CheckAutomaticShutdown[K comparable] struct {
	RequestMessage[K, struct{}]
}

func MsgCheckAutomaticShutdown

func MsgCheckAutomaticShutdown[K comparable]() (chan struct{}, *CheckAutomaticShutdown[K])

type Clock

type Clock struct {
	// contains filtered or unexported fields
}

Clock represents a clock that can manage multiple tickers with different execution modes.

func NewClock

func NewClock(opts ...ClockOption) *Clock

NewClock creates a new Clock instance with the given options.

Example usage:

clock := NewClock(
	WithName("MyClock"),
	WithInterval(time.Second),
)

func (*Clock) Add

func (c *Clock) Add(ticker Ticker, mode ExecutionMode)

Add subscribes a Ticker to the Clock with the specified ExecutionMode.

Example usage:

clock.Add(&MyTicker{}, NonBlocking)

func (*Clock) Start

func (c *Clock) Start()

Start begins the Clock's ticking process.

Example usage:

clock.Start()

func (*Clock) Stop

func (c *Clock) Stop()

Stop halts the Clock's ticking process.

Example usage:

clock.Stop()

type ClockOption

type ClockOption func(*Clock)

ClockOption is a function type for configuring a Clock instance.

func WithInterval

func WithInterval(interval time.Duration) ClockOption

WithInterval sets the tick interval of the Clock.

func WithName

func WithName(name string) ClockOption

WithName sets the name of the Clock.

type ConcurrentArray

type ConcurrentArray[T comparable] struct {
	// contains filtered or unexported fields
}

ConcurrentArray represents a thread-safe array of comparable elements

func Array

func Array[T comparable]() *ConcurrentArray[T]

New creates a new ConcurrentArray

func (*ConcurrentArray[T]) Append

func (ca *ConcurrentArray[T]) Append(item T)

Append adds an element to the array

func (*ConcurrentArray[T]) Contains

func (ca *ConcurrentArray[T]) Contains(item T) bool

Contains checks if an item exists in the array

func (*ConcurrentArray[T]) Get

func (ca *ConcurrentArray[T]) Get(index int) (T, bool)

Get retrieves an element at a specific index

func (*ConcurrentArray[T]) Length

func (ca *ConcurrentArray[T]) Length() int

Length returns the current length of the array

func (*ConcurrentArray[T]) Set

func (ca *ConcurrentArray[T]) Set(index int, item T) bool

Set updates an element at a specific index

type Destroy

type Destroy[K comparable] struct{}

func MsgDestroy

func MsgDestroy[K comparable]() *Destroy[K]

type Envelope

type Envelope[K comparable] struct {
	From K
	Message
}

type ErroredShutdown

type ErroredShutdown[K comparable] struct {
	KeyMessage[K]
	Error error
	RequestMessage[K, struct{}]
}

type ExecutionMode

type ExecutionMode int

ExecutionMode defines how a ticker should be executed.

const (
	// NonBlocking mode executes the ticker without waiting for completion.
	NonBlocking ExecutionMode = iota
	// ManagedTimeline mode ensures tickers are executed in order, potentially delaying subsequent ticks.
	ManagedTimeline
	// BestEffort mode attempts to execute tickers on time but may skip ticks if the system is overloaded.
	BestEffort
)

type Extension

type Extension[K comparable] interface {
	OnStart(ctx context.Context, errChan chan<- error) error
	OnStop(ctx context.Context, errChan chan<- error) error
	OnNewRuntime(ctx context.Context) context.Context
	OnStopRuntime(ctx context.Context) context.Context
	OnMsg(ctx context.Context, m *MessagePayload) error
}

Extension defines the interface for Gronos extensions

type FnConfirm

type FnConfirm func() (<-chan bool, Message)

type FnWait

type FnWait func() (<-chan struct{}, Message)

type ForceCancelShutdown

type ForceCancelShutdown[K comparable] struct {
	KeyMessage[K]
	Error error
	RequestMessage[K, struct{}]
}

global system force cancel

func MsgForceCancelShutdown

func MsgForceCancelShutdown[K comparable](key K, err error) (<-chan struct{}, *ForceCancelShutdown[K])

type ForceTerminateShutdown

type ForceTerminateShutdown[K comparable] struct {
	KeyMessage[K]
	RequestMessage[K, struct{}]
}

global system force terminate

func MsgForceTerminateShutdown

func MsgForceTerminateShutdown[K comparable](key K) (<-chan struct{}, *ForceTerminateShutdown[K])

type GetListRuntimeApplication

type GetListRuntimeApplication[K comparable] struct {
	RequestMessage[K, []K]
}

func MsgGetListRuntimeApplication

func MsgGetListRuntimeApplication[K comparable]() (<-chan []K, *GetListRuntimeApplication[K])

type GracePeriodExceededMessage

type GracePeriodExceededMessage[K comparable] struct {
	KeyMessage[K]
}

func MsgGracePeriodExceeded

func MsgGracePeriodExceeded[K comparable]() *GracePeriodExceededMessage[K]

type InitiateContextCancellation

type InitiateContextCancellation[K comparable] struct{}

func MsgInitiateContextCancellation

func MsgInitiateContextCancellation[K comparable]() *InitiateContextCancellation[K]

type InitiateShutdown

type InitiateShutdown[K comparable] struct{}

func MsgInitiateShutdown

func MsgInitiateShutdown[K comparable]() *InitiateShutdown[K]

Message creation functions

type IteratorOption

type IteratorOption func(*iteratorConfig)

IteratorOption is a function type for configuring the Iterator middleware.

func WithLoopableIteratorOptions

func WithLoopableIteratorOptions(opts ...LoopableIteratorOption) IteratorOption

WithLoopableIteratorOptions adds LoopableIteratorOptions to the Iterator middleware.

type IteratorStateOption

type IteratorStateOption[T any] func(*iteratorStateConfig[T])

IteratorStateOption is a function type for configuring the IteratorState

func WithInitialState

func WithInitialState[T any](state *T) IteratorStateOption[T]

WithInitialState sets the initial state for the IteratorState

func WithLoopableIteratorStateOptions

func WithLoopableIteratorStateOptions[T any](opts ...LoopableIteratorStateOption[T]) IteratorStateOption[T]

WithLoopableIteratorStateOptions adds LoopableIteratorStateOptions to the IteratorState

type KeyMessage

type KeyMessage[K comparable] struct {
	Key K
}

Composable header

type LoopableIterator

type LoopableIterator struct {
	// contains filtered or unexported fields
}

func NewLoopableIterator

func NewLoopableIterator(tasks []CancellableTask, opts ...LoopableIteratorOption) *LoopableIterator

func (*LoopableIterator) Cancel

func (li *LoopableIterator) Cancel()

func (*LoopableIterator) Run

func (li *LoopableIterator) Run(ctx context.Context) chan error

func (*LoopableIterator) Stop

func (li *LoopableIterator) Stop()

func (*LoopableIterator) Wait

func (li *LoopableIterator) Wait()

type LoopableIteratorOption

type LoopableIteratorOption func(*LoopableIterator)

func WithAfterLoop

func WithAfterLoop(afterLoop func(ctx context.Context) error) LoopableIteratorOption

func WithBeforeLoop

func WithBeforeLoop(beforeLoop func(ctx context.Context) error) LoopableIteratorOption

func WithExtraCancel

func WithExtraCancel(cancels ...context.CancelFunc) LoopableIteratorOption

func WithOnError

func WithOnError(handler func(error) error) LoopableIteratorOption

func WithOnInit

func WithOnInit(onInit func(ctx context.Context) (context.Context, error)) LoopableIteratorOption

func WithShouldStop

func WithShouldStop(shouldStop func(error) bool) LoopableIteratorOption

type LoopableIteratorState

type LoopableIteratorState[T any] struct {
	// contains filtered or unexported fields
}

LoopableIteratorState is similar to LoopableIterator but with a shared state

func NewLoopableIteratorState

func NewLoopableIteratorState[T any](tasks []CancellableStateTask[T], state *T, opts ...LoopableIteratorStateOption[T]) *LoopableIteratorState[T]

func (*LoopableIteratorState[T]) Cancel

func (li *LoopableIteratorState[T]) Cancel()

func (*LoopableIteratorState[T]) Run

func (li *LoopableIteratorState[T]) Run(ctx context.Context) chan error

func (*LoopableIteratorState[T]) Stop

func (li *LoopableIteratorState[T]) Stop()

func (*LoopableIteratorState[T]) Wait

func (li *LoopableIteratorState[T]) Wait()

type LoopableIteratorStateOption

type LoopableIteratorStateOption[T any] func(*LoopableIteratorState[T])

func WithAfterLoopState

func WithAfterLoopState[T any](afterLoop func(ctx context.Context, state *T) error) LoopableIteratorStateOption[T]

func WithBeforeLoopState

func WithBeforeLoopState[T any](beforeLoop func(ctx context.Context, state *T) error) LoopableIteratorStateOption[T]

func WithExtraCancelState

func WithExtraCancelState[T any](cancels ...context.CancelFunc) LoopableIteratorStateOption[T]

func WithOnErrorState

func WithOnErrorState[T any](handler func(error) error) LoopableIteratorStateOption[T]

func WithOnInitState

func WithOnInitState[T any](onInit func(ctx context.Context, state *T) (context.Context, error)) LoopableIteratorStateOption[T]

func WithShouldStopState

func WithShouldStopState[T any](shouldStop func(error) bool) LoopableIteratorStateOption[T]

type Message

type Message interface{}

Message is an interface type for internal communication within gronos.

type MessagePayload

type MessagePayload struct {
	Metadata map[string]interface{}
	Message
}

type Middleware

type Middleware[K comparable] struct {
}

intercept messages and context

type Option

type Option[K comparable] func(*gronos[K])

func WithExtension

func WithExtension[K comparable](ext Extension[K]) Option[K]

func WithGracePeriod

func WithGracePeriod[K comparable](period time.Duration) Option[K]

func WithImmediatePeriod

func WithImmediatePeriod[K comparable](period time.Duration) Option[K]

func WithMinRuntime

func WithMinRuntime[K comparable](duration time.Duration) Option[K]

func WithShutdownBehavior

func WithShutdownBehavior[K comparable](behavior ShutdownBehavior) Option[K]

func WithWait

func WithWait[K comparable]() Option[K]

func WithoutGracePeriod

func WithoutGracePeriod[K comparable]() Option[K]

func WithoutImmediatePeriod

func WithoutImmediatePeriod[K comparable]() Option[K]

func WithoutMinRuntime

func WithoutMinRuntime[K comparable]() Option[K]

type PanickedShutdown

type PanickedShutdown[K comparable] struct {
	KeyMessage[K]
	Error error
	RequestMessage[K, struct{}]
}

type RemoveMessage

type RemoveMessage[K comparable] struct {
	KeyMessage[K]
	RequestMessage[K, bool]
}

func MsgRemove

func MsgRemove[K comparable](key K) (<-chan bool, *RemoveMessage[K])

type RequestAlive

type RequestAlive[K comparable] struct {
	KeyMessage[K]
	RequestMessage[K, bool]
}

func MsgRequestAlive

func MsgRequestAlive[K comparable](key K) *RequestAlive[K]

type RequestAllAlive

type RequestAllAlive[K comparable] struct {
	RequestMessage[K, bool]
}

func MsgRequestAllAlive

func MsgRequestAllAlive[K comparable]() (<-chan bool, *RequestAllAlive[K])

type RequestMessage

type RequestMessage[K comparable, Y any] struct {
	KeyMessage[K]
	Response chan Y
}

Used for generic requests

type RequestReason

type RequestReason[K comparable] struct {
	KeyMessage[K]
	RequestMessage[K, error]
}

func MsgRequestReason

func MsgRequestReason[K comparable](key K) *RequestReason[K]

type RequestStatus

type RequestStatus[K comparable] struct {
	KeyMessage[K]
	RequestMessage[K, StatusState]
}

func MsgRequestStatus

func MsgRequestStatus[K comparable](key K) (<-chan StatusState, *RequestStatus[K])

type RequestStatusAsync

type RequestStatusAsync[K comparable] struct {
	KeyMessage[K]
	When StatusState
	RequestMessage[K, struct{}]
}

func MsgRequestStatusAsync

func MsgRequestStatusAsync[K comparable](key K, when StatusState) (<-chan struct{}, *RequestStatusAsync[K])

type RuntimeApplication

type RuntimeApplication func(ctx context.Context, shutdown <-chan struct{}) error

RuntimeApplication is a function type representing an application that can be run concurrently. It takes a context and a shutdown channel as parameters and returns an error.

func Iterator

func Iterator(tasks []CancellableTask, opts ...IteratorOption) RuntimeApplication

Iterator creates a RuntimeApplication that uses a LoopableIterator to execute tasks.

func IteratorState

func IteratorState[T any](tasks []CancellableStateTask[T], opts ...IteratorStateOption[T]) RuntimeApplication

IteratorState creates a RuntimeApplication that uses a LoopableIteratorState to execute tasks with a shared state

func Worker

func Worker(interval time.Duration, mode ExecutionMode, app TickingRuntime, opts ...tickerOption) RuntimeApplication

Worker creates a RuntimeApplication that executes a TickingRuntime at specified intervals. It takes an interval duration, execution mode, and a TickingRuntime as parameters.

Example usage:

worker := gronos.Worker(time.Second, gronos.NonBlocking, func(ctx context.Context) error {
	// Periodic task logic here
	return nil
})
g.Add("periodicTask", worker)

type RuntimeError

type RuntimeError[K comparable] struct {
	KeyMessage[K]
	Error error
}

func MsgRuntimeError

func MsgRuntimeError[K comparable](key K, err error) *RuntimeError[K]

type ShutdownBehavior

type ShutdownBehavior int
const (
	ShutdownAutomatic ShutdownBehavior = iota
	ShutdownManual
)

type ShutdownComplete

type ShutdownComplete[K comparable] struct{}

type ShutdownKind

type ShutdownKind string
const (
	ShutdownKindTerminate ShutdownKind = "terminate"
	ShutdownKindCancel    ShutdownKind = "cancel"
)

type ShutdownProgress

type ShutdownProgress[K comparable] struct {
	RemainingApps int
}

type StatusState

type StatusState string

StatusState represents the possible states of a component

const (
	StatusAdded       StatusState = "added"        // first state
	StatusRunning     StatusState = "running"      // when the runtime application is triggered
	StatusShutingDown StatusState = "shuting_down" // when any of messages related to shutdown is initiated

	StatusShutdownCancelled  StatusState = "shutdown_cancelled"
	StatusShutdownPanicked   StatusState = "shutdown_panicked"
	StatusShutdownTerminated StatusState = "shutdown_terminated"
	StatusShutdownError      StatusState = "shutdown_error"
	StatusNotFound           StatusState = "shutdown_not_found"
)

type TerminatedShutdown

type TerminatedShutdown[K comparable] struct {
	RequestMessage[K, struct{}]
}

type Ticker

type Ticker interface {
	// Tick is called when the ticker is triggered.
	Tick()
}

Ticker interface represents an object that can be ticked.

type TickerSubscriber

type TickerSubscriber struct {
	Ticker Ticker
	Mode   ExecutionMode

	DynamicInterval func(lastInterval time.Duration) time.Duration
	// contains filtered or unexported fields
}

TickerSubscriber represents a subscriber to the clock's ticks.

type TickingRuntime

type TickingRuntime func(context.Context) error

TickingRuntime is a function type representing an application that performs periodic tasks.

Directories

Path Synopsis
examples
extensions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL