extapi

package
v0.2.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2022 License: MIT Imports: 14 Imported by: 6

Documentation

Overview

Package extapi implements a client for Lambda Extensions API and Extension handler to abstract interactions with the API. Implement Extension and use Run function in your main package. For more custom use cases you can use low-level Client directly.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func EnvAWSLambdaFunctionMemorySizeMB

func EnvAWSLambdaFunctionMemorySizeMB() int

EnvAWSLambdaFunctionMemorySizeMB returns the amount of memory available to the function in MB.

func EnvAWSLambdaFunctionName

func EnvAWSLambdaFunctionName() string

EnvAWSLambdaFunctionName returns the name of the function.

func EnvAWSLambdaFunctionVersion

func EnvAWSLambdaFunctionVersion() lambdaext.FunctionVersion

EnvAWSLambdaFunctionVersion returns the version of the function being executed.

func EnvAWSLambdaInitializationType

func EnvAWSLambdaInitializationType() lambdaext.InitType

EnvAWSLambdaInitializationType returns the initialization type of the function, which is either on-demand or provisioned-concurrency. For information, see Configuring provisioned concurrency.

func EnvAWSLambdaRuntimeAPI

func EnvAWSLambdaRuntimeAPI() lambdaext.AWSLambdaRuntimeAPI

EnvAWSLambdaRuntimeAPI returns the host and port of the runtime API for custom runtime.

func EnvAWSRegion

func EnvAWSRegion() string

EnvAWSRegion returns the AWS Region where the Lambda function is executed.

func EnvXAmznTraceID

func EnvXAmznTraceID() lambdaext.TracingValue

EnvXAmznTraceID returns X-Ray tracing header.

func Run

func Run(ctx context.Context, ext Extension, opts ...Option) error

Run runs the Extension. Run blocks the current goroutine till extension lifecycle is finished or error occurs.

Example
package main

import (
	"context"
	"encoding/json"
	"log"

	"github.com/zakharovvi/aws-lambda-extensions/extapi"
)

type Extension struct{}

func (ext *Extension) Init(ctx context.Context, client *extapi.Client) error {
	registerResp := client.GetRegisterResponse()
	log.Printf(
		"initializing extension for function %s(%s), handler %s, and accountID %s\n",
		registerResp.FunctionName,
		registerResp.FunctionVersion,
		registerResp.Handler,
		registerResp.AccountID,
	)

	return nil
}

func (ext *Extension) HandleInvokeEvent(ctx context.Context, event *extapi.NextEventResponse) error {
	b, err := json.Marshal(event)
	if err != nil {
		return err
	}
	log.Printf("received invocation event: %s\n", b)

	return nil
}

func (ext *Extension) Shutdown(ctx context.Context, reason extapi.ShutdownReason, err error) error {
	log.Printf("shutting down extension due to reason=%s error=%v\n", reason, err)

	return nil
}

func (ext *Extension) Err() <-chan error {
	return nil
}

func main() {
	if err := extapi.Run(context.Background(), &Extension{}); err != nil {
		log.Panic(err)
	}
}
Output:

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Low-level Lambda API client. In most situations it's better to use high-level handlers extapi.Run and logsapi.Run.

Example

End to end example how to use Client, process events and handle errors. Please consider using Run function which is a high-level wrapper over Client.

package main

import (
	"context"
	"log"
	"os"

	"github.com/zakharovvi/aws-lambda-extensions/extapi"
)

func main() {
	ctx := context.Background()

	// 1. register extension
	client, err := extapi.Register(ctx)
	if err != nil {
		log.Panic(err)
	}
	registerResp := client.GetRegisterResponse()
	log.Println(registerResp.FunctionName)
	log.Println(registerResp.FunctionVersion)
	log.Println(registerResp.Handler)
	log.Println(registerResp.AccountID)

	// 2. initialize the extension
	initFunc := func() error { return nil }
	if err := initFunc(); err != nil {
		// report error and exit if initialization failed
		_, _ = client.InitError(ctx, "ExtensionName.Reason", err)
		log.Panic(err)
	}

	// 3. start polling events
	// first NextEvent calls notifies lambda that extension initialization has finished
	for {
		event, err := client.NextEvent(ctx)
		if err != nil {
			// report error and exit if event processing failed
			_, _ = client.ExitError(ctx, "ExtensionName.Reason", err)
			log.Panic(err)
		}
		if event.EventType == extapi.Shutdown {
			log.Println(event.ShutdownReason)
			os.Exit(0)
		}

		processEventFunc := func(event *extapi.NextEventResponse) error { return nil }
		if err := processEventFunc(event); err != nil {
			// 4. report error and exit if event processing failed
			_, _ = client.ExitError(ctx, "ExtensionName.Reason", err)
			log.Panic(err)
		}
	}
}
Output:

func Register

func Register(ctx context.Context, opts ...Option) (*Client, error)

Register registers the extension with the Lambda Extensions API. This happens during extension Init. Each call must include the list of events in the body and the lambdaext.ExtensionName in the headers.

Example

Register supports optional arguments to override defaults.

package main

import (
	"context"
	"log"
	"net/http"

	"github.com/zakharovvi/aws-lambda-extensions/extapi"
)

func main() {
	ctx := context.Background()

	client, err := extapi.Register(
		ctx,
		extapi.WithEventTypes([]extapi.EventType{extapi.Shutdown}),
		extapi.WithExtensionName("binary_file_basename"),
		extapi.WithAWSLambdaRuntimeAPI("127.0.0.1:8080"),
		extapi.WithHTTPClient(http.DefaultClient),
	)
	if err != nil {
		log.Panic(err)
	}
	_ = client
}
Output:

func (*Client) ExitError

func (c *Client) ExitError(ctx context.Context, errorType string, err error) (*ErrorResponse, error)

ExitError reports an error to the platform before exiting. Call it when you encounter an unexpected failure.

func (*Client) GetRegisterResponse

func (c *Client) GetRegisterResponse() *RegisterResponse

func (*Client) InitError

func (c *Client) InitError(ctx context.Context, errorType string, err error) (*ErrorResponse, error)

InitError reports an initialization error to the platform. Call it when you registered but failed to initialize.

Example

Client.InitError accept error details to send to lambda.

package main

import (
	"context"
	"errors"
	"log"

	"github.com/zakharovvi/aws-lambda-extensions/extapi"
)

func main() {
	ctx := context.Background()

	client, err := extapi.Register(ctx)
	if err != nil {
		log.Panic(err)
	}

	// if something went wrong call client.InitError and exit without calling client.NextEvent.
	errResp, err := client.InitError(ctx, "ExtensionName.Reason", errors.New("text description of the error"))
	if err != nil {
		log.Println(err)
	}
	if errResp.Status != "OK" {
		log.Printf("unknown error response status: %s, want OK", errResp.Status)
	}
}
Output:

func (*Client) LogsSubscribe deprecated

func (c *Client) LogsSubscribe(ctx context.Context, subscribeReq *LogsSubscribeRequest) error

LogsSubscribe subscribes to a log stream. Lambda streams the logs to the extension, and the extension can then process, filter, and send the logs to any preferred destination. Subscription should occur during the extension initialization phase.

Deprecated: The Lambda Telemetry API supersedes the Lambda Logs API. Use TelemetrySubscribe instead.

https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html

Example
package main

import (
	"context"
	"log"
	"net/http"

	"github.com/zakharovvi/aws-lambda-extensions/extapi"
)

func main() {
	ctx := context.Background()
	destinationHostPort := "sandbox.localdomain:8080"

	// 1. start log receiving server
	srv := http.Server{
		Addr: destinationHostPort,
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			// process logs
		}),
	}
	defer func() {
		if err := srv.ListenAndServe(); err != nil {
			log.Println(err)
		}
	}()

	// 2. register extension and subscribe only to shutdown events
	client, err := extapi.Register(ctx, extapi.WithEventTypes([]extapi.EventType{extapi.Shutdown}))
	if err != nil {
		log.Panic(err)
	}

	// 3. subscribe to logs api
	req := extapi.NewLogsSubscribeRequest("http://"+destinationHostPort, nil, nil)
	if err := client.LogsSubscribe(ctx, req); err != nil {
		// 4. report error and exit if event processing failed
		_, _ = client.InitError(ctx, "ExtensionName.Reason", err)
		log.Panic(err)
	}

	// 5. block till shutdown event
	_, _ = client.NextEvent(ctx)
}
Output:

func (*Client) NextEvent

func (c *Client) NextEvent(ctx context.Context) (*NextEventResponse, error)

NextEvent blocks while long polling for the next lambda invoke or shutdown By default, the Go HTTP client has no timeout, and in this case this is actually the desired behavior to enable long polling of the Extensions API.

func (*Client) TelemetrySubscribe

func (c *Client) TelemetrySubscribe(ctx context.Context, subscribeReq *TelemetrySubscribeRequest) error

TelemetrySubscribe subscribes to a telemetry stream Lambda streams the telemetry to the extension, and the extension can then process, filter, and send the logs to any preferred destination. Subscription should occur during the extension initialization phase. https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html

type ErrorResponse

type ErrorResponse struct {
	Status string `json:"status"`
}

ErrorResponse is the body of the response for /init/error and /exit/error.

type EventType

type EventType string

EventType represents the type of events received from /event/next.

const (
	// Invoke is the lambda invoke event.
	Invoke EventType = "INVOKE"
	// Shutdown is a shutdown event for the environment.
	Shutdown EventType = "SHUTDOWN"
)

type Extension

type Extension interface {
	// Init is called after extension Register and before invoking lambda function.
	// It's the best place to make network connections, warmup caches, preallocate buffers, etc.
	Init(ctx context.Context, client *Client) error
	// HandleInvokeEvent is called after receiving Invoke event type from Lambda API.
	// Shutdown event type is handled inside Run internally and not exposed to the Extension.
	HandleInvokeEvent(ctx context.Context, event *NextEventResponse) error
	// Shutdown is called when Lambda API signals the extension to stop or in case of an error.
	// There will be no calls of HandleInvokeEvent after Shutdown was called.
	// Extension should flush all unsaved changes to persistent storage.
	// Run will return after calling the Shutdown and handling its result.
	Shutdown(ctx context.Context, reason ShutdownReason, err error) error
	// Err signals an error to Run loop and stop the extension.
	// Only the first error is read from the channel. Consider using unblocking send to put errors into the channel.
	// error channel can be nil.
	Err() <-chan error
}

Extension abstracts the extension logic from Lambda Extensions API. For Telemetry API extension, use telemetryapi.Processor and telemetryapi.Run.

type LambdaAPIError

type LambdaAPIError struct {
	Type           string `json:"errorType"`
	Message        string `json:"errorMessage"`
	HTTPStatusCode int    `json:"-"`
}

func (LambdaAPIError) Error

func (e LambdaAPIError) Error() string

type LogSubscriptionType

type LogSubscriptionType string

LogSubscriptionType represents the type of logs in Lambda.

const (
	// LogSubscriptionTypePlatform is to receive logs emitted by the platform.
	LogSubscriptionTypePlatform LogSubscriptionType = "platform"
	// LogSubscriptionTypeFunction is to receive logs emitted by the function.
	LogSubscriptionTypeFunction LogSubscriptionType = "function"
	// LogSubscriptionTypeExtension is to receive logs emitted by the extension.
	LogSubscriptionTypeExtension LogSubscriptionType = "extension"
)

type LogsBufferingCfg

type LogsBufferingCfg struct {
	// MaxItems is the maximum number of events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000)
	MaxItems uint32 `json:"maxItems"`
	// MaxBytes is the maximum size in bytes of the logs to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576)
	MaxBytes uint32 `json:"maxBytes"`
	// TimeoutMS is the maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000)
	TimeoutMS uint32 `json:"timeoutMs"`
}

LogsBufferingCfg is the configuration set for receiving logs from Logs API. Whichever of the conditions below is met first, the logs will be sent.

type LogsDestination

type LogsDestination struct {
	Protocol   LogsHTTPProtocol `json:"protocol"`
	URI        string           `json:"URI"`
	HTTPMethod LogsHTTPMethod   `json:"method,omitempty"`
	Encoding   LogsHTTPEncoding `json:"encoding,omitempty"`
}

LogsDestination is the configuration for listeners who would like to receive logs with HTTP.

type LogsHTTPEncoding

type LogsHTTPEncoding string

LogsHTTPEncoding denotes what the content is encoded in.

const (
	JSON LogsHTTPEncoding = "JSON"
)

type LogsHTTPMethod

type LogsHTTPMethod string

LogsHTTPMethod represents the HTTP method used to receive logs from Logs API.

const (
	// HTTPPost is to receive logs through POST.
	HTTPPost LogsHTTPMethod = "POST"
	// HTTPPut is to receive logs through PUT.
	HTTPPut LogsHTTPMethod = "PUT"
)

type LogsHTTPProtocol

type LogsHTTPProtocol string

LogsHTTPProtocol is used to specify the protocol when subscribing to Logs API for HTTP.

const (
	HTTPProto LogsHTTPProtocol = "HTTP"
)

type LogsSchemaVersion

type LogsSchemaVersion string
const (
	LogsSchemaVersion20210318 LogsSchemaVersion = "2021-03-18"
)

type LogsSubscribeRequest

type LogsSubscribeRequest struct {
	SchemaVersion LogsSchemaVersion     `json:"schemaVersion,omitempty"`
	LogTypes      []LogSubscriptionType `json:"types"`
	BufferingCfg  *LogsBufferingCfg     `json:"buffering,omitempty"`
	Destination   *LogsDestination      `json:"destination"`
}

LogsSubscribeRequest is the request body that is sent to Logs API on subscribe.

func NewLogsSubscribeRequest deprecated

func NewLogsSubscribeRequest(url string, logTypes []LogSubscriptionType, bufferingCfg *LogsBufferingCfg) *LogsSubscribeRequest

NewLogsSubscribeRequest creates LogsSubscribeRequest with sensible defaults.

Deprecated: The Lambda Telemetry API supersedes the Lambda Logs API. Use NewTelemetrySubscribeRequest instead.

type NextEventResponse

type NextEventResponse struct {
	// Either INVOKE or SHUTDOWN.
	EventType EventType `json:"eventType"`
	// The instant that the invocation times out, as epoch milliseconds.
	DeadlineMs int64 `json:"deadlineMs"`
	// The AWS Request ID, for INVOKE events.
	RequestID lambdaext.RequestID `json:"requestId"`
	// The ARN of the function being invoked, for INVOKE events.
	InvokedFunctionArn string `json:"invokedFunctionArn"`
	// XRay trace ID, for INVOKE events.
	Tracing Tracing `json:"tracing"`
	// The reason for termination, if this is a shutdown event.
	ShutdownReason ShutdownReason `json:"shutdownReason"`
}

NextEventResponse is the response for /event/next.

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAWSLambdaRuntimeAPI

func WithAWSLambdaRuntimeAPI(api string) Option

func WithEventTypes

func WithEventTypes(types []EventType) Option

func WithExtensionName

func WithExtensionName(name lambdaext.ExtensionName) Option

func WithHTTPClient

func WithHTTPClient(httpClient *http.Client) Option

func WithLogger

func WithLogger(log logr.Logger) Option

type RegisterRequest

type RegisterRequest struct {
	EventTypes []EventType `json:"events"`
}

type RegisterResponse

type RegisterResponse struct {
	FunctionName    string                    `json:"functionName"`
	FunctionVersion lambdaext.FunctionVersion `json:"functionVersion"`
	Handler         string                    `json:"handler"`
	// AccountID returns the account ID associated with the Lambda function that you're registering the extension for.
	AccountID string `json:"accountId"`
}

RegisterResponse is the body of the response for /register.

type ShutdownReason

type ShutdownReason string

ShutdownReason represents the reason for a shutdown event.

const (
	// Spindown is a normal end to a function.
	Spindown ShutdownReason = "spindown"
	// Timeout means the handler ran out of time.
	Timeout ShutdownReason = "timout"
	// Failure is any other shutdown type, such as out-of-memory.
	Failure ShutdownReason = "failure"
	// ExtensionError is used when one of Client or Extension methods return error. It is not returned by lambda.
	ExtensionError ShutdownReason = "extension_error"
)

type TelemetryBufferingCfg

type TelemetryBufferingCfg struct {
	// MaxItems is the maximum number of events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000)
	MaxItems uint32 `json:"maxItems"`
	// MaxBytes is the maximum size in bytes of data to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576)
	MaxBytes uint32 `json:"maxBytes"`
	// TimeoutMS is the maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000)
	TimeoutMS uint32 `json:"timeoutMs"`
}

TelemetryBufferingCfg is the configuration set for receiving events from Telemetry API. Whichever of the conditions below is met first, the events will be sent.

type TelemetryDestination

type TelemetryDestination struct {
	Protocol string `json:"protocol"`
	URI      string `json:"URI"`
}

TelemetryDestination is the configuration settings that define the telemetry event destination and the protocol for event delivery.

type TelemetrySchemaVersion

type TelemetrySchemaVersion string
const (
	TelemetrySchemaVersion20220701 TelemetrySchemaVersion = "2022-07-01"
)

type TelemetrySubscribeRequest

type TelemetrySubscribeRequest struct {
	SchemaVersion TelemetrySchemaVersion      `json:"schemaVersion,omitempty"`
	Types         []TelemetrySubscriptionType `json:"types"`
	BufferingCfg  *TelemetryBufferingCfg      `json:"buffering,omitempty"`
	Destination   *TelemetryDestination       `json:"destination"`
}

TelemetrySubscribeRequest is the request body that is sent to Telemetry API on subscribe.

func NewTelemetrySubscribeRequest

func NewTelemetrySubscribeRequest(url string, types []TelemetrySubscriptionType, bufferingCfg *TelemetryBufferingCfg) *TelemetrySubscribeRequest

NewTelemetrySubscribeRequest creates TelemetrySubscribeRequest with sensible defaults.

type TelemetrySubscriptionType

type TelemetrySubscriptionType string

TelemetrySubscriptionType represents the type of telemetry events in Lambda.

const (
	// TelemetrySubscriptionTypePlatform is logs, metrics, and traces, which describe events and errors
	// related to the execution environment runtime lifecycle, extension lifecycle, and function invocations.
	TelemetrySubscriptionTypePlatform TelemetrySubscriptionType = "platform"
	// TelemetrySubscriptionTypeFunction is custom logs that the Lambda function code generates.
	TelemetrySubscriptionTypeFunction TelemetrySubscriptionType = "function"
	// TelemetrySubscriptionTypeExtension is custom logs that the Lambda extension code generates.
	TelemetrySubscriptionTypeExtension TelemetrySubscriptionType = "extension"
)

type Tracing

type Tracing struct {
	Type  lambdaext.TracingType  `json:"type"`
	Value lambdaext.TracingValue `json:"value"`
}

Tracing is part of the response for /event/next.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL