durex

module
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2024 License: MIT

README ΒΆ

go-temporal-tools: utilities for temporal.io go sdk

GitHub release (with filter) GitHub go.mod Go version (subdirectory of monorepo) Tests License GitHub contributors

[!WARNING]
This is a work in porgress. The surface API can change without warning.

πŸš€ Install

go get go.breu.io/durex

Compatibility: go >= 1.21

Why?

After working with temporal.io across multiple projects, we have standarized a set of best practices across our projects. For example, to create a unique & identifiable workflow id for debugging, we have found that following the block, element, modifier method, a technique for writing maintainable CSS, makes it very readable and maintainable.

We also found that attaching the workflow to a queue makes it very easy. For us, Queue is where it all begins.

Getting Started

We start by creating a Queue first and then call ExecuteWorkflow or ExecuteChildWorkflow methods on the Queue interface.

temporal.io setup

Given the temporal.go file below, we are going to setup our queues as global varibales like

package temporal

import (
  "fmt"
  "log/slog"
  "sync"
  "time"

  "github.com/avast/retry-go/v4"
  "github.com/ilyakaznacheev/cleanenv"
  "go.breu.io/slog-utils/calldepth"
  "go.temporal.io/sdk/client"
)

type (
  Temporal interface {
    Client() client.Client
    ConnectionString() string
  }

  config struct {
    Host   string `env:"TEMPORAL_HOST" env-default:"localhost"`
    Port   int    `env:"TEMPORAL_PORT" env-default:"7233"`
    client client.Client
    logger *slog.Logger
  }

  Option func(*config)
)

var (
  once sync.Once
)

const (
  MaxAttempts = 10
)

func (c *config) ConnectionString() string {
  return fmt.Sprintf("%s:%d", c.Host, c.Port)
}

func (c *config) Client() client.Client {
  if c.client == nil {
  once.Do(func() {
    slog.Info(
      "initializing temporal connection",
      slog.String("connection_string", c.ConnectionString()),
    )

    options := client.Options{
      HostPort: c.ConnectionString(),
      Logger: c.logger,
    }

    retryTemporal := func() error {
      clt, err := client.Dial(options)
      if err != nil {
        return err
      }

      c.client = clt

      slog.Info("temporal: connected")

      return nil
    }

    if err := retry.Do(
      retryTemporal,
      retry.Attempts(MaxAttempts),
      retry.Delay(1*time.Second),
      retry.OnRetry(func(n uint, err error) {
        slog.Info(
          "temporal: failed to connect. retrying connection ...",
          slog.Any("attempt", n+1),
          slog.Any("max attempts", MaxAttempts),
          slog.String("error", err.Error()),
        )
      }),
      ); err != nil {
        panic(fmt.Errorf("failed to connect to temporal: %w", err))
      }
    })
  }

  return c.client
}

func WithHost(host string) Option {
  return func(c *config) {
  c.Host = host
  }
}

func WithPort(port int) Option {
  return func(c *config) {
  c.Port = port
  }
}

func WithLogger(logger *slog.Logger) Option {
  return func(c *config) {
  c.logger = logger
  }
}

func FromEnvironment() Option {
  return func(c *config) {
  if err := cleanenv.ReadEnv(c); err != nil {
  panic(fmt.Errorf("failed to read environment: %w", err))
  }
  }
}

func New(opts ...Option) Temporal {
  c := &config{}

  for _, opt := range opts {
  opt(c)
  }

  return c
}
Temporal singleton
package shared

import (
  "os"
  "sync"

  "shared/temporal"

  "go.breu.io/slog-utils/calldepth"
)

var (
  tmprl     temporal.Temporal // Global temporal instance.
  tmprlOnce sync.Once         // Global temporal initialization state.
)

// Temporal returns the global temporal instance.
func Temporal() temporal.Temporal {
  tmprlOnce.Do(func() {
    tmprl = temporal.New(
      temporal.FromEnvironment(),
      temporal.WithLogger(
        calldepth.NewAdapter(
          calldepth.NewLogger(slog.NewJsonLogger()),
          calldepth.WithCallDepth(5), // 5 for activities, 6 for workflows.
         ).WithGroup("temporal")
      ),
    )
  })

  return tmprl
}
Queue setup
package shared

import (
  "sync"

  "go.breu.io/durex/queues"
)

var (
  coreq queues.Queue
  coreqone sync.once
)

// The default value right now is "io.breu".
// Change this as per your requirements. A good practice is to use java's package notation.
func init() {
  queues.SetDefaultPrefix("com.company.pkg")
}

func CoreQueue(
  once.Do(func() {
    queues.New(
      WithName("core"),
      // We have already setup temporal as a signleton in the shared package. We leverage the instantiatilized client.
      WithClient(
        Temporal().Client(),
      ),
    )
  })
)

Usage for Workflows

We make the assumptions that a workflow is tied to a queue. So by calling the Queue.ExecuteWorkflow() to start a workflow.

package main

import (
  "context"
  "shared"

  "github.com/google/uuid"
  "go.temporal.io/sdk/workflow"

  "go.breu.io/durex/workflows"
)

func main() {
  worker := shared.CoreQueue().CreateWorker()
  defer worker.Stop()

  worker.RegisterWorkflow(Workflow)
  worker.RegisterWorkflow(ChildWorkflow)

  if err != worker.Start(); err != nil {
    // handler error
  }


  opts, err := workflows.NewOptions(
    workflows.WithBlock("block"),
    workflows.WithBlockID(uuid.New()), // d5e012df-5b9e-41cf-9ed5-3439eeafd8e4
  )
  if err != nil {
    // handle error
  }

  exe, err := shared.CoreQueue().ExecuteWorkflow(
    context.Background(),
    // The workflow id created in this case would be
    //  "com.company.pkg.block.d5e012df-5b9e-41cf-9ed5-3439eeafd8e4"
    opts,
    Workflow, // or workflow function name
    payload,
  )
  if err != nil {
    // handle error
  }
}

func Workflow(ctx context.Context, payload any) error {
  childopts, err := workflow.NewOptions(
    WithParent(ctx),
    WithBlock("child"),
    WithBlockID(uuid.New()), // bb42d90a-d3f3-4022-bc09-aeed6e9db659
  )
  if err != nil {
    return err
  }

  future := shared.CoreQueue().ExecuteChildWorkflow(
    ctx,
    // By passing WithParent(ctx), the helper method automagically picks up the right workflow to create the child id i.e.
    // "com.company.pkg.block.d5e012df-5b9e-41cf-9ed5-3439eeafd8e4.child.bb42d90a-d3f3-4022-bc09-aeed6e9db659"
    opts,
    ChildWorkflow,
    payload,
  )

  // Do Something

  return nil
}

func ChildWorkflow(ctx workflow.Context, payload any) {
  // Child workflow
}

πŸ‘€ Contributors

Contributors

πŸ“ License

Copyright Β© 2023 Breu Inc.

This project is MIT licensed.

Directories ΒΆ

Path Synopsis
Package dispatch provides helper functions for configuring workflow.Context objects with default activity options.
Package dispatch provides helper functions for configuring workflow.Context objects with default activity options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL