Documentation ¶
Index ¶
- type Address
- type AddressScopedStorage
- type CancellationToken
- type Context
- type EgressBuilder
- type Expiration
- type GenericEgressBuilder
- type KafkaEgressBuilder
- type KinesisEgressBuilder
- type Message
- func (m *Message) As(t SimpleType, receiver interface{}) error
- func (m *Message) AsBool() bool
- func (m *Message) AsFloat32() float32
- func (m *Message) AsFloat64() float64
- func (m *Message) AsInt32() int32
- func (m *Message) AsInt64() int64
- func (m *Message) AsString() string
- func (m *Message) Is(t SimpleType) bool
- func (m *Message) IsBool() bool
- func (m *Message) IsFloat32() bool
- func (m *Message) IsFloat64() bool
- func (m *Message) IsInt32() bool
- func (m *Message) IsInt64() bool
- func (m *Message) IsString() bool
- func (m *Message) RawValue() []byte
- func (m *Message) ValueTypeName() TypeName
- type MessageBuilder
- type MissingSpecs
- type PrimitiveType
- type RequestReplyHandler
- type SimpleType
- type StatefulFunction
- type StatefulFunctionPointer
- type StatefulFunctionSpec
- type StatefulFunctions
- type TypeName
- type ValueSpec
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Address ¶
An Address is the unique identity of an individual StatefulFunction, containing the function's FunctionType and a unique identifier within the type. The function's type denotes the type (or class) of function to invoke, while the unique identifier addresses the invocation to a specific function instance.
type AddressScopedStorage ¶
type AddressScopedStorage interface { // Get returnss the values of the provided ValueSpec, scoped to the // current invoked Address and stores the result in the value // pointed to by receiver. The method will return false // if there is no value for the spec in storage // so callers can differentiate between missing and // the types zero value. Get(spec ValueSpec, receiver interface{}) (exists bool) // Set updates the value for the provided ValueSpec, scoped // to the current invoked Address. Set(spec ValueSpec, value interface{}) // Remove deletes the prior value set for the the provided // ValueSpec, scoped to the current invoked Address. // // After removing the value, calling Get for the same // spec under the same Address will return false. Remove(spec ValueSpec) }
An AddressScopedStorage is used for reading and writing persistent values that are managed by the Stateful Functions runtime for fault-tolerance and consistency.
All access to the storage is scoped to the current function instance, identified by the instance's Address. This means that within an invocation, function instances may only access its own persisted values through this storage.
type CancellationToken ¶
type CancellationToken interface { fmt.Stringer // Token returns the underlying string // used to create the CancellationToken. Token() string // contains filtered or unexported methods }
CancellationToken tags a delayed message send with statefun.SendAfterWithCancellationToken. It can then be used to cancel said message on a best effort basis with statefun.CancelDelayedMessage. The underlying string token can be retrieved by invoking Token().
func NewCancellationToken ¶
func NewCancellationToken(t string) (CancellationToken, error)
NewCancellationToken creates a new cancellation token or returns an error if the token is invalid.
type Context ¶
type Context interface { context.Context // Self is the current invoked function instance's Address. Self() Address // Caller is the caller function instance's Address, if applicable. This is nil // if the message was sent to this function via an ingress. Caller() *Address // Send forwards out a MessageBuilder to another function. Send(message MessageBuilder) // SendAfter forwards out a MessageBuilder to another function, after a specified time.Duration delay. SendAfter(delay time.Duration, message MessageBuilder) // SendAfterWithCancellationToken forwards out a MessageBuilder to another function, // after a specified time.Duration delay. The message is tagged with a non-empty, //unique token to attach to this message, to be used for message cancellation SendAfterWithCancellationToken(delay time.Duration, token CancellationToken, message MessageBuilder) // CancelDelayedMessage cancels a delayed message (a message that was send via SendAfterWithCancellationToken). // NOTE: this is a best-effort operation, since the message might have been already delivered. // If the message was delivered, this is a no-op operation. CancelDelayedMessage(token CancellationToken) // SendEgress forwards out an EgressBuilder to an egress. SendEgress(egress EgressBuilder) // Storage returns the AddressScopedStorage, providing access to stored values scoped to the // current invoked function instance's Address (which is obtainable using Self()). Storage() AddressScopedStorage }
A Context contains information about the current function invocation, such as the invoked function instance's and caller's Address. It is also used for side effects as a result of the invocation such as send messages to other functions or egresses, and provides access to AddressScopedStorage scoped to the current Address. This type is also a context.Context and can be used to ensure any spawned go routines do not outlive the current function invocation.
type EgressBuilder ¶
type EgressBuilder interface {
// contains filtered or unexported methods
}
type Expiration ¶
type Expiration struct {
// contains filtered or unexported fields
}
Expiration Configuration
Defines the way state can be auto expired by the runtime. State Expiration (also known as TTL) can be used to keep state from growing arbitrarily by assigning an Expiration date to a value.
State can be expired after a duration has passed since either the last write to the state, or the last call to the Function.
func ExpireAfterCall ¶
func ExpireAfterCall(duration time.Duration) Expiration
ExpireAfterCall returns an Expiration configuration that would expire a duration after the last invocation of the Function.
func ExpireAfterWrite ¶
func ExpireAfterWrite(duration time.Duration) Expiration
ExpireAfterWrite returns an Expiration configuration that would expire a duration after the last write.
func (Expiration) String ¶
func (e Expiration) String() string
type GenericEgressBuilder ¶
type GenericEgressBuilder struct { // The TypeName as specified when registered Target TypeName // The value to produce Value interface{} // The values type ValueType SimpleType }
GenericEgressBuilder create a generic egress record. For Kafka and Kinesis see KafkaEgressBuilder and KinesisEgressBuilder respectively
type KafkaEgressBuilder ¶
type KafkaEgressBuilder struct { // The TypeName as specified in module.yaml Target TypeName // The Kafka destination topic for that record Topic string // The utf8 encoded string key to produce (can be empty) Key string // The value to produce Value interface{} // An optional hint to this values type ValueType SimpleType }
KafkaEgressBuilder builds a message that can be emitted to a Kafka generic egress. If a ValueType is provided, then Value will be serialized according to the provided ValueType's serializer. Otherwise, we will try to convert Value to bytes if it is one of:
- utf-8 string
- []bytes
- an int (as defined by Kafka's serialization format)
- float (as defined by Kafka's serialization format)
type KinesisEgressBuilder ¶
type KinesisEgressBuilder struct { // The TypeName as specified in module.yaml Target TypeName // The Kinesis destination stream for that record Stream string // The value to produce Value interface{} // An optional hint to this value type ValueType SimpleType // The utf8 encoded string partition key to use PartitionKey string // A utf8 encoded string explicit hash key to use (can be empty) ExplicitHashKey string }
KinesisEgressBuilder builds a message that can be emitted to a Kinesis generic egress. If a ValueType is provided, then Value will be serialized according to the provided ValueType's serializer. Otherwise, we will try to convert Value to bytes if it is one of:
- utf-8 string
- []byte
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func (*Message) As ¶
func (m *Message) As(t SimpleType, receiver interface{}) error
func (*Message) Is ¶
func (m *Message) Is(t SimpleType) bool
func (*Message) ValueTypeName ¶
type MessageBuilder ¶
type MessageBuilder struct { Target Address Value interface{} ValueType SimpleType }
func (MessageBuilder) ToMessage ¶
func (m MessageBuilder) ToMessage() (Message, error)
type MissingSpecs ¶
type MissingSpecs []*protocol.FromFunction_PersistedValueSpec
type PrimitiveType ¶
type PrimitiveType int
const ( BoolType PrimitiveType = iota Int32Type Int64Type Float32Type Float64Type StringType )
func (PrimitiveType) Deserialize ¶
func (p PrimitiveType) Deserialize(r io.Reader, receiver interface{}) error
func (PrimitiveType) GetTypeName ¶
func (p PrimitiveType) GetTypeName() TypeName
type RequestReplyHandler ¶
type RequestReplyHandler interface { http.Handler // Invoke method provides compliance with AWS Lambda handler Invoke(ctx context.Context, payload []byte) ([]byte, error) }
The RequestReplyHandler processes messages from the runtime, invokes functions, and encodes side effects. The handler implements http.Handler so it can easily be embedded in standard Go server frameworks.
type SimpleType ¶
type SimpleType interface { GetTypeName() TypeName Deserialize(r io.Reader, receiver interface{}) error Serialize(writer io.Writer, data interface{}) error }
SimpleType interface is the core abstraction used by Stateful Function's type system, and consists of a few things that StateFun uses to handle Message's and ValueSpec's
1. TypeName to identify the type. 2. (De)serialization methods for marshalling and unmarshalling data
Cross-language primitive types ¶
StateFun's type system has cross-language support for common primitive types, such as boolean, integer (int32), long (int64), etc. These primitive types have built-in SimpleType's implemented for them already with predefined TypeName's.
These primitives have standard encoding across all StateFun language SDKs, so functions in various other languages (Java, Python, etc) can message Golang functions by directly sending supported primitive values as message arguments. Moreover, the type system is used for state values as well; so you can expect that a function can safely read previous state after reimplementing it in a different language.
Common custom types ¶
The type system is also very easily extensible to support more complex types. The Go SDK ships with predefined support for JSON and Protobuf - see MakeJsonType MakeProtobufType. For other formats, it is just a matter of implementing your own SimpleType with a custom typename and serializer.
func MakeJsonType ¶
func MakeJsonType(name TypeName) SimpleType
MakeJsonType creates a new SimpleType with a given TypeName using the standard Go JSON library.
func MakeProtobufType ¶
func MakeProtobufType(m proto.Message) SimpleType
MakeProtobufType creates a new SimpleType for the given protobuf Message.
func MakeProtobufTypeWithTypeName ¶
func MakeProtobufTypeWithTypeName(typeName TypeName) SimpleType
MakeProtobufTypeWithTypeName creates a new SimpleType for the given protobuf Message with a custom namespace.
type StatefulFunction ¶
type StatefulFunction interface { // Invoke is the method called for each message. The passed Context // is canceled as soon as Invoke returns as a signal to // any spawned go routines. The method may return // an Error to signal the invocation failed and should // be reattempted. Invoke(ctx Context, message Message) error }
A StatefulFunction is a user-defined function that can be invoked with a given input. This is the primitive building block for a Stateful Functions application.
Concept ¶
Each individual StatefulFunction is an uniquely invokable "instance" of a registered StatefulFunctionSpec. Each instance is identified by an Address, representing the function's unique id (a string) within its type. From a user's perspective, it would seem as if for each unique function id, there exists a stateful instance of the function that is always available to be invoked within a Stateful Functions application.
Invoking a StatefulFunction ¶
An individual StatefulFunction can be invoked with arbitrary input from any another StatefulFunction (including itself), or routed from ingresses. To invoke a StatefulFunction, the caller simply needs to know the Address of the target function.
As a result of invoking a StatefulFunction, the function may continue to invoke other functions, access persisted values, or send messages to egresses.
Persistent State ¶
Each individual StatefulFunction may have persistent values written to storage that is maintained by the system, providing consistent exactly-once and fault-tolerant guarantees. Please see docs in ValueSpec and AddressScopedStorage for an overview of how to register persistent values and access the storage.
type StatefulFunctionPointer ¶
The StatefulFunctionPointer type is an adapter to allow the use of ordinary functions as StatefulFunction's. If f is a function with the appropriate signature, StatefulFunctionPointer(f) is a StatefulFunction that calls f.
type StatefulFunctionSpec ¶
type StatefulFunctionSpec struct { // The unique TypeName associated // the StatefulFunction being defined. FunctionType TypeName // A slice of registered ValueSpec's that will be used // by this function. A function may only access values // that have been eagerly registered as part of its spec. States []ValueSpec // The physical StatefulFunction instance. Function StatefulFunction }
StatefulFunctionSpec for a Stateful Function, identifiable by a unique TypeName.
type StatefulFunctions ¶
type StatefulFunctions interface { // WithSpec registers a StatefulFunctionSpec, which will be // used to build the runtime function. It returns an error // if the specification is invalid and the handler // fails to register the function. WithSpec(spec StatefulFunctionSpec) error // AsHandler creates a RequestReplyHandler from the registered // function specs. AsHandler() RequestReplyHandler }
StatefulFunctions is a registry for multiple StatefulFunction's. A RequestReplyHandler can be created from the registry that understands how to dispatch invocation requests to the registered functions as well as encode side-effects (e.g., sending messages to other functions or updating values in storage) as the response.
func StatefulFunctionsBuilder ¶
func StatefulFunctionsBuilder() StatefulFunctions
StatefulFunctionsBuilder creates a new StatefulFunctions registry.
type TypeName ¶
A TypeName is used to uniquely identify objects within a Stateful Functions application, including functions, egresses, and types. TypeName's serve as an integral part of identifying these objects for message delivery as well as message data serialization and deserialization.
func ParseTypeName ¶
ParseTypeName creates a TypeName from a canonical string in the format `<namespace>/<type>`.
func TypeNameFrom ¶
TypeNameFrom creates a TypeName from a canonical string in the format `<namespace>/<type>`. This Function assumes correctly formatted strings and will panic on error. For runtime error handling please see ParseTypeName.
func TypeNameFromParts ¶
type ValueSpec ¶
type ValueSpec struct { // The given tpe of the persistent value. The tpe must be a valid // identifier conforming to the following rules: // // 1. First character must be an alphabet letter [a-z] / [A-Z], or an underscore '_'. // 2. Remaining characters can be an alphabet letter [a-z] / [A-Z], a digit [0-9], or // an underscore '-'. // 3. Must not contain any spaces. Name string // The SimpleType of the persistent value. Either // a built-in PrimitiveType or custom implementation. ValueType SimpleType // An optional expiration configuration. Expiration Expiration }
A ValueSpec identifies a registered persistent value of a function, which will be managed by the Stateful Functions runtime for consistency and fault-tolerance. A ValueSpec is registered for a function by configuring it on the function's associated StatefulFunctionSpec.