statefun

package
v3.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2023 License: Apache-2.0 Imports: 17 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Address

type Address struct {
	FunctionType TypeName
	Id           string
}

An Address is the unique identity of an individual StatefulFunction, containing the function's FunctionType and a unique identifier within the type. The function's type denotes the type (or class) of function to invoke, while the unique identifier addresses the invocation to a specific function instance.

func (Address) String

func (a Address) String() string

type AddressScopedStorage

type AddressScopedStorage interface {

	// Get returnss the values of the provided ValueSpec, scoped to the
	// current invoked Address and stores the result in the value
	// pointed to by receiver. The method will return false
	// if there is no value for the spec in storage
	// so callers can differentiate between missing and
	// the types zero value.
	Get(spec ValueSpec, receiver interface{}) (exists bool)

	// Set updates the value for the provided ValueSpec, scoped
	// to the current invoked Address.
	Set(spec ValueSpec, value interface{})

	// Remove deletes the prior value set for the the provided
	// ValueSpec, scoped to the current invoked Address.
	//
	// After removing the value, calling Get for the same
	// spec under the same Address will return false.
	Remove(spec ValueSpec)
}

An AddressScopedStorage is used for reading and writing persistent values that are managed by the Stateful Functions runtime for fault-tolerance and consistency.

All access to the storage is scoped to the current function instance, identified by the instance's Address. This means that within an invocation, function instances may only access its own persisted values through this storage.

type CancellationToken

type CancellationToken interface {
	fmt.Stringer

	// Token returns the underlying string
	// used to create the CancellationToken.
	Token() string
	// contains filtered or unexported methods
}

CancellationToken tags a delayed message send with statefun.SendAfterWithCancellationToken. It can then be used to cancel said message on a best effort basis with statefun.CancelDelayedMessage. The underlying string token can be retrieved by invoking Token().

func NewCancellationToken

func NewCancellationToken(t string) (CancellationToken, error)

NewCancellationToken creates a new cancellation token or returns an error if the token is invalid.

type Context

type Context interface {
	context.Context

	// Self is the current invoked function instance's Address.
	Self() Address

	// Caller is the caller function instance's Address, if applicable. This is nil
	// if the message was sent to this function via an ingress.
	Caller() *Address

	// Send forwards out a MessageBuilder to another function.
	Send(message MessageBuilder)

	// SendAfter forwards out a MessageBuilder to another function, after a specified time.Duration delay.
	SendAfter(delay time.Duration, message MessageBuilder)

	// SendAfterWithCancellationToken forwards out a MessageBuilder to another function,
	// after a specified time.Duration delay. The message is tagged with a non-empty,
	//unique token to attach to this message, to be used for message cancellation
	SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder)

	// CancelDelayedMessage cancels a delayed message (a message that was send via SendAfterWithCancellationToken).
	// NOTE: this is a best-effort operation, since the message might have been already delivered.
	// If the message was delivered, this is a no-op operation.
	CancelDelayedMessage(token CancellationToken)

	// SendEgress forwards out an EgressBuilder to an egress.
	SendEgress(egress EgressBuilder)

	// Storage returns the AddressScopedStorage, providing access to stored values scoped to the
	// current invoked function instance's Address (which is obtainable using Self()).
	Storage() AddressScopedStorage
}

A Context contains information about the current function invocation, such as the invoked function instance's and caller's Address. It is also used for side effects as a result of the invocation such as send messages to other functions or egresses, and provides access to AddressScopedStorage scoped to the current Address. This type is also a context.Context and can be used to ensure any spawned go routines do not outlive the current function invocation.

func DeriveContext added in v3.3.0

func DeriveContext(statefunCtx Context, ctx context.Context) Context

DeriveContext derives a new statefun.Context from an existing one, replacing the wrapped context.Context.

type EgressBuilder

type EgressBuilder interface {
	// contains filtered or unexported methods
}

type Expiration

type Expiration struct {
	// contains filtered or unexported fields
}

Expiration Configuration

Defines the way state can be auto expired by the runtime. State Expiration (also known as TTL) can be used to keep state from growing arbitrarily by assigning an Expiration date to a value.

State can be expired after a duration has passed since either the last write to the state, or the last call to the Function.

func ExpireAfterCall

func ExpireAfterCall(duration time.Duration) Expiration

ExpireAfterCall returns an Expiration configuration that would expire a duration after the last invocation of the Function.

func ExpireAfterWrite

func ExpireAfterWrite(duration time.Duration) Expiration

ExpireAfterWrite returns an Expiration configuration that would expire a duration after the last write.

func (Expiration) String

func (e Expiration) String() string

type GenericEgressBuilder

type GenericEgressBuilder struct {
	// The TypeName as specified when registered
	Target TypeName

	// The value to produce
	Value interface{}

	// The values type
	ValueType SimpleType
}

GenericEgressBuilder create a generic egress record. For Kafka and Kinesis see KafkaEgressBuilder and KinesisEgressBuilder respectively

type KafkaEgressBuilder

type KafkaEgressBuilder struct {
	// The TypeName as specified in module.yaml
	Target TypeName

	// The Kafka destination topic for that record
	Topic string

	// The utf8 encoded string key to produce (can be empty)
	Key string

	// The value to produce
	Value interface{}

	// An optional hint to this values type
	ValueType SimpleType
}

KafkaEgressBuilder builds a message that can be emitted to a Kafka generic egress. If a ValueType is provided, then Value will be serialized according to the provided ValueType's serializer. Otherwise, we will try to convert Value to bytes if it is one of:

  • utf-8 string
  • []bytes
  • an int (as defined by Kafka's serialization format)
  • float (as defined by Kafka's serialization format)

type KinesisEgressBuilder

type KinesisEgressBuilder struct {
	// The TypeName as specified in module.yaml
	Target TypeName

	// The Kinesis destination stream for that record
	Stream string

	// The value to produce
	Value interface{}

	// An optional hint to this value type
	ValueType SimpleType

	// The utf8 encoded string partition key to use
	PartitionKey string

	// A utf8 encoded string explicit hash key to use (can be empty)
	ExplicitHashKey string
}

KinesisEgressBuilder builds a message that can be emitted to a Kinesis generic egress. If a ValueType is provided, then Value will be serialized according to the provided ValueType's serializer. Otherwise, we will try to convert Value to bytes if it is one of:

  • utf-8 string
  • []byte

type Message

type Message struct {
	// contains filtered or unexported fields
}

func (*Message) As

func (m *Message) As(t SimpleType, receiver interface{}) error

func (*Message) AsBool

func (m *Message) AsBool() bool

func (*Message) AsFloat32

func (m *Message) AsFloat32() float32

func (*Message) AsFloat64

func (m *Message) AsFloat64() float64

func (*Message) AsInt32

func (m *Message) AsInt32() int32

func (*Message) AsInt64

func (m *Message) AsInt64() int64

func (*Message) AsString

func (m *Message) AsString() string

func (*Message) Is

func (m *Message) Is(t SimpleType) bool

func (*Message) IsBool

func (m *Message) IsBool() bool

func (*Message) IsFloat32

func (m *Message) IsFloat32() bool

func (*Message) IsFloat64

func (m *Message) IsFloat64() bool

func (*Message) IsInt32

func (m *Message) IsInt32() bool

func (*Message) IsInt64

func (m *Message) IsInt64() bool

func (*Message) IsString

func (m *Message) IsString() bool

func (*Message) RawValue

func (m *Message) RawValue() []byte

func (*Message) ValueTypeName

func (m *Message) ValueTypeName() TypeName

type MessageBuilder

type MessageBuilder struct {
	Target    Address
	Value     interface{}
	ValueType SimpleType
}

func (MessageBuilder) ToMessage

func (m MessageBuilder) ToMessage() (Message, error)

type PrimitiveType

type PrimitiveType int
const (
	BoolType PrimitiveType = iota
	Int32Type
	Int64Type
	Float32Type
	Float64Type
	StringType
)

func (PrimitiveType) Deserialize

func (p PrimitiveType) Deserialize(r io.Reader, receiver interface{}) error

func (PrimitiveType) GetTypeName

func (p PrimitiveType) GetTypeName() TypeName

func (PrimitiveType) Serialize

func (p PrimitiveType) Serialize(writer io.Writer, data interface{}) error

type RequestReplyHandler

type RequestReplyHandler interface {
	http.Handler

	// Invoke method provides compliance with AWS Lambda handler
	Invoke(ctx context.Context, payload []byte) ([]byte, error)
}

The RequestReplyHandler processes messages from the runtime, invokes functions, and encodes side effects. The handler implements http.Handler so it can easily be embedded in standard Go server frameworks.

type SimpleType

type SimpleType interface {
	GetTypeName() TypeName

	Deserialize(r io.Reader, receiver interface{}) error

	Serialize(writer io.Writer, data interface{}) error
}

SimpleType interface is the core abstraction used by Stateful Function's type system, and consists of a few things that StateFun uses to handle Message's and ValueSpec's

1. TypeName to identify the type. 2. (De)serialization methods for marshalling and unmarshalling data

Cross-language primitive types

StateFun's type system has cross-language support for common primitive types, such as boolean, integer (int32), long (int64), etc. These primitive types have built-in SimpleType's implemented for them already with predefined TypeName's.

These primitives have standard encoding across all StateFun language SDKs, so functions in various other languages (Java, Python, etc) can message Golang functions by directly sending supported primitive values as message arguments. Moreover, the type system is used for state values as well; so you can expect that a function can safely read previous state after reimplementing it in a different language.

Common custom types

The type system is also very easily extensible to support more complex types. The Go SDK ships with predefined support for JSON and Protobuf - see MakeJsonType MakeProtobufType. For other formats, it is just a matter of implementing your own SimpleType with a custom typename and serializer.

func MakeJsonType

func MakeJsonType(name TypeName) SimpleType

MakeJsonType creates a new SimpleType with a given TypeName using the standard Go JSON library.

func MakeProtobufType

func MakeProtobufType(m proto.Message) SimpleType

MakeProtobufType creates a new SimpleType for the given protobuf Message.

func MakeProtobufTypeWithTypeName

func MakeProtobufTypeWithTypeName(typeName TypeName) SimpleType

MakeProtobufTypeWithTypeName creates a new SimpleType for the given protobuf Message with a custom namespace.

type StatefulFunction

type StatefulFunction interface {

	// Invoke is the method called for each message. The passed Context
	// is canceled as soon as Invoke returns as a signal to
	// any spawned go routines. The method may return
	// an Error to signal the invocation failed and should
	// be reattempted.
	Invoke(ctx Context, message Message) error
}

A StatefulFunction is a user-defined function that can be invoked with a given input. This is the primitive building block for a Stateful Functions application.

Concept

Each individual StatefulFunction is an uniquely invokable "instance" of a registered StatefulFunctionSpec. Each instance is identified by an Address, representing the function's unique id (a string) within its type. From a user's perspective, it would seem as if for each unique function id, there exists a stateful instance of the function that is always available to be invoked within a Stateful Functions application.

Invoking a StatefulFunction

An individual StatefulFunction can be invoked with arbitrary input from any another StatefulFunction (including itself), or routed from ingresses. To invoke a StatefulFunction, the caller simply needs to know the Address of the target function.

As a result of invoking a StatefulFunction, the function may continue to invoke other functions, access persisted values, or send messages to egresses.

Persistent State

Each individual StatefulFunction may have persistent values written to storage that is maintained by the system, providing consistent exactly-once and fault-tolerant guarantees. Please see docs in ValueSpec and AddressScopedStorage for an overview of how to register persistent values and access the storage.

type StatefulFunctionPointer

type StatefulFunctionPointer func(Context, Message) error

The StatefulFunctionPointer type is an adapter to allow the use of ordinary functions as StatefulFunction's. If f is a function with the appropriate signature, StatefulFunctionPointer(f) is a StatefulFunction that calls f.

func (StatefulFunctionPointer) Invoke

func (s StatefulFunctionPointer) Invoke(ctx Context, message Message) error

type StatefulFunctionSpec

type StatefulFunctionSpec struct {
	// The unique TypeName associated
	// the StatefulFunction being defined.
	FunctionType TypeName

	// A slice of registered ValueSpec's that will be used
	// by this function. A function may only access values
	// that have been eagerly registered as part of its spec.
	States []ValueSpec

	// The physical StatefulFunction instance.
	Function StatefulFunction
}

StatefulFunctionSpec for a Stateful Function, identifiable by a unique TypeName.

type StatefulFunctions

type StatefulFunctions interface {

	// WithSpec registers a StatefulFunctionSpec, which will be
	// used to build the runtime function. It returns an error
	// if the specification is invalid and the handler
	// fails to register the function.
	WithSpec(spec StatefulFunctionSpec) error

	// AsHandler creates a RequestReplyHandler from the registered
	// function specs.
	AsHandler() RequestReplyHandler
}

StatefulFunctions is a registry for multiple StatefulFunction's. A RequestReplyHandler can be created from the registry that understands how to dispatch invocation requests to the registered functions as well as encode side-effects (e.g., sending messages to other functions or updating values in storage) as the response.

func StatefulFunctionsBuilder

func StatefulFunctionsBuilder() StatefulFunctions

StatefulFunctionsBuilder creates a new StatefulFunctions registry.

type TypeName

type TypeName interface {
	fmt.Stringer
	GetNamespace() string
	GetType() string
}

A TypeName is used to uniquely identify objects within a Stateful Functions application, including functions, egresses, and types. TypeName's serve as an integral part of identifying these objects for message delivery as well as message data serialization and deserialization.

func ParseTypeName

func ParseTypeName(typename string) (TypeName, error)

ParseTypeName creates a TypeName from a canonical string in the format `<namespace>/<type>`.

func TypeNameFrom

func TypeNameFrom(typename string) TypeName

TypeNameFrom creates a TypeName from a canonical string in the format `<namespace>/<type>`. This Function assumes correctly formatted strings and will panic on error. For runtime error handling please see ParseTypeName.

func TypeNameFromParts

func TypeNameFromParts(namespace, tpe string) (TypeName, error)

type ValueSpec

type ValueSpec struct {
	// The given tpe of the persistent value. The tpe must be a valid
	// identifier conforming to the following rules:
	//
	// 1. First character must be an alphabet letter [a-z] / [A-Z], or an underscore '_'.
	// 2. Remaining characters can be an alphabet letter [a-z] / [A-Z], a digit [0-9], or
	//    an underscore '-'.
	// 3. Must not contain any spaces.
	Name string

	// The SimpleType of the persistent value. Either
	// a built-in PrimitiveType or custom implementation.
	ValueType SimpleType

	// An optional expiration configuration.
	Expiration Expiration
}

A ValueSpec identifies a registered persistent value of a function, which will be managed by the Stateful Functions runtime for consistency and fault-tolerance. A ValueSpec is registered for a function by configuring it on the function's associated StatefulFunctionSpec.

func (*ValueSpec) String

func (v *ValueSpec) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL