qlib

module
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2025 License: MIT

README

qlib

A Go library for building event-driven, worker-based applications with a hierarchical object database.

Overview

qlib provides a framework for building applications with the following key features:

  • Single-threaded application model with worker-based architecture
  • Hierarchical object database with real-time notifications
  • Built-in leadership election and high availability support
  • Event-driven communication between workers
  • WebSocket-based communication using Protocol Buffers

Core Concepts

Applications

Applications in qlib are built around a main thread and a set of workers. Create a new application:

app := app.NewApplication("myapp")
Workers

Workers are logical units that handle specific functionality. Each worker:

  • Must implement the Worker interface
  • Is initialized with the application context
  • Runs in the main thread via DoWork()
  • Can communicate with other workers through the database or signals

Example worker:

type MyWorker struct {
    store data.Store
}

func (w *MyWorker) Init(ctx context.Context, h app.Handle) {
    // Initialize worker
}

func (w *MyWorker) DoWork(ctx context.Context) {
    // Handle work in main thread
}

func (w *MyWorker) Deinit(ctx context.Context) {
    // Cleanup
}

Add workers to your application:

app.AddWorker(myWorker)
Database Structure

The database follows a tree-like structure where:

  • Each node is an "Entity" with a type and unique ID
  • Entities can have fields (properties)
  • Entities can have parent-child relationships
  • Changes to entity fields trigger notifications

Example structure:

Root
├── Service
│   ├── Name: "myapp"
│   ├── Status: "running"
│   └── Config
│       ├── Setting1: "value1"
│       └── Setting2: "value2"
└── AnotherEntity
    └── Field1: "data"
Field Notifications

Workers can subscribe to field changes:

store.Notify(
    ctx,
    notification.NewConfig().
        SetEntityType("Service").
        SetFieldName("Status"),
    notification.NewCallback(func(ctx context.Context, n data.Notification) {
        // Handle field change
    }),
)

Database (Store) Interface

The Store interface is the core component for data persistence and real-time updates. It provides a rich API for managing entities, fields, and notifications.

Store Configuration

The store supports multiple backend configurations that can be mixed and matched:

// Create store with web backend
store := store.New(
    store.CommunicateOverWeb("localhost:8080"),
)

// Create store with Redis backend
store := store.New(
    store.CommunicateOverRedis("redis:6379", "password"),
)

// Create store with Postgres backend
store := store.New(
    store.CommunicateOverPostgres("postgres://user:pass@localhost:5432/db"),
)

// Create store with NATS backend
store := store.New(
    store.CommunicateOverNats("nats://localhost:4222"),
)

// Mix multiple backends
store := store.New(
    store.PersistOverPostgres("postgres://user:pass@localhost:5432/db"),
    store.NotifyOverNats("nats://localhost:4222"),
)

Each backend configuration function sets up the appropriate components for:

  • Entity management
  • Field operations
  • Schema management
  • Notifications
  • Snapshots
Basic Store Operations
type Store interface {
    // Connection management
    Connect(context.Context)
    Disconnect(context.Context)
    IsConnected(context.Context) bool

    // Entity operations
    CreateEntity(ctx context.Context, entityType, parentId, name string)
    GetEntity(ctx context.Context, entityId string) Entity
    DeleteEntity(ctx context.Context, entityId string)
    FindEntities(ctx context.Context, entityType string) []string

    // Field operations
    Read(context.Context, ...Request)
    Write(context.Context, ...Request)

    // Schema operations
    GetEntitySchema(ctx context.Context, entityType string) EntitySchema
    SetEntitySchema(context.Context, EntitySchema)

    // Notification system
    Notify(ctx context.Context, config NotificationConfig, callback NotificationCallback) NotificationToken
    Unnotify(ctx context.Context, subscriptionId string)
}
Entity Management

Entities are the primary data containers. Each entity:

  • Has a unique ID
  • Has a type (defines its schema)
  • Can have a parent entity
  • Can have child entities
  • Contains fields (properties)
// Creating an entity
store.CreateEntity(ctx, "Device", parentId, "LivingRoomLight")

// Reading an entity
entity := store.GetEntity(ctx, entityId)
fmt.Printf("Name: %s, Type: %s\n", entity.GetName(), entity.GetType())

// Finding entities by type
deviceIds := store.FindEntities(ctx, "Device")
Field Operations

Fields store the actual data values. The store supports various field types:

  • Int
  • Float
  • String
  • Bool
  • BinaryFile
  • EntityReference (links to other entities)
  • Timestamp
  • Transformation
  • Choice (selection from predefined options)
  • EntityList (collection of entity references)

Reading and writing fields:

// Writing fields
store.Write(ctx, 
    request.New().
        SetEntityId(deviceId).
        SetFieldName("Status").
        SetValue(value.NewString("ON")),
)

// Reading fields
store.Read(ctx,
    request.New().
        SetEntityId(deviceId).
        SetFieldName("Status"),
)
Using Bindings

Bindings provide a more object-oriented way to interact with entities and fields:

// Get a multi-binding for batch operations
multi := binding.NewMulti(store)

// Get an entity binding
device := multi.GetEntityById(ctx, deviceId)

// Read field values
status := device.GetField("Status").ReadString(ctx)

// Write field values
device.GetField("Status").WriteString(ctx, "OFF")

// Commit changes
multi.Commit(ctx)
Database Queries

The Query interface provides SQL-like operations for finding entities:

// Find all active devices
devices := query.New(store).
    Select("Status", "Name").
    From("Device").
    Where("Status").Equals("ON").
    Execute(ctx)

// Process results
for _, device := range devices {
    name := device.GetField("Name").GetString()
    status := device.GetField("Status").GetString()
    fmt.Printf("Device %s is %s\n", name, status)
}
Real-time Notifications

The notification system allows workers to react to field changes:

// Subscribe to field changes
token := store.Notify(
    ctx,
    notification.NewConfig().
        SetEntityType("Device").
        SetFieldName("Status").
        SetContextFields("Name", "Location"),
    notification.NewCallback(func(ctx context.Context, n data.Notification) {
        // Access changed value
        newValue := n.GetCurrent().GetValue().GetString()
        oldValue := n.GetPrevious().GetValue().GetString()
        
        // Access context fields
        name := n.GetContext(0).GetValue().GetString()
        location := n.GetContext(1).GetValue().GetString()
        
        fmt.Printf("Device %s in %s changed from %s to %s\n",
            name, location, oldValue, newValue)
    }),
)

// Later, unsubscribe
token.Unbind(ctx)
Database Schema

Schemas define the structure of entities and their fields:

// Define a schema
schema := schema.New("Device").
    AddField("Status", "string").
    AddField("Name", "string").
    AddField("Location", "string").
    AddField("LastSeen", "timestamp")

// Set the schema
store.SetEntitySchema(ctx, schema)

// Retrieve schema
deviceSchema := store.GetEntitySchema(ctx, "Device")
for _, field := range deviceSchema.GetFields() {
    fmt.Printf("Field: %s, Type: %s\n", 
        field.GetFieldName(), 
        field.GetFieldType())
}
Transaction Management

While the store operations are atomic by default, you can use MultiBinding for transaction-like operations:

// Start a multi-binding session
multi := binding.NewMulti(store)

// Perform multiple operations
deviceEntity := multi.GetEntityById(ctx, deviceId)
deviceEntity.GetField("Status").WriteString(ctx, "OFF")
deviceEntity.GetField("LastSeen").WriteTimestamp(ctx, time.Now())

configEntity := multi.GetEntityById(ctx, configId)
configEntity.GetField("LastUpdate").WriteTimestamp(ctx, time.Now())

// Commit all changes atomically
multi.Commit(ctx)
Database Snapshots

The store supports creating and restoring snapshots of the entire database state:

// Create a snapshot
snapshot := store.CreateSnapshot(ctx)

// Restore from snapshot
store.RestoreSnapshot(ctx, snapshot)

// Access snapshot data
for _, entity := range snapshot.GetEntities() {
    fmt.Printf("Entity: %s (%s)\n", entity.GetName(), entity.GetType())
}

for _, field := range snapshot.GetFields() {
    fmt.Printf("Field: %s = %v\n", field.GetFieldName(), field.GetValue())
}

for _, schema := range snapshot.GetSchemas() {
    fmt.Printf("Schema: %s\n", schema.GetType())
}
Field Validation

The EntityFieldValidator ensures data integrity by validating required fields:

// Create a validator
validator := data.NewEntityFieldValidator(store)

// Register required fields for entity types
validator.RegisterEntityFields("Service",
    "Leader",
    "Candidates",
    "HeartbeatTrigger",
    "ApplicationName",
    "LogLevel",
)

// Validate fields exist in schema
if err := validator.ValidateFields(ctx); err != nil {
    log.Fatal("Schema validation failed:", err)
}
Best Practices for Store Usage
  1. Entity Organization

    • Design your entity hierarchy to reflect your domain model
    • Use meaningful entity types and field names
    • Keep entity relationships logical and maintight
  2. Performance Optimization

    • Use batch operations with MultiBinding when possible
    • Subscribe to specific fields rather than entire entities
    • Include relevant context fields in notifications to avoid additional queries
  3. Error Handling

    • Always check connection status before operations
    • Implement retry logic for transient failures
    • Use proper error handling in notification callbacks
  4. Schema Management

    • Define schemas early in application initialization
    • Validate required fields during startup
    • Consider schema versioning for application updates
  5. Memory Management

    • Unbind notification tokens when no longer needed
    • Clean up entity references in MultiBinding sessions
    • Use snapshots judiciously as they hold entire database state
Naming Conventions

Entity and field names in qlib follow strict naming conventions:

  1. Pascal Case
    • Entity types must use PascalCase (e.g., DeviceController, LightSwitch)
    • Field names must use PascalCase (e.g., CurrentState, LastUpdateTime)
    • This applies to both schema definitions and runtime operations
// Correct naming
schema := schema.New("LightController").
    AddField("PowerState", "string").
    AddField("BrightnessLevel", "int").
    AddField("LastStateChange", "timestamp")

// Incorrect naming - will cause validation errors
schema := schema.New("light_controller").
    AddField("power_state", "string").
    AddField("brightnessLevel", "int")
Field Indirection

qlib supports referencing fields relative to an entity using the -> delimiter. This powerful feature allows you to:

  • Reference fields in parent or child entities
  • Navigate the entity tree
  • Create dynamic field references
// Basic field indirection syntax:
// EntityReference->FieldName

// Examples:
store.Write(ctx,
    request.New().
        SetEntityId(deviceId).
        SetFieldName("Controller->Status").  // References Status field in Controller entity
        SetValue(value.NewString("ON")),
)

// Multiple levels of indirection
store.Notify(
    ctx,
    notification.NewConfig().
        SetEntityType("Device").
        SetFieldName("Zone->Building->Status").  // References Building's Status via Zone
        SetContextFields(
            "Name",
            "Parent->Name",           // Get parent entity's Name
            "Zone->BuildingName",     // Get BuildingName from Zone entity
        ),
    notification.NewCallback(func(ctx context.Context, n data.Notification) {
        deviceName := n.GetContext(0).GetValue().GetString()
        parentName := n.GetContext(1).GetValue().GetString()
        buildingName := n.GetContext(2).GetValue().GetString()
    }),
)

// Using EntityReference fields
schema := schema.New("Device").
    AddField("Controller", "entityref").     // References another entity
    AddField("Zone", "entityref")

// Query using indirection
devices := query.New(store).
    Select(
        "Name",
        "Controller->Status",         // Status from referenced Controller
        "Zone->Building->Address",    // Address from Building referenced by Zone
    ).
    From("Device").
    Where("Controller->Type").Equals("MainController").
    Execute(ctx)

Common Use Cases for Indirection:

  1. Hierarchical Data Access

    // Access parent data
    "Parent->Name"
    "Parent->Parent->Type"
    
    // Access child data
    "MainController->Status"
    "SubDevices->Count"
    
  2. Cross-Entity Relationships

    // Reference through entity reference fields
    "Zone->Controller->Status"
    "Building->MainPower->State"
    
  3. Dynamic Configuration

    // Use indirection in configuration entities
    "ConfigSource->Settings->UpdateInterval"
    "Template->DefaultValues->State"
    
  4. Notification Context

    notification.NewConfig().
        SetEntityType("Device").
        SetFieldName("Status").
        SetContextFields(
            "Name",
            "Parent->Location",
            "Zone->Building->Name",
            "Controller->Type",
        )
    

Best Practices for Indirection:

  1. Depth Management

    • Keep indirection chains reasonably short
    • Consider creating direct references for frequently used deep paths
    • Document complex indirection paths
  2. Performance

    • Cache frequently accessed indirect values when possible
    • Use MultiBinding for batch operations with indirection
    • Include necessary context fields in notifications to avoid additional lookups
  3. Error Handling

    • Validate entity references exist before using indirection
    • Handle missing intermediate entities gracefully
    • Log invalid indirection paths for debugging
Field Value Validation

qlib enforces strict type validation for field values. Each field type has specific validation rules:

  1. Int

    // Valid int assignments
    field.WriteInt(ctx, 42)
    field.WriteInt(ctx, int64(1000))
    
    // Invalid - will cause validation error
    field.WriteInt(ctx, "42")    // string not allowed
    field.WriteInt(ctx, 3.14)    // float not allowed
    
  2. Float

    // Valid float assignments
    field.WriteFloat(ctx, 3.14)
    field.WriteFloat(ctx, float64(1.0))
    
    // Integer values are automatically converted
    field.WriteFloat(ctx, 42)    // becomes 42.0
    
  3. String

    // Valid string assignments
    field.WriteString(ctx, "hello")
    
    // Non-string values must be explicitly converted
    field.WriteString(ctx, fmt.Sprintf("%d", 42))
    
  4. Bool

    // Valid boolean assignments
    field.WriteBool(ctx, true)
    field.WriteBool(ctx, false)
    
    // Invalid - will cause validation error
    field.WriteBool(ctx, "true")   // string not allowed
    field.WriteBool(ctx, 1)        // int not allowed
    
  5. Timestamp

    // Valid timestamp assignments
    field.WriteTimestamp(ctx, time.Now())
    field.WriteTimestamp(ctx, time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC))
    
  6. EntityReference

    // Valid entity reference assignments
    field.WriteEntityReference(ctx, "DeviceId123")
    
    // Best practice: validate entity exists
    if store.EntityExists(ctx, "DeviceId123") {
        field.WriteEntityReference(ctx, "DeviceId123")
    }
    
  7. BinaryFile

    // Valid binary file assignments
    content := []byte{...}
    encoded := data.FileEncode(content)
    field.WriteBinaryFile(ctx, encoded)
    
    // Reading binary files
    encoded := field.ReadBinaryFile(ctx)
    content := data.FileDecode(encoded)
    
  8. Transformation

    // Transformation fields contain scripts/expressions
    field.WriteTransformation(ctx, "value * 2")
    field.WriteTransformation(ctx, "Parent->Value + 10")
    
Working with Choice Fields

Choice fields represent a selected option from a predefined list of choices:

// Define schema with choice field
schema := schema.New("Device").
    AddField("Status", "string").
    AddField("OperatingMode", "choice")

// Set the available options in the schema
schema.SetChoiceOptions(ctx, store, "Device", "OperatingMode", 
    []string{"Normal", "Eco", "Boost", "Away"})

// Write a choice value - just the selected index
device.GetField("OperatingMode").WriteChoice(ctx, 0)  // Select "Normal" option

// Read choice value with rich interface
choice := device.GetField("OperatingMode").ReadChoice(ctx)
selectedIndex := choice.GetSelectedIndex()
selectedMode := choice.GetSelectedValue()

// Get options from schema through the binding
allModes := device.GetField("OperatingMode").GetChoiceOptions(ctx)

// Update selection
device.GetField("OperatingMode").SelectChoiceByValue(ctx, "Eco")
Working with EntityList Fields

EntityList fields store collections of entity references:

// Define schema with entitylist field
schema := schema.New("User").
    AddField("Name", "string").
    AddField("Devices", "entitylist")

// Write an entity list
user.GetField("Devices").WriteEntityList(ctx, 
    []string{"device-1", "device-2", "device-3"}
)

// Read entity list with rich interface
deviceList := user.GetField("Devices").ReadEntityList(ctx)
allDevices := deviceList.GetEntities()
deviceCount := deviceList.Count()

// Use convenience methods to modify list
user.GetField("Devices").AddEntityToList(ctx, "device-4")
user.GetField("Devices").RemoveEntityFromList(ctx, "device-2")
hasDevice := user.GetField("Devices").EntityListContains(ctx, "device-1")

Best Practices for Field Values:

  1. Type Safety

    // Use type-specific methods
    if field.IsInt() {
        value := field.ReadInt(ctx)
    } else if field.IsString() {
        value := field.ReadString(ctx)
    }
    
  2. Null Values

    // Check for nil values
    if !field.GetValue().IsNil() {
        value := field.ReadString(ctx)
    }
    
  3. Value Conversion

    // Convert values explicitly when needed
    intValue := int64(field.ReadFloat(ctx))
    strValue := fmt.Sprintf("%d", field.ReadInt(ctx))
    
  4. Batch Updates

    // Use MultiBinding for multiple value updates
    multi := binding.NewMulti(store)
    entity := multi.GetEntityById(ctx, entityId)
    
    entity.GetField("IntValue").WriteInt(ctx, 42)
    entity.GetField("FloatValue").WriteFloat(ctx, 3.14)
    entity.GetField("StringValue").WriteString(ctx, "hello")
    
    multi.Commit(ctx)
    
  5. Error Handling

    // Handle type conversion errors
    field := entity.GetField("Value")
    if !field.IsFloat() {
        log.Error("Expected float field")
        return
    }
    value := field.ReadFloat(ctx)
    
  6. Value Validation in Schema

    // Define field types in schema
    schema := schema.New("Device").
        AddField("Status", "string").
        AddField("Temperature", "float").
        AddField("IsActive", "bool").
        AddField("LastUpdate", "timestamp").
        AddField("Controller", "entityref")
    

Application Readiness

qlib provides built-in support for application readiness checks through the Readiness worker. This ensures your application only becomes active when all required dependencies and conditions are met.

Readiness Worker

The Readiness worker manages the application's ready state:

// Create readiness worker
readinessWorker := workers.NewReadiness()

// Add readiness criteria
readinessWorker.AddCriteria(workers.NewStoreConnectedCriteria(store))
readinessWorker.AddCriteria(workers.NewSchemaValidityCriteria(store))

// Add custom criteria
readinessWorker.AddCriteria(workers.ReadinessCriteriaFunc(func() bool {
    return myCondition // e.g., database is connected
}))

// Handle readiness changes
readinessWorker.BecameReady.Connect(func() {
    // Application is now ready
})

readinessWorker.BecameUnready.Connect(func() {
    // Application is no longer ready
})

// Add to application
app.AddWorker(readinessWorker)
Best Practices for Readiness
  1. Critical Dependencies

    • Add criteria for all critical services
    • Monitor connection states
    • Validate required configurations
  2. Schema Validation

    • Use SchemaValidityCriteria to ensure schema integrity
    • Validate required fields exist
    • Check field types match expectations
  3. Custom Conditions

    • Implement custom ReadinessCriteria for specific needs
    • Keep criteria checks lightweight
    • Avoid blocking operations
  4. Monitoring

    • Track readiness state changes
    • Log criteria failures
    • Implement health checks based on readiness

Example readiness-aware worker:

type MyWorker struct {
    isReady bool
}

func (w *MyWorker) Init(ctx context.Context, h app.Handle) {
    readiness.BecameReady.Connect(func() {
        w.isReady = true
        // Start operations
    })
    
    readiness.BecameUnready.Connect(func() {
        w.isReady = false
        // Stop operations
    })
}

func (w *MyWorker) DoWork(ctx context.Context) {
    if (!w.isReady) {
        return // Only work when ready
    }
    // Perform operations
}

Web Communication

The library uses Protocol Buffers for structured web communication over WebSocket connections. Messages follow a standard format:

message ApiMessage {
    ApiHeader header = 1;
    google.protobuf.Any payload = 2;
}

message ApiHeader {
    string id = 1;
    google.protobuf.Timestamp timestamp = 2;
    AuthenticationStatusEnum authenticationStatus = 3;
}
Working with WebMessages
  1. Sending Messages
// Create and send a message to a specific client
client.Write(&protobufs.ApiMessage{
    Header: &protobufs.ApiHeader{
        Id: uuid.New().String(),
        Timestamp: timestamppb.Now(),
    },
    Payload: myPayload, // any proto message
})
  1. Handling Messages
// Set up a message handler
client.SetMessageHandler(func(c web.Client, m web.Message) {
    switch payload := m.GetPayload().(type) {
    case *protobufs.ApiConfigCreateEntityRequest:
        // Handle entity creation request
    case *protobufs.ApiRuntimeDatabaseRequest:
        // Handle database operation request
    }
})
Common Message Types
  • Database Operations:

    • ApiRuntimeDatabaseRequest: Read/write field values
    • ApiConfigCreateEntityRequest: Create new entities
    • ApiConfigDeleteEntityRequest: Delete entities
    • ApiConfigGetEntityRequest: Retrieve entity details
  • Schema Operations:

    • ApiConfigGetEntitySchemaRequest: Get entity type schema
    • ApiConfigSetEntitySchemaRequest: Update entity schema
    • ApiConfigGetEntityTypesRequest: List available entity types
  • Notifications:

    • ApiRuntimeRegisterNotificationRequest: Subscribe to changes
    • ApiRuntimeGetNotificationsRequest: Get pending notifications
    • ApiRuntimeUnregisterNotificationRequest: Remove subscriptions

Example database operation:

// Send a database read request
client.Write(&protobufs.ApiMessage{
    Header: &protobufs.ApiHeader{
        Id: uuid.New().String(),
        Timestamp: timestamppb.Now(),
    },
    Payload: &anypb.Any{
        TypeUrl: "ApiRuntimeDatabaseRequest",
        Value: &protobufs.ApiRuntimeDatabaseRequest{
            RequestType: protobufs.ApiRuntimeDatabaseRequest_READ,
            Requests: []*protobufs.DatabaseRequest{
                {
                    Id: entityId,
                    Field: "Status",
                },
            },
        },
    },
})

Worker Communication Patterns

Workers in qlib can communicate with each other through the Store using several patterns:

Direct Field Communication

Workers can communicate by writing to and monitoring specific fields:

// Worker A: Sender
type SenderWorker struct {
    store data.Store
}

func (w *SenderWorker) DoWork(ctx context.Context) {
    w.store.Write(ctx,
        request.New().
            SetEntityId("command-entity").
            SetFieldName("Command").
            SetValue(value.NewString("start_operation")),
    )
}

// Worker B: Receiver
type ReceiverWorker struct {
    store data.Store
}

func (w *ReceiverWorker) Init(ctx context.Context, h app.Handle) {
    w.store.Notify(
        ctx,
        notification.NewConfig().
            SetEntityType("command-entity").
            SetFieldName("Command"),
        notification.NewCallback(w.onCommand),
    )
}

func (w *ReceiverWorker) onCommand(ctx context.Context, n data.Notification) {
    command := n.GetCurrent().GetValue().GetString()
    if command == "start_operation" {
        // Handle command
    }
}
Request-Response Pattern

For request-response style communication:

// Request entity schema
schema := schema.New("Request").
    AddField("Status", "string").     // "pending", "processing", "completed"
    AddField("Data", "string").       // Request data
    AddField("Response", "string").   // Response data
    AddField("Error", "string")       // Error message if any

// Worker A: Client
func (w *ClientWorker) makeRequest(ctx context.Context, data string) {
    // Create request entity
    w.store.CreateEntity(ctx, "Request", parentId, "request-1")
    
    // Write request data
    w.store.Write(ctx,
        request.New().
            SetEntityId("request-1").
            SetFieldName("Data").
            SetValue(value.NewString(data)),
        request.New().
            SetEntityId("request-1").
            SetFieldName("Status").
            SetValue(value.NewString("pending")),
    )
    
    // Monitor for response
    w.store.Notify(
        ctx,
        notification.NewConfig().
            SetEntityId("request-1").
            SetFieldName("Status").
            SetContextFields("Response", "Error"),
        notification.NewCallback(w.onResponse),
    )
}

// Worker B: Server
func (w *ServerWorker) Init(ctx context.Context, h app.Handle) {
    w.store.Notify(
        ctx,
        notification.NewConfig().
            SetEntityType("Request").
            SetFieldName("Status"),
        notification.NewCallback(w.onRequest),
    )
}

func (w *ServerWorker) onRequest(ctx context.Context, n data.Notification) {
    status := n.GetCurrent().GetValue().GetString()
    if status != "pending" {
        return
    }
    
    requestId := n.GetCurrent().GetEntityId()
    requestEntity := w.store.GetEntity(ctx, requestId)
    
    // Process request and write response
    w.store.Write(ctx,
        request.New().
            SetEntityId(requestId).
            SetFieldName("Response").
            SetValue(value.NewString("result")),
        request.New().
            SetEntityId(requestId).
            SetFieldName("Status").
            SetValue(value.NewString("completed")),
    )
}
Pub/Sub Pattern

For broadcast-style communication:

// Publisher worker publishes events
func (w *PublisherWorker) publishEvent(ctx context.Context, eventType, data string) {
    w.store.Write(ctx,
        request.New().
            SetEntityId("events").
            SetFieldName(eventType).
            SetValue(value.NewString(data)),
    )
}

// Subscriber worker listens for events
func (w *SubscriberWorker) Init(ctx context.Context, h app.Handle) {
    w.store.Notify(
        ctx,
        notification.NewConfig().
            SetEntityId("events").
            SetFieldName("user_login"),  // Subscribe to specific event type
        notification.NewCallback(w.onUserLogin),
    )
}

func (w *SubscriberWorker) onUserLogin(ctx context.Context, n data.Notification) {
    userData := n.GetCurrent().GetValue().GetString()
    // Handle user login event
}
Best Practices for Worker Communication
  1. Entity Design

    • Create dedicated entities for inter-worker communication
    • Use clear field names for commands and events
    • Include timestamps for tracking message age
  2. Error Handling

    • Include error fields in request-response patterns
    • Implement timeouts for pending requests
    • Handle missing or invalid data gracefully
  3. Performance

    • Use appropriate notification patterns (change-only vs. all updates)
    • Clean up completed request entities
    • Batch related field updates using MultiBinding
  4. Testing

    • Test communication patterns in isolation
    • Verify proper cleanup of temporary entities
    • Simulate network and timing issues

Getting Started

  1. Create a new application
  2. Implement workers for your business logic
  3. Set up database schema and entity structure
  4. Add notification handlers
  5. Run the application

Example:

func main() {
    app := app.NewApplication("myapp")
    
    // Add workers
    app.AddWorker(NewMyWorker())
    
    // Run the application
    app.Execute()
}

Best Practices

  1. Keep workers focused on single responsibilities
  2. Use the main thread for coordination
  3. Leverage notifications for event-driven architecture
  4. Structure your database hierarchy logically
  5. Use leadership election for high availability

Environment Variables

  • Q_IN_DOCKER: Set when running in Docker container
  • Q_LOG_LEVEL: Application log level
  • Q_LIB_LOG_LEVEL: Library log level

Application Lifecycle Management

Applications in qlib follow a strict initialization and shutdown sequence to ensure proper resource management and graceful termination.

Initialization Sequence
func main() {
    // Create application with unique name
    app := app.NewApplication("myapp")

    // Configure workers before Execute()
    storeWorker := workers.NewStore(myStore)
    webWorker := workers.NewWeb(":8080")
    
    // Order matters - add core workers first
    app.AddWorker(storeWorker)
    app.AddWorker(leadershipWorker)
    app.AddWorker(webWorker)
    app.AddWorker(myBusinessWorker)

    // Execute handles init/deinit
    app.Execute()
}
Worker Initialization

Workers should properly initialize their resources:

type MyWorker struct {
    store data.Store
    subscriptions []data.NotificationToken
}

func (w *MyWorker) Init(ctx context.Context, h app.Handle) {
    // Set up notification subscriptions
    w.subscriptions = append(w.subscriptions,
        w.store.Notify(ctx,
            notification.NewConfig().
                SetEntityType("Device").
                SetFieldName("Status"),
            notification.NewCallback(w.onStatusChange),
        ),
    )

    // Initialize resources with timeout
    timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()
    
    if err := w.initializeResource(timeoutCtx); err != nil {
        log.Panic("Failed to initialize: %v", err)
    }
}
Graceful Shutdown

Workers must clean up resources on shutdown:

func (w *MyWorker) Deinit(ctx context.Context) {
    // Unsubscribe from notifications
    for _, sub := range w.subscriptions {
        sub.Unbind(ctx)
    }

    // Close connections
    if w.conn != nil {
        w.conn.Close()
    }

    // Wait for pending operations
    w.wg.Wait()
}
Error Handling

Best practices for handling errors in workers:

func (w *MyWorker) DoWork(ctx context.Context) {
    // Check context cancellation
    if ctx.Err() != nil {
        return
    }

    // Handle recoverable errors
    if err := w.doSomething(ctx); err != nil {
        log.Error("Operation failed: %v", err)
        // Update status to reflect error
        w.store.Write(ctx,
            request.New().
                SetEntityId(w.entityId).
                SetFieldName("Error").
                SetValue(value.NewString(err.Error())),
        )
        return
    }

    // Handle fatal errors
    if err := w.criticalOperation(ctx); err != nil {
        log.Panic("Critical error: %v", err)
        // Application will shut down
    }
}
Startup Dependencies

Managing worker dependencies:

type MyWorker struct {
    store data.Store
    isStoreConnected bool
}

func (w *MyWorker) Init(ctx context.Context, h app.Handle) {
    // Subscribe to store connection status
    w.store.Connected().Connect(func(ctx context.Context) {
        w.isStoreConnected = true
    })
    
    w.store.Disconnected().Connect(func(ctx context.Context) {
        w.isStoreConnected = false
    })
}

func (w *MyWorker) DoWork(ctx context.Context) {
    // Wait for dependencies
    if !w.isStoreConnected {
        return
    }
    
    // Proceed with work
}
Signal Handling

The application handles system signals:

func main() {
    app := app.NewApplication("myapp")
    
    // App.Execute() handles:
    // - SIGINT (Ctrl+C)
    // - SIGTERM
    // 
    // Workers get ctx.Done() on signal
    app.Execute()
}

// In your worker
func (w *MyWorker) DoWork(ctx context.Context) {
    select {
    case <-ctx.Done():
        // Clean up and return
        return
    default:
        // Do work
    }
}
Environment Configuration

Environment variables that affect application behavior:

# Application identification
APP_NAME=myapp                  # Override application name
Q_IN_DOCKER=true               # Running in container

# Store configuration
Q_LEADER_STORE_ADDR=redis:6379 # Leadership store address
Q_LEADER_STORE_PASSWORD=secret # Leadership store password

# Runtime behavior
Q_LOG_LEVEL=INFO              # Application log level
Q_LIB_LOG_LEVEL=WARN         # Library log level

Best Practices:

  1. Worker State

    • Initialize all state in Init()
    • Clean up all resources in Deinit()
    • Use context cancellation for termination
  2. Resource Management

    • Track and clean up subscriptions
    • Close connections and channels
    • Wait for goroutines to finish
  3. Error Handling

    • Use log.Error for recoverable errors
    • Use log.Panic for fatal errors
    • Update entity status on errors
  4. Dependencies

    • Check dependency status before operations
    • Handle dependency failures gracefully
    • Implement proper retry logic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL