awskinesisfirehosedestinations

package
v1.181.1-devpreview Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

README

Amazon Kinesis Data Firehose Destinations Library

This library provides constructs for adding destinations to a Amazon Kinesis Data Firehose delivery stream. Destinations can be added by specifying the destinations prop when defining a delivery stream.

See Amazon Kinesis Data Firehose module README for usage examples.

import destinations "github.com/aws/aws-cdk-go/awscdk"

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewS3Bucket_Override

func NewS3Bucket_Override(s S3Bucket, bucket awss3.IBucket, props *S3BucketProps)

Experimental.

Types

type BackupMode

type BackupMode string

Options for S3 record backup of a delivery stream.

Example:

import path "github.com/aws-samples/dummy/path"
import firehose "github.com/aws/aws-cdk-go/awscdk"
import kms "github.com/aws/aws-cdk-go/awscdk"
import lambdanodejs "github.com/aws/aws-cdk-go/awscdk"
import logs "github.com/aws/aws-cdk-go/awscdk"
import s3 "github.com/aws/aws-cdk-go/awscdk"
import cdk "github.com/aws/aws-cdk-go/awscdk"
import destinations "github.com/aws/aws-cdk-go/awscdk"

app := cdk.NewApp()

stack := cdk.NewStack(app, jsii.String("aws-cdk-firehose-delivery-stream-s3-all-properties"))

bucket := s3.NewBucket(stack, jsii.String("Bucket"), &bucketProps{
	removalPolicy: cdk.removalPolicy_DESTROY,
	autoDeleteObjects: jsii.Boolean(true),
})

backupBucket := s3.NewBucket(stack, jsii.String("BackupBucket"), &bucketProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
	autoDeleteObjects: jsii.Boolean(true),
})
logGroup := logs.NewLogGroup(stack, jsii.String("LogGroup"), &logGroupProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

dataProcessorFunction := lambdanodejs.NewNodejsFunction(stack, jsii.String("DataProcessorFunction"), &nodejsFunctionProps{
	entry: path.join(__dirname, jsii.String("lambda-data-processor.js")),
	timeout: cdk.duration.minutes(jsii.Number(1)),
})

processor := firehose.NewLambdaFunctionProcessor(dataProcessorFunction, &dataProcessorProps{
	bufferInterval: cdk.*duration.seconds(jsii.Number(60)),
	bufferSize: cdk.size.mebibytes(jsii.Number(1)),
	retries: jsii.Number(1),
})

key := kms.NewKey(stack, jsii.String("Key"), &keyProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

backupKey := kms.NewKey(stack, jsii.String("BackupKey"), &keyProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

firehose.NewDeliveryStream(stack, jsii.String("Delivery Stream"), &deliveryStreamProps{
	destinations: []iDestination{
		destinations.NewS3Bucket(bucket, &s3BucketProps{
			logging: jsii.Boolean(true),
			logGroup: logGroup,
			processor: processor,
			compression: destinations.compression_GZIP(),
			dataOutputPrefix: jsii.String("regularPrefix"),
			errorOutputPrefix: jsii.String("errorPrefix"),
			bufferingInterval: cdk.*duration.seconds(jsii.Number(60)),
			bufferingSize: cdk.*size.mebibytes(jsii.Number(1)),
			encryptionKey: key,
			s3Backup: &destinationS3BackupProps{
				mode: destinations.backupMode_ALL,
				bucket: backupBucket,
				compression: destinations.*compression_ZIP(),
				dataOutputPrefix: jsii.String("backupPrefix"),
				errorOutputPrefix: jsii.String("backupErrorPrefix"),
				bufferingInterval: cdk.*duration.seconds(jsii.Number(60)),
				bufferingSize: cdk.*size.mebibytes(jsii.Number(1)),
				encryptionKey: backupKey,
			},
		}),
	},
})

app.synth()

Experimental.

const (
	// All records are backed up.
	// Experimental.
	BackupMode_ALL BackupMode = "ALL"
	// Only records that failed to deliver or transform are backed up.
	// Experimental.
	BackupMode_FAILED BackupMode = "FAILED"
)

type CommonDestinationProps

type CommonDestinationProps struct {
	// If true, log errors when data transformation or data delivery fails.
	//
	// If `logGroup` is provided, this will be implicitly set to `true`.
	// Experimental.
	Logging *bool `field:"optional" json:"logging" yaml:"logging"`
	// The CloudWatch log group where log streams will be created to hold error logs.
	// Experimental.
	LogGroup awslogs.ILogGroup `field:"optional" json:"logGroup" yaml:"logGroup"`
	// The data transformation that should be performed on the data before writing to the destination.
	// Experimental.
	Processor awskinesisfirehose.IDataProcessor `field:"optional" json:"processor" yaml:"processor"`
	// The IAM role associated with this destination.
	//
	// Assumed by Kinesis Data Firehose to invoke processors and write to destinations.
	// Experimental.
	Role awsiam.IRole `field:"optional" json:"role" yaml:"role"`
	// The configuration for backing up source records to S3.
	// Experimental.
	S3Backup *DestinationS3BackupProps `field:"optional" json:"s3Backup" yaml:"s3Backup"`
}

Generic properties for defining a delivery stream destination.

Example:

// The code below shows an example of how to instantiate this type.
// The values are placeholders you should change.
import monocdk "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var bucket bucket
var compression compression
var dataProcessor iDataProcessor
var duration duration
var key key
var logGroup logGroup
var role role
var size size

commonDestinationProps := &commonDestinationProps{
	logging: jsii.Boolean(false),
	logGroup: logGroup,
	processor: dataProcessor,
	role: role,
	s3Backup: &destinationS3BackupProps{
		bucket: bucket,
		bufferingInterval: duration,
		bufferingSize: size,
		compression: compression,
		dataOutputPrefix: jsii.String("dataOutputPrefix"),
		encryptionKey: key,
		errorOutputPrefix: jsii.String("errorOutputPrefix"),
		logging: jsii.Boolean(false),
		logGroup: logGroup,
		mode: awscdk.Aws_kinesisfirehose_destinations.backupMode_ALL,
	},
}

Experimental.

type CommonDestinationS3Props

type CommonDestinationS3Props struct {
	// The length of time that Firehose buffers incoming data before delivering it to the S3 bucket.
	//
	// Minimum: Duration.seconds(60)
	// Maximum: Duration.seconds(900)
	// Experimental.
	BufferingInterval awscdk.Duration `field:"optional" json:"bufferingInterval" yaml:"bufferingInterval"`
	// The size of the buffer that Kinesis Data Firehose uses for incoming data before delivering it to the S3 bucket.
	//
	// Minimum: Size.mebibytes(1)
	// Maximum: Size.mebibytes(128)
	// Experimental.
	BufferingSize awscdk.Size `field:"optional" json:"bufferingSize" yaml:"bufferingSize"`
	// The type of compression that Kinesis Data Firehose uses to compress the data that it delivers to the Amazon S3 bucket.
	//
	// The compression formats SNAPPY or ZIP cannot be specified for Amazon Redshift
	// destinations because they are not supported by the Amazon Redshift COPY operation
	// that reads from the S3 bucket.
	// Experimental.
	Compression Compression `field:"optional" json:"compression" yaml:"compression"`
	// A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3.
	//
	// This prefix appears immediately following the bucket name.
	// See: https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
	//
	// Experimental.
	DataOutputPrefix *string `field:"optional" json:"dataOutputPrefix" yaml:"dataOutputPrefix"`
	// The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket.
	// Experimental.
	EncryptionKey awskms.IKey `field:"optional" json:"encryptionKey" yaml:"encryptionKey"`
	// A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3.
	//
	// This prefix appears immediately following the bucket name.
	// See: https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
	//
	// Experimental.
	ErrorOutputPrefix *string `field:"optional" json:"errorOutputPrefix" yaml:"errorOutputPrefix"`
}

Common properties for defining a backup, intermediary, or final S3 destination for a Kinesis Data Firehose delivery stream.

Example:

// The code below shows an example of how to instantiate this type.
// The values are placeholders you should change.
import monocdk "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"
import "github.com/aws/aws-cdk-go/awscdk"

var compression compression
var duration duration
var key key
var size size

commonDestinationS3Props := &commonDestinationS3Props{
	bufferingInterval: duration,
	bufferingSize: size,
	compression: compression,
	dataOutputPrefix: jsii.String("dataOutputPrefix"),
	encryptionKey: key,
	errorOutputPrefix: jsii.String("errorOutputPrefix"),
}

Experimental.

type Compression

type Compression interface {
	// the string value of the Compression.
	// Experimental.
	Value() *string
}

Possible compression options Kinesis Data Firehose can use to compress data on delivery.

Example:

import path "github.com/aws-samples/dummy/path"
import firehose "github.com/aws/aws-cdk-go/awscdk"
import kms "github.com/aws/aws-cdk-go/awscdk"
import lambdanodejs "github.com/aws/aws-cdk-go/awscdk"
import logs "github.com/aws/aws-cdk-go/awscdk"
import s3 "github.com/aws/aws-cdk-go/awscdk"
import cdk "github.com/aws/aws-cdk-go/awscdk"
import destinations "github.com/aws/aws-cdk-go/awscdk"

app := cdk.NewApp()

stack := cdk.NewStack(app, jsii.String("aws-cdk-firehose-delivery-stream-s3-all-properties"))

bucket := s3.NewBucket(stack, jsii.String("Bucket"), &bucketProps{
	removalPolicy: cdk.removalPolicy_DESTROY,
	autoDeleteObjects: jsii.Boolean(true),
})

backupBucket := s3.NewBucket(stack, jsii.String("BackupBucket"), &bucketProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
	autoDeleteObjects: jsii.Boolean(true),
})
logGroup := logs.NewLogGroup(stack, jsii.String("LogGroup"), &logGroupProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

dataProcessorFunction := lambdanodejs.NewNodejsFunction(stack, jsii.String("DataProcessorFunction"), &nodejsFunctionProps{
	entry: path.join(__dirname, jsii.String("lambda-data-processor.js")),
	timeout: cdk.duration.minutes(jsii.Number(1)),
})

processor := firehose.NewLambdaFunctionProcessor(dataProcessorFunction, &dataProcessorProps{
	bufferInterval: cdk.*duration.seconds(jsii.Number(60)),
	bufferSize: cdk.size.mebibytes(jsii.Number(1)),
	retries: jsii.Number(1),
})

key := kms.NewKey(stack, jsii.String("Key"), &keyProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

backupKey := kms.NewKey(stack, jsii.String("BackupKey"), &keyProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

firehose.NewDeliveryStream(stack, jsii.String("Delivery Stream"), &deliveryStreamProps{
	destinations: []iDestination{
		destinations.NewS3Bucket(bucket, &s3BucketProps{
			logging: jsii.Boolean(true),
			logGroup: logGroup,
			processor: processor,
			compression: destinations.compression_GZIP(),
			dataOutputPrefix: jsii.String("regularPrefix"),
			errorOutputPrefix: jsii.String("errorPrefix"),
			bufferingInterval: cdk.*duration.seconds(jsii.Number(60)),
			bufferingSize: cdk.*size.mebibytes(jsii.Number(1)),
			encryptionKey: key,
			s3Backup: &destinationS3BackupProps{
				mode: destinations.backupMode_ALL,
				bucket: backupBucket,
				compression: destinations.*compression_ZIP(),
				dataOutputPrefix: jsii.String("backupPrefix"),
				errorOutputPrefix: jsii.String("backupErrorPrefix"),
				bufferingInterval: cdk.*duration.seconds(jsii.Number(60)),
				bufferingSize: cdk.*size.mebibytes(jsii.Number(1)),
				encryptionKey: backupKey,
			},
		}),
	},
})

app.synth()

Experimental.

func Compression_GZIP

func Compression_GZIP() Compression

func Compression_HADOOP_SNAPPY

func Compression_HADOOP_SNAPPY() Compression

func Compression_Of

func Compression_Of(value *string) Compression

Creates a new Compression instance with a custom value. Experimental.

func Compression_SNAPPY

func Compression_SNAPPY() Compression

func Compression_ZIP

func Compression_ZIP() Compression

type DestinationS3BackupProps

type DestinationS3BackupProps struct {
	// The length of time that Firehose buffers incoming data before delivering it to the S3 bucket.
	//
	// Minimum: Duration.seconds(60)
	// Maximum: Duration.seconds(900)
	// Experimental.
	BufferingInterval awscdk.Duration `field:"optional" json:"bufferingInterval" yaml:"bufferingInterval"`
	// The size of the buffer that Kinesis Data Firehose uses for incoming data before delivering it to the S3 bucket.
	//
	// Minimum: Size.mebibytes(1)
	// Maximum: Size.mebibytes(128)
	// Experimental.
	BufferingSize awscdk.Size `field:"optional" json:"bufferingSize" yaml:"bufferingSize"`
	// The type of compression that Kinesis Data Firehose uses to compress the data that it delivers to the Amazon S3 bucket.
	//
	// The compression formats SNAPPY or ZIP cannot be specified for Amazon Redshift
	// destinations because they are not supported by the Amazon Redshift COPY operation
	// that reads from the S3 bucket.
	// Experimental.
	Compression Compression `field:"optional" json:"compression" yaml:"compression"`
	// A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3.
	//
	// This prefix appears immediately following the bucket name.
	// See: https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
	//
	// Experimental.
	DataOutputPrefix *string `field:"optional" json:"dataOutputPrefix" yaml:"dataOutputPrefix"`
	// The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket.
	// Experimental.
	EncryptionKey awskms.IKey `field:"optional" json:"encryptionKey" yaml:"encryptionKey"`
	// A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3.
	//
	// This prefix appears immediately following the bucket name.
	// See: https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
	//
	// Experimental.
	ErrorOutputPrefix *string `field:"optional" json:"errorOutputPrefix" yaml:"errorOutputPrefix"`
	// The S3 bucket that will store data and failed records.
	// Experimental.
	Bucket awss3.IBucket `field:"optional" json:"bucket" yaml:"bucket"`
	// If true, log errors when data transformation or data delivery fails.
	//
	// If `logGroup` is provided, this will be implicitly set to `true`.
	// Experimental.
	Logging *bool `field:"optional" json:"logging" yaml:"logging"`
	// The CloudWatch log group where log streams will be created to hold error logs.
	// Experimental.
	LogGroup awslogs.ILogGroup `field:"optional" json:"logGroup" yaml:"logGroup"`
	// Indicates the mode by which incoming records should be backed up to S3, if any.
	//
	// If `bucket` is provided, this will be implicitly set to `BackupMode.ALL`.
	// Experimental.
	Mode BackupMode `field:"optional" json:"mode" yaml:"mode"`
}

Properties for defining an S3 backup destination.

S3 backup is available for all destinations, regardless of whether the final destination is S3 or not.

Example:

import path "github.com/aws-samples/dummy/path"
import firehose "github.com/aws/aws-cdk-go/awscdk"
import kms "github.com/aws/aws-cdk-go/awscdk"
import lambdanodejs "github.com/aws/aws-cdk-go/awscdk"
import logs "github.com/aws/aws-cdk-go/awscdk"
import s3 "github.com/aws/aws-cdk-go/awscdk"
import cdk "github.com/aws/aws-cdk-go/awscdk"
import destinations "github.com/aws/aws-cdk-go/awscdk"

app := cdk.NewApp()

stack := cdk.NewStack(app, jsii.String("aws-cdk-firehose-delivery-stream-s3-all-properties"))

bucket := s3.NewBucket(stack, jsii.String("Bucket"), &bucketProps{
	removalPolicy: cdk.removalPolicy_DESTROY,
	autoDeleteObjects: jsii.Boolean(true),
})

backupBucket := s3.NewBucket(stack, jsii.String("BackupBucket"), &bucketProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
	autoDeleteObjects: jsii.Boolean(true),
})
logGroup := logs.NewLogGroup(stack, jsii.String("LogGroup"), &logGroupProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

dataProcessorFunction := lambdanodejs.NewNodejsFunction(stack, jsii.String("DataProcessorFunction"), &nodejsFunctionProps{
	entry: path.join(__dirname, jsii.String("lambda-data-processor.js")),
	timeout: cdk.duration.minutes(jsii.Number(1)),
})

processor := firehose.NewLambdaFunctionProcessor(dataProcessorFunction, &dataProcessorProps{
	bufferInterval: cdk.*duration.seconds(jsii.Number(60)),
	bufferSize: cdk.size.mebibytes(jsii.Number(1)),
	retries: jsii.Number(1),
})

key := kms.NewKey(stack, jsii.String("Key"), &keyProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

backupKey := kms.NewKey(stack, jsii.String("BackupKey"), &keyProps{
	removalPolicy: cdk.*removalPolicy_DESTROY,
})

firehose.NewDeliveryStream(stack, jsii.String("Delivery Stream"), &deliveryStreamProps{
	destinations: []iDestination{
		destinations.NewS3Bucket(bucket, &s3BucketProps{
			logging: jsii.Boolean(true),
			logGroup: logGroup,
			processor: processor,
			compression: destinations.compression_GZIP(),
			dataOutputPrefix: jsii.String("regularPrefix"),
			errorOutputPrefix: jsii.String("errorPrefix"),
			bufferingInterval: cdk.*duration.seconds(jsii.Number(60)),
			bufferingSize: cdk.*size.mebibytes(jsii.Number(1)),
			encryptionKey: key,
			s3Backup: &destinationS3BackupProps{
				mode: destinations.backupMode_ALL,
				bucket: backupBucket,
				compression: destinations.*compression_ZIP(),
				dataOutputPrefix: jsii.String("backupPrefix"),
				errorOutputPrefix: jsii.String("backupErrorPrefix"),
				bufferingInterval: cdk.*duration.seconds(jsii.Number(60)),
				bufferingSize: cdk.*size.mebibytes(jsii.Number(1)),
				encryptionKey: backupKey,
			},
		}),
	},
})

app.synth()

Experimental.

type S3Bucket

type S3Bucket interface {
	awskinesisfirehose.IDestination
	// Binds this destination to the Kinesis Data Firehose delivery stream.
	//
	// Implementers should use this method to bind resources to the stack and initialize values using the provided stream.
	// Experimental.
	Bind(scope constructs.Construct, _options *awskinesisfirehose.DestinationBindOptions) *awskinesisfirehose.DestinationConfig
}

An S3 bucket destination for data from a Kinesis Data Firehose delivery stream.

Example:

var bucket bucket
// Provide a Lambda function that will transform records before delivery, with custom
// buffering and retry configuration
lambdaFunction := lambda.NewFunction(this, jsii.String("Processor"), &functionProps{
	runtime: lambda.runtime_NODEJS_14_X(),
	handler: jsii.String("index.handler"),
	code: lambda.code.fromAsset(path.join(__dirname, jsii.String("process-records"))),
})
lambdaProcessor := firehose.NewLambdaFunctionProcessor(lambdaFunction, &dataProcessorProps{
	bufferInterval: awscdk.Duration.minutes(jsii.Number(5)),
	bufferSize: awscdk.Size.mebibytes(jsii.Number(5)),
	retries: jsii.Number(5),
})
s3Destination := destinations.NewS3Bucket(bucket, &s3BucketProps{
	processor: lambdaProcessor,
})
firehose.NewDeliveryStream(this, jsii.String("Delivery Stream"), &deliveryStreamProps{
	destinations: []iDestination{
		s3Destination,
	},
})

Experimental.

func NewS3Bucket

func NewS3Bucket(bucket awss3.IBucket, props *S3BucketProps) S3Bucket

Experimental.

type S3BucketProps

type S3BucketProps struct {
	// The length of time that Firehose buffers incoming data before delivering it to the S3 bucket.
	//
	// Minimum: Duration.seconds(60)
	// Maximum: Duration.seconds(900)
	// Experimental.
	BufferingInterval awscdk.Duration `field:"optional" json:"bufferingInterval" yaml:"bufferingInterval"`
	// The size of the buffer that Kinesis Data Firehose uses for incoming data before delivering it to the S3 bucket.
	//
	// Minimum: Size.mebibytes(1)
	// Maximum: Size.mebibytes(128)
	// Experimental.
	BufferingSize awscdk.Size `field:"optional" json:"bufferingSize" yaml:"bufferingSize"`
	// The type of compression that Kinesis Data Firehose uses to compress the data that it delivers to the Amazon S3 bucket.
	//
	// The compression formats SNAPPY or ZIP cannot be specified for Amazon Redshift
	// destinations because they are not supported by the Amazon Redshift COPY operation
	// that reads from the S3 bucket.
	// Experimental.
	Compression Compression `field:"optional" json:"compression" yaml:"compression"`
	// A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3.
	//
	// This prefix appears immediately following the bucket name.
	// See: https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
	//
	// Experimental.
	DataOutputPrefix *string `field:"optional" json:"dataOutputPrefix" yaml:"dataOutputPrefix"`
	// The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket.
	// Experimental.
	EncryptionKey awskms.IKey `field:"optional" json:"encryptionKey" yaml:"encryptionKey"`
	// A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3.
	//
	// This prefix appears immediately following the bucket name.
	// See: https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
	//
	// Experimental.
	ErrorOutputPrefix *string `field:"optional" json:"errorOutputPrefix" yaml:"errorOutputPrefix"`
	// If true, log errors when data transformation or data delivery fails.
	//
	// If `logGroup` is provided, this will be implicitly set to `true`.
	// Experimental.
	Logging *bool `field:"optional" json:"logging" yaml:"logging"`
	// The CloudWatch log group where log streams will be created to hold error logs.
	// Experimental.
	LogGroup awslogs.ILogGroup `field:"optional" json:"logGroup" yaml:"logGroup"`
	// The data transformation that should be performed on the data before writing to the destination.
	// Experimental.
	Processor awskinesisfirehose.IDataProcessor `field:"optional" json:"processor" yaml:"processor"`
	// The IAM role associated with this destination.
	//
	// Assumed by Kinesis Data Firehose to invoke processors and write to destinations.
	// Experimental.
	Role awsiam.IRole `field:"optional" json:"role" yaml:"role"`
	// The configuration for backing up source records to S3.
	// Experimental.
	S3Backup *DestinationS3BackupProps `field:"optional" json:"s3Backup" yaml:"s3Backup"`
}

Props for defining an S3 destination of a Kinesis Data Firehose delivery stream.

Example:

var bucket bucket
// Provide a Lambda function that will transform records before delivery, with custom
// buffering and retry configuration
lambdaFunction := lambda.NewFunction(this, jsii.String("Processor"), &functionProps{
	runtime: lambda.runtime_NODEJS_14_X(),
	handler: jsii.String("index.handler"),
	code: lambda.code.fromAsset(path.join(__dirname, jsii.String("process-records"))),
})
lambdaProcessor := firehose.NewLambdaFunctionProcessor(lambdaFunction, &dataProcessorProps{
	bufferInterval: awscdk.Duration.minutes(jsii.Number(5)),
	bufferSize: awscdk.Size.mebibytes(jsii.Number(5)),
	retries: jsii.Number(5),
})
s3Destination := destinations.NewS3Bucket(bucket, &s3BucketProps{
	processor: lambdaProcessor,
})
firehose.NewDeliveryStream(this, jsii.String("Delivery Stream"), &deliveryStreamProps{
	destinations: []iDestination{
		s3Destination,
	},
})

Experimental.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL