OpenMCF logoOpenMCF

Loading...

AWS MSK Cluster

Deploys an Amazon MSK (Managed Streaming for Apache Kafka) cluster with configurable broker nodes, multi-method authentication (SASL/IAM, SASL/SCRAM, mTLS), encryption at rest and in transit, inline Kafka configuration management, and broker log delivery to CloudWatch Logs, Kinesis Data Firehose, and S3. The component creates a managed security group with Kafka and ZooKeeper port rules when ingress sources are specified.

What Gets Created

When you deploy an AwsMskCluster resource, OpenMCF provisions:

  • MSK Cluster — an aws_msk_cluster resource with the specified number of broker nodes distributed across subnets, configured with the requested Kafka version, instance type, authentication methods, encryption settings, and monitoring level
  • Security Group — created only when securityGroupIds or allowedCidrBlocks are provided; opens ports 9092-9098 (Kafka broker protocols) and 2181-2182 (ZooKeeper) for the specified source security groups and CIDR ranges, with unrestricted egress
  • MSK Configuration — created only when serverProperties is provided; holds Apache Kafka server.properties overrides (e.g., replication factor, min ISR, auto-create topics) and is associated with the cluster

Prerequisites

  • AWS credentials configured via environment variables or OpenMCF provider config
  • At least one VPC subnet for broker placement; three subnets across distinct Availability Zones recommended for production
  • A VPC ID if specifying securityGroupIds or allowedCidrBlocks (required for managed security group creation)
  • A KMS key ARN if using customer-managed encryption at rest
  • An ACM Private CA ARN if enabling mutual TLS (mTLS) authentication
  • A CloudWatch Log Group if enabling CloudWatch broker log delivery
  • A Kinesis Data Firehose delivery stream if enabling Firehose broker log delivery
  • An S3 bucket if enabling S3 broker log delivery

Quick Start

Create a file msk.yaml:

apiVersion: aws.openmcf.org/v1
kind: AwsMskCluster
metadata:
  name: my-kafka
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: dev.AwsMskCluster.my-kafka
spec:
  region: us-west-2
  kafkaVersion: "3.6.0"
  numberOfBrokerNodes: 3
  instanceType: kafka.t3.small
  subnetIds:
    - subnet-0a1b2c3d4e5f00001
    - subnet-0a1b2c3d4e5f00002
    - subnet-0a1b2c3d4e5f00003
  authentication:
    saslIamEnabled: true

Deploy:

openmcf apply -f msk.yaml

This creates a 3-broker MSK cluster with SASL/IAM authentication across three subnets, TLS encryption enabled by default.

Configuration Reference

Required Fields

FieldTypeDescriptionValidation
regionstringAWS region where the MSK cluster will be created (e.g., us-west-2, eu-west-1).Required; non-empty
kafkaVersionstringApache Kafka version (e.g., "3.6.0", "3.5.1"). Downgrades force cluster replacement.Required
numberOfBrokerNodesintTotal broker nodes. Must be a multiple of the number of subnets for even AZ distribution.Required, >= 1
instanceTypestringBroker EC2 instance type (e.g., "kafka.m5.large", "kafka.m7g.xlarge", "kafka.t3.small").Required
subnetIdsStringValueOrRef[]VPC subnets for broker placement. ForceNew. Can reference AwsVpc via valueFrom.Minimum 1 item

Optional Fields

FieldTypeDefaultDescription
securityGroupIdsStringValueOrRef[][]Source security groups for managed SG ingress rules. Can reference AwsSecurityGroup via valueFrom.
allowedCidrBlocksstring[][]IPv4 CIDR ranges for managed SG ingress rules. Must be valid CIDR notation.
associateSecurityGroupIdsStringValueOrRef[][]Existing security groups attached directly to the cluster. ForceNew: changes force replacement.
vpcIdStringValueOrRef—VPC for managed security group creation. Required when securityGroupIds or allowedCidrBlocks are set.
ebsVolumeSizeGibintAWS defaultEBS volume size per broker in GiB. Range: 1-16384.
provisionedThroughputEnabledboolfalseEnable provisioned EBS throughput. Requires large instance types and ebsVolumeSizeGib >= 10.
provisionedThroughputMbsint—Provisioned throughput in MiB/s per broker. Range: 250-2375. Required when provisionedThroughputEnabled is true.
storageModestring—LOCAL or TIERED. Tiered offloads warm data to S3 for cost optimization.
kmsKeyArnStringValueOrRefAWS-managed keyKMS key for at-rest encryption. ForceNew. Can reference AwsKmsKey via valueFrom.
clientBrokerEncryptionstringTLSClient-broker encryption: TLS, TLS_PLAINTEXT, or PLAINTEXT.
inClusterEncryptionbooltrueInter-broker TLS encryption. ForceNew.
authenticationobject—Client authentication configuration. See below.
authentication.saslIamEnabledboolfalseEnable SASL/IAM authentication (port 9098). Recommended.
authentication.saslScramEnabledboolfalseEnable SASL/SCRAM-SHA-512 authentication (port 9096).
authentication.tlsEnabledboolfalseEnable mutual TLS authentication (port 9094).
authentication.tlsCertificateAuthorityArnsStringValueOrRef[][]ACM Private CA ARNs for mTLS. Required when tlsEnabled is true.
authentication.unauthenticatedboolfalseAllow unauthenticated connections. Not recommended for production.
configurationArnstring—ARN of an external MSK Configuration. Mutually exclusive with serverProperties.
configurationRevisionint—Revision of external configuration. Required when configurationArn is set. >= 1.
serverPropertiesmap<string,string>{}Inline Kafka server.properties overrides. Creates an MSK Configuration resource. Mutually exclusive with configurationArn.
logging.cloudwatchLogs.enabledboolfalseEnable CloudWatch Logs delivery.
logging.cloudwatchLogs.logGroupStringValueOrRef—CloudWatch Log Group name. Required when enabled. Can reference AwsCloudwatchLogGroup via valueFrom.
logging.firehose.enabledboolfalseEnable Firehose delivery.
logging.firehose.deliveryStreamStringValueOrRef—Firehose delivery stream name. Required when enabled. Can reference AwsKinesisFirehose via valueFrom.
logging.s3.enabledboolfalseEnable S3 delivery.
logging.s3.bucketStringValueOrRef—S3 bucket name. Required when enabled. Can reference AwsS3Bucket via valueFrom.
logging.s3.prefixstring—Optional S3 key prefix for log objects.
enhancedMonitoringstringDEFAULTCloudWatch metrics level: DEFAULT, PER_BROKER, PER_TOPIC_PER_BROKER, PER_TOPIC_PER_PARTITION.
jmxExporterEnabledboolfalseEnable Prometheus JMX Exporter (port 11001).
nodeExporterEnabledboolfalseEnable Prometheus Node Exporter (port 11002).
publicAccessTypestring—DISABLED or SERVICE_PROVIDED_EIPS. Public access requires SASL/IAM or SASL/SCRAM with TLS.

Examples

Production Cluster with IAM Auth and KMS

A 6-broker cluster with customer-managed encryption, tiered storage, and CloudWatch monitoring:

apiVersion: aws.openmcf.org/v1
kind: AwsMskCluster
metadata:
  name: prod-kafka
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: prod.AwsMskCluster.prod-kafka
spec:
  region: us-east-1
  kafkaVersion: "3.6.0"
  numberOfBrokerNodes: 6
  instanceType: kafka.m7g.xlarge
  subnetIds:
    - subnet-az1
    - subnet-az2
    - subnet-az3
  ebsVolumeSizeGib: 1000
  storageMode: TIERED
  kmsKeyArn: arn:aws:kms:us-east-1:123456789012:key/mrk-abc123
  authentication:
    saslIamEnabled: true
  serverProperties:
    auto.create.topics.enable: "false"
    default.replication.factor: "3"
    min.insync.replicas: "2"
  logging:
    cloudwatchLogs:
      enabled: true
      logGroup: /aws/msk/prod-kafka
  enhancedMonitoring: PER_TOPIC_PER_BROKER
  jmxExporterEnabled: true
  nodeExporterEnabled: true

Multi-Authentication Cluster

All three authentication methods enabled for mixed client populations:

apiVersion: aws.openmcf.org/v1
kind: AwsMskCluster
metadata:
  name: multi-auth-kafka
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: prod.AwsMskCluster.multi-auth-kafka
spec:
  region: us-west-2
  kafkaVersion: "3.6.0"
  numberOfBrokerNodes: 3
  instanceType: kafka.m5.large
  subnetIds:
    - subnet-az1
    - subnet-az2
    - subnet-az3
  authentication:
    saslIamEnabled: true
    saslScramEnabled: true
    tlsEnabled: true
    tlsCertificateAuthorityArns:
      - arn:aws:acm-pca:us-east-1:123456789012:certificate-authority/abc-12345

Full Logging Configuration

Broker logs delivered to all three destinations simultaneously:

apiVersion: aws.openmcf.org/v1
kind: AwsMskCluster
metadata:
  name: logged-kafka
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: prod.AwsMskCluster.logged-kafka
spec:
  region: us-west-2
  kafkaVersion: "3.6.0"
  numberOfBrokerNodes: 3
  instanceType: kafka.m5.large
  subnetIds:
    - subnet-az1
    - subnet-az2
    - subnet-az3
  authentication:
    saslIamEnabled: true
  logging:
    cloudwatchLogs:
      enabled: true
      logGroup: /aws/msk/logged-kafka
    firehose:
      enabled: true
      deliveryStream: msk-logs-to-s3
    s3:
      enabled: true
      bucket: my-msk-audit-logs
      prefix: broker-logs/

Using Foreign Key References

Reference other OpenMCF-managed resources instead of hardcoding IDs:

apiVersion: aws.openmcf.org/v1
kind: AwsMskCluster
metadata:
  name: ref-kafka
  labels:
    openmcf.org/provisioner: pulumi
    pulumi.openmcf.org/organization: my-org
    pulumi.openmcf.org/project: my-project
    pulumi.openmcf.org/stack.name: prod.AwsMskCluster.ref-kafka
spec:
  region: us-east-1
  kafkaVersion: "3.6.0"
  numberOfBrokerNodes: 6
  instanceType: kafka.m7g.xlarge
  subnetIds:
    - valueFrom:
        kind: AwsVpc
        name: production-vpc
        fieldPath: status.outputs.private_subnets.[0].id
    - valueFrom:
        kind: AwsVpc
        name: production-vpc
        fieldPath: status.outputs.private_subnets.[1].id
    - valueFrom:
        kind: AwsVpc
        name: production-vpc
        fieldPath: status.outputs.private_subnets.[2].id
  vpcId:
    valueFrom:
      kind: AwsVpc
      name: production-vpc
      fieldPath: status.outputs.vpc_id
  securityGroupIds:
    - valueFrom:
        kind: AwsSecurityGroup
        name: kafka-clients
        fieldPath: status.outputs.security_group_id
  kmsKeyArn:
    valueFrom:
      kind: AwsKmsKey
      name: platform-key
      fieldPath: status.outputs.key_arn
  authentication:
    saslIamEnabled: true
  logging:
    cloudwatchLogs:
      enabled: true
      logGroup:
        valueFrom:
          kind: AwsCloudwatchLogGroup
          name: kafka-logs
          fieldPath: status.outputs.log_group_name

Stack Outputs

After deployment, the following outputs are available in status.outputs:

OutputTypeDescription
cluster_arnstringARN of the MSK cluster, used in IAM policies and event source mappings
cluster_namestringName of the MSK cluster
cluster_uuidstringUUID extracted from the cluster ARN
current_versionstringCluster version string, required for update operations
bootstrap_brokersstringComma-separated plaintext broker endpoints (port 9092). Empty when clientBrokerEncryption is TLS.
bootstrap_brokers_tlsstringComma-separated TLS broker endpoints (port 9094)
bootstrap_brokers_sasl_iamstringComma-separated SASL/IAM broker endpoints (port 9098). Populated when saslIamEnabled is true.
bootstrap_brokers_sasl_scramstringComma-separated SASL/SCRAM broker endpoints (port 9096). Populated when saslScramEnabled is true.
bootstrap_brokers_public_tlsstringComma-separated public TLS endpoints. Populated when publicAccessType is SERVICE_PROVIDED_EIPS.
bootstrap_brokers_public_sasl_iamstringComma-separated public SASL/IAM endpoints
bootstrap_brokers_public_sasl_scramstringComma-separated public SASL/SCRAM endpoints
zookeeper_connect_stringstringComma-separated ZooKeeper plaintext endpoints
zookeeper_connect_string_tlsstringComma-separated ZooKeeper TLS endpoints
security_group_idstringID of the managed security group. Only set when securityGroupIds or allowedCidrBlocks are provided.
configuration_arnstringARN of the inline MSK Configuration. Only set when serverProperties is provided.

Related Components

  • AwsVpc — provides subnets for broker placement and VPC ID for managed security group
  • AwsSecurityGroup — controls network access to Kafka and ZooKeeper ports
  • AwsKmsKey — provides customer-managed encryption key for data at rest
  • AwsCloudwatchLogGroup — receives broker logs via CloudWatch Logs integration
  • AwsKinesisFirehose — receives broker logs for analytics pipeline delivery

Next article

AWS MWAA Environment

AWS MWAA Environment Deploys an Amazon Managed Workflows for Apache Airflow environment with DAGs sourced from S3, VPC-based networking across two Availability Zones, and optional managed security group creation. The component handles environment sizing, per-module CloudWatch logging, KMS encryption, and worker auto-scaling configuration. What Gets Created When you deploy an AwsMwaaEnvironment resource, OpenMCF provisions: MWAA Environment — an awsmwaaenvironment with DAGs loaded from an S3...
Read next article
Presets
3 ready-to-deploy configurationsView presets →