Thursday, August 14, 2025

AWS DynamoDB | Integration With S3 Bucket.

 

AWS DynamoDB ↔ S3 integration,

View:

What DynamoDB ↔ S3 integration is,

 How to use DynamoDB ↔ S3 integration,

 Why uses DynamoDB ↔  S3 integration,

Gotchas (what to watch out for) in the architectural decisions of  DynamoDB ↔ S3 integration.

1. Integration Overview

DynamoDB and S3 don’t “natively” integrate in the same way like, S3 integrates with Glacier. However, AWS provides several official integration patterns for data export, import, backup, analytics, and archival.

The key integration points are:

Use Case

AWS Service / Feature

Direction

Full table export without performance impact

DynamoDB Export to S3 (Point-in-time recovery snapshot)

DynamoDB → S3

Continuous stream processing to S3 for analytics or ETL

DynamoDB Streams → Kinesis Data Firehose → S3

DynamoDB → S3

Ad-hoc data extraction

AWS Glue / Data Pipeline

DynamoDB → S3

Backup for compliance or DR

AWS Backup (supports DynamoDB to S3 storage backend)

DynamoDB → S3

Restore from S3 into DynamoDB

AWS Glue / Data Pipeline / Custom Lambda Loader

S3 → DynamoDB

Nearline cold storage migration

Custom Lambda/Kinesis pipeline

DynamoDB ↔ S3

2. Exporting DynamoDB to S3

2.1 Native Export to S3

  • Feature: DynamoDB Export to S3
  • Type: Serverless snapshot-based export
  • Data format: Compressed, partitioned JSON or DynamoDB JSON (Amazon Ion)
  • Source: Any Point-in-Time Recovery (PITR) timestamp
  • Performance: No impact on the live table
  • Trigger: On-demand
  • S3 Path: s3://twtech-s3bucket/AWSDynamoDB/YYYY-MM-DDTHH-MM-SSZ/
  • Cost: Based on the amount of data scanned (per GB) + S3 storage

Flow:

  1. Enable PITR on twtech DynamoDB table.
  2. Run “Export to S3” via console, CLI, or SDK.
  3. AWS creates multiple export files in S3 (partitioned for parallel processing in Athena/Glue).

Best for:

  • Full backups
  • Analytics ingestion into Athena/Redshift
  • Archiving state of table at a certain time

2.2 DynamoDB Streams + Kinesis Firehose

If twtech needs continuous export instead of point-in-time:

  • DynamoDB Streams captures item-level changes (INSERT, MODIFY, REMOVE).
  • Kinesis Data Firehose batches and delivers changes to S3 in near real-time.
  • Output format can be JSON, CSV, or Parquet.

Architecture:

DynamoDB → DynamoDB Streams → Lambda / Firehose → S3

Pros(Benefits):

  • Continuous, near-real-time pipeline
  • Can transform data before storing in S3
  • Supports partitioned storage for Athena queries

Cons(Limitations):

  • Stream retention = 24 hours (need to consume promptly)
  • Requires custom transform logic for upserts/deletes in S3

2.3 AWS Glue or Data Pipeline

  • Glue: Can crawl DynamoDB, extract data, transform, and write to S3 in various formats (Parquet, ORC, Avro, CSV, JSON).
  • AWS Data Pipeline: (older service) supports periodic export jobs from DynamoDB to S3.

3. Importing from S3 into DynamoDB

No one-click “import” exists yet in the same way as export, but twtech has several patterns:

  1. AWS Glue ETL Job
    • Read S3 data → Transform → Write to DynamoDB
    • Good for bulk historical loads
  2. AWS Data Pipeline
    • Predefined template for S3 → DynamoDB ingestion
    • More old-school, less common now
  3. Custom Lambda/Kinesis Ingestion
    • Use S3 event notifications to trigger Lambda, parse new object, batch-write to DynamoDB
  4. EMR/Spark
    • If twtech has large-scale batch data and need complex transformations before ingest

4. Real-World Integration Patterns

4.1 Analytics Pipeline

  • Goal: Query DynamoDB data with SQL
  • Pattern:
    1. DynamoDB Export to S3 (JSON/Parquet)
    2. S3 as source for Athena or Redshift Spectrum
  • Benefits:
    • Avoids expensive scans in DynamoDB
    • Athena queries directly on S3 data at low cost

4.2 Archiving Old Items

  • Goal: Keep DynamoDB lean and cost-effective
  • Pattern:
    • TTL on items → Streams → Lambda → S3
    • Store in compressed Parquet for cheap storage
  • Benefit:
    • DynamoDB remains small = cheaper provisioned capacity
    • Historical data kept in S3 for compliance

4.3 Near Real-Time Event Storage

  • Goal: Keep append-only event log in S3
  • Pattern:
    • DynamoDB Streams → Firehose → S3 partitioned by year/month/day/hour
  • Benefit:
    • Perfect for replay or reprocessing events later

5. Cost and Performance Considerations

  • Export to S3 = billed per GB scanned (storage in S3 separate)
  • Streams + Firehose = billed for stream reads, Firehose delivery, S3 storage
  • Glue = billed per Data Processing Unit (DPU) hour
  • Athena = billed per data scanned (optimize with Parquet)
  • S3 storage tiers can be leveraged for cost optimization (Standard → Glacier)

6. Security & Compliance

  • Always enable S3 server-side encryption (SSE-KMS) for exports
  • Use VPC Endpoints to avoid public traffic
  • IAM policies should restrict access to the specific S3 bucket/prefix
  • Consider Object Lock for immutability (WORM compliance)

7. Comparison Table

Method

Real-time?

Transformable?

Performance impact on DynamoDB?

Best Use Case

Export to S3

No

No

None

Backups, analytics

Streams + Firehose

Yes

Yes (via Lambda)

Low

Event-driven ETL, CDC pipelines

Glue

No

Yes

Medium

Large batch ETL

Data Pipeline

No

Limited

Medium

Legacy batch jobs

Custom Lambda

Yes

Yes

Low

Small-scale ingestion/export

 

Insights:

Continuous Stream Processing from DynamoDB to Amazon S3 for analytics or ETL.

 View:

·       Architecture,

·       Implementation,

·       Tips.

1. Why use this Continuous Stream Processing (benefits)

DynamoDB is fantastic for operational workloads but not optimized for:

  • Ad-hoc analytical queries
  • Historical data storage
  • Complex transformations

The idea behind:

 twteh uses DynamoDB Streams → Kinesis Data Firehose → S3 to continuously capture every change, transform it, and store it in an analytics-friendly format for services like Athena, Glue, or Redshift Spectrum.

2. Core Architecture

DynamoDB → DynamoDB Streams → (Lambda transform optional) → Kinesis Data Firehose → S3 → (Athena/Glue/Redshift)

2.1 Component Roles

  • DynamoDB Streams
    Captures INSERT, MODIFY, and REMOVE events for 24 hours after they occur.
    Streams data is immutable, ordered per partition key.
  • Lambda (optional)
    Intercepts stream records, performs:
    • Data enrichment
    • Field filtering/mapping
    • Format conversion (e.g., DynamoDB JSON → flat JSON/Parquet)
  • Kinesis Data Firehose
    Buffers, batches, and delivers transformed data to S3.
    • Batch size: up to 5 MB
    • Buffer interval: up to 900 seconds
    • Optional format conversion: JSON → Parquet/ORC
    • Can compress: GZIP, Snappy, Zip
  • Amazon S3
    Destination storage, typically partitioned:

 s3://twtech-analyticsbucket/dynamodb/twtech_dynamoDB_table/year=YYYY/month=MM/day=DD/hour=HH/

3. Step-by-Step Setup

Step 1: Enable DynamoDB Streams

  • Console → Table → Exports and Streams → Enable Streams
  • Choose New and old images if you want full before/after snapshots.

Step 2: Create a Firehose Delivery Stream

  • Source: Direct PUT
  • Destination: S3
  • Enable data transformation if Lambda is used.
  • Enable format conversion to Parquet for Athena optimization.

Step 3: Lambda Stream Consumer (Optional)

  • Trigger: DynamoDB Streams event
  • Transform records (flatten, rename keys, mask sensitive fields)
  • Push transformed data to Firehose via PutRecordBatch

Step 4: S3 Partitioning

  • Configure Firehose S3 prefix:

·        dynamodb/twtech_dynamoDB_table/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/

4. Querying the Data

Once data is in S3:

  • Use AWS Glue Crawler to create schema
  • Query with Amazon Athena:

·        SELECT *

·        FROM dynamodb_changes

·        WHERE year = '2025' AND month = '08'

  • Or load into Redshift Spectrum for joins with other datasets

5. Best Practices

  • Buffering: Larger batch sizes = lower cost but higher latency.
  • Format: Use Parquet for up to 90% cost reduction in Athena.
  • Security:
    • S3 bucket policy locked to Firehose role
    • SSE-KMS encryption
    • VPC endpoints to avoid public internet
  • Error Handling:
    • Firehose automatically retries failed deliveries
    • Failed records go to an S3 error prefix for later replay
  • Retention: DynamoDB Streams keep data for 24h — ensure consumer processes within this window.

6. Example Use Cases

  • Real-Time Analytics: Clickstream → DynamoDB → S3 → Athena dashboard refresh every few minutes.
  • CDC (Change Data Capture): Replicate DynamoDB changes to a data lake for ML training.
  • Archiving for Compliance: Store all modifications in immutable S3 storage with Object Lock.

using Amazon EMR / Apache Spark with DynamoDB ↔ S3. I’ll cover the main patterns, exact connectors, runnable snippets, and the knobs that matter for scale, cost, and correctness.

When EMR/Spark is the right tool

Use EMR/Spark when twech needs any of the following:

·        Large-scale batch exports from DynamoDB to S3 in columnar formats (Parquet/ORC) with heavy transforms or joins.

·        Bulk backfills from S3 into DynamoDB with dedupe/merge logic.

·        Enrichment: joining DynamoDB data with other lake datasets (S3) before publishing curated tables.

·        Streaming ETL at scale (Structured Streaming) when Kinesis → Firehose isn’t flexible enough.

Architecture patterns

1.     Batch export: DynamoDB → Spark on EMR → S3 (Parquet)

DynamoDB table ──(dynamodb-hadoop/spark-dynamodb)──> Spark jobs ──> S3 (Parquet, partitioned) ──> Athena/Redshift

·        Read with the EMR DynamoDB Hadoop connector or spark-dynamodb library.

·        Write partitioned by year/month/day (and a domain partition such as region/tenant).

2.     Bulk import: S3 (Parquet/JSON) → Spark → DynamoDB

Raw S3 data ──> Spark transform/validate ──(batch write)──> DynamoDB

·        Use conditional writes / idempotency keys to avoid duplicates.

3.     Streaming CDC enrichment (optional)

DynamoDB Streams → Kinesis → Spark Structured Streaming on EMR → S3 curated

·        Spark handles upserts & late data; S3 holds the gold tables.

Connectors to use

·        EMR DynamoDB Hadoop Connector (built into EMR on EC2 & EKS):

o   Read: org.apache.hadoop.dynamodb.read.DynamoDBInputFormat

o   Write: org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat

o   Works well for large scans and batch writes; integrates with EMRFS.

·        spark-dynamodb (community lib; ergonomic DataFrame API):

o   Coordinates: com.github.traviscrawford:spark-dynamodb_2.12:<version> (or org.apache.spark fork).

o   Lets you spark.read.format("dynamodb").

For most EMR on EC2/EKS stacks, the EMR DynamoDB connector is the safer default; use spark-dynamodb if twtech prefers DataFrame-first ergonomics.

Cluster choices

·        EMR Serverless: no cluster mgmt, great for bursty batch. Simplest ops.

·        EMR on EC2: best for full control, spot fleets, HDFS cache.

·        EMR on EKS: if twtech already operate Kubernetes and want shared pools.

Rule of thumb: start with EMR Serverless for batch ETL, then move to EC2/EKS if you need fine-grained tuning or very long jobs.

Minimal, runnable examples

A) Batch export DynamoDB → Parquet on S3 (PySpark on EMR)

Using the EMR DynamoDB Hadoop connector

# pyspark script: ddb_export_to_parquet.py
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("DDBExportToParquet")
         .getOrCreate())
table = "Orders"
region = "us-east-2"
s3_out = "s3://twtech-analytics-bucket/dynamodb/orders/"
# Read via Hadoop input format -> DataFrame
items_rdd = (spark.sparkContext
    .newAPIHadoopRDD(
        "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
        "org.apache.hadoop.io.Text",
        "org.apache.hadoop.io.Text",
        conf={
            "dynamodb.region": region,
            "dynamodb.input.tableName": table,
            # Optional parallelism controls
            "dynamodb.throughput.read.percent": "0.5",   # use 50% of table RCUs
            "dynamodb.export.format": "dynamodb"         # preserves types in JSON
        }))
# Convert to DataFrame (values are DynamoDB JSON strings)
from pyspark.sql.types import StringType
from pyspark.sql import Row
df = spark.read.json(items_rdd.map(lambda kv: kv[1]), multiLine=False)
# Optional transforms: select/rename/flatten, cast types, add load_date
from pyspark.sql.functions import current_date, year, month, dayofmonth
curated = (df
    .withColumn("load_date", current_date())
    .withColumn("year", year("load_date"))
    .withColumn("month", month("load_date"))
    .withColumn("day", dayofmonth("load_date")))
(curated
 .repartition("year","month","day")
 .write.mode("overwrite")
 .partitionBy("year","month","day")
 .parquet(s3_out))

Submit on EMR Serverless (example job payload):

{
  "name": "ddb-export-parquet",
  "applicationId": "YOUR_APP_ID",
  "executionRoleArn": "arn:aws:iam::1234567xxxx:role/EMRServerlessJobRole",
  "jobDriver": {
    "sparkSubmit": {
      "entryPoint": "s3://twtech-code/ddb_export_to_parquet.py",
      "sparkSubmitParameters": "--conf spark.sql.sources.partitionOverwriteMode=dynamic --conf spark.hadoop.dynamodb.throughput.read.percent=0.5"
    }
  },
  "configurationOverrides": {
    "applicationConfiguration": [{
      "classification": "spark-defaults",
      "properties": {
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
      }
    }]
  }
}

B) Bulk import S3 → DynamoDB (PySpark)

# pyspark script: s3_to_ddb_bulk_import.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2, concat_ws, col
spark = SparkSession.builder.appName("S3ToDDB").getOrCreate()
df = spark.read.parquet("s3://twtech-landing/orders_2025/*")
# Build idempotency key (twtech-key)
df = df.withColumn("idempotency_key", sha2(concat_ws("|", "order_id", "updated_at"), 256))
# Convert to DynamoDB JSON format expected by the Hadoop OutputFormat
def to_ddb_item(row):
    # Minimal example for a couple of fields
    return {
      "order_id": {"S": str(row["order_id"])},
      "status":   {"S": str(row["status"])},
      "amount":   {"N": str(row["amount"])},
      "updated_at": {"S": str(row["updated_at"])},
      "idem": {"S": row["idempotency_key"]}
    }
items = df.rdd.map(lambda r: ("", to_ddb_item(r)))
(items.saveAsNewAPIHadoopDataset(conf={
    "dynamodb.output.tableName": "Orders",
    "dynamodb.region": "us-east-2",
    # throttle to protect WCUs
    "dynamodb.throughput.write.percent": "0.5"
}, keyClass="org.apache.hadoop.io.Text",
   valueClass="org.apache.hadoop.io.Text",
   outputFormatClass="org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat"))

For DataFrame-native writes, the spark-dynamodb library lets twtech df.write.format("dynamodb").option("tableName","Orders").save().This ensure twtech bundle the right jar for its Spark/Scala version.

C) Streaming (Kinesis) → Spark Structured Streaming → S3 parquet

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, year, month, dayofmonth, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
spark = SparkSession.builder.appName("DDBStreamToS3").getOrCreate()
schema = StructType([
  StructField("pk", StringType()), StructField("sk", StringType()),
  StructField("op", StringType()), # INSERT/MODIFY/REMOVE
  StructField("ts", LongType()),
  StructField("amount", DoubleType())
])
raw = (spark.readStream
  .format("kinesis")
  .option("streamName", "ddb-changes")
  .option("region", "us-east-2")
  .option("startingposition", "LATEST")
  .load())
parsed = (raw.selectExpr("CAST(data AS STRING) json")
  .select(from_json(col("json"), schema).alias("r"))
  .select("r.*")
  .withColumn("event_time", to_timestamp((col("ts")/1000).cast("timestamp")))
  .withColumn("year", year("event_time"))
  .withColumn("month", month("event_time"))
  .withColumn("day", dayofmonth("event_time")))
query = (parsed.writeStream
  .format("parquet")
  .option("checkpointLocation", "s3://twtech-analytics-bucket/_chk/ddb/")
  .option("path", "s3://twtech-analytics-bucket/curated/ddb/")
  .partitionBy("year","month","day")
  .outputMode("append")
  .start())
query.awaitTermination()

Throughput & performance tuning (Concerns)

DynamoDB scan/write throttling

·        dynamodb.throughput.read.percent and dynamodb.throughput.write.percent (Hadoop connector) are your guardrails. Start at 0.3–0.5 to share capacity with prod.

·        Prefer On-Demand tables during massive one-time backfills to avoid manual capacity planning.

Parallelism

·        Export scans parallelize by segments. Use dynamodb.max.map.tasks or Spark spark.default.parallelism so each worker processes a segment.

·        Coalesce small files with spark.sql.files.maxRecordsPerFile or post-write OPTIMIZE/compaction (e.g., Delta/Iceberg).

Data formats

·        Write Parquet with sensible types; avoid everything-as-string.

·        Partition by low-cardinality columns (year/month/day) plus a domain split (e.g., tenant_id) if queries benefit.

Idempotency / upserts

·        For S3→DDB imports, use a deterministic PK and optionally a conditional expression (e.g., attribute_not_exists(pk) or updated_at >= :ts) via a small Lambda consumer for strict control, or implement merge logic in Spark then write.

Fault tolerance

·        For streaming: set a durable checkpointLocation in S3 and enable exactly-once file sinks (Spark handles commit protocols).

·        For batch: make jobs re-runnable (write to a temp path, then atomic rename).

Costs

·        Prefer EMR Serverless for burst batch; Spot for EMR on EC2.

·        Parquet + partition pruning cuts Athena/Redshift Spectrum scan costs dramatically.

Security

·        Use SSE-KMS on S3 outputs; narrow KMS key policy to EMR roles.

·        Private traffic via Gateway/Interface VPC Endpoints (S3, DynamoDB, Kinesis).

·        Lock bucket prefixes with least-privilege IAM.

Operational templates (copy/paste)

spark-submit (EMR on EC2)

spark-submit \
  --deploy-mode cluster \
  --conf spark.hadoop.dynamodb.region=us-east-2 \
  --conf spark.hadoop.dynamodb.throughput.read.percent=0.5 \
  s3://twtech-code/ddb_export_to_parquet.py

EMR Serverless application (Spark)

·        Create once, then submit jobs with different entrypoints.

·        Size with max DPUs ~ # of input segments / desired runtime.

Validation & publishing

1.     Row counts: compare DDB ScanCount (or DescribeTable item count) vs. Parquet row count.

2.     Schema registry: Glue Crawler -> Glue Catalog table for Athena.

3.     Data quality: add Great Expectations/Deequ step in the EMR job.

4.     Promotion: write to s3://…/staging/ then move to …/curated/ on success.

Parquet / ORC:

In the context of DynamoDB ↔ S3 integration with EMR/Spark:

For:

·       Big performance,

·       Cost implications for analytics, ETL,

·       Data lake workflows.

1. Why use Parquet or ORC instead of JSON/CSV

DynamoDB exports (native or via Streams) default to JSON format, which is:

  • Human readable but storage-heavy (lots of repeated field names)
  • Expensive to query in Athena / Redshift Spectrum (no column pruning, no predicate pushdown)
  • Slow to scan because you must read all columns

Parquet and ORC are columnar storage formats that:

  • Store values column-by-column instead of row-by-row
  • Compress better (especially for repeated values)
  • Allow predicate pushdown (Athena reads only necessary columns)
  • Support schema evolution (adding columns over time)
  • Enable vectorized reads in Spark

 Typical cost savings:
Querying a 100 GB JSON dataset in Athena could scan all 100 GB,
whereas a well-partitioned Parquet dataset might scan only 2–5 GB.

2. Parquet vs ORC — Quick Comparison

Feature

Parquet

ORC

Best with

Wide schemas, many numeric/string cols

Highly repetitive data, fewer updates

Compression default

Snappy (fast), GZIP (smaller)

Zlib (smaller), Zstd (newer, smaller+fast)

Predicate pushdown

Yes

Yes

Athena support

Yes

Yes

Redshift Spectrum

Yes

Yes

Schema evolution

Add columns easily

Add columns easily

File size sweet spot

128–512 MB

128–512 MB

Rule of thumb:

  • Use Parquet if you expect a lot of Spark/Athena/Presto queries
  • Use ORC if your workloads are Hive-heavy or highly repetitive text that benefits from dictionary encoding

3. Converting DynamoDB Data to Parquet/ORC with Spark

3.1 From DynamoDB to Parquet

from pyspark.sql import SparkSession

from pyspark.sql.functions import year, month, dayofmonth, current_date

spark = SparkSession.builder.appName("DDBToParquet").getOrCreate()

# Read from DynamoDB via Hadoop connector

items_rdd = spark.sparkContext.newAPIHadoopRDD(

    "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",

    "org.apache.hadoop.io.Text",

    "org.apache.hadoop.io.Text",

    conf={

        "dynamodb.region": "us-east-2",

        "dynamodb.input.tableName": "Orders",

        "dynamodb.throughput.read.percent": "0.5"

    }

)

df = spark.read.json(items_rdd.map(lambda kv: kv[1]))

 

df_curated = (

    df.withColumn("load_date", current_date())

      .withColumn("year", year("load_date"))

      .withColumn("month", month("load_date"))

      .withColumn("day", dayofmonth("load_date"))

)

(df_curated

    .repartition("year", "month", "day")

    .write.mode("overwrite")

    .partitionBy("year", "month", "day")

    .parquet("s3://twtech-s3bucket/dynamodb/orders/"))

3.2 From DynamoDB to ORC

(df_curated

    .repartition("year", "month", "day")

    .write.mode("overwrite")

    .option("orc.compress", "ZSTD")   # Smaller + faster

    .partitionBy("year", "month", "day")

    .orc("s3://twtech-s3bucket/dynamodb/orders_orc/"))

4. Partitioning Strategy

To maximize performance:

  • Partition by date-based columns (e.g., year/month/day)
  • Add domain-based partitions if queries are filtered by a specific key (e.g., region, tenant_id)
  • Avoid too many small partitions (<128 MB files)

S3 Layout Example:

s3://twtech-s3bucket/dynamodb/orders/

    year=2025/month=08/day=14/part-0000.parquet

    year=2025/month=08/day=14/part-0001.parquet

5. Athena/Glue Integration

Once data is in Parquet/ORC:

  1. Create/Update Glue table with partition metadata:

CREATE EXTERNAL TABLE orders (

    order_id string,

    status string,

    amount double,

    load_date date

)

PARTITIONED BY (year int, month int, day int)

STORED AS PARQUET

LOCATION 's3://twtech-s3bucket/dynamodb/orders/';

  1. MSCK REPAIR TABLE orders; to pick up partitions
  2. Athena queries will now only scan needed partitions & columns

6. Best Practices

  • File size: Aim for 128–512 MB to optimize scan throughput
  • Compression:
    • Parquet → snappy for speed, gzip for max compression
    • ORC → zstd for balanced performance
  • Schema evolution: Always add columns at the end; avoid renaming
  • Compaction: For streaming pipelines, periodically compact small files into larger ones
  • Data lake governance: Consider Delta Lake or Apache Iceberg if twtech needs ACID updates

CID updates in the DynamoDB → S3 (Parquet/ORC) world.
This is where things get tricky, because S3 alone is not ACID, but a layer of technology can be created on top to make it behave like a transactional table store.

Step-by-step.

1. The Problem

When exporting or streaming DynamoDB data into S3 in Parquet/ORC format:

  • S3 is immutable for objects, but twtech can overwrite entire objects without isolation guarantees
  • Multiple writers can create inconsistent views if updates overlap
  • Deletes/updates require rewriting whole files (columnar formats aren’t row-updatable)
  • Athena/Presto/Redshift Spectrum will see “half-written” data if queries overlap updates

So, twtech needs a table format that adds transaction logs and schema evolution.

2. Solutions for ACID in S3

Three leading open formats needed:

Format

ACID Support

Time Travel

Popular Engines

Apache Hudi

Yes

Yes

Spark, Hive, Flink, Presto

Delta Lake

Yes

Yes

Spark, Presto, Trino, Athena (via connectors)

Apache Iceberg

Yes

Yes

Spark, Flink, Trino, Athena, Snowflake

2.1 How it works

These formats:

  • Store data files (Parquet/ORC) in S3
  • Maintain a transaction log (JSON/Parquet metadata) in the same bucket
  • Writers append new files and update the log atomically
  • Queries read the log to get a consistent snapshot

Update/Delete workflow in ACID table formats:

  1. Write changed rows to new Parquet/ORC files
  2. Update the metadata log to mark old files as obsolete
  3. Readers see only the “committed” snapshot

3. DynamoDB → ACID Table on S3

A practical pattern looks like:

DynamoDB Streams

    ↓

Spark Structured Streaming (EMR/EKS/Serverless)

    ↓

Delta Lake / Hudi / Iceberg table in S3

    ↓

Athena / Presto / Redshift Spectrum for queries

4. Example: Delta Lake Upserts from DynamoDB

from pyspark.sql import SparkSession

from delta.tables import DeltaTable

from pyspark.sql.functions import col

 

spark = SparkSession.builder \

    .appName("DDBToDeltaUpsert") \

    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

    .getOrCreate()

 

# Read change records from DynamoDB Streams/Kinesis (example)

df_changes = spark.readStream.format("kinesis") \

    .option("streamName", "twtech-ddb-changes") \

    .option("region", "us-east-2") \

    .option("startingPosition", "TRIM_HORIZON") \

    .load()

 

# Deserialize JSON into structured fields

# Assume schema has: pk, updated_at, amount, status, op (I/M/D)

 

delta_path = "s3://twtech-lakehouse/orders_delta/"

 

# Initialize table if needed

try:

    delta_table = DeltaTable.forPath(spark, delta_path)

except:

    df_changes.write.format("delta").save(delta_path)

    delta_table = DeltaTable.forPath(spark, delta_path)

 

# Merge logic (Upsert + Delete)

(delta_table.alias("t")

    .merge(df_changes.alias("s"), "t.pk = s.pk")

    .whenMatchedUpdateAll(condition="s.op IN ('I','M')")

    .whenMatchedDelete(condition="s.op = 'D'")

    .whenNotMatchedInsertAll(condition="s.op IN ('I')")

    .execute())

5. Why This Matters

Without ACID:

  • Deletes leave “zombie” rows
  • Updates create multiple versions that analytics might double-count
  • Streaming pipelines risk partial results mid-write

With ACID (Delta/Hudi/Iceberg):

  • Upserts & deletes are safe
  • Readers see consistent snapshots
  • twtech gets time travel to replay the table state at any timestamp
  • ETL jobs can run incrementally without scanning all files

6. Best Practices for ACID Tables in S3

  • Partition wisely: still use date + domain key to limit file rewrites
  • Compaction: run periodic optimize jobs to merge small files
  • Schema evolution: Add columns with nullable defaults
  • Vacuum: clean obsolete files after retention period
  • Concurrency: use a single job per table for writes to avoid conflicts

·        an AWS architecture diagram showing

An AWS architecture diagram showingDynamoDB Streams → Spark → Delta Lake/Hudi/Iceberg in S3 → Athena/Redshift



No comments:

Post a Comment

AWS DynamoDB | Integration With S3 Bucket.

  AWS DynamoDB ↔ S3 integration , View: What DynamoDB ↔ S3 integration is,   How to use DynamoDB ↔ S3 integration,   Why uses DynamoDB ↔  S3...