common

package
v0.0.0-...-b95a7b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2024 License: BSD-3-Clause Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// AcquireLockSignalName signal channel name for lock acquisition
	AcquireLockSignalName = "acquire-lock-event"
	// RequestLockSignalName channel name for request lock
	RequestLockSignalName = "request-lock-event"
)

Variables

This section is empty.

Functions

func NewFutureWithFutures

func NewFutureWithFutures(ctx workflow.Context, futures ...workflow.Future) workflow.Future

NewFutureWithFutures returns a future that waits for all the given futures to be fulfilled.

The given list can contain a nil future, which will be a no-op. A new future will always be returned regardless of the given list containing any pending future. The context error will be populated to the returned future but it doesn't have any returned values from the given list of futures.

func RegisterTemporal

func RegisterTemporal(w worker.Worker) *worker.Worker

RegisterTemporal registers the temporal workflows and activities that are needed to run Lock()

Types

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex is a struct used by temporal to lock resources to allow workflows to utilize a resource sequentially. Mutex workflows hold onto swarming bots to ensure that they execute pairwise tasks in immediate sequential order without being interrupted by swarming tasks from other jobs. Mutex should only be used by run_benchmark or workflows that trigger run_benchmark

func NewMutex

func NewMutex(currentWorkflowID string, lockNamespace string) *Mutex

NewMutex initializes mutex. currentWorkflowID is the temporal workflow that needs the resource. lockNamespace is a namespace for the mutex workflow.

func (*Mutex) Lock

func (s *Mutex) Lock(ctx workflow.Context, resourceID string, unlockTimeout time.Duration) (UnlockFunc, error)

Lock applies the lock on resourceID by triggering a temporal workflow that creates a workflow with the resourceID as part of the workflowID. All external workflows outside of the mutex namespace that require resourceID will have to wait for the resource until the lock is released via UnlockFunc. The resourceID can be any arbitrary string. Example usage: func SampleWorkflowWithMutex(resourceID):

m := NewMutex("workflow", "namespace")
unlockFunc, err := m.Lock(ctx, resourceID, 10*time.Minute)
< perform workflow actions >
_ = unlockFunc() // this releases the lock on the resource
< remaining workflow actions >

run a bunch of workflows in parallel, but because each workflow requires the same resource, they will not start until the lock is released func WorkflowThatNeedsSpecificResource:

rc := workflow.NewBufferedChannel(ctx, numChildren)
ec := workflow.NewBufferedChannel(ctx, numChildren)
wg := workflow.NewWaitGroup(ctx)
wg.Add(numChildren)
for numChildren {
	workflow.Go(ctx, func(gCtx workflow.Context) {
		defer wg.Done
		if err := workflow.ExecuteChildWorkflow(gCtx, SampleWorkflowWithMutex, "bot-123").Get(); err != nil {
			ec.Send(gCtx, err)
			return
		}
		rc.Send(gCtx, status)
	})

Also, if another workflow starts and calls SampleWorkflowWithMutex("bot-123") while `WorkflowThatNeedsSpecificResource` is still running, it, will also wait until the lock on "bot-123" is released via unlockFunc().

type UnlockFunc

type UnlockFunc func() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL