AWS DynamoDB ↔ S3
integration,
View:
What DynamoDB
↔ S3 integration is,
How to use DynamoDB
↔ S3 integration,
Why uses DynamoDB ↔ S3 integration,
Gotchas (what to watch out for) in the architectural decisions of DynamoDB ↔ S3 integration.
1. Integration Overview
DynamoDB and S3 don’t “natively” integrate in the same way like,
S3 integrates with Glacier. However,
AWS provides several official integration patterns for data export,
import, backup, analytics, and archival.
The key integration points are:
Use Case |
AWS Service / Feature |
Direction |
Full table export without
performance impact |
DynamoDB Export to S3 (Point-in-time recovery snapshot) |
DynamoDB → S3 |
Continuous stream processing to S3
for analytics or ETL |
DynamoDB Streams → Kinesis Data
Firehose → S3 |
DynamoDB → S3 |
Ad-hoc data extraction |
AWS Glue / Data Pipeline |
DynamoDB → S3 |
Backup for compliance or DR |
AWS Backup (supports DynamoDB to S3 storage backend) |
DynamoDB → S3 |
Restore from S3 into DynamoDB |
AWS Glue / Data Pipeline / Custom
Lambda Loader |
S3 → DynamoDB |
Nearline cold storage migration |
Custom Lambda/Kinesis pipeline |
DynamoDB ↔ S3 |
2. Exporting DynamoDB to S3
2.1 Native Export to S3
- Feature:
DynamoDB Export to S3
- Type:
Serverless snapshot-based export
- Data format:
Compressed, partitioned JSON or DynamoDB JSON (Amazon Ion)
- Source:
Any Point-in-Time Recovery (PITR) timestamp
- Performance:
No impact on the live table
- Trigger:
On-demand
- S3 Path:
s3://twtech-s3bucket/AWSDynamoDB/YYYY-MM-DDTHH-MM-SSZ/
- Cost:
Based on the amount of data scanned (per GB) + S3 storage
Flow:
- Enable PITR on twtech DynamoDB table.
- Run “Export to S3” via console, CLI, or SDK.
- AWS creates multiple export files in S3 (partitioned
for parallel processing in Athena/Glue).
Best for:
- Full backups
- Analytics ingestion into Athena/Redshift
- Archiving state of table at a certain time
2.2 DynamoDB Streams + Kinesis Firehose
If twtech needs continuous
export instead of point-in-time:
- DynamoDB Streams
captures item-level changes (INSERT,
MODIFY, REMOVE).
- Kinesis Data Firehose
batches and delivers changes to S3 in near real-time.
- Output format can be JSON, CSV, or Parquet.
Architecture:
DynamoDB
→ DynamoDB Streams → Lambda / Firehose → S3
Pros(Benefits):
- Continuous, near-real-time pipeline
- Can transform data before storing in S3
- Supports partitioned storage for Athena queries
Cons(Limitations):
- Stream retention = 24 hours (need to consume promptly)
- Requires custom transform logic for upserts/deletes in
S3
2.3 AWS Glue or Data Pipeline
- Glue:
Can crawl DynamoDB, extract data, transform, and write to S3 in various
formats (Parquet, ORC, Avro, CSV, JSON).
- AWS Data Pipeline:
(older service) supports periodic export jobs from DynamoDB to S3.
3. Importing from S3 into DynamoDB
No one-click “import” exists yet in
the same way as export, but twtech has several patterns:
- AWS Glue ETL Job
- Read S3 data → Transform → Write to DynamoDB
- Good for bulk historical loads
- AWS Data Pipeline
- Predefined template for S3 → DynamoDB ingestion
- More old-school, less common now
- Custom Lambda/Kinesis Ingestion
- Use S3 event notifications to trigger Lambda, parse
new object, batch-write to DynamoDB
- EMR/Spark
- If twtech has large-scale batch data and need complex
transformations before ingest
4. Real-World Integration Patterns
4.1 Analytics Pipeline
- Goal:
Query DynamoDB data with SQL
- Pattern:
- DynamoDB Export to S3 (JSON/Parquet)
- S3 as source for Athena or Redshift Spectrum
- Benefits:
- Avoids expensive scans in DynamoDB
- Athena queries directly on S3 data at low cost
4.2 Archiving Old Items
- Goal:
Keep DynamoDB lean and cost-effective
- Pattern:
- TTL on items → Streams → Lambda → S3
- Store in compressed Parquet for cheap storage
- Benefit:
- DynamoDB remains small = cheaper provisioned capacity
- Historical data kept in S3 for compliance
4.3 Near Real-Time Event Storage
- Goal:
Keep append-only event log in S3
- Pattern:
- DynamoDB Streams → Firehose → S3 partitioned by year/month/day/hour
- Benefit:
- Perfect for replay or reprocessing events later
5. Cost and Performance Considerations
- Export to S3
= billed per GB scanned (storage in
S3 separate)
- Streams + Firehose
= billed for stream reads, Firehose delivery, S3 storage
- Glue
= billed per Data Processing Unit (DPU)
hour
- Athena
= billed per data scanned (optimize
with Parquet)
- S3 storage tiers
can be leveraged for cost optimization (Standard → Glacier)
6. Security & Compliance
- Always enable S3 server-side encryption (SSE-KMS)
for exports
- Use VPC Endpoints to avoid public traffic
- IAM policies should restrict access to the specific S3
bucket/prefix
- Consider Object Lock for immutability (WORM
compliance)
7. Comparison Table
Method |
Real-time? |
Transformable? |
Performance impact on DynamoDB? |
Best Use Case |
Export to S3 |
No |
No |
None |
Backups, analytics |
Streams + Firehose |
Yes |
Yes (via Lambda) |
Low |
Event-driven ETL, CDC pipelines |
Glue |
No |
Yes |
Medium |
Large batch ETL |
Data Pipeline |
No |
Limited |
Medium |
Legacy batch jobs |
Custom Lambda |
Yes |
Yes |
Low |
Small-scale ingestion/export |
Insights:
Continuous Stream Processing from
DynamoDB to Amazon S3 for analytics or ETL.
View:
· Architecture,
· Implementation,
· Tips.
1. Why use this Continuous Stream Processing (benefits)
DynamoDB is fantastic for operational
workloads but not optimized for:
- Ad-hoc
analytical queries
- Historical
data storage
- Complex
transformations
The idea behind:
twteh uses DynamoDB Streams → Kinesis Data
Firehose → S3 to continuously capture every change, transform it, and store
it in an analytics-friendly format for services like Athena, Glue, or
Redshift Spectrum.
2. Core Architecture
DynamoDB
→ DynamoDB Streams → (Lambda transform optional) → Kinesis Data Firehose → S3 →
(Athena/Glue/Redshift)
2.1 Component Roles
- DynamoDB Streams
Captures INSERT, MODIFY, and REMOVE events for 24 hours after they occur.
Streams data is immutable, ordered per partition key. - Lambda (optional)
Intercepts stream records, performs: - Data enrichment
- Field filtering/mapping
- Format conversion (e.g., DynamoDB JSON → flat
JSON/Parquet)
- Kinesis Data Firehose
Buffers, batches, and delivers transformed data to S3. - Batch size: up to 5 MB
- Buffer interval: up to 900 seconds
- Optional format conversion: JSON → Parquet/ORC
- Can compress: GZIP, Snappy, Zip
- Amazon S3
Destination storage, typically partitioned:
s3://twtech-analyticsbucket/dynamodb/twtech_dynamoDB_table/year=YYYY/month=MM/day=DD/hour=HH/
3. Step-by-Step Setup
Step 1: Enable DynamoDB Streams
- Console → Table → Exports and Streams → Enable Streams
- Choose New and old images if you want full
before/after snapshots.
Step 2: Create a Firehose Delivery Stream
- Source: Direct PUT
- Destination: S3
- Enable data transformation if Lambda is used.
- Enable format conversion to Parquet for Athena
optimization.
Step 3: Lambda Stream Consumer (Optional)
- Trigger: DynamoDB Streams event
- Transform records (flatten, rename keys, mask sensitive
fields)
- Push transformed data to Firehose via PutRecordBatch
Step 4: S3 Partitioning
- Configure Firehose S3 prefix:
· dynamodb/twtech_dynamoDB_table/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
4. Querying the Data
Once data is in S3:
- Use AWS Glue Crawler to create schema
- Query with Amazon Athena:
·
SELECT *
·
FROM dynamodb_changes
·
WHERE year = '2025' AND month = '08'
- Or load into Redshift Spectrum for joins with
other datasets
5. Best Practices
- Buffering:
Larger batch sizes = lower cost but higher latency.
- Format:
Use Parquet for up to 90% cost reduction in Athena.
- Security:
- S3 bucket policy locked to Firehose role
- SSE-KMS encryption
- VPC endpoints to avoid public internet
- Error Handling:
- Firehose automatically retries failed deliveries
- Failed records go to an S3 error prefix for
later replay
- Retention:
DynamoDB Streams keep data for 24h — ensure consumer processes within this
window.
6. Example Use Cases
- Real-Time Analytics:
Clickstream → DynamoDB → S3 → Athena dashboard refresh every few minutes.
- CDC (Change Data Capture): Replicate DynamoDB changes to a data lake for ML
training.
- Archiving for Compliance: Store all modifications in immutable S3 storage with
Object Lock.
using Amazon EMR /
Apache Spark with DynamoDB ↔ S3. I’ll cover the main patterns,
exact connectors, runnable snippets, and the knobs that matter for scale, cost,
and correctness.
When EMR/Spark is the right tool
Use
EMR/Spark when twech needs any of the following:
·
Large-scale batch exports from DynamoDB to S3 in
columnar formats (Parquet/ORC) with
heavy transforms or joins.
·
Bulk backfills from S3 into DynamoDB with dedupe/merge
logic.
·
Enrichment: joining DynamoDB data with other lake
datasets (S3) before publishing
curated tables.
·
Streaming ETL at scale (Structured Streaming) when Kinesis → Firehose isn’t flexible
enough.
Architecture patterns
1. Batch export: DynamoDB → Spark on EMR →
S3 (Parquet)
DynamoDB table ──(dynamodb-hadoop/spark-dynamodb)──> Spark
jobs ──> S3 (Parquet, partitioned) ──> Athena/Redshift
·
Read with the EMR DynamoDB Hadoop connector or spark-dynamodb library.
·
Write partitioned by year/month/day
(and a domain partition such as region/tenant).
2. Bulk import: S3 (Parquet/JSON) → Spark
→ DynamoDB
Raw S3 data ──> Spark
transform/
validate ──(batch
write)──> DynamoDB
·
Use conditional writes / idempotency keys to
avoid duplicates.
3. Streaming CDC enrichment (optional)
DynamoDB Streams → Kinesis → Spark Structured Streaming
on EMR → S3 curated
·
Spark handles upserts & late data; S3 holds
the gold tables.
Connectors to use
·
EMR DynamoDB Hadoop Connector (built into EMR on EC2
& EKS):
o
Read: org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
o
Write: org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
o
Works well for large scans and batch writes;
integrates with EMRFS.
·
spark-dynamodb (community lib; ergonomic DataFrame
API):
o
Coordinates: com.github.traviscrawford:spark-dynamodb_2.12:<version>
(or org.apache.spark
fork).
o
Lets you spark.read.format("dynamodb")
.
For most EMR on EC2/EKS stacks, the EMR DynamoDB connector is the safer default; use spark-dynamodb
if twtech prefers DataFrame-first ergonomics.
Cluster choices
·
EMR Serverless: no cluster mgmt, great for bursty
batch. Simplest ops.
·
EMR on EC2: best for full control, spot fleets, HDFS
cache.
·
EMR on EKS: if twtech already operate Kubernetes and want
shared pools.
Rule of thumb: start with EMR
Serverless for batch ETL, then move to EC2/EKS if you need
fine-grained tuning or very long jobs.
Minimal, runnable examples
A) Batch export DynamoDB → Parquet on S3 (PySpark on EMR)
Using the EMR DynamoDB
Hadoop connector
# pyspark script: ddb_export_to_parquet.py
from pyspark.sql
import SparkSession
spark = (SparkSession.builder
.appName(
"DDBExportToParquet")
.getOrCreate())
table =
"Orders"
region =
"us-east-2"
s3_out =
"s3://twtech-analytics-bucket/dynamodb/orders/"
# Read via Hadoop input format -> DataFrame
items_rdd = (spark.sparkContext
.newAPIHadoopRDD(
"org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text",
conf={
"dynamodb.region": region,
"dynamodb.input.tableName": table,
# Optional parallelism controls
"dynamodb.throughput.read.percent":
"0.5",
# use 50% of table RCUs
"dynamodb.export.format":
"dynamodb"
# preserves types in JSON
}))
# Convert to DataFrame (values are DynamoDB JSON strings)
from pyspark.sql.types
import StringType
from pyspark.sql
import Row
df = spark.read.json(items_rdd.
map(
lambda kv: kv[
1]), multiLine=
False)
# Optional transforms: select/rename/flatten, cast types, add load_date
from pyspark.sql.functions
import current_date, year, month, dayofmonth
curated = (df
.withColumn(
"load_date", current_date())
.withColumn(
"year", year(
"load_date"))
.withColumn(
"month", month(
"load_date"))
.withColumn(
"day", dayofmonth(
"load_date")))
(curated
.repartition(
"year",
"month",
"day")
.write.mode(
"overwrite")
.partitionBy(
"year",
"month",
"day")
.parquet(s3_out))
Submit on
EMR Serverless (example job payload):
{
"name":
"ddb-export-parquet",
"applicationId":
"YOUR_APP_ID",
"executionRoleArn":
"arn:aws:iam::1234567xxxx:role/EMRServerlessJobRole",
"jobDriver":
{
"sparkSubmit":
{
"entryPoint":
"s3://twtech-code/ddb_export_to_parquet.py",
"sparkSubmitParameters":
"--conf spark.sql.sources.partitionOverwriteMode=dynamic --conf spark.hadoop.dynamodb.throughput.read.percent=0.5"
}
},
"configurationOverrides":
{
"applicationConfiguration":
[{
"classification":
"spark-defaults",
"properties":
{
"spark.serializer":
"org.apache.spark.serializer.KryoSerializer"
}
}]
}
}
B) Bulk import S3 → DynamoDB (PySpark)
# pyspark script: s3_to_ddb_bulk_import.py
from pyspark.sql
import SparkSession
from pyspark.sql.functions
import sha2, concat_ws, col
spark = SparkSession.builder.appName(
"S3ToDDB").getOrCreate()
df = spark.read.parquet(
"s3://twtech-landing/orders_2025/*")
# Build idempotency key (twtech-key)
df = df.withColumn(
"idempotency_key", sha2(concat_ws(
"|",
"order_id",
"updated_at"),
256))
# Convert to DynamoDB JSON format expected by the Hadoop OutputFormat
def
to_ddb_item(
row):
# Minimal example for a couple of fields
return {
"order_id": {
"S":
str(row[
"order_id"])},
"status": {
"S":
str(row[
"status"])},
"amount": {
"N":
str(row[
"amount"])},
"updated_at": {
"S":
str(row[
"updated_at"])},
"idem": {
"S": row[
"idempotency_key"]}
}
items = df.rdd.
map(
lambda r: (
"", to_ddb_item(r)))
(items.saveAsNewAPIHadoopDataset(conf={
"dynamodb.output.tableName":
"Orders",
"dynamodb.region":
"us-east-2",
# throttle to protect WCUs
"dynamodb.throughput.write.percent":
"0.5"
}, keyClass=
"org.apache.hadoop.io.Text",
valueClass=
"org.apache.hadoop.io.Text",
outputFormatClass=
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat"))
For DataFrame-native writes, the spark-dynamodb
library lets twtech
df.write.format("dynamodb").option("tableName","Orders").save()
.This
ensure twtech bundle the right jar for its Spark/Scala version.
C) Streaming (Kinesis) → Spark Structured Streaming
→ S3 parquet
from pyspark.sql
import SparkSession
from pyspark.sql.functions
import from_json, col, year, month, dayofmonth, to_timestamp
from pyspark.sql.types
import StructType, StructField, StringType, DoubleType, LongType
spark = SparkSession.builder.appName(
"DDBStreamToS3").getOrCreate()
schema = StructType([
StructField(
"pk", StringType()), StructField(
"sk", StringType()),
StructField(
"op", StringType()),
# INSERT/MODIFY/REMOVE
StructField(
"ts", LongType()),
StructField(
"amount", DoubleType())
])
raw = (spark.readStream
.
format(
"kinesis")
.option(
"streamName",
"ddb-changes")
.option(
"region",
"us-east-2")
.option(
"startingposition",
"LATEST")
.load())
parsed = (raw.selectExpr(
"CAST(data AS STRING) json")
.select(from_json(col(
"json"), schema).alias(
"r"))
.select(
"r.*")
.withColumn(
"event_time", to_timestamp((col(
"ts")/
1000).cast(
"timestamp")))
.withColumn(
"year", year(
"event_time"))
.withColumn(
"month", month(
"event_time"))
.withColumn(
"day", dayofmonth(
"event_time")))
query = (parsed.writeStream
.
format(
"parquet")
.option(
"checkpointLocation",
"s3://twtech-analytics-bucket/_chk/ddb/")
.option(
"path",
"s3://twtech-analytics-bucket/curated/ddb/")
.partitionBy(
"year",
"month",
"day")
.outputMode(
"append")
.start())
query.awaitTermination()
Throughput & performance tuning (Concerns)
DynamoDB
scan/write throttling
·
dynamodb.throughput.read.percent
and dynamodb.throughput.write.percent
(Hadoop connector) are your
guardrails. Start at 0.3–0.5
to share capacity with prod.
·
Prefer On-Demand tables during massive one-time backfills to
avoid manual capacity planning.
Parallelism
·
Export scans parallelize by segments. Use dynamodb.max.map.tasks
or Spark spark.default.parallelism
so each worker processes a segment.
·
Coalesce small files with spark.sql.files.maxRecordsPerFile
or post-write OPTIMIZE/compaction
(e.g., Delta/Iceberg).
Data
formats
·
Write Parquet with sensible types; avoid
everything-as-string.
·
Partition by low-cardinality columns (year/month/day
)
plus a domain split (e.g., tenant_id
) if queries benefit.
Idempotency
/ upserts
·
For S3→DDB imports, use a deterministic PK and
optionally a conditional
expression (e.g., attribute_not_exists(pk)
or updated_at >= :ts
) via a small Lambda consumer for
strict control, or implement merge logic in Spark then write.
Fault
tolerance
·
For streaming: set a durable checkpointLocation
in S3 and enable exactly-once
file sinks (Spark handles commit protocols).
·
For batch: make jobs re-runnable (write to a
temp path, then atomic rename).
Costs
·
Prefer EMR Serverless for burst batch; Spot for EMR on EC2.
·
Parquet + partition pruning cuts Athena/Redshift
Spectrum scan costs dramatically.
Security
·
Use SSE-KMS on S3 outputs; narrow KMS key policy to EMR
roles.
·
Private traffic via Gateway/Interface VPC Endpoints
(S3, DynamoDB, Kinesis).
·
Lock bucket prefixes with least-privilege IAM.
Operational templates (copy/paste)
spark-submit
(EMR on EC2)
spark-submit \
--deploy-mode cluster \
--conf spark.hadoop.dynamodb.region=
us-east-2
\
--conf spark.hadoop.dynamodb.throughput.read.percent=0.5 \
s3://
twtech-code
/ddb_export_to_parquet.py
EMR
Serverless application (Spark)
·
Create once, then submit jobs with different
entrypoints.
·
Size with max DPUs ~ # of input segments /
desired runtime.
Validation & publishing
1. Row counts: compare DDB
ScanCount
(or DescribeTable
item count) vs. Parquet row count.
2. Schema registry: Glue
Crawler -> Glue Catalog table for Athena.
3. Data quality: add Great
Expectations/Deequ step in the EMR job.
4. Promotion: write to s3://…/staging/
then move to …/curated/
on success.
Parquet / ORC:
In the context of DynamoDB ↔ S3
integration with EMR/Spark:
For:
·
Big
performance,
· Cost implications
for analytics, ETL,
· Data lake workflows.
1.
Why use Parquet or ORC instead of JSON/CSV
DynamoDB exports (native or via Streams) default to JSON
format, which is:
- Human readable
but storage-heavy (lots of repeated field names)
- Expensive to query
in Athena / Redshift Spectrum (no column pruning, no predicate pushdown)
- Slow to scan
because you must read all columns
Parquet and ORC
are columnar storage formats
that:
- Store values column-by-column instead of
row-by-row
- Compress better (especially for repeated values)
- Allow predicate pushdown (Athena reads only
necessary columns)
- Support schema evolution (adding columns over
time)
- Enable vectorized reads in Spark
Typical cost savings:
Querying a 100 GB JSON dataset in Athena could scan all 100 GB,
whereas a well-partitioned Parquet dataset might scan only 2–5 GB.
2. Parquet vs ORC — Quick Comparison
Feature |
Parquet |
ORC |
Best with |
Wide schemas, many numeric/string
cols |
Highly repetitive data, fewer
updates |
Compression default |
Snappy (fast), GZIP (smaller) |
Zlib (smaller), Zstd (newer,
smaller+fast) |
Predicate pushdown |
Yes |
Yes |
Athena support |
Yes |
Yes |
Redshift Spectrum |
Yes |
Yes |
Schema evolution |
Add columns easily |
Add columns easily |
File size sweet spot |
128–512 MB |
128–512 MB |
Rule of thumb:
- Use Parquet if you expect a lot of
Spark/Athena/Presto queries
- Use ORC if your workloads are Hive-heavy or highly repetitive text that benefits from dictionary encoding
3. Converting DynamoDB Data to Parquet/ORC with Spark
3.1 From DynamoDB to Parquet
from
pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, current_date
spark = SparkSession.builder.appName("DDBToParquet").getOrCreate()
# Read
from DynamoDB via Hadoop connector
items_rdd
= spark.sparkContext.newAPIHadoopRDD(
"org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text",
conf={
"dynamodb.region": "us-east-2",
"dynamodb.input.tableName":
"Orders",
"dynamodb.throughput.read.percent": "0.5"
}
)
df =
spark.read.json(items_rdd.map(lambda kv: kv[1]))
df_curated
= (
df.withColumn("load_date",
current_date())
.withColumn("year",
year("load_date"))
.withColumn("month",
month("load_date"))
.withColumn("day",
dayofmonth("load_date"))
)
(df_curated
.repartition("year",
"month", "day")
.write.mode("overwrite")
.partitionBy("year", "month",
"day")
.parquet("s3://twtech-s3bucket/dynamodb/orders/"))
3.2 From DynamoDB to ORC
(df_curated
.repartition("year",
"month", "day")
.write.mode("overwrite")
.option("orc.compress",
"ZSTD") # Smaller + faster
.partitionBy("year", "month",
"day")
.orc("s3://twtech-s3bucket/dynamodb/orders_orc/"))
4. Partitioning Strategy
To maximize performance:
- Partition by date-based columns (e.g., year/month/day)
- Add domain-based partitions if queries are
filtered by a specific key (e.g., region, tenant_id)
- Avoid too many small partitions (<128 MB files)
S3 Layout Example:
s3://twtech-s3bucket/dynamodb/orders/
year=2025/month=08/day=14/part-0000.parquet
year=2025/month=08/day=14/part-0001.parquet
5. Athena/Glue Integration
Once data is in
Parquet/ORC:
- Create/Update
Glue table with partition metadata:
CREATE
EXTERNAL TABLE orders (
order_id string,
status string,
amount double,
load_date date
)
PARTITIONED
BY (year int, month int, day int)
STORED
AS PARQUET
LOCATION
's3://twtech-s3bucket/dynamodb/orders/';
- MSCK REPAIR TABLE orders;
to pick up partitions
- Athena queries will now only scan needed partitions
& columns
6. Best Practices
- File size:
Aim for 128–512 MB to optimize scan throughput
- Compression:
- Parquet → snappy for speed, gzip for max compression
- ORC → zstd for balanced performance
- Schema evolution:
Always add columns at the end; avoid renaming
- Compaction:
For streaming pipelines, periodically compact small files into larger ones
- Data lake governance:
Consider Delta Lake or Apache Iceberg if twtech needs ACID updates
CID
updates in the DynamoDB → S3 (Parquet/ORC) world.
This is where things get tricky, because S3 alone is not ACID,
but a layer of
technology can be created on top to make it behave like a transactional table
store.
Step-by-step.
1. The Problem
When exporting or streaming DynamoDB data into
S3 in Parquet/ORC format:
- S3 is immutable for objects, but twtech can overwrite entire objects without
isolation guarantees
- Multiple writers can create inconsistent views
if updates overlap
- Deletes/updates require rewriting whole files (columnar formats aren’t
row-updatable)
- Athena/Presto/Redshift Spectrum will see “half-written”
data if queries overlap updates
So, twtech needs a
table format that adds transaction logs and schema evolution.
2. Solutions for ACID in S3
Three leading open formats needed:
Format |
ACID Support |
Time Travel |
Popular Engines |
Apache Hudi |
Yes |
Yes |
Spark, Hive, Flink, Presto |
Delta Lake |
Yes |
Yes |
Spark, Presto, Trino, Athena (via
connectors) |
Apache Iceberg |
Yes |
Yes |
Spark, Flink, Trino, Athena,
Snowflake |
2.1 How it works
These formats:
- Store data files (Parquet/ORC) in S3
- Maintain a transaction log (JSON/Parquet metadata) in the same bucket
- Writers append new files and update the log atomically
- Queries read the log to get a consistent snapshot
Update/Delete
workflow in ACID table formats:
- Write changed rows to new Parquet/ORC files
- Update the metadata log to mark old files as obsolete
- Readers see only the “committed” snapshot
3. DynamoDB → ACID Table on S3
A practical pattern looks like:
DynamoDB
Streams
↓
Spark
Structured Streaming (EMR/EKS/Serverless)
↓
Delta
Lake / Hudi / Iceberg table in S3
↓
Athena /
Presto / Redshift Spectrum for queries
4. Example: Delta Lake Upserts from DynamoDB
from
pyspark.sql import SparkSession
from
delta.tables import DeltaTable
from
pyspark.sql.functions import col
spark =
SparkSession.builder \
.appName("DDBToDeltaUpsert") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
\
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read
change records from DynamoDB Streams/Kinesis (example)
df_changes
= spark.readStream.format("kinesis") \
.option("streamName", "twtech-ddb-changes")
\
.option("region", "us-east-2")
\
.option("startingPosition", "TRIM_HORIZON")
\
.load()
#
Deserialize JSON into structured fields
# Assume
schema has: pk, updated_at, amount, status, op (I/M/D)
delta_path
= "s3://twtech-lakehouse/orders_delta/"
#
Initialize table if needed
try:
delta_table = DeltaTable.forPath(spark,
delta_path)
except:
df_changes.write.format("delta").save(delta_path)
delta_table = DeltaTable.forPath(spark,
delta_path)
# Merge
logic (Upsert + Delete)
(delta_table.alias("t")
.merge(df_changes.alias("s"), "t.pk
= s.pk")
.whenMatchedUpdateAll(condition="s.op
IN ('I','M')")
.whenMatchedDelete(condition="s.op =
'D'")
.whenNotMatchedInsertAll(condition="s.op
IN ('I')")
.execute())
5. Why This Matters
Without ACID:
- Deletes leave “zombie” rows
- Updates create multiple versions that analytics might
double-count
- Streaming pipelines risk partial results mid-write
With ACID (Delta/Hudi/Iceberg):
- Upserts & deletes
are safe
- Readers see consistent snapshots
- twtech gets time travel to replay the table
state at any timestamp
- ETL jobs can run incrementally without scanning all
files
6. Best Practices for ACID Tables in S3
- Partition wisely:
still use date + domain key to limit file rewrites
- Compaction:
run periodic optimize jobs to merge small files
- Schema evolution:
Add columns with nullable defaults
- Vacuum:
clean obsolete files after retention period
- Concurrency:
use a single job per table for writes to avoid conflicts
·
an AWS architecture diagram showing
An AWS architecture diagram showingDynamoDB Streams
→ Spark → Delta Lake/Hudi/Iceberg in S3 → Athena/Redshift
No comments:
Post a Comment