Documentation
¶
Overview ¶
Package extapi implements a client for Lambda Extensions API and Extension handler to abstract interactions with the API. Implement Extension and use Run function in your main package. For more custom use cases you can use low-level Client directly.
Index ¶
- func EnvAWSLambdaFunctionMemorySizeMB() int
- func EnvAWSLambdaFunctionName() string
- func EnvAWSLambdaFunctionVersion() lambdaext.FunctionVersion
- func EnvAWSLambdaInitializationType() lambdaext.InitType
- func EnvAWSLambdaRuntimeAPI() lambdaext.AWSLambdaRuntimeAPI
- func EnvAWSRegion() string
- func EnvXAmznTraceID() lambdaext.TracingValue
- func Run(ctx context.Context, ext Extension, opts ...Option) error
- type Client
- func (c *Client) ExitError(ctx context.Context, errorType string, err error) (*ErrorResponse, error)
- func (c *Client) GetRegisterResponse() *RegisterResponse
- func (c *Client) InitError(ctx context.Context, errorType string, err error) (*ErrorResponse, error)
- func (c *Client) LogsSubscribe(ctx context.Context, subscribeReq *LogsSubscribeRequest) errordeprecated
- func (c *Client) NextEvent(ctx context.Context) (*NextEventResponse, error)
- func (c *Client) TelemetrySubscribe(ctx context.Context, subscribeReq *TelemetrySubscribeRequest) error
- type ErrorResponse
- type EventType
- type Extension
- type LambdaAPIError
- type LogSubscriptionType
- type LogsBufferingCfg
- type LogsDestination
- type LogsHTTPEncoding
- type LogsHTTPMethod
- type LogsHTTPProtocol
- type LogsSchemaVersion
- type LogsSubscribeRequest
- type NextEventResponse
- type Option
- type RegisterRequest
- type RegisterResponse
- type ShutdownReason
- type TelemetryBufferingCfg
- type TelemetryDestination
- type TelemetrySchemaVersion
- type TelemetrySubscribeRequest
- type TelemetrySubscriptionType
- type Tracing
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EnvAWSLambdaFunctionMemorySizeMB ¶
func EnvAWSLambdaFunctionMemorySizeMB() int
EnvAWSLambdaFunctionMemorySizeMB returns the amount of memory available to the function in MB.
func EnvAWSLambdaFunctionName ¶
func EnvAWSLambdaFunctionName() string
EnvAWSLambdaFunctionName returns the name of the function.
func EnvAWSLambdaFunctionVersion ¶
func EnvAWSLambdaFunctionVersion() lambdaext.FunctionVersion
EnvAWSLambdaFunctionVersion returns the version of the function being executed.
func EnvAWSLambdaInitializationType ¶
EnvAWSLambdaInitializationType returns the initialization type of the function, which is either on-demand or provisioned-concurrency. For information, see Configuring provisioned concurrency.
func EnvAWSLambdaRuntimeAPI ¶
func EnvAWSLambdaRuntimeAPI() lambdaext.AWSLambdaRuntimeAPI
EnvAWSLambdaRuntimeAPI returns the host and port of the runtime API for custom runtime.
func EnvAWSRegion ¶
func EnvAWSRegion() string
EnvAWSRegion returns the AWS Region where the Lambda function is executed.
func EnvXAmznTraceID ¶
func EnvXAmznTraceID() lambdaext.TracingValue
EnvXAmznTraceID returns X-Ray tracing header.
func Run ¶
Run runs the Extension. Run blocks the current goroutine till extension lifecycle is finished or error occurs.
Example ¶
package main import ( "context" "encoding/json" "log" "github.com/zakharovvi/aws-lambda-extensions/extapi" ) type Extension struct{} func (ext *Extension) Init(ctx context.Context, client *extapi.Client) error { registerResp := client.GetRegisterResponse() log.Printf( "initializing extension for function %s(%s), handler %s, and accountID %s\n", registerResp.FunctionName, registerResp.FunctionVersion, registerResp.Handler, registerResp.AccountID, ) return nil } func (ext *Extension) HandleInvokeEvent(ctx context.Context, event *extapi.NextEventResponse) error { b, err := json.Marshal(event) if err != nil { return err } log.Printf("received invocation event: %s\n", b) return nil } func (ext *Extension) Shutdown(ctx context.Context, reason extapi.ShutdownReason, err error) error { log.Printf("shutting down extension due to reason=%s error=%v\n", reason, err) return nil } func (ext *Extension) Err() <-chan error { return nil } func main() { if err := extapi.Run(context.Background(), &Extension{}); err != nil { log.Panic(err) } }
Output:
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Low-level Lambda API client. In most situations it's better to use high-level handlers extapi.Run and logsapi.Run.
Example ¶
End to end example how to use Client, process events and handle errors. Please consider using Run function which is a high-level wrapper over Client.
package main import ( "context" "log" "os" "github.com/zakharovvi/aws-lambda-extensions/extapi" ) func main() { ctx := context.Background() // 1. register extension client, err := extapi.Register(ctx) if err != nil { log.Panic(err) } registerResp := client.GetRegisterResponse() log.Println(registerResp.FunctionName) log.Println(registerResp.FunctionVersion) log.Println(registerResp.Handler) log.Println(registerResp.AccountID) // 2. initialize the extension initFunc := func() error { return nil } if err := initFunc(); err != nil { // report error and exit if initialization failed _, _ = client.InitError(ctx, "ExtensionName.Reason", err) log.Panic(err) } // 3. start polling events // first NextEvent calls notifies lambda that extension initialization has finished for { event, err := client.NextEvent(ctx) if err != nil { // report error and exit if event processing failed _, _ = client.ExitError(ctx, "ExtensionName.Reason", err) log.Panic(err) } if event.EventType == extapi.Shutdown { log.Println(event.ShutdownReason) os.Exit(0) } processEventFunc := func(event *extapi.NextEventResponse) error { return nil } if err := processEventFunc(event); err != nil { // 4. report error and exit if event processing failed _, _ = client.ExitError(ctx, "ExtensionName.Reason", err) log.Panic(err) } } }
Output:
func Register ¶
Register registers the extension with the Lambda Extensions API. This happens during extension Init. Each call must include the list of events in the body and the lambdaext.ExtensionName in the headers.
Example ¶
Register supports optional arguments to override defaults.
package main import ( "context" "log" "net/http" "github.com/zakharovvi/aws-lambda-extensions/extapi" ) func main() { ctx := context.Background() client, err := extapi.Register( ctx, extapi.WithEventTypes([]extapi.EventType{extapi.Shutdown}), extapi.WithExtensionName("binary_file_basename"), extapi.WithAWSLambdaRuntimeAPI("127.0.0.1:8080"), extapi.WithHTTPClient(http.DefaultClient), ) if err != nil { log.Panic(err) } _ = client }
Output:
func (*Client) ExitError ¶
func (c *Client) ExitError(ctx context.Context, errorType string, err error) (*ErrorResponse, error)
ExitError reports an error to the platform before exiting. Call it when you encounter an unexpected failure.
func (*Client) GetRegisterResponse ¶
func (c *Client) GetRegisterResponse() *RegisterResponse
func (*Client) InitError ¶
func (c *Client) InitError(ctx context.Context, errorType string, err error) (*ErrorResponse, error)
InitError reports an initialization error to the platform. Call it when you registered but failed to initialize.
Example ¶
Client.InitError accept error details to send to lambda.
package main import ( "context" "errors" "log" "github.com/zakharovvi/aws-lambda-extensions/extapi" ) func main() { ctx := context.Background() client, err := extapi.Register(ctx) if err != nil { log.Panic(err) } // if something went wrong call client.InitError and exit without calling client.NextEvent. errResp, err := client.InitError(ctx, "ExtensionName.Reason", errors.New("text description of the error")) if err != nil { log.Println(err) } if errResp.Status != "OK" { log.Printf("unknown error response status: %s, want OK", errResp.Status) } }
Output:
func (*Client) LogsSubscribe
deprecated
func (c *Client) LogsSubscribe(ctx context.Context, subscribeReq *LogsSubscribeRequest) error
LogsSubscribe subscribes to a log stream. Lambda streams the logs to the extension, and the extension can then process, filter, and send the logs to any preferred destination. Subscription should occur during the extension initialization phase.
Deprecated: The Lambda Telemetry API supersedes the Lambda Logs API. Use TelemetrySubscribe instead.
https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html
Example ¶
package main import ( "context" "log" "net/http" "github.com/zakharovvi/aws-lambda-extensions/extapi" ) func main() { ctx := context.Background() destinationHostPort := "sandbox.localdomain:8080" // 1. start log receiving server srv := http.Server{ Addr: destinationHostPort, Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // process logs }), } defer func() { if err := srv.ListenAndServe(); err != nil { log.Println(err) } }() // 2. register extension and subscribe only to shutdown events client, err := extapi.Register(ctx, extapi.WithEventTypes([]extapi.EventType{extapi.Shutdown})) if err != nil { log.Panic(err) } // 3. subscribe to logs api req := extapi.NewLogsSubscribeRequest("http://"+destinationHostPort, nil, nil) if err := client.LogsSubscribe(ctx, req); err != nil { // 4. report error and exit if event processing failed _, _ = client.InitError(ctx, "ExtensionName.Reason", err) log.Panic(err) } // 5. block till shutdown event _, _ = client.NextEvent(ctx) }
Output:
func (*Client) NextEvent ¶
func (c *Client) NextEvent(ctx context.Context) (*NextEventResponse, error)
NextEvent blocks while long polling for the next lambda invoke or shutdown By default, the Go HTTP client has no timeout, and in this case this is actually the desired behavior to enable long polling of the Extensions API.
func (*Client) TelemetrySubscribe ¶
func (c *Client) TelemetrySubscribe(ctx context.Context, subscribeReq *TelemetrySubscribeRequest) error
TelemetrySubscribe subscribes to a telemetry stream Lambda streams the telemetry to the extension, and the extension can then process, filter, and send the logs to any preferred destination. Subscription should occur during the extension initialization phase. https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html
type ErrorResponse ¶
type ErrorResponse struct {
Status string `json:"status"`
}
ErrorResponse is the body of the response for /init/error and /exit/error.
type EventType ¶
type EventType string
EventType represents the type of events received from /event/next.
type Extension ¶
type Extension interface { // Init is called after extension Register and before invoking lambda function. // It's the best place to make network connections, warmup caches, preallocate buffers, etc. Init(ctx context.Context, client *Client) error // HandleInvokeEvent is called after receiving Invoke event type from Lambda API. // Shutdown event type is handled inside Run internally and not exposed to the Extension. HandleInvokeEvent(ctx context.Context, event *NextEventResponse) error // Shutdown is called when Lambda API signals the extension to stop or in case of an error. // There will be no calls of HandleInvokeEvent after Shutdown was called. // Extension should flush all unsaved changes to persistent storage. // Run will return after calling the Shutdown and handling its result. Shutdown(ctx context.Context, reason ShutdownReason, err error) error // Err signals an error to Run loop and stop the extension. // Only the first error is read from the channel. Consider using unblocking send to put errors into the channel. // error channel can be nil. Err() <-chan error }
Extension abstracts the extension logic from Lambda Extensions API. For Telemetry API extension, use telemetryapi.Processor and telemetryapi.Run.
type LambdaAPIError ¶
type LambdaAPIError struct { Type string `json:"errorType"` Message string `json:"errorMessage"` HTTPStatusCode int `json:"-"` }
func (LambdaAPIError) Error ¶
func (e LambdaAPIError) Error() string
type LogSubscriptionType ¶
type LogSubscriptionType string
LogSubscriptionType represents the type of logs in Lambda.
const ( // LogSubscriptionTypePlatform is to receive logs emitted by the platform. LogSubscriptionTypePlatform LogSubscriptionType = "platform" // LogSubscriptionTypeFunction is to receive logs emitted by the function. LogSubscriptionTypeFunction LogSubscriptionType = "function" // LogSubscriptionTypeExtension is to receive logs emitted by the extension. LogSubscriptionTypeExtension LogSubscriptionType = "extension" )
type LogsBufferingCfg ¶
type LogsBufferingCfg struct { // MaxItems is the maximum number of events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000) MaxItems uint32 `json:"maxItems"` // MaxBytes is the maximum size in bytes of the logs to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576) MaxBytes uint32 `json:"maxBytes"` // TimeoutMS is the maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000) TimeoutMS uint32 `json:"timeoutMs"` }
LogsBufferingCfg is the configuration set for receiving logs from Logs API. Whichever of the conditions below is met first, the logs will be sent.
type LogsDestination ¶
type LogsDestination struct { Protocol LogsHTTPProtocol `json:"protocol"` URI string `json:"URI"` HTTPMethod LogsHTTPMethod `json:"method,omitempty"` Encoding LogsHTTPEncoding `json:"encoding,omitempty"` }
LogsDestination is the configuration for listeners who would like to receive logs with HTTP.
type LogsHTTPEncoding ¶
type LogsHTTPEncoding string
LogsHTTPEncoding denotes what the content is encoded in.
const (
JSON LogsHTTPEncoding = "JSON"
)
type LogsHTTPMethod ¶
type LogsHTTPMethod string
LogsHTTPMethod represents the HTTP method used to receive logs from Logs API.
const ( // HTTPPost is to receive logs through POST. HTTPPost LogsHTTPMethod = "POST" // HTTPPut is to receive logs through PUT. HTTPPut LogsHTTPMethod = "PUT" )
type LogsHTTPProtocol ¶
type LogsHTTPProtocol string
LogsHTTPProtocol is used to specify the protocol when subscribing to Logs API for HTTP.
const (
HTTPProto LogsHTTPProtocol = "HTTP"
)
type LogsSchemaVersion ¶
type LogsSchemaVersion string
const (
LogsSchemaVersion20210318 LogsSchemaVersion = "2021-03-18"
)
type LogsSubscribeRequest ¶
type LogsSubscribeRequest struct { SchemaVersion LogsSchemaVersion `json:"schemaVersion,omitempty"` LogTypes []LogSubscriptionType `json:"types"` BufferingCfg *LogsBufferingCfg `json:"buffering,omitempty"` Destination *LogsDestination `json:"destination"` }
LogsSubscribeRequest is the request body that is sent to Logs API on subscribe.
func NewLogsSubscribeRequest
deprecated
func NewLogsSubscribeRequest(url string, logTypes []LogSubscriptionType, bufferingCfg *LogsBufferingCfg) *LogsSubscribeRequest
NewLogsSubscribeRequest creates LogsSubscribeRequest with sensible defaults.
Deprecated: The Lambda Telemetry API supersedes the Lambda Logs API. Use NewTelemetrySubscribeRequest instead.
type NextEventResponse ¶
type NextEventResponse struct { // Either INVOKE or SHUTDOWN. EventType EventType `json:"eventType"` // The instant that the invocation times out, as epoch milliseconds. DeadlineMs int64 `json:"deadlineMs"` // The AWS Request ID, for INVOKE events. RequestID lambdaext.RequestID `json:"requestId"` // The ARN of the function being invoked, for INVOKE events. InvokedFunctionArn string `json:"invokedFunctionArn"` // XRay trace ID, for INVOKE events. Tracing Tracing `json:"tracing"` // The reason for termination, if this is a shutdown event. ShutdownReason ShutdownReason `json:"shutdownReason"` }
NextEventResponse is the response for /event/next.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithAWSLambdaRuntimeAPI ¶
func WithEventTypes ¶
func WithExtensionName ¶
func WithExtensionName(name lambdaext.ExtensionName) Option
func WithHTTPClient ¶
func WithLogger ¶
type RegisterRequest ¶
type RegisterRequest struct {
EventTypes []EventType `json:"events"`
}
type RegisterResponse ¶
type RegisterResponse struct { FunctionName string `json:"functionName"` FunctionVersion lambdaext.FunctionVersion `json:"functionVersion"` Handler string `json:"handler"` // AccountID returns the account ID associated with the Lambda function that you're registering the extension for. AccountID string `json:"accountId"` }
RegisterResponse is the body of the response for /register.
type ShutdownReason ¶
type ShutdownReason string
ShutdownReason represents the reason for a shutdown event.
const ( // Spindown is a normal end to a function. Spindown ShutdownReason = "spindown" // Timeout means the handler ran out of time. Timeout ShutdownReason = "timout" // Failure is any other shutdown type, such as out-of-memory. Failure ShutdownReason = "failure" // ExtensionError is used when one of Client or Extension methods return error. It is not returned by lambda. ExtensionError ShutdownReason = "extension_error" )
type TelemetryBufferingCfg ¶
type TelemetryBufferingCfg struct { // MaxItems is the maximum number of events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000) MaxItems uint32 `json:"maxItems"` // MaxBytes is the maximum size in bytes of data to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576) MaxBytes uint32 `json:"maxBytes"` // TimeoutMS is the maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000) TimeoutMS uint32 `json:"timeoutMs"` }
TelemetryBufferingCfg is the configuration set for receiving events from Telemetry API. Whichever of the conditions below is met first, the events will be sent.
type TelemetryDestination ¶
TelemetryDestination is the configuration settings that define the telemetry event destination and the protocol for event delivery.
type TelemetrySchemaVersion ¶
type TelemetrySchemaVersion string
const (
TelemetrySchemaVersion20220701 TelemetrySchemaVersion = "2022-07-01"
)
type TelemetrySubscribeRequest ¶
type TelemetrySubscribeRequest struct { SchemaVersion TelemetrySchemaVersion `json:"schemaVersion,omitempty"` Types []TelemetrySubscriptionType `json:"types"` BufferingCfg *TelemetryBufferingCfg `json:"buffering,omitempty"` Destination *TelemetryDestination `json:"destination"` }
TelemetrySubscribeRequest is the request body that is sent to Telemetry API on subscribe.
func NewTelemetrySubscribeRequest ¶
func NewTelemetrySubscribeRequest(url string, types []TelemetrySubscriptionType, bufferingCfg *TelemetryBufferingCfg) *TelemetrySubscribeRequest
NewTelemetrySubscribeRequest creates TelemetrySubscribeRequest with sensible defaults.
type TelemetrySubscriptionType ¶
type TelemetrySubscriptionType string
TelemetrySubscriptionType represents the type of telemetry events in Lambda.
const ( // TelemetrySubscriptionTypePlatform is logs, metrics, and traces, which describe events and errors // related to the execution environment runtime lifecycle, extension lifecycle, and function invocations. TelemetrySubscriptionTypePlatform TelemetrySubscriptionType = "platform" // TelemetrySubscriptionTypeFunction is custom logs that the Lambda function code generates. TelemetrySubscriptionTypeFunction TelemetrySubscriptionType = "function" // TelemetrySubscriptionTypeExtension is custom logs that the Lambda extension code generates. TelemetrySubscriptionTypeExtension TelemetrySubscriptionType = "extension" )
type Tracing ¶
type Tracing struct { Type lambdaext.TracingType `json:"type"` Value lambdaext.TracingValue `json:"value"` }
Tracing is part of the response for /event/next.