cflog2otel

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2024 License: MIT Imports: 39 Imported by: 0

README

cflog2otel

cflog2otel is a Lambda Function designed to aggregate AWS CloudFront access logs and export the metrics using the OpenTelemetry (Otel) protocol. When CloudFront logs are uploaded to S3, the Lambda is triggered by an S3 Notification, and it automatically collects and monitors important metrics in real-time.

Features

  • CloudFront Log S3 Event Handling
    • Automatically processes CloudFront logs stored in S3 whenever an S3 Notification Event is triggered.
  • Operates as a Lambda Function
    • Runs on AWS Lambda, allowing for easy serverless scaling.
  • OpenTelemetry Compatible
    • Aggregated metrics are exported using the OpenTelemetry protocol, compatible with various monitoring tools and platforms.
  • Flexible Configuration
    • Customize the log fields to be aggregated, such as request status codes or byte counts.

Installation

Pre-built release binaries are provided. You can deploy the binary as a Lambda function to aggregate CloudFront logs in real-time. This binary acts as the bootstrap for the Lambda.

1. Download the Release Binary

Download the latest binary from the Releases page.

2. Create the Lambda Function
  1. Create the Lambda Function:

    • Use the AWS Lambda console to create a new Lambda function and select a custom runtime.
    • Upload the downloaded binary as the bootstrap for the Lambda function.
  2. Set Up S3 Notification Trigger:

    • Configure an S3 Notification on the S3 bucket where CloudFront logs are stored to trigger the Lambda function.
Workflow
  1. S3 Notification Event: When CloudFront logs are uploaded to S3, an S3 Notification Event triggers the Lambda function.
  2. Lambda Function Execution: The cflog2otel Lambda function is invoked by the event.
  3. Log Download and Aggregation: The Lambda function downloads the log file from S3 and aggregates the metrics based on the specified configuration.
  4. Metrics Export: The aggregated metrics are exported to the specified OpenTelemetry endpoint.

Configuration

Below is an example of the configuration file used to parse CloudFront logs and export metrics when the Lambda function is triggered by an S3 event.

local cel = std.native('cel');

{
  otel: {
    endpoint: 'http://localhost:4317/',
    gzip: true,
  },
  resource_attributes: [
    {
      key: 'service.name',
      value: 'Amazon CloudFront',
    },
    {
      key: 'aws.cloudfront.distribution_id',
      value: cel('cloudfront.distributionId'),
    },
  ],
  scope: {
    name: 'cflog2otel',
    version: '0.0.0',
    schema_url: 'https://example.com/schemas/1.0.0',
  },
  metrics: [
    {
      name: 'http.server.requests',
      description: 'The number of HTTP requests',
      type: 'Count',
      attributes: [
        {
          key: 'http.status_code',
          value: cel('log.scStatusCategory'),
        },
      ],
    },
  ],
}
Example of Metrics Aggregation for CloudFront Logs

Consider a CloudFront log file, such as the one described in the AWS CloudFront Developer Guide, with the following entries:

Log File Example
Path: example-prefix/EMLARXS9EXAMPLE.2019-11-14-20.RT4KCN4SGK9.gz

#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
2019-12-04	21:02:31	LAX1	392	192.0.2.100	GET	d111111abcdef8.cloudfront.net	/index.html	200	-	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Hit	SOX4xwn4XV6Q4rgb7XiVGOHms_BGlTAC4KyHmureZmBNrjGdRLiNIQ==	d111111abcdef8.cloudfront.net	https	23	0.001	-	TLSv1.2	ECDHE-RSA-AES128-GCM-SHA256	Hit	HTTP/2.0	-	-	11040	0.001	Hit	text/html	78	-	-
2019-12-04	21:02:31	LAX1	392	192.0.2.100	GET	d111111abcdef8.cloudfront.net	/index.html	200	-	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Hit	k6WGMNkEzR5BEM_SaF47gjtX9zBDO2m349OY2an0QPEaUum1ZOLrow==	d111111abcdef8.cloudfront.net	https	23	0.000	-	TLSv1.2	ECDHE-RSA-AES128-GCM-SHA256	Hit	HTTP/2.0	-	-	11040	0.000	Hit	text/html	78	-	-
2019-12-04	21:02:31	LAX1	392	192.0.2.100	GET	d111111abcdef8.cloudfront.net	/index.html	200	-	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Hit	f37nTMVvnKvV2ZSvEsivup_c2kZ7VXzYdjC-GUQZ5qNs-89BlWazbw==	d111111abcdef8.cloudfront.net	https	23	0.001	-	TLSv1.2	ECDHE-RSA-AES128-GCM-SHA256	Hit	HTTP/2.0	-	-	11040	0.001	Hit	text/html	78	-	-
2019-12-13	22:36:27	SEA19-C1	900	192.0.2.200	GET	d111111abcdef8.cloudfront.net	/favicon.ico	502	http://www.example.com/	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Error	1pkpNfBQ39sYMnjjUQjmH2w1wdJnbHYTbag21o_3OfcQgPzdL2RSSQ==	www.example.com	http	675	0.102	-	-	-	Error	HTTP/1.1	-	-	25260	0.102	OriginDnsError	text/html	507	-	-
2019-12-13	22:36:26	SEA19-C1	900	192.0.2.200	GET	d111111abcdef8.cloudfront.net	/	502	-	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Error	3AqrZGCnF_g0-5KOvfA7c9XLcf4YGvMFSeFdIetR1N_2y8jSis8Zxg==	www.example.com	http	735	0.107	-	-	-	Error	HTTP/1.1	-	-	3802	0.107	OriginDnsError	text/html	507	-	-
2019-12-13	22:37:02	SEA19-C2	900	192.0.2.200	GET	d111111abcdef8.cloudfront.net	/	502	-	curl/7.55.1	-	-	Error	kBkDzGnceVtWHqSCqBUqtA_cEs2T3tFUBbnBNkB9El_uVRhHgcZfcw==	www.example.com	http	387	0.103	-	-	-	Error	HTTP/1.1	-	-	12644	0.103	OriginDnsError	text/html	507	-	-

For the above log file, the following metrics aggregation result will be exported:

Metrics Aggregation Result Example

{
  "resource_metrics": [
    {
      "resource": {
        "attributes": [
          {
            "key": "aws.cloudfront.distribution_id",
            "value": {
              "Value": {
                "StringValue": "EMLARXS9EXAMPLE"
              }
            }
          },
          {
            "key": "service.name",
            "value": {
              "Value": {
                "StringValue": "Amazon CloudFront"
              }
            }
          }
        ]
      },
      "scope_metrics": [
        {
          "scope": {
            "name": "cflog2otel",
            "version": "0.0.0"
          },
          "metrics": [
            {
              "name": "http.server.requests",
              "description": "The number of HTTP requests",
              "Data": {
                "Sum": {
                  "data_points": [
                    {
                      "attributes": [
                        {
                          "key": "http.status_code",
                          "value": {
                            "Value": {
                              "StringValue": "2xx"
                            }
                          }
                        }
                      ],
                      "start_time_unix_nano": 1575493320000000000,
                      "Value": {
                        "AsInt": 3
                      }
                    },
                    {
                      "attributes": [
                        {
                          "key": "http.status_code",
                          "value": {
                            "Value": {
                              "StringValue": "5xx"
                            }
                          }
                        }
                      ],
                      "start_time_unix_nano": 1576276560000000000,
                      "Value": {
                        "AsInt": 2
                      }
                    },
                    {
                      "attributes": [
                        {
                          "key": "http.status_code",
                          "value": {
                            "Value": {
                              "StringValue": "5xx"
                            }
                          }
                        }
                      ],
                      "start_time_unix_nano": 1576276620000000000,
                      "Value": {
                        "AsInt": 1
                      }
                    }
                  ],
                  "aggregation_temporality": 2,
                  "is_monotonic": true
                }
              }
            }
          ],
          "schema_url": "https://example.com/schemas/1.0.0"
        }
      ]
    }
  ]
}

In this example, the HTTP requests are aggregated by their status code categories (e.g., 2xx, 5xx) and the results are exported as metrics.

OpenTelemetry Provider Export Configuration

The otel field is used to specify settings related to exporting metrics to an OpenTelemetry provider.

Field Descriptions
  • endpoint (string, optional):
    • Specifies the endpoint URL where the metrics data should be sent.
  • gzip (bool, optional):
    • Indicates whether to enable GZip compression when exporting metrics data.
  • eaders (map[string]string, optional):
    • A map of HTTP headers used when sending metrics data. This can include headers such as Authorization.
Example Using ssm

Config can use https://github.com/fujiwara/ssm-lookup to get the value from AWS SSM Parameter Store.
Here is an example that demonstrates how to use AWS Systems Manager (SSM) Parameter Store to securely retrieve an API key and set it as the Authorization header.

local ssm = std.native('ssm');

{
  otel: {
    endpoint: 'https://otel-collector.example.com/',
    gzip: true,
    headers: {
      "Authorization": 'Bearer '+ssm("/path/to/api-key")
    },
  },
}
Backfill Export

If your Metrics Backend uses Delta Temporality but can handle updates for the same timestamp and attributes, use this option. Enable backfill for reading logs to include complete aggregation, including past logs.

This configuration will behave as follows: When an event is triggered for an object like example-prefix/EMLARXS9EXAMPLE.2019-11-14-20.RT4KCN4SGK9.gz, it will execute ListObjects for logs/EMLARXS9EXAMPLE.2019-11-14-20*. If the LastModifiedAt of the logs is within the specified time_tolerance period from the trigger time, those logs will also be used for metrics aggregation.

{
  otel: {
    endpoint: 'http://localhost:4317/',
    gzip: true,
  },
  backfill: {
    enabled: true,
    time_tolerance: '1h',
  },
  // ...
}
OpenTelemetry Metrics Aggregation Settings

The resource_attributes, scope, and metrics fields are used to configure how metrics are aggregated and exported to an OpenTelemetry provider.

cel and switch Jsonnet Native Functions

The cel and switch Jsonnet native functions allow you to define custom expressions for filtering and calculating metrics. These functions use CEL (Common Expression Language) to define logic based on the log data, enabling precise control over which metrics are collected and how they are calculated.

For example, cel('log.scStatusCategory == "5xx"') is a custom expression that filters logs based on the HTTP status code category.

The switch function is a syntax sugar for a switch-case statement in CEL. An example is shown below:

switch([
    {
        case: cel('log.scStatusCategory == "2xx"'),
        value: 'success',
    },
    {
        case: cel('log.scStatusCategory == "4xx"'),
        value: 'client_error',
    },
    {
        case: cel('log.scStatusCategory == "5xx"'),
        value: 'server_error',
    },
    {
        default: 'other',
    },
])
CEL Variables

CEL variables have the following object structure:

Variable Name Type Description
bucket object Contains information about the S3 bucket provided in the Event Notification.
object object Contains information about the S3 object provided in the Event Notification.
cloudfront object Contains information about the CloudFront distribution related to the log.
log object Contains a single line of CloudFront log information. Each field in the log object is a lowerCamelCase version of the corresponding CloudFront log field name.

Below are the specific variables that are commonly used in CEL expressions:

Variable Name Type Description
bucket.name string The name of the S3 bucket where the event occurred.
object.key string The key (path) of the object within the S3 bucket.
cloudfront.distributionId string The ID of the CloudFront distribution.
log.xEdgeLocation nullable string The AWS Edge location that served the request.
log.csMethod nullable string The HTTP method used in the request (e.g., GET, POST).
log.csHost nullable string The hostname from which the request originated.
log.csUriStem nullable string The URI stem of the request (the path to the resource).
log.scStatus nullable int The HTTP status code returned by CloudFront (e.g., 200, 404).
log.scStatusCategory nullable string The category of the HTTP status code (e.g., 2xx, 4xx, 5xx).
Usage of cel and cel_switch

The cel and cel_switch functions are used in the following configuration fields:

  • resource_attributes[*].value
  • metrics[*].attributes[*].value
  • metrics[*].filter
  • metrics[*].value
metrics[*].type is Aggregation type

aggregation type is one of the following:

  • Count (default): Count the number of log lines that match the filter.
  • Sum: Sum the value of the specified field in the log lines that match the filter.
  • Histogram: Calculate the histogram of the specified field in the log lines that match the filter.
Example of Count Aggregation

Count rows that match the filter.

local cel = std.native('cel');

{
  otel: {
    endpoint: 'http://localhost:4317/',
    gzip: true,
  },
  metrics: [
    {
      name: 'http.server.requests',
      description: 'The number of HTTP requests',
      type: 'Count',
      attributes: [
        {
          key: 'http.status_code',
          value: cel('log.scStatusCategory'),
        },
      ],
    },
  ],
}

required fields are nameand type. optional fields are description, attributes, filter and unit. Metric Temporality defaults Delta, if is_cumulative is true, it will be Cumulative.

Example of Sum Aggregation

Sum values of the specified field in the log lines that match the filter.

local cel = std.native('cel');

{
  otel: {
    endpoint: 'http://localhost:4317/',
    gzip: true,
  },
  metrics: [
    {
      name: 'http.server.total_bytes',
      description: 'The total number of bytes sent by the server',
      type: 'Sum',
      unit: 'Byte',
      attributes: [
        {
          key: 'http.status_code',
          value: cel('log.scStatusCategory'),
        },
      ],
      value: cel('double(log.scBytes)'),
      is_monotonic: true,
    },
  ],
}

required fields are name, type and value. optional fields are description, attributes, filter, unit and is_monotonic. Metric Temporality defaults Delta, if is_cumulative is true, it will be Cumulative. Metric default Non-Monotonic, if is_monotonic is true, it will be Monotonic.

Example of Histogram Aggregation

Calculate the histogram of the specified field in the log lines that match the filter.

local cel = std.native('cel');

{
  otel: {
    endpoint: 'http://localhost:4317/',
    gzip: true,
  },
  metrics: [
    {
      name: 'http.server.request_time',
      description: 'The request time of HTTP requests',
      type: 'Histogram',
      unit: 'ms',
      value: cel('log.timeTaken * 1000.0'),
    },
  ],
}

required fields are name, type and value. optional fields are description, attributes, filter, unit, boundaries, and no_min_max. Metric Temporality defaults Delta, if is_cumulative is true, it will be Cumulative. Boundaries defaults [0, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000], if boundaries is specified, it will be used. If set boundaries [0.0, 0.5, 1.0, 2.5, 5.0] means histogram buckets are (-inf, 0.0], (0.0, 0.5], (0.5, 1.0], (1.0, 2.5], (2.5, 5.0], (5.0, +inf). If no_min_max is true, the not calculate the histogram of the minimum and maximum values.

Emit Zero Configuration

The emit_zero configuration is used to ensure that metrics are emitted with a value of zero for specified attribute combinations, even if no data points are recorded for those combinations during the reporting period. This is useful for maintaining consistent time series data and avoiding gaps in monitoring dashboards.

Example

In the following example, the emit_zero configuration is used to emit zero values for HTTP status code categories 2xx, 3xx, 4xx, and 5xx:

local cel = std.native('cel');

{
  otel: {
    endpoint: 'http://localhost:4317/',
    gzip: true,
  },
  resource_attributes: [
    {
      key: 'service.name',
      value: 'Amazon CloudFront',
    },
    {
      key: 'aws.cloudfront.distribution_id',
      value: cel('cloudfront.distributionId'),
    },
  ],
  scope: {
    name: 'test',
    version: '1.0.0',
    schema_url: 'https://example.com/schemas/1.0.0',
  },
  metrics: [
    {
      name: 'http.server.requests',
      description: 'The number of HTTP requests',
      type: 'Count',
      attributes: [
        {
          key: 'http.status_code',
          value: cel('log.scStatusCategory'),
        },
      ],
      emit_zero: [
        ['2xx'],
        ['3xx'],
        ['4xx'],
        ['5xx'],
      ],
    },
  ],
}

In this configuration, even if there are no HTTP requests with status codes in the 2xx, 3xx, 4xx, or 5xx categories during the reporting period, the metrics system will still emit zero values for these categories. This helps maintain a complete and consistent set of metrics data.

Example of Mackerel Labeled Metrics

See lambda/mackerel dir for more details.

Include terraform code and lambroll configuration.

For Local Configuration Testing
$cflog2otel --s3-url s3://bucket-name/path/to/logs.gz --config config.jsonnet --local-collector

Output metrics to stdout.

License

This project is licensed under the MIT License. See the LICENSE file for more details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Base64EncodeNativeFunction = &jsonnet.NativeFunction{
	Name:   "base64_encode",
	Params: []ast.Identifier{"data"},
	Func: func(args []interface{}) (interface{}, error) {
		if len(args) != 1 {
			return nil, oops.Errorf("base64_encode: invalid arguments length expected 1 got %d", len(args))
		}
		var data []byte
		str, ok := args[0].(string)
		if ok {
			data = []byte(str)
		} else {
			data, ok = args[0].([]byte)
			if !ok {
				return nil, oops.Errorf("base64_encode: invalid arguments, expected string or []byte got %T", args[0])
			}
		}
		return base64.StdEncoding.EncodeToString(data), nil
	},
}
View Source
var CELCapableNativeFunction = &jsonnet.NativeFunction{
	Name:   "cel",
	Params: []ast.Identifier{"expr"},
	Func: func(args []interface{}) (interface{}, error) {
		if len(args) != 1 {
			return nil, oops.Errorf("cel: invalid arguments length expected 1 got %d", len(args))
		}
		str, ok := args[0].(string)
		if !ok {
			return nil, oops.Errorf("cel: invalid arguments, expected string got %T", args[0])
		}
		return map[string]interface{}{
			"expr": str,
		}, nil
	},
}
View Source
var DefaultHistogramBoundaries = []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}
View Source
var EnvNativeFunction = &jsonnet.NativeFunction{
	Name:   "env",
	Params: []ast.Identifier{"name", "default"},
	Func: func(args []interface{}) (interface{}, error) {
		if len(args) != 2 {
			return nil, oops.Errorf("env: invalid arguments length expected 2 got %d", len(args))
		}
		key, ok := args[0].(string)
		if !ok {
			return nil, oops.Errorf("env: invalid 1st arguments, expected string got %T", args[0])
		}
		val := os.Getenv(key)
		if val == "" {
			return args[1], nil
		}
		return val, nil
	},
}
View Source
var JsonescapeNativeFunction = &jsonnet.NativeFunction{
	Name:   "json_escape",
	Params: []ast.Identifier{"str"},
	Func: func(args []interface{}) (interface{}, error) {
		if len(args) != 1 {
			return nil, oops.Errorf("jsonescape: invalid arguments length expected 1 got %d", len(args))
		}
		str, ok := args[0].(string)
		if !ok {
			return nil, oops.Errorf("jsonescape: invalid arguments, expected string got %T", args[0])
		}
		bs, err := json.Marshal(str)
		if err != nil {
			return nil, oops.Wrapf(err, "jsonescape")
		}
		return string(bs), nil
	},
}
View Source
var MustEnvNativeFunction = &jsonnet.NativeFunction{
	Name:   "must_env",
	Params: []ast.Identifier{"name"},
	Func: func(args []interface{}) (interface{}, error) {
		if len(args) != 1 {
			return nil, oops.Errorf("must_env: invalid arguments length expected 1 got %d", len(args))
		}
		key, ok := args[0].(string)
		if !ok {
			return nil, oops.Errorf("must_env: invalid arguments, expected string got %T", args[0])
		}
		val, ok := os.LookupEnv(key)
		if !ok {
			return nil, oops.Errorf("must_env: %s not set", key)
		}
		return val, nil
	},
}
View Source
var SwitchNativeFunction = &jsonnet.NativeFunction{
	Name:   "switch",
	Params: []ast.Identifier{"cases"},
	Func: func(args []interface{}) (interface{}, error) {
		if len(args) != 1 {
			return nil, oops.Errorf("switch: invalid arguments length expected 1 got %d", len(args))
		}
		cases, ok := args[0].([]any)
		if !ok {
			return nil, oops.Errorf("switch: invalid arguments, expected string got %T", args[0])
		}
		defaultCount := 0
		for i, c := range cases {
			v, ok := c.(map[string]any)
			if !ok {
				return nil, oops.Errorf("switch: invalid arguments, expected map[string]interface{} got %T", c)
			}
			caseField, ok := v["case"]
			if !ok {
				defaultField, ok := v["default"]
				if !ok {
					return nil, oops.Errorf("switch: invalid arguments, expected string case")
				}
				defaultCount++
				if defaultExpr, ok := castCELExpr(defaultField); ok {
					cases[i] = map[string]any{
						"default_expr": defaultExpr,
					}
				}
				continue
			}
			caseExpr, ok := castCELExpr(caseField)
			if !ok {
				return nil, oops.Errorf("switch: case must be a CEL expression")
			}
			valueField, ok := v["value"]
			if !ok {
				return nil, oops.Errorf("cel: invalid arguments, need value")
			}

			if valueExpr, ok := castCELExpr(valueField); ok {
				cases[i] = map[string]any{
					"case":       caseExpr,
					"value_expr": valueExpr,
				}
				continue
			}
			cases[i] = map[string]any{
				"case":  caseExpr,
				"value": valueField,
			}
		}
		if defaultCount > 1 {
			return nil, oops.Errorf("cel: multiple default values in switch")
		}
		return map[string]interface{}{
			"switch": cases,
		}, nil
	},
}
View Source
var Version = "v0.6.0"

Functions

func Aggregate

func Aggregate(ctx context.Context, cfg *Config, celVariables *CELVariables, logs []CELVariablesLog) ([]*metricdata.ResourceMetrics, error)

func AggregationTypeStrings

func AggregationTypeStrings() []string

AggregationTypeStrings returns a slice of all String values of the enum

func AppendValueToHistogramDataPoint

func AppendValueToHistogramDataPoint[N int64 | float64](value N, dp metricdata.HistogramDataPoint[N], noMinMax bool) metricdata.HistogramDataPoint[N]

func IsGzipped

func IsGzipped(data []byte) bool

func LenDataPoints

func LenDataPoints(data metricdata.Aggregation) int

func MakeVM

func MakeVM(opts ...JsonnetOption) *jsonnet.VM

func NewS3ObjectReader added in v0.2.0

func NewS3ObjectReader(ctx context.Context, downloader *manager.Downloader, bucket, key string) (io.Reader, error)

func ParseCFStandardLogObjectKey

func ParseCFStandardLogObjectKey(str string) (string, string, string, string, error)

func ToAttribute added in v0.3.0

func ToAttribute(ctx context.Context, key string, value any) (attribute.KeyValue, bool)

func ToAttributes

func ToAttributes(ctx context.Context, cfgs []AttributeConfig, celVariables *CELVariables) ([]attribute.KeyValue, error)

func UnwrapEvent

func UnwrapEvent(ctx context.Context, event json.RawMessage) func(yield func(json.RawMessage) bool)

Types

type AggregationType

type AggregationType int
const (
	AggregationTypeCount AggregationType = iota
	AggregationTypeSum
	AggregationTypeHistogram
)

func AggregationTypeString

func AggregationTypeString(s string) (AggregationType, error)

AggregationTypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func AggregationTypeValues

func AggregationTypeValues() []AggregationType

AggregationTypeValues returns all values of the enum

func (AggregationType) IsAAggregationType

func (i AggregationType) IsAAggregationType() bool

IsAAggregationType returns "true" if the value is listed in the enum definition. "false" otherwise

func (AggregationType) MarshalJSON

func (i AggregationType) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface for AggregationType

func (AggregationType) MarshalText

func (i AggregationType) MarshalText() ([]byte, error)

MarshalText implements the encoding.TextMarshaler interface for AggregationType

func (AggregationType) String

func (i AggregationType) String() string

func (*AggregationType) UnmarshalJSON

func (i *AggregationType) UnmarshalJSON(data []byte) error

UnmarshalJSON implements the json.Unmarshaler interface for AggregationType

func (*AggregationType) UnmarshalText

func (i *AggregationType) UnmarshalText(text []byte) error

UnmarshalText implements the encoding.TextUnmarshaler interface for AggregationType

type App

type App struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, cfg *Config) (*App, error)

func NewWithClient

func NewWithClient(cfg *Config, client S3APIClient) (*App, error)

func (*App) GetVariablesAndLogs added in v0.2.0

func (app *App) GetVariablesAndLogs(ctx context.Context, notification events.S3EventRecord) (*CELVariables, []CELVariablesLog, error)

func (*App) Invoke

func (app *App) Invoke(ctx context.Context, event json.RawMessage) (any, error)

func (*App) Process

func (app *App) Process(ctx context.Context, notifications []events.S3EventRecord) error

type AttributeConfig

type AttributeConfig struct {
	Key   string           `json:"key,omitempty"`
	Value *CELCapable[any] `json:"value,omitempty"`
}

func (*AttributeConfig) UnmarshalJSON added in v0.3.0

func (c *AttributeConfig) UnmarshalJSON(data []byte) error

func (*AttributeConfig) Validate

func (c *AttributeConfig) Validate() error

type BackfillConfig added in v0.2.0

type BackfillConfig struct {
	Enabled       bool   `json:"enabled,omitempty"`
	TimeTolerance string `json:"time_tolerance,omitempty"`
	// contains filtered or unexported fields
}

func (*BackfillConfig) TimeToleranceDuration added in v0.2.0

func (c *BackfillConfig) TimeToleranceDuration() time.Duration

func (*BackfillConfig) UnmarshalJSON added in v0.3.0

func (c *BackfillConfig) UnmarshalJSON(data []byte) error

func (*BackfillConfig) Validate added in v0.2.0

func (c *BackfillConfig) Validate() error

type CELCapable

type CELCapable[T any] struct {
	// contains filtered or unexported fields
}

func (*CELCapable[T]) Eval

func (expr *CELCapable[T]) Eval(ctx context.Context, vars *CELVariables) (T, error)

func (*CELCapable[T]) MarshalJSON

func (expr *CELCapable[T]) MarshalJSON() ([]byte, error)

func (*CELCapable[T]) UnmarshalJSON

func (expr *CELCapable[T]) UnmarshalJSON(data []byte) error

type CELVariables

type CELVariables struct {
	Bucket     CELVariablesS3Bucket   `json:"bucket" cel:"bucket"`
	Object     CELVariablesS3Object   `json:"object" cel:"object"`
	CloudFront CELVariablesCloudFront `json:"cloudfront" cel:"cloudfront"`
	Log        CELVariablesLog        `json:"log" cel:"log"`
}

func NewCELVariables

func NewCELVariables(record events.S3EventRecord, distributionID string) *CELVariables

func (*CELVariables) MarshalMap

func (v *CELVariables) MarshalMap() map[string]interface{}

func (*CELVariables) SetLogLine added in v0.2.0

func (v *CELVariables) SetLogLine(log CELVariablesLog)

type CELVariablesCloudFront

type CELVariablesCloudFront struct {
	DistributionID string `json:"distributionId" cel:"distributionId"`
}

type CELVariablesLog

type CELVariablesLog struct {
	Type                   string    `json:"type" cel:"type"`
	Date                   string    `json:"date" cel:"date"`
	Time                   string    `json:"time" cel:"time"`
	Timestamp              time.Time `json:"timestamp" cel:"timestamp"`
	EdgeLocation           *string   `json:"xEdgeLocation" cel:"xEdgeLocation"`
	ScBytes                *int      `json:"scBytes" cel:"scBytes"`
	ClientIP               *string   `json:"clientIp" cel:"clientIp"`
	CsMethod               *string   `json:"csMethod" cel:"csMethod"`
	CsHost                 *string   `json:"csHost" cel:"csHost"`
	CsURIStem              *string   `json:"csUriStem" cel:"csUriStem"`
	ScStatus               *int      `json:"scStatus" cel:"scStatus"`
	ScStatusCategory       *string   `json:"scStatusCategory" cel:"scStatusCategory"`
	CsReferer              *string   `json:"csReferer" cel:"csReferer"`
	CsUserAgent            *string   `json:"csUserAgent" cel:"csUserAgent"`
	CsURIQuery             *string   `json:"csUriQuery" cel:"csUriQuery"`
	CsCookie               *string   `json:"csCookie" cel:"csCookie"`
	EdgeResultType         *string   `json:"xEdgeResultType" cel:"xEdgeResultType"`
	EdgeRequestID          *string   `json:"xEdgeRequestId" cel:"xEdgeRequestId"`
	HostHeader             *string   `json:"xHostHeader" cel:"xHostHeader"`
	CsProtocol             *string   `json:"csProtocol" cel:"csProtocol"`
	CsBytes                *int      `json:"csBytes" cel:"csBytes"`
	TimeTaken              *float64  `json:"timeTaken" cel:"timeTaken"`
	XForwardedFor          *string   `json:"xForwardedFor" cel:"xForwardedFor"`
	SslProtocol            *string   `json:"sslProtocol" cel:"sslProtocol"`
	SslCipher              *string   `json:"sslCipher" cel:"sslCipher"`
	EdgeResponseResultType *string   `json:"edgeResponseResultType" cel:"edgeResponseResultType"`
	CsProtocolVersion      *string   `json:"csProtocolVersion" cel:"csProtocolVersion"`
	FleStatus              *string   `json:"fleStatus" cel:"fleStatus"`
	FleEncryptedFields     *int      `json:"fleEncryptedFields" cel:"fleEncryptedFields"`
	CPort                  *int      `json:"cPort" cel:"cPort"`
	TimeToFirstByte        *float64  `json:"timeToFirstByte" cel:"timeToFirstByte"`
	EdgeDetailedResultType *string   `json:"xEdgeDetailedResultType" cel:"xEdgeDetailedResultType"`
	ScContentType          *string   `json:"scContentType" cel:"scContentType"`
	ScContentLen           *int      `json:"scContentLen" cel:"scContentLen"`
	ScRangeStart           *string   `json:"scRangeStart" cel:"scRangeStart"`
	ScRangeEnd             *string   `json:"scRangeEnd" cel:"scRangeEnd"`
}

func ParseCloudFrontLog

func ParseCloudFrontLog(ctx context.Context, r io.Reader) ([]CELVariablesLog, error)

func (*CELVariablesLog) CloudFrontStandardLogFieldSetters

func (l *CELVariablesLog) CloudFrontStandardLogFieldSetters() map[string]func(string) error

type CELVariablesS3Bucket

type CELVariablesS3Bucket struct {
	Name          string                     `json:"name" cel:"name"`
	OwnerIdentity CELVariablesS3UserIdentity `json:"ownerIdentity" cel:"ownerIdentity"`
	Arn           string                     `json:"arn" cel:"arn"`
}

type CELVariablesS3Object

type CELVariablesS3Object struct {
	Key       string `json:"key" cel:"key"`
	Size      int64  `json:"size" cel:"size"`
	ETag      string `json:"eTag" cel:"eTag"`
	VersionID string `json:"versionId" cel:"versionId"`
	Sequencer string `json:"sequencer" cel:"sequencer"`
}

type CELVariablesS3UserIdentity

type CELVariablesS3UserIdentity struct {
	PrincipalID string `json:"principalId" cel:"principalId"`
}

type CannotUseCELNativeFunctionError added in v0.3.0

type CannotUseCELNativeFunctionError struct {
	Field string
}

func IsCannotUseCELNativeFunction added in v0.3.0

func IsCannotUseCELNativeFunction(err error, bs []byte, allowColumns []string) (*CannotUseCELNativeFunctionError, bool)

func (*CannotUseCELNativeFunctionError) Error added in v0.3.0

type Config

type Config struct {
	Otel               OtelConfig        `json:"otel,omitempty"`
	ResourceAttributes []AttributeConfig `json:"resource_attributes,omitempty"`
	Scope              ScopeConfig       `json:"scope,omitempty"`
	Metrics            []MetricsConfig   `json:"metrics,omitempty"`
	Backfill           BackfillConfig    `json:"backfill,omitempty"`
	NoSkip             bool              `json:"no_skip,omitempty"`
}

func DefaultConfig

func DefaultConfig() *Config

func (*Config) Load

func (c *Config) Load(path string, opts ...JsonnetOption) error

func (*Config) Validate

func (c *Config) Validate() error

type JsonnetOption

type JsonnetOption func(*jsonnetOptions)

func WithAWSConfig

func WithAWSConfig(cfg aws.Config) JsonnetOption

func WithCache

func WithCache(cache *sync.Map) JsonnetOption

func WithContext

func WithContext(ctx context.Context) JsonnetOption

type MetricsConfig

type MetricsConfig struct {
	Name         string               `json:"name,omitempty"`
	Description  string               `json:"description,omitempty"`
	Interval     string               `json:"interval,omitempty"`
	Unit         string               `json:"unit,omitempty"`
	Type         AggregationType      `json:"type,omitempty"`
	Attributes   []AttributeConfig    `json:"attributes,omitempty"`
	Filter       *CELCapable[bool]    `json:"filter,omitempty"`
	Value        *CELCapable[float64] `json:"value,omitempty"`
	IsMonotonic  bool                 `json:"is_monotonic,omitempty"`
	IsCumulative bool                 `json:"is_cumulative,omitempty"`
	Boundaries   []float64            `json:"boundaries,omitempty"`
	NoMinMax     bool                 `json:"no_min_max,omitempty"`
	EmitZero     [][]any              `json:"emit_zero,omitempty"`
	// contains filtered or unexported fields
}

func (*MetricsConfig) AggregateInterval

func (c *MetricsConfig) AggregateInterval() time.Duration

func (*MetricsConfig) UnmarshalJSON added in v0.3.0

func (c *MetricsConfig) UnmarshalJSON(data []byte) error

func (*MetricsConfig) Validate

func (c *MetricsConfig) Validate() error

type OtelConfig

type OtelConfig struct {
	Headers  map[string]string `json:"headers,omitempty"`
	Endpoint string            `json:"endpoint,omitempty"`
	GZip     bool              `json:"gzip,omitempty"`
	// contains filtered or unexported fields
}

func (*OtelConfig) EndpointURL

func (c *OtelConfig) EndpointURL() *url.URL

func (*OtelConfig) SetEndpointURL

func (c *OtelConfig) SetEndpointURL(endpoint string) error

func (*OtelConfig) UnmarshalJSON added in v0.3.0

func (c *OtelConfig) UnmarshalJSON(data []byte) error

func (*OtelConfig) Validate

func (c *OtelConfig) Validate() error

type S3APIClient added in v0.2.0

type S3APIClient interface {
	manager.DownloadAPIClient
	s3.ListObjectsV2APIClient
}

type ScopeConfig

type ScopeConfig struct {
	Name      string `json:"name"`
	Version   string `json:"version,omitempty"`
	SchemaURL string `json:"schema_url,omitempty"`
}

func (*ScopeConfig) Validate

func (c *ScopeConfig) Validate() error

type WriteAtBuffer

type WriteAtBuffer struct {
	// contains filtered or unexported fields
}

WriteAtBuffer is an in-memory buffer implementing io.WriterAt

func NewWriteAtBuffer

func NewWriteAtBuffer() *WriteAtBuffer

NewWriteAtBuffer creates a new WriteAtBuffer

func (*WriteAtBuffer) Bytes

func (w *WriteAtBuffer) Bytes() []byte

Bytes returns the contents of the buffer

func (*WriteAtBuffer) WriteAt

func (w *WriteAtBuffer) WriteAt(p []byte, off int64) (n int, err error)

WriteAt writes bytes to the buffer at the specified offset

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL