OpenMCF logoOpenMCF

Loading...

AWS Kinesis Firehose

Deploys an Amazon Kinesis Data Firehose delivery stream that captures, transforms, and delivers streaming data to S3, OpenSearch, HTTP endpoints, or Redshift. The component supports Direct PUT and Kinesis Data Stream sources, optional Lambda transformation, dynamic partitioning, and Parquet/ORC format conversion.

What Gets Created

When you deploy an AwsKinesisFirehose resource, OpenMCF provisions:

  • Kinesis Firehose Delivery Stream — the core aws_kinesis_firehose_delivery_stream resource configured with the selected destination type
  • Kinesis source configuration — created only when kinesisStreamSource is set, configures Firehose to consume from an existing Kinesis Data Stream with automatic checkpointing and retry
  • Server-side encryption — created only when sseEnabled is true, encrypts data at rest in the delivery stream buffer using AWS-owned or customer-managed KMS keys (Direct PUT sources only)
  • Extended S3 destination — primary S3 delivery with optional GZIP/Snappy compression, Lambda processing, dynamic partitioning, and Parquet/ORC format conversion via AWS Glue Data Catalog
  • OpenSearch destination — direct indexing into an OpenSearch domain with configurable index rotation, VPC delivery, and S3 backup for failed documents
  • HTTP endpoint destination — HTTPS delivery to any endpoint (Datadog, New Relic, Sumo Logic, custom APIs) with S3 backup for failed records
  • Redshift destination — S3 staging followed by a Redshift COPY command for bulk data warehouse loading

Prerequisites

  • AWS credentials configured via environment variables or OpenMCF provider config
  • A destination resource — an S3 bucket, OpenSearch domain, HTTPS endpoint, or Redshift cluster depending on the chosen destination type
  • An IAM role with permissions appropriate for the destination (S3 write, OpenSearch index, Redshift COPY, etc.)
  • A Kinesis Data Stream if using Kinesis source mode instead of Direct PUT
  • An AWS Glue Data Catalog database and table if enabling Parquet/ORC data format conversion
  • VPC subnets and security groups if delivering to a VPC-deployed OpenSearch domain

Quick Start

Create a file firehose.yaml:

apiVersion: aws.openmcf.org/v1
kind: AwsKinesisFirehose
metadata:
  name: my-firehose
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: dev.AwsKinesisFirehose.my-firehose
spec:
  region: us-east-1
  extendedS3:
    bucketArn: arn:aws:s3:::my-data-bucket
    roleArn: arn:aws:iam::123456789012:role/firehose-s3-role

Deploy:

openmcf apply -f firehose.yaml

This creates a Direct PUT delivery stream that writes raw records to S3 with no compression or transformation.

Configuration Reference

Required Fields

FieldTypeDescriptionValidation
regionstringAWS region where the Firehose delivery stream will be created (e.g., us-west-2, eu-west-1).Required; non-empty

Exactly one destination must be configured. The destination type is ForceNew — changing it requires replacing the delivery stream.

FieldTypeDescription
extendedS3objectExtended S3 destination for data lake storage with compression, partitioning, and format conversion
opensearchobjectOpenSearch destination for direct indexing with S3 backup
httpEndpointobjectHTTP endpoint destination for HTTPS delivery with S3 backup
redshiftobjectRedshift destination for S3 staging + COPY command

Optional Fields

FieldTypeDefaultDescription
kinesisStreamSourceobject—Kinesis Data Stream source configuration. When absent, the delivery stream uses Direct PUT. ForceNew.
kinesisStreamSource.streamArnstring—ARN of the Kinesis Data Stream to consume from. Required when kinesisStreamSource is set. Can reference an AwsKinesisStream resource via valueFrom.
kinesisStreamSource.roleArnstring—IAM role ARN granting Firehose read access to the Kinesis stream. Required when kinesisStreamSource is set. Can reference an AwsIamRole resource via valueFrom.
sseEnabledboolfalseEnables server-side encryption for data at rest in the delivery stream buffer. Only valid for Direct PUT sources.
sseKmsKeyArnstring—Customer-managed KMS key ARN for SSE. When absent, uses the AWS-owned CMK. Requires sseEnabled to be true. Can reference an AwsKmsKey resource via valueFrom.

Extended S3 Destination Fields

FieldTypeDefaultDescription
extendedS3.bucketArnstring—(Required) S3 bucket ARN for delivery. Can reference an AwsS3Bucket resource via valueFrom.
extendedS3.roleArnstring—(Required) IAM role ARN granting Firehose write access to S3, KMS, Lambda, and Glue as needed. Can reference an AwsIamRole resource via valueFrom.
extendedS3.prefixstring—S3 key prefix. Supports Firehose expression syntax (e.g., year=!{timestamp:yyyy}/).
extendedS3.errorOutputPrefixstring—S3 key prefix for records that fail transformation or delivery.
extendedS3.compressionFormatstringUNCOMPRESSEDCompression applied before writing. Valid: UNCOMPRESSED, GZIP, ZIP, Snappy, HADOOP_SNAPPY.
extendedS3.kmsKeyArnstring—KMS key ARN for S3 server-side encryption (SSE-KMS). Can reference an AwsKmsKey resource via valueFrom.
extendedS3.bufferingobject300s / 5 MiBBuffering hints: intervalInSeconds (0–900) and sizeInMbs (1–128).
extendedS3.customTimeZonestringUTCIANA time zone for S3 prefix timestamp expressions.
extendedS3.fileExtensionstring—File extension appended to delivered objects (e.g., .json, .parquet). Must start with a period.
extendedS3.s3BackupModestringDisabledWhen Enabled, a copy of pre-transformation records is written to s3Backup.
extendedS3.s3Backupobject—S3 configuration for source record backup. Required when s3BackupMode is Enabled.
extendedS3.processingobject—Lambda-based record transformation. Set enabled, lambdaArn, and optional buffer/retry settings.
extendedS3.loggingobject—CloudWatch error logging. Set enabled, logGroupName, and logStreamName.
extendedS3.dynamicPartitioningobject—Dynamic partitioning by record fields for efficient querying with Athena/Spark. ForceNew.
extendedS3.dataFormatConversionobject—JSON-to-Parquet/ORC conversion via AWS Glue Data Catalog schema.

OpenSearch Destination Fields

FieldTypeDefaultDescription
opensearch.domainArnstring—ARN of the OpenSearch domain. Mutually exclusive with clusterEndpoint. Can reference an AwsOpenSearchDomain resource via valueFrom.
opensearch.clusterEndpointstring—OpenSearch cluster endpoint URL. Mutually exclusive with domainArn.
opensearch.indexNamestring—(Required) Index name (or prefix when rotation is enabled).
opensearch.roleArnstring—(Required) IAM role ARN with es:ESHttpPut and es:ESHttpGet permissions. Can reference an AwsIamRole resource via valueFrom.
opensearch.s3Configobject—(Required) S3 configuration for backing up failed or all documents.
opensearch.indexRotationPeriodstringOneDayIndex rotation period. Valid: NoRotation, OneHour, OneDay, OneWeek, OneMonth.
opensearch.typeNamestring—Document type name. Only relevant for Elasticsearch 6.x and earlier.
opensearch.bufferingobject300s / 5 MiBBuffering hints. Max size: 100 MiB for OpenSearch destinations.
opensearch.retryDurationInSecondsint300Retry duration for failed index requests. Range: 0–7200.
opensearch.s3BackupModestringFailedDocumentsOnlyValid: FailedDocumentsOnly, AllDocuments.
opensearch.processingobject—Lambda-based record transformation before indexing.
opensearch.loggingobject—CloudWatch error logging for delivery failures.
opensearch.vpcConfigobject—VPC configuration for VPC-deployed OpenSearch domains. ForceNew.

HTTP Endpoint Destination Fields

FieldTypeDefaultDescription
httpEndpoint.urlstring—(Required) HTTPS URL of the destination endpoint. Must start with https://.
httpEndpoint.s3Configobject—(Required) S3 configuration for backing up failed or all records.
httpEndpoint.namestring—Human-readable endpoint name for the AWS Console and CloudWatch metrics.
httpEndpoint.accessKeystring—Access key sent in the X-Amz-Firehose-Access-Key header. Sensitive.
httpEndpoint.roleArnstring—IAM role ARN for delivery and S3 backup. Can reference an AwsIamRole resource via valueFrom.
httpEndpoint.bufferingobject300s / 5 MiBBuffering hints for HTTP delivery.
httpEndpoint.retryDurationInSecondsint300Retry duration for non-2xx responses. Range: 0–7200.
httpEndpoint.s3BackupModestringFailedDataOnlyValid: FailedDataOnly, AllData.
httpEndpoint.processingobject—Lambda-based record transformation before HTTP delivery.
httpEndpoint.loggingobject—CloudWatch error logging for delivery failures.
httpEndpoint.requestConfigobject—Request customization: contentEncoding (NONE, GZIP) and commonAttributes (key-value headers).

Redshift Destination Fields

FieldTypeDefaultDescription
redshift.clusterJdbcurlstring—(Required) JDBC URL of the Redshift cluster (e.g., jdbc:redshift://host:5439/db).
redshift.roleArnstring—(Required) IAM role ARN for S3 read and Redshift COPY. Can reference an AwsIamRole resource via valueFrom.
redshift.dataTableNamestring—(Required) Target Redshift table for the COPY command.
redshift.s3Configobject—(Required) S3 configuration for intermediate staging. Firehose writes here before issuing COPY.
redshift.dataTableColumnsstring—Comma-separated column names for the COPY command. When absent, COPY loads all columns in table order.
redshift.copyOptionsstring—Additional COPY command options (e.g., JSON 'auto', GZIP, DELIMITER ',').
redshift.usernamestring—Redshift database username.
redshift.passwordstring—Redshift database password. Sensitive.
redshift.retryDurationInSecondsint3600Retry duration for failed COPY commands. Range: 0–7200.
redshift.s3BackupModestringDisabledWhen Enabled, original records are backed up to s3Backup.
redshift.s3Backupobject—S3 configuration for source record backup. Required when s3BackupMode is Enabled.
redshift.processingobject—Lambda-based record transformation before staging.
redshift.loggingobject—CloudWatch error logging for COPY failures.

Examples

Extended S3 Data Lake

GZIP-compressed delivery to S3 with timestamp-based prefixes and buffering tuned for throughput:

apiVersion: aws.openmcf.org/v1
kind: AwsKinesisFirehose
metadata:
  name: data-lake-firehose
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: prod.AwsKinesisFirehose.data-lake-firehose
spec:
  region: us-east-1
  extendedS3:
    bucketArn: arn:aws:s3:::my-data-lake-bucket
    roleArn: arn:aws:iam::123456789012:role/firehose-s3-delivery-role
    prefix: events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
    compressionFormat: GZIP
    fileExtension: .json.gz
    buffering:
      intervalInSeconds: 120
      sizeInMbs: 64

OpenSearch Log Analytics

Indexes application logs into an OpenSearch domain with daily index rotation and S3 backup for failed documents. References the OpenSearch domain via valueFrom:

apiVersion: aws.openmcf.org/v1
kind: AwsKinesisFirehose
metadata:
  name: log-analytics-firehose
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: prod.AwsKinesisFirehose.log-analytics-firehose
spec:
  region: us-east-1
  opensearch:
    domainArn:
      valueFrom:
        kind: AwsOpenSearchDomain
        name: my-log-domain
        fieldPath: status.outputs.domain_arn
    indexName: application-logs
    roleArn: arn:aws:iam::123456789012:role/firehose-opensearch-role
    indexRotationPeriod: OneDay
    s3BackupMode: FailedDocumentsOnly
    buffering:
      intervalInSeconds: 60
      sizeInMbs: 5
    s3Config:
      bucketArn: arn:aws:s3:::my-firehose-backup-bucket
      roleArn: arn:aws:iam::123456789012:role/firehose-s3-backup-role
      prefix: opensearch-backup/failed/
      compressionFormat: GZIP

Production S3 with Kinesis Source and Parquet Conversion

Consumes from an existing Kinesis Data Stream, converts JSON to Parquet via AWS Glue Data Catalog, and writes columnar files to a partitioned S3 data lake:

apiVersion: aws.openmcf.org/v1
kind: AwsKinesisFirehose
metadata:
  name: analytics-parquet-firehose
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: prod.AwsKinesisFirehose.analytics-parquet-firehose
spec:
  region: us-east-1
  kinesisStreamSource:
    streamArn:
      valueFrom:
        kind: AwsKinesisStream
        name: my-events-stream
        fieldPath: status.outputs.stream_arn
    roleArn:
      valueFrom:
        kind: AwsIamRole
        name: firehose-kinesis-consumer-role
        fieldPath: status.outputs.role_arn
  extendedS3:
    bucketArn: arn:aws:s3:::my-analytics-data-lake
    roleArn: arn:aws:iam::123456789012:role/firehose-glue-s3-role
    prefix: analytics/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
    compressionFormat: UNCOMPRESSED
    fileExtension: .parquet
    buffering:
      intervalInSeconds: 60
      sizeInMbs: 64
    dynamicPartitioning:
      enabled: true
      retryDurationInSeconds: 300
    dataFormatConversion:
      enabled: true
      inputFormat: OPENX_JSON
      outputFormat: PARQUET
      parquetCompression: SNAPPY
      schema:
        databaseName: analytics_db
        tableName: events
        roleArn: arn:aws:iam::123456789012:role/firehose-glue-access-role

Stack Outputs

After deployment, the following outputs are available in status.outputs:

OutputTypeDescription
delivery_stream_arnstringARN of the Kinesis Data Firehose delivery stream
delivery_stream_namestringName of the delivery stream, unique within the AWS account and region

Related Components

  • AwsKinesisStream — provides a Kinesis Data Stream as the source for the delivery stream
  • AwsS3Bucket — serves as the delivery destination, backup target, or Redshift staging area
  • AwsOpenSearchDomain — serves as the indexing destination for log and search workloads
  • AwsIamRole — provides the permissions Firehose needs for source, destination, and transformation access
  • AwsLambda — provides the Lambda function for record transformation before delivery

Next article

AWS Kinesis Stream Consumer

AWS Kinesis Stream Consumer Registers an Amazon Kinesis enhanced fan-out consumer for a Kinesis Data Stream, providing dedicated 2 MB/s read throughput per shard independent of all other consumers. The consumer name is derived from metadata.name and both the name and stream binding are immutable after creation. What Gets Created When you deploy an AwsKinesisStreamConsumer resource, OpenMCF provisions: Kinesis Stream Consumer — an awskinesisstreamconsumer resource registered with the specified...
Read next article