flows

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

Flows

This package makes handling flows super simple. 😎

Flow: The "Flow" represents the entire journey of an API request, from its initial receipt to the final response generation. It encapsulates the sequence of operations and transformations that occur during the processing of the request.

Step: A "Step" is a single, distinct action or operation that contributes to the overall Flow. It's a building block of the larger process, representing a specific stage or task within the API's business logic.

When Publish is called on a flow/step, the Flow is published to a default "flows" pubsub topic. Users can subscribe to this topic to receive updates on the flow/step. The published message will include a type attribute of value "alis.open.flows.v1.Flow" that can be used as a filter on the subscription. The default topic can be overridden using the WithTopic option.

Installation

Get the package

go get go.alis.build/flows

Import the package

import "go.alis.build/flows"

Create a new Client instance using NewClient

// Create new client
client, err := flows.NewClient(gcpProject, flows.WithTopic("flows"), flows.WithAwaitPublish())
if err != nil {}

WithTopic and WithAwaitPublish are optional configurations.

Usage

Create a new flow.

flow, err := client.NewFlow(ctx)
if err != nil {}

Add a step to the flow.

step, ctx, err := flow.NewStep("1.0", flows.WithTitle("Step 1"), flows.WithExistingId())
if err != nil {}

WithExistingId is an optional configuration.

Set step state

step = step.Queued()

Publish the flow

err := flow.Publish()

Documentation

Overview

Package flows provides a lightweight set of methods which Builders can use to represent the entire journey of an API request, from its initial receipt to the final response generation. It encapsulates the sequence of operations and transformations that occur during the processing of the request.

Index

Constants

View Source
const (
	DefaultTopic        = "flows"
	FlowParentHeaderKey = "x-alis-flow-parent"
	FlowHeaderKey       = "x-alis-flow-id"
)

Variables

View Source
var (
	StepIdRegex   = regexp.MustCompile(`^[^-]+$`)
	ParentIdRegex = regexp.MustCompile(`^([a-z0-9]+)-([^-]+)$`)
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client object to manage Publishing to a Pub/Sub topic.

func NewClient

func NewClient(project string, opts ...Option) (*Client, error)

NewClient creates a new instance of the Client object. A valid Google Cloud project id is required.

Multiple Option functions can be provided to customize the client. For example: WithTopic("flows"), WithAwaitPublish()

func (*Client) NewFlow

func (c *Client) NewFlow(ctx context.Context) (*Flow, error)

NewFlow creates a new Flow object

The source is inferred from the invoking method. This can be overridden by calling WithSource.

The parent id is inferred from the x-alis-flow-parent header. This can be overridden by calling WithParentId.

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

func (*Flow) NewStep

func (f *Flow) NewStep(id string, opts ...StepOption) (*Step, context.Context, error)

NewStep adds a step to the flow and returns a Step object.

The initial state of the step is Queued. This can be overridden by passing the WithState option.

If the WithExistingId option is provided, the step with the specified id is returned. If the step does not exist, a new step is created with the specified id.

func (*Flow) Publish

func (f *Flow) Publish() error

Publish the Flow as an event.

func (*Flow) Steps added in v1.0.1

func (f *Flow) Steps() *maps.OrderedMap[string, *Step]

Steps returns the steps of the flow. The steps are returned in an [OrderedMap](https://pkg.go.dev/go.alis.build/utils#OrderedMap)

Example Usage:

stepsMap := make(map[string]*Step, flow.Steps().Len())
flow.Steps().Range(func(idx int, key string, value *Step) bool {
	stepsMap[key] = value
	return true
})
stepsMap["step1"].Done()

func (*Flow) WithParentId

func (f *Flow) WithParentId(parentId string) (*Flow, error)

WithParentId sets the parent id of the flow.

This overrides the inferred parent id from the x-alis-flow-parent header.

The parent id is of the format: <flow-id>-<step-id>. For example: 0af7651916cd43dd8448eb211c80319c-0

func (*Flow) WithSource

func (f *Flow) WithSource(source string) *Flow

WithSource sets the source of the flow.

This overrides the inferred source from the invoking method.

type Option

type Option func(*Options)

Option is a functional option for the NewClient method.

func WithAwaitPublish

func WithAwaitPublish() Option

WithAwaitPublish instructs the client to block until the flow is finished publishing. This causes the client to block until the Publish call completes or the context is done.

func WithTopic

func WithTopic(topic string) Option

WithTopic sets the topic for the client.

If provided multiple times, the last value will take precedence.

type Options

type Options struct {
	// The Pub/Sub Topic
	// For example: 'flows'
	//
	// Defaults to 'flows' if not specified.
	Topic string
	// Indicates whether the pubsub client should block until the message is published.
	// If set to true, the client will block until the message is published or the context is done.
	// If set to false, the client will return immediately after the message is published.
	AwaitPublish bool
}

Options for the NewClient method.

type Step

type Step struct {
	// contains filtered or unexported fields
}

Step represents a single step within the Flow object.

func (*Step) AwaitingInput

func (s *Step) AwaitingInput() *Step

AwaitingInput marks the state of the step as Awaiting Input.

func (*Step) Cancelled

func (s *Step) Cancelled() *Step

Cancelled marks the state of the step as Queued.

func (*Step) Done

func (s *Step) Done() *Step

Done marks the state of the step as done.

func (*Step) Failed

func (s *Step) Failed(err error) *Step

Failed marks the lasted step as Failed with the specified error.

func (*Step) InProgress

func (s *Step) InProgress() *Step

InProgress marks the state of the step as In Progress.

func (*Step) Publish

func (s *Step) Publish() error

Publish allows one to publish a particular step.

func (*Step) Queued

func (s *Step) Queued() *Step

Queued marks the state of the step as Queued.

func (*Step) WithDescription

func (s *Step) WithDescription(description string) *Step

WithDescription sets the description of the step.

func (*Step) WithTitle

func (s *Step) WithTitle(title string) *Step

WithTitle sets the title of the step.

type StepOption added in v1.0.1

type StepOption func(*StepOptions)

StepOption is a functional option for the NewStep method.

func WithDescription added in v1.0.1

func WithDescription(description string) StepOption

WithDescription sets the description of the step.

func WithExistingId added in v1.0.1

func WithExistingId() StepOption

WithExistingId gets the step with the specified id. If the step does not exist, it assumes normal behaviour and creates a new step with the specified id.

func WithState added in v1.0.3

func WithState(state flows.Flow_Step_State) StepOption

WithState sets the initial state of the step.

func WithTitle added in v1.0.1

func WithTitle(title string) StepOption

WithTitle sets the title of the step.

type StepOptions added in v1.0.1

type StepOptions struct {
	// contains filtered or unexported fields
}

StepOptions for the NewStep method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL