README
¶
qlib
A Go library for building event-driven, worker-based applications with a hierarchical object database.
Overview
qlib provides a framework for building applications with the following key features:
- Single-threaded application model with worker-based architecture
- Hierarchical object database with real-time notifications
- Built-in leadership election and high availability support
- Event-driven communication between workers
- WebSocket-based communication using Protocol Buffers
Core Concepts
Applications
Applications in qlib are built around a main thread and a set of workers. Create a new application:
app := app.NewApplication("myapp")
Workers
Workers are logical units that handle specific functionality. Each worker:
- Must implement the
Worker
interface - Is initialized with the application context
- Runs in the main thread via
DoWork()
- Can communicate with other workers through the database or signals
Example worker:
type MyWorker struct {
store data.Store
}
func (w *MyWorker) Init(ctx context.Context, h app.Handle) {
// Initialize worker
}
func (w *MyWorker) DoWork(ctx context.Context) {
// Handle work in main thread
}
func (w *MyWorker) Deinit(ctx context.Context) {
// Cleanup
}
Add workers to your application:
app.AddWorker(myWorker)
Database Structure
The database follows a tree-like structure where:
- Each node is an "Entity" with a type and unique ID
- Entities can have fields (properties)
- Entities can have parent-child relationships
- Changes to entity fields trigger notifications
Example structure:
Root
├── Service
│ ├── Name: "myapp"
│ ├── Status: "running"
│ └── Config
│ ├── Setting1: "value1"
│ └── Setting2: "value2"
└── AnotherEntity
└── Field1: "data"
Field Notifications
Workers can subscribe to field changes:
store.Notify(
ctx,
notification.NewConfig().
SetEntityType("Service").
SetFieldName("Status"),
notification.NewCallback(func(ctx context.Context, n data.Notification) {
// Handle field change
}),
)
Database (Store) Interface
The Store interface is the core component for data persistence and real-time updates. It provides a rich API for managing entities, fields, and notifications.
Store Configuration
The store supports multiple backend configurations that can be mixed and matched:
// Create store with web backend
store := store.New(
store.CommunicateOverWeb("localhost:8080"),
)
// Create store with Redis backend
store := store.New(
store.CommunicateOverRedis("redis:6379", "password"),
)
// Create store with Postgres backend
store := store.New(
store.CommunicateOverPostgres("postgres://user:pass@localhost:5432/db"),
)
// Create store with NATS backend
store := store.New(
store.CommunicateOverNats("nats://localhost:4222"),
)
// Mix multiple backends
store := store.New(
store.PersistOverPostgres("postgres://user:pass@localhost:5432/db"),
store.NotifyOverNats("nats://localhost:4222"),
)
Each backend configuration function sets up the appropriate components for:
- Entity management
- Field operations
- Schema management
- Notifications
- Snapshots
Basic Store Operations
type Store interface {
// Connection management
Connect(context.Context)
Disconnect(context.Context)
IsConnected(context.Context) bool
// Entity operations
CreateEntity(ctx context.Context, entityType, parentId, name string)
GetEntity(ctx context.Context, entityId string) Entity
DeleteEntity(ctx context.Context, entityId string)
FindEntities(ctx context.Context, entityType string) []string
// Field operations
Read(context.Context, ...Request)
Write(context.Context, ...Request)
// Schema operations
GetEntitySchema(ctx context.Context, entityType string) EntitySchema
SetEntitySchema(context.Context, EntitySchema)
// Notification system
Notify(ctx context.Context, config NotificationConfig, callback NotificationCallback) NotificationToken
Unnotify(ctx context.Context, subscriptionId string)
}
Entity Management
Entities are the primary data containers. Each entity:
- Has a unique ID
- Has a type (defines its schema)
- Can have a parent entity
- Can have child entities
- Contains fields (properties)
// Creating an entity
store.CreateEntity(ctx, "Device", parentId, "LivingRoomLight")
// Reading an entity
entity := store.GetEntity(ctx, entityId)
fmt.Printf("Name: %s, Type: %s\n", entity.GetName(), entity.GetType())
// Finding entities by type
deviceIds := store.FindEntities(ctx, "Device")
Field Operations
Fields store the actual data values. The store supports various field types:
- Int
- Float
- String
- Bool
- BinaryFile
- EntityReference (links to other entities)
- Timestamp
- Transformation
- Choice (selection from predefined options)
- EntityList (collection of entity references)
Reading and writing fields:
// Writing fields
store.Write(ctx,
request.New().
SetEntityId(deviceId).
SetFieldName("Status").
SetValue(value.NewString("ON")),
)
// Reading fields
store.Read(ctx,
request.New().
SetEntityId(deviceId).
SetFieldName("Status"),
)
Using Bindings
Bindings provide a more object-oriented way to interact with entities and fields:
// Get a multi-binding for batch operations
multi := binding.NewMulti(store)
// Get an entity binding
device := multi.GetEntityById(ctx, deviceId)
// Read field values
status := device.GetField("Status").ReadString(ctx)
// Write field values
device.GetField("Status").WriteString(ctx, "OFF")
// Commit changes
multi.Commit(ctx)
Database Queries
The Query interface provides SQL-like operations for finding entities:
// Find all active devices
devices := query.New(store).
Select("Status", "Name").
From("Device").
Where("Status").Equals("ON").
Execute(ctx)
// Process results
for _, device := range devices {
name := device.GetField("Name").GetString()
status := device.GetField("Status").GetString()
fmt.Printf("Device %s is %s\n", name, status)
}
Real-time Notifications
The notification system allows workers to react to field changes:
// Subscribe to field changes
token := store.Notify(
ctx,
notification.NewConfig().
SetEntityType("Device").
SetFieldName("Status").
SetContextFields("Name", "Location"),
notification.NewCallback(func(ctx context.Context, n data.Notification) {
// Access changed value
newValue := n.GetCurrent().GetValue().GetString()
oldValue := n.GetPrevious().GetValue().GetString()
// Access context fields
name := n.GetContext(0).GetValue().GetString()
location := n.GetContext(1).GetValue().GetString()
fmt.Printf("Device %s in %s changed from %s to %s\n",
name, location, oldValue, newValue)
}),
)
// Later, unsubscribe
token.Unbind(ctx)
Database Schema
Schemas define the structure of entities and their fields:
// Define a schema
schema := schema.New("Device").
AddField("Status", "string").
AddField("Name", "string").
AddField("Location", "string").
AddField("LastSeen", "timestamp")
// Set the schema
store.SetEntitySchema(ctx, schema)
// Retrieve schema
deviceSchema := store.GetEntitySchema(ctx, "Device")
for _, field := range deviceSchema.GetFields() {
fmt.Printf("Field: %s, Type: %s\n",
field.GetFieldName(),
field.GetFieldType())
}
Transaction Management
While the store operations are atomic by default, you can use MultiBinding for transaction-like operations:
// Start a multi-binding session
multi := binding.NewMulti(store)
// Perform multiple operations
deviceEntity := multi.GetEntityById(ctx, deviceId)
deviceEntity.GetField("Status").WriteString(ctx, "OFF")
deviceEntity.GetField("LastSeen").WriteTimestamp(ctx, time.Now())
configEntity := multi.GetEntityById(ctx, configId)
configEntity.GetField("LastUpdate").WriteTimestamp(ctx, time.Now())
// Commit all changes atomically
multi.Commit(ctx)
Database Snapshots
The store supports creating and restoring snapshots of the entire database state:
// Create a snapshot
snapshot := store.CreateSnapshot(ctx)
// Restore from snapshot
store.RestoreSnapshot(ctx, snapshot)
// Access snapshot data
for _, entity := range snapshot.GetEntities() {
fmt.Printf("Entity: %s (%s)\n", entity.GetName(), entity.GetType())
}
for _, field := range snapshot.GetFields() {
fmt.Printf("Field: %s = %v\n", field.GetFieldName(), field.GetValue())
}
for _, schema := range snapshot.GetSchemas() {
fmt.Printf("Schema: %s\n", schema.GetType())
}
Field Validation
The EntityFieldValidator ensures data integrity by validating required fields:
// Create a validator
validator := data.NewEntityFieldValidator(store)
// Register required fields for entity types
validator.RegisterEntityFields("Service",
"Leader",
"Candidates",
"HeartbeatTrigger",
"ApplicationName",
"LogLevel",
)
// Validate fields exist in schema
if err := validator.ValidateFields(ctx); err != nil {
log.Fatal("Schema validation failed:", err)
}
Best Practices for Store Usage
-
Entity Organization
- Design your entity hierarchy to reflect your domain model
- Use meaningful entity types and field names
- Keep entity relationships logical and maintight
-
Performance Optimization
- Use batch operations with MultiBinding when possible
- Subscribe to specific fields rather than entire entities
- Include relevant context fields in notifications to avoid additional queries
-
Error Handling
- Always check connection status before operations
- Implement retry logic for transient failures
- Use proper error handling in notification callbacks
-
Schema Management
- Define schemas early in application initialization
- Validate required fields during startup
- Consider schema versioning for application updates
-
Memory Management
- Unbind notification tokens when no longer needed
- Clean up entity references in MultiBinding sessions
- Use snapshots judiciously as they hold entire database state
Naming Conventions
Entity and field names in qlib follow strict naming conventions:
- Pascal Case
- Entity types must use PascalCase (e.g.,
DeviceController
,LightSwitch
) - Field names must use PascalCase (e.g.,
CurrentState
,LastUpdateTime
) - This applies to both schema definitions and runtime operations
- Entity types must use PascalCase (e.g.,
// Correct naming
schema := schema.New("LightController").
AddField("PowerState", "string").
AddField("BrightnessLevel", "int").
AddField("LastStateChange", "timestamp")
// Incorrect naming - will cause validation errors
schema := schema.New("light_controller").
AddField("power_state", "string").
AddField("brightnessLevel", "int")
Field Indirection
qlib supports referencing fields relative to an entity using the ->
delimiter. This powerful feature allows you to:
- Reference fields in parent or child entities
- Navigate the entity tree
- Create dynamic field references
// Basic field indirection syntax:
// EntityReference->FieldName
// Examples:
store.Write(ctx,
request.New().
SetEntityId(deviceId).
SetFieldName("Controller->Status"). // References Status field in Controller entity
SetValue(value.NewString("ON")),
)
// Multiple levels of indirection
store.Notify(
ctx,
notification.NewConfig().
SetEntityType("Device").
SetFieldName("Zone->Building->Status"). // References Building's Status via Zone
SetContextFields(
"Name",
"Parent->Name", // Get parent entity's Name
"Zone->BuildingName", // Get BuildingName from Zone entity
),
notification.NewCallback(func(ctx context.Context, n data.Notification) {
deviceName := n.GetContext(0).GetValue().GetString()
parentName := n.GetContext(1).GetValue().GetString()
buildingName := n.GetContext(2).GetValue().GetString()
}),
)
// Using EntityReference fields
schema := schema.New("Device").
AddField("Controller", "entityref"). // References another entity
AddField("Zone", "entityref")
// Query using indirection
devices := query.New(store).
Select(
"Name",
"Controller->Status", // Status from referenced Controller
"Zone->Building->Address", // Address from Building referenced by Zone
).
From("Device").
Where("Controller->Type").Equals("MainController").
Execute(ctx)
Common Use Cases for Indirection:
-
Hierarchical Data Access
// Access parent data "Parent->Name" "Parent->Parent->Type" // Access child data "MainController->Status" "SubDevices->Count"
-
Cross-Entity Relationships
// Reference through entity reference fields "Zone->Controller->Status" "Building->MainPower->State"
-
Dynamic Configuration
// Use indirection in configuration entities "ConfigSource->Settings->UpdateInterval" "Template->DefaultValues->State"
-
Notification Context
notification.NewConfig(). SetEntityType("Device"). SetFieldName("Status"). SetContextFields( "Name", "Parent->Location", "Zone->Building->Name", "Controller->Type", )
Best Practices for Indirection:
-
Depth Management
- Keep indirection chains reasonably short
- Consider creating direct references for frequently used deep paths
- Document complex indirection paths
-
Performance
- Cache frequently accessed indirect values when possible
- Use MultiBinding for batch operations with indirection
- Include necessary context fields in notifications to avoid additional lookups
-
Error Handling
- Validate entity references exist before using indirection
- Handle missing intermediate entities gracefully
- Log invalid indirection paths for debugging
Field Value Validation
qlib enforces strict type validation for field values. Each field type has specific validation rules:
-
Int
// Valid int assignments field.WriteInt(ctx, 42) field.WriteInt(ctx, int64(1000)) // Invalid - will cause validation error field.WriteInt(ctx, "42") // string not allowed field.WriteInt(ctx, 3.14) // float not allowed
-
Float
// Valid float assignments field.WriteFloat(ctx, 3.14) field.WriteFloat(ctx, float64(1.0)) // Integer values are automatically converted field.WriteFloat(ctx, 42) // becomes 42.0
-
String
// Valid string assignments field.WriteString(ctx, "hello") // Non-string values must be explicitly converted field.WriteString(ctx, fmt.Sprintf("%d", 42))
-
Bool
// Valid boolean assignments field.WriteBool(ctx, true) field.WriteBool(ctx, false) // Invalid - will cause validation error field.WriteBool(ctx, "true") // string not allowed field.WriteBool(ctx, 1) // int not allowed
-
Timestamp
// Valid timestamp assignments field.WriteTimestamp(ctx, time.Now()) field.WriteTimestamp(ctx, time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC))
-
EntityReference
// Valid entity reference assignments field.WriteEntityReference(ctx, "DeviceId123") // Best practice: validate entity exists if store.EntityExists(ctx, "DeviceId123") { field.WriteEntityReference(ctx, "DeviceId123") }
-
BinaryFile
// Valid binary file assignments content := []byte{...} encoded := data.FileEncode(content) field.WriteBinaryFile(ctx, encoded) // Reading binary files encoded := field.ReadBinaryFile(ctx) content := data.FileDecode(encoded)
-
Transformation
// Transformation fields contain scripts/expressions field.WriteTransformation(ctx, "value * 2") field.WriteTransformation(ctx, "Parent->Value + 10")
Working with Choice Fields
Choice fields represent a selected option from a predefined list of choices:
// Define schema with choice field
schema := schema.New("Device").
AddField("Status", "string").
AddField("OperatingMode", "choice")
// Set the available options in the schema
schema.SetChoiceOptions(ctx, store, "Device", "OperatingMode",
[]string{"Normal", "Eco", "Boost", "Away"})
// Write a choice value - just the selected index
device.GetField("OperatingMode").WriteChoice(ctx, 0) // Select "Normal" option
// Read choice value with rich interface
choice := device.GetField("OperatingMode").ReadChoice(ctx)
selectedIndex := choice.GetSelectedIndex()
selectedMode := choice.GetSelectedValue()
// Get options from schema through the binding
allModes := device.GetField("OperatingMode").GetChoiceOptions(ctx)
// Update selection
device.GetField("OperatingMode").SelectChoiceByValue(ctx, "Eco")
Working with EntityList Fields
EntityList fields store collections of entity references:
// Define schema with entitylist field
schema := schema.New("User").
AddField("Name", "string").
AddField("Devices", "entitylist")
// Write an entity list
user.GetField("Devices").WriteEntityList(ctx,
[]string{"device-1", "device-2", "device-3"}
)
// Read entity list with rich interface
deviceList := user.GetField("Devices").ReadEntityList(ctx)
allDevices := deviceList.GetEntities()
deviceCount := deviceList.Count()
// Use convenience methods to modify list
user.GetField("Devices").AddEntityToList(ctx, "device-4")
user.GetField("Devices").RemoveEntityFromList(ctx, "device-2")
hasDevice := user.GetField("Devices").EntityListContains(ctx, "device-1")
Best Practices for Field Values:
-
Type Safety
// Use type-specific methods if field.IsInt() { value := field.ReadInt(ctx) } else if field.IsString() { value := field.ReadString(ctx) }
-
Null Values
// Check for nil values if !field.GetValue().IsNil() { value := field.ReadString(ctx) }
-
Value Conversion
// Convert values explicitly when needed intValue := int64(field.ReadFloat(ctx)) strValue := fmt.Sprintf("%d", field.ReadInt(ctx))
-
Batch Updates
// Use MultiBinding for multiple value updates multi := binding.NewMulti(store) entity := multi.GetEntityById(ctx, entityId) entity.GetField("IntValue").WriteInt(ctx, 42) entity.GetField("FloatValue").WriteFloat(ctx, 3.14) entity.GetField("StringValue").WriteString(ctx, "hello") multi.Commit(ctx)
-
Error Handling
// Handle type conversion errors field := entity.GetField("Value") if !field.IsFloat() { log.Error("Expected float field") return } value := field.ReadFloat(ctx)
-
Value Validation in Schema
// Define field types in schema schema := schema.New("Device"). AddField("Status", "string"). AddField("Temperature", "float"). AddField("IsActive", "bool"). AddField("LastUpdate", "timestamp"). AddField("Controller", "entityref")
Application Readiness
qlib provides built-in support for application readiness checks through the Readiness worker. This ensures your application only becomes active when all required dependencies and conditions are met.
Readiness Worker
The Readiness worker manages the application's ready state:
// Create readiness worker
readinessWorker := workers.NewReadiness()
// Add readiness criteria
readinessWorker.AddCriteria(workers.NewStoreConnectedCriteria(store))
readinessWorker.AddCriteria(workers.NewSchemaValidityCriteria(store))
// Add custom criteria
readinessWorker.AddCriteria(workers.ReadinessCriteriaFunc(func() bool {
return myCondition // e.g., database is connected
}))
// Handle readiness changes
readinessWorker.BecameReady.Connect(func() {
// Application is now ready
})
readinessWorker.BecameUnready.Connect(func() {
// Application is no longer ready
})
// Add to application
app.AddWorker(readinessWorker)
Best Practices for Readiness
-
Critical Dependencies
- Add criteria for all critical services
- Monitor connection states
- Validate required configurations
-
Schema Validation
- Use SchemaValidityCriteria to ensure schema integrity
- Validate required fields exist
- Check field types match expectations
-
Custom Conditions
- Implement custom ReadinessCriteria for specific needs
- Keep criteria checks lightweight
- Avoid blocking operations
-
Monitoring
- Track readiness state changes
- Log criteria failures
- Implement health checks based on readiness
Example readiness-aware worker:
type MyWorker struct {
isReady bool
}
func (w *MyWorker) Init(ctx context.Context, h app.Handle) {
readiness.BecameReady.Connect(func() {
w.isReady = true
// Start operations
})
readiness.BecameUnready.Connect(func() {
w.isReady = false
// Stop operations
})
}
func (w *MyWorker) DoWork(ctx context.Context) {
if (!w.isReady) {
return // Only work when ready
}
// Perform operations
}
Web Communication
The library uses Protocol Buffers for structured web communication over WebSocket connections. Messages follow a standard format:
message ApiMessage {
ApiHeader header = 1;
google.protobuf.Any payload = 2;
}
message ApiHeader {
string id = 1;
google.protobuf.Timestamp timestamp = 2;
AuthenticationStatusEnum authenticationStatus = 3;
}
Working with WebMessages
- Sending Messages
// Create and send a message to a specific client
client.Write(&protobufs.ApiMessage{
Header: &protobufs.ApiHeader{
Id: uuid.New().String(),
Timestamp: timestamppb.Now(),
},
Payload: myPayload, // any proto message
})
- Handling Messages
// Set up a message handler
client.SetMessageHandler(func(c web.Client, m web.Message) {
switch payload := m.GetPayload().(type) {
case *protobufs.ApiConfigCreateEntityRequest:
// Handle entity creation request
case *protobufs.ApiRuntimeDatabaseRequest:
// Handle database operation request
}
})
Common Message Types
-
Database Operations:
ApiRuntimeDatabaseRequest
: Read/write field valuesApiConfigCreateEntityRequest
: Create new entitiesApiConfigDeleteEntityRequest
: Delete entitiesApiConfigGetEntityRequest
: Retrieve entity details
-
Schema Operations:
ApiConfigGetEntitySchemaRequest
: Get entity type schemaApiConfigSetEntitySchemaRequest
: Update entity schemaApiConfigGetEntityTypesRequest
: List available entity types
-
Notifications:
ApiRuntimeRegisterNotificationRequest
: Subscribe to changesApiRuntimeGetNotificationsRequest
: Get pending notificationsApiRuntimeUnregisterNotificationRequest
: Remove subscriptions
Example database operation:
// Send a database read request
client.Write(&protobufs.ApiMessage{
Header: &protobufs.ApiHeader{
Id: uuid.New().String(),
Timestamp: timestamppb.Now(),
},
Payload: &anypb.Any{
TypeUrl: "ApiRuntimeDatabaseRequest",
Value: &protobufs.ApiRuntimeDatabaseRequest{
RequestType: protobufs.ApiRuntimeDatabaseRequest_READ,
Requests: []*protobufs.DatabaseRequest{
{
Id: entityId,
Field: "Status",
},
},
},
},
})
Worker Communication Patterns
Workers in qlib can communicate with each other through the Store using several patterns:
Direct Field Communication
Workers can communicate by writing to and monitoring specific fields:
// Worker A: Sender
type SenderWorker struct {
store data.Store
}
func (w *SenderWorker) DoWork(ctx context.Context) {
w.store.Write(ctx,
request.New().
SetEntityId("command-entity").
SetFieldName("Command").
SetValue(value.NewString("start_operation")),
)
}
// Worker B: Receiver
type ReceiverWorker struct {
store data.Store
}
func (w *ReceiverWorker) Init(ctx context.Context, h app.Handle) {
w.store.Notify(
ctx,
notification.NewConfig().
SetEntityType("command-entity").
SetFieldName("Command"),
notification.NewCallback(w.onCommand),
)
}
func (w *ReceiverWorker) onCommand(ctx context.Context, n data.Notification) {
command := n.GetCurrent().GetValue().GetString()
if command == "start_operation" {
// Handle command
}
}
Request-Response Pattern
For request-response style communication:
// Request entity schema
schema := schema.New("Request").
AddField("Status", "string"). // "pending", "processing", "completed"
AddField("Data", "string"). // Request data
AddField("Response", "string"). // Response data
AddField("Error", "string") // Error message if any
// Worker A: Client
func (w *ClientWorker) makeRequest(ctx context.Context, data string) {
// Create request entity
w.store.CreateEntity(ctx, "Request", parentId, "request-1")
// Write request data
w.store.Write(ctx,
request.New().
SetEntityId("request-1").
SetFieldName("Data").
SetValue(value.NewString(data)),
request.New().
SetEntityId("request-1").
SetFieldName("Status").
SetValue(value.NewString("pending")),
)
// Monitor for response
w.store.Notify(
ctx,
notification.NewConfig().
SetEntityId("request-1").
SetFieldName("Status").
SetContextFields("Response", "Error"),
notification.NewCallback(w.onResponse),
)
}
// Worker B: Server
func (w *ServerWorker) Init(ctx context.Context, h app.Handle) {
w.store.Notify(
ctx,
notification.NewConfig().
SetEntityType("Request").
SetFieldName("Status"),
notification.NewCallback(w.onRequest),
)
}
func (w *ServerWorker) onRequest(ctx context.Context, n data.Notification) {
status := n.GetCurrent().GetValue().GetString()
if status != "pending" {
return
}
requestId := n.GetCurrent().GetEntityId()
requestEntity := w.store.GetEntity(ctx, requestId)
// Process request and write response
w.store.Write(ctx,
request.New().
SetEntityId(requestId).
SetFieldName("Response").
SetValue(value.NewString("result")),
request.New().
SetEntityId(requestId).
SetFieldName("Status").
SetValue(value.NewString("completed")),
)
}
Pub/Sub Pattern
For broadcast-style communication:
// Publisher worker publishes events
func (w *PublisherWorker) publishEvent(ctx context.Context, eventType, data string) {
w.store.Write(ctx,
request.New().
SetEntityId("events").
SetFieldName(eventType).
SetValue(value.NewString(data)),
)
}
// Subscriber worker listens for events
func (w *SubscriberWorker) Init(ctx context.Context, h app.Handle) {
w.store.Notify(
ctx,
notification.NewConfig().
SetEntityId("events").
SetFieldName("user_login"), // Subscribe to specific event type
notification.NewCallback(w.onUserLogin),
)
}
func (w *SubscriberWorker) onUserLogin(ctx context.Context, n data.Notification) {
userData := n.GetCurrent().GetValue().GetString()
// Handle user login event
}
Best Practices for Worker Communication
-
Entity Design
- Create dedicated entities for inter-worker communication
- Use clear field names for commands and events
- Include timestamps for tracking message age
-
Error Handling
- Include error fields in request-response patterns
- Implement timeouts for pending requests
- Handle missing or invalid data gracefully
-
Performance
- Use appropriate notification patterns (change-only vs. all updates)
- Clean up completed request entities
- Batch related field updates using MultiBinding
-
Testing
- Test communication patterns in isolation
- Verify proper cleanup of temporary entities
- Simulate network and timing issues
Getting Started
- Create a new application
- Implement workers for your business logic
- Set up database schema and entity structure
- Add notification handlers
- Run the application
Example:
func main() {
app := app.NewApplication("myapp")
// Add workers
app.AddWorker(NewMyWorker())
// Run the application
app.Execute()
}
Best Practices
- Keep workers focused on single responsibilities
- Use the main thread for coordination
- Leverage notifications for event-driven architecture
- Structure your database hierarchy logically
- Use leadership election for high availability
Environment Variables
Q_IN_DOCKER
: Set when running in Docker containerQ_LOG_LEVEL
: Application log levelQ_LIB_LOG_LEVEL
: Library log level
Application Lifecycle Management
Applications in qlib follow a strict initialization and shutdown sequence to ensure proper resource management and graceful termination.
Initialization Sequence
func main() {
// Create application with unique name
app := app.NewApplication("myapp")
// Configure workers before Execute()
storeWorker := workers.NewStore(myStore)
webWorker := workers.NewWeb(":8080")
// Order matters - add core workers first
app.AddWorker(storeWorker)
app.AddWorker(leadershipWorker)
app.AddWorker(webWorker)
app.AddWorker(myBusinessWorker)
// Execute handles init/deinit
app.Execute()
}
Worker Initialization
Workers should properly initialize their resources:
type MyWorker struct {
store data.Store
subscriptions []data.NotificationToken
}
func (w *MyWorker) Init(ctx context.Context, h app.Handle) {
// Set up notification subscriptions
w.subscriptions = append(w.subscriptions,
w.store.Notify(ctx,
notification.NewConfig().
SetEntityType("Device").
SetFieldName("Status"),
notification.NewCallback(w.onStatusChange),
),
)
// Initialize resources with timeout
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := w.initializeResource(timeoutCtx); err != nil {
log.Panic("Failed to initialize: %v", err)
}
}
Graceful Shutdown
Workers must clean up resources on shutdown:
func (w *MyWorker) Deinit(ctx context.Context) {
// Unsubscribe from notifications
for _, sub := range w.subscriptions {
sub.Unbind(ctx)
}
// Close connections
if w.conn != nil {
w.conn.Close()
}
// Wait for pending operations
w.wg.Wait()
}
Error Handling
Best practices for handling errors in workers:
func (w *MyWorker) DoWork(ctx context.Context) {
// Check context cancellation
if ctx.Err() != nil {
return
}
// Handle recoverable errors
if err := w.doSomething(ctx); err != nil {
log.Error("Operation failed: %v", err)
// Update status to reflect error
w.store.Write(ctx,
request.New().
SetEntityId(w.entityId).
SetFieldName("Error").
SetValue(value.NewString(err.Error())),
)
return
}
// Handle fatal errors
if err := w.criticalOperation(ctx); err != nil {
log.Panic("Critical error: %v", err)
// Application will shut down
}
}
Startup Dependencies
Managing worker dependencies:
type MyWorker struct {
store data.Store
isStoreConnected bool
}
func (w *MyWorker) Init(ctx context.Context, h app.Handle) {
// Subscribe to store connection status
w.store.Connected().Connect(func(ctx context.Context) {
w.isStoreConnected = true
})
w.store.Disconnected().Connect(func(ctx context.Context) {
w.isStoreConnected = false
})
}
func (w *MyWorker) DoWork(ctx context.Context) {
// Wait for dependencies
if !w.isStoreConnected {
return
}
// Proceed with work
}
Signal Handling
The application handles system signals:
func main() {
app := app.NewApplication("myapp")
// App.Execute() handles:
// - SIGINT (Ctrl+C)
// - SIGTERM
//
// Workers get ctx.Done() on signal
app.Execute()
}
// In your worker
func (w *MyWorker) DoWork(ctx context.Context) {
select {
case <-ctx.Done():
// Clean up and return
return
default:
// Do work
}
}
Environment Configuration
Environment variables that affect application behavior:
# Application identification
APP_NAME=myapp # Override application name
Q_IN_DOCKER=true # Running in container
# Store configuration
Q_LEADER_STORE_ADDR=redis:6379 # Leadership store address
Q_LEADER_STORE_PASSWORD=secret # Leadership store password
# Runtime behavior
Q_LOG_LEVEL=INFO # Application log level
Q_LIB_LOG_LEVEL=WARN # Library log level
Best Practices:
-
Worker State
- Initialize all state in Init()
- Clean up all resources in Deinit()
- Use context cancellation for termination
-
Resource Management
- Track and clean up subscriptions
- Close connections and channels
- Wait for goroutines to finish
-
Error Handling
- Use log.Error for recoverable errors
- Use log.Panic for fatal errors
- Update entity status on errors
-
Dependencies
- Check dependency status before operations
- Handle dependency failures gracefully
- Implement proper retry logic