DynamoDB Integration - Deep Dive.
Scope:
- Intro,
- AWS DynamoDB ↔ S3 integration,
- The key integration points,
- Exporting DynamoDB to S3,
- Architecture Flow,
- Real-World Integration Patterns,
- Cost and Performance Considerations,
- Security & Compliance,
- Integration Table,
- Architecture of integration with S3,
- Insights,
- Architecture diagram of DynamoDB Streams → Spark → Delta Lake/Hudi/Iceberg in S3 → Athena/Redshift.
Intro:
- DynamoDB integrates with S3 and othe AWS services for various data management, analytics, and storage strategies.
- The primary methods are direct import and export using managed AWS features, streaming data changes, and offloading large attributes with help of attached policies.
1. Integration Overview
- DynamoDB and S3 don’t “natively” integrate in the same way like, S3 integrates with Glacier.
- However, AWS provides several official integration patterns for data export, import, backup, analytics, and archival.
The key integration points:
|
Use Case |
AWS Service / Feature |
Direction |
|
Full table export without
performance impact |
DynamoDB Export to S3 (Point-in-time recovery snapshot) |
DynamoDB → S3 |
|
Continuous stream processing to S3
for analytics or ETL |
DynamoDB Streams → Kinesis Data
Firehose → S3 |
DynamoDB → S3 |
|
Ad-hoc data extraction |
AWS Glue / Data Pipeline |
DynamoDB → S3 |
|
Backup for compliance or DR |
AWS Backup (supports DynamoDB to S3 storage backend) |
DynamoDB → S3 |
|
Restore from S3 into DynamoDB |
AWS Glue / Data Pipeline / Custom
Lambda Loader |
S3 → DynamoDB |
|
Nearline cold storage migration |
Custom Lambda/Kinesis pipeline |
DynamoDB ↔ S3 |
2. Exporting DynamoDB to S3
2.1 Native Export to S3
- Feature:
DynamoDB Export to S3
- Type:
Serverless snapshot-based export
- Data format:
Compressed, partitioned JSON or DynamoDB JSON (Amazon Ion)
- Source:
Any Point-in-Time Recovery (PITR) timestamp
- Performance:
No impact on the live table
- Trigger:
On-demand
- S3 Path:
s3://twtech-s3bucket/AWSDynamoDB/YYYY-MM-DDTHH-MM-SSZ/
- Cost:
Based on the amount of data scanned (per GB) + S3 storage
Flow:
- Enable PITR on twtech DynamoDB table.
- Run “Export to S3” via console, CLI, or SDK.
- AWS creates multiple export files in S3 (partitioned
for parallel processing in Athena/Glue).
Best for:
- Full backups
- Analytics ingestion into Athena/Redshift
- Archiving state of table at a certain time
2.2 DynamoDB Streams + Kinesis Firehose
If twtech needs continuous
export instead of point-in-time:
- DynamoDB Streams
captures item-level changes (INSERT,
MODIFY, REMOVE).
- Kinesis Data Firehose
batches and delivers changes to S3 in near real-time.
- Output format can be JSON, CSV, or Parquet.
Architecture Flow:
DynamoDB
→ DynamoDB Streams → Lambda / Firehose → S3
Pros(Benefits):
- Continuous, near-real-time pipeline
- Can transform data before storing in S3
- Supports partitioned storage for Athena queries
Cons(Limitations):
- Stream retention = 24 hours (need to consume promptly)
- Requires custom transform logic for upserts/deletes in
S3
2.3 AWS Glue or Data Pipeline
- Glue:
Can crawl DynamoDB, extract data, transform, and write to S3 in various
formats (Parquet, ORC, Avro, CSV, JSON).
- AWS Data Pipeline:
(older service) supports periodic export jobs from DynamoDB to S3.
3. Importing from S3 into DynamoDB
No one-click “import” exists yet in
the same way as export, but twtech has several patterns:
- AWS Glue ETL Job
- Read S3 data → Transform → Write to DynamoDB
- Good for bulk historical loads
- AWS Data Pipeline
- Predefined template for S3 → DynamoDB ingestion
- More old-school, less common now
- Custom Lambda/Kinesis Ingestion
- Use S3 event notifications to trigger Lambda, parse
new object, batch-write to DynamoDB
- EMR/Spark
- If twtech has large-scale batch data and need complex
transformations before ingest
4. Real-World Integration Patterns
4.1 Analytics Pipeline
- Goal:
Query DynamoDB data with SQL
- Pattern:
- DynamoDB Export to S3 (JSON/Parquet)
- S3 as source for Athena or Redshift Spectrum
- Benefits:
- Avoids expensive scans in DynamoDB
- Athena queries directly on S3 data at low cost
4.2 Archiving Old Items
- Goal:
Keep DynamoDB lean and cost-effective
- Pattern:
- TTL on items → Streams → Lambda → S3
- Store in compressed Parquet for cheap storage
- Benefit:
- DynamoDB remains small = cheaper provisioned capacity
- Historical data kept in S3 for compliance
4.3 Near Real-Time Event Storage
- Goal:
Keep append-only event log in S3
- Pattern:
- DynamoDB Streams → Firehose → S3 partitioned by year/month/day/hour
- Benefit:
- Perfect for replay or reprocessing events later
5. Cost and Performance Considerations
- Export to S3
= billed per GB scanned (storage in
S3 separate)
- Streams + Firehose
= billed for stream reads, Firehose delivery, S3 storage
- Glue
= billed per Data Processing Unit (DPU)
hour
- Athena
= billed per data scanned (optimize
with Parquet)
- S3 storage tiers
can be leveraged for cost optimization (Standard → Glacier)
6. Security & Compliance
- Always enable S3 server-side encryption (SSE-KMS)
for exports
- Use VPC Endpoints to avoid public traffic
- IAM policies should restrict access to the specific S3
bucket/prefix
- Consider Object Lock for immutability (WORM
compliance)
7. Integration Table
|
Method |
Real-time? |
Transformable? |
Performance impact on DynamoDB? |
Best Use Case |
|
Export to S3 |
No |
No |
None |
Backups, analytics |
|
Streams + Firehose |
Yes |
Yes (via Lambda) |
Low |
Event-driven ETL, CDC pipelines |
|
Glue |
No |
Yes |
Medium |
Large batch ETL |
|
Data Pipeline |
No |
Limited |
Medium |
Legacy batch jobs |
|
Custom Lambda |
Yes |
Yes |
Low |
Small-scale ingestion/export |
twtech-Insights:
- Continuous Stream Processing from DynamoDB to Amazon S3 for analytics or ETL....Extract, transform, & load (ETL) is the process of combining data from multiple sources into a large, central repository called a data warehouse.
- ETL uses a set of business rules to clean and organize raw data, prepare it for storage, data analytics, and machine learning (ML).
1. Why use this Continuous Stream Processing (benefits)
DynamoDB is fantastic for operational
workloads but not optimized for:
- Ad-hoc
analytical queries
- Historical
data storage
- Complex
transformations
The idea behind:
twteh uses DynamoDB Streams → Kinesis Data
Firehose → S3 to continuously capture every change, transform it, and store
it in an analytics-friendly format for services like Athena, Glue, or
Redshift Spectrum.
2. Core Architecture
DynamoDB
→ DynamoDB Streams → (Lambda transform optional) → Kinesis Data Firehose → S3 →
(Athena/Glue/Redshift)
2.1 Component Roles
- DynamoDB Streams
Captures INSERT, MODIFY, and REMOVE events for 24 hours after they occur.
Streams data is immutable, ordered per partition key. - Lambda (optional)
Intercepts stream records, performs: - Data enrichment
- Field filtering/mapping
- Format conversion (e.g., DynamoDB JSON → flat
JSON/Parquet)
- Kinesis Data Firehose
Buffers, batches, and delivers transformed data to S3. - Batch size: up to 5 MB
- Buffer interval: up to 900 seconds
- Optional format conversion: JSON → Parquet/ORC
- Can compress: GZIP, Snappy, Zip
- Amazon S3
Destination storage, typically partitioned:
s3://twtech-analyticsbucket/dynamodb/twtech_dynamoDB_table/year=YYYY/month=MM/day=DD/hour=HH/
3. Step-by-Step Setup
Step 1: Enable DynamoDB Streams
- Console → Table → Exports and Streams → Enable Streams
- Choose New and old images if you want full
before/after snapshots.
Step 2: Create a Firehose Delivery Stream
- Source: Direct PUT
- Destination: S3
- Enable data transformation if Lambda is used.
- Enable format conversion to Parquet for Athena
optimization.
Step 3: Lambda Stream Consumer (Optional)
- Trigger: DynamoDB Streams event
- Transform records (flatten, rename keys, mask sensitive
fields)
- Push transformed data to Firehose via PutRecordBatch
Step 4: S3 Partitioning
- Configure Firehose S3 prefix:
4. Querying the Data (Once data is in S3):
- Use AWS Glue Crawler to create schema
- Query with Amazon Athena:
- SELECT *
- FROM dynamodb_changes
- WHERE year = '2025' AND month = '08'
- Or load into Redshift Spectrum for joins with
other datasets
5. Best Practices
- Buffering:
Larger batch sizes = lower cost but higher latency.
- Format:
Use Parquet for up to 90% cost reduction in Athena.
- Security:
- S3 bucket policy locked to Firehose role
- SSE-KMS encryption
- VPC endpoints to avoid public internet
- Error Handling:
- Firehose automatically retries failed deliveries
- Failed records go to an S3 error prefix for
later replay
- Retention:
DynamoDB Streams keep data for 24h — ensure consumer processes within this
window.
6. Example Use Cases
- Real-Time Analytics:
Clickstream → DynamoDB → S3 → Athena dashboard refresh every few minutes.
- CDC (Change Data Capture): Replicate DynamoDB changes to a data lake for ML
training.
- Archiving for Compliance: Store all modifications in immutable S3 storage with
Object Lock.
Using Amazon EMR (Elastic MapReduce) / Apache Spark with DynamoDB ↔ S3.
- This covers the main patterns, exact connectors, runnable snippets, and the knobs that matter for scale, cost, and correctness.
When EMR/Spark is the right tool
Use
EMR/Spark when twech needs any of the following:
- Large-scale batch exports from DynamoDB to S3 in columnar formats (Parquet/ORC) with heavy transforms or joins.
- Bulk backfills from S3 into DynamoDB with dedupe/merge logic.
- Enrichment: joining DynamoDB data with other lake datasets (S3) before publishing curated tables.
- Streaming ETL at scale (Structured Streaming) when Kinesis → Firehose isn’t flexible enough.
Architecture patterns
1. Batch export: DynamoDB → Spark on EMR →
S3 (Parquet)
DynamoDB table ──(dynamodb-hadoop/spark-dynamodb)──> Spark jobs ──> S3 (Parquet, partitioned) ──> Athena/Redshift
- Read with the EMR DynamoDB Hadoop connector or spark-dynamodb library.
- Write partitioned by
year/month/day(and a domain partition such as region/tenant).
2. Bulk import: S3 (Parquet/JSON) → Spark
→ DynamoDB
Raw S3 data ──> Spark transform/validate ──(batch write)──> DynamoDB
Use conditional writes / idempotency keys to
avoid duplicates.
3. Streaming CDC enrichment (optional)
DynamoDB Streams → Kinesis → Spark Structured Streaming on EMR → S3 curated
- Spark handles upserts & late data; S3 holds the gold tables.
Connectors to use
EMR DynamoDB Hadoop Connector (built into EMR on EC2
& EKS):
- Read:
org.apache.hadoop.dynamodb.read.DynamoDBInputFormat - Write:
org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat - Works well for large scans and batch writes; integrates with EMRFS.
Spark-dynamodb (community lib; ergonomic DataFrame
API):
- Coordinates:
com.github.traviscrawford:spark-dynamodb_2.12:<version>(ororg.apache.sparkfork). - Lets twtech to
spark.read.format("dynamodb").
NB:
- For most EMR (Amazon Elastic MapReduce) on EC2/EKS stacks, the EMR DynamoDB connector is the safer default;
- use
spark-dynamodb...if twtech prefers DataFrame-first ergonomics.
Cluster choices
- EMR Serverless: no cluster mgmt, great for bursty batch. Simplest ops.
- EMR on EC2: best for full control, spot fleets, HDFS cache.
- EMR on EKS: if twtech already operate Kubernetes and want shared pools.
Rule of thumb:
- start with EMR Serverless for batch ETL, then move to EC2/EKS if twtech needs fine-grained tuning or very long jobs.
Minimal, runnable examples
A) Batch export DynamoDB → Parquet on S3 (PySpark on EMR)
- Using the EMR DynamoDB Hadoop connector
# pyspark script: ddb_export_to_parquet.pyfrom pyspark.sql import SparkSessionspark = (SparkSession.builder .appName("DDBExportToParquet") .getOrCreate())table = "twtechOrders"region = "us-east-2"s3_out = "s3://twtech-analytics-bucket/dynamodb/twtechorders/"# Read via Hadoop input format -> DataFrameitems_rdd = (spark.sparkContext .newAPIHadoopRDD( "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat", "org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text", conf={ "dynamodb.region": us-east-2, "dynamodb.input.tableName": twtechtable, # Optional parallelism controls "dynamodb.throughput.read.percent": "0.5", # use 50% of table RCUs "dynamodb.export.format": "dynamodb" # preserves types in JSON }))# Convert to DataFrame (values are DynamoDB JSON strings)from pyspark.sql.types import StringTypefrom pyspark.sql import Rowdf = spark.read.json(items_rdd.map(lambda kv: kv[1]), multiLine=False)# Optional transforms: select/rename/flatten, cast types, add load_datefrom pyspark.sql.functions import current_date, year, month, dayofmonthcurated = (df .withColumn("load_date", current_date()) .withColumn("year", year("load_date")) .withColumn("month", month("load_date")) .withColumn("day", dayofmonth("load_date")))(curated .repartition("year","month","day") .write.mode("overwrite") .partitionBy("year","month","day") .parquet(s3_out))
Submit on
EMR Serverless (Sample job payload):
{ "name": "ddb-export-parquet", "applicationId": "twtech_APP_ID", "executionRoleArn": "arn:aws:iam::accountID:role/EMRServerlessJobRole", "jobDriver": { "sparkSubmit": { "entryPoint": "s3://twtech-code/ddb_export_to_parquet.py", "sparkSubmitParameters": "--conf spark.sql.sources.partitionOverwriteMode=dynamic \--conf spark.hadoop.dynamodb.throughput.read.percent=0.5" } }, "configurationOverrides": { "applicationConfiguration": [{ "classification": "spark-defaults", "properties": { "spark.serializer": "org.apache.spark.serializer.KryoSerializer" } }] }}
B) Bulk import S3 → DynamoDB (PySpark)
# pyspark script: s3_to_ddb_bulk_import.pyfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import sha2, concat_ws, colspark = SparkSession.builder.appName("S3ToDDB").getOrCreate()df = spark.read.parquet("s3://twtech-landing/twtechorders_2025/*")# Build idempotency key (twtech-key)df = df.withColumn("idempotency_key", sha2(concat_ws("|", "order_id", "updated_at"), 256))# Convert to DynamoDB JSON format expected by the Hadoop OutputFormatdef to_ddb_item(row):# Minimal example for a couple of fields return { "order_id": {"S": str(row["order_id"])}, "status": {"S": str(row["status"])}, "amount": {"N": str(row["amount"])}, "updated_at": {"S": str(row["updated_at"])}, "idem": {"S": row["idempotency_key"]} }items = df.rdd.map(lambda r: ("", to_ddb_item(r)))(items.saveAsNewAPIHadoopDataset(conf={ "dynamodb.output.tableName": "twtechOrders", "dynamodb.region": "us-east-2", # throttle to protect WCUs "dynamodb.throughput.write.percent": "0.5"}, keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text", outputFormatClass="org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat"))
For DataFrame-native writes, the spark-dynamodb library lets twtech
df.write.format("dynamodb").option("twtechtable","twtechOrders").save().
This
ensure twtech bundle the right jar for its Spark/Scala version.
C) Streaming (Kinesis) → Spark Structured Streaming
→ S3 parquet
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import from_json, col, year, month, dayofmonth, to_timestampfrom pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongTypespark = SparkSession.builder.twtechappName("DDBStreamToS3").getOrCreate()schema = StructType([ StructField("pk", StringType()), StructField("sk", StringType()), StructField("op", StringType()), # INSERT/MODIFY/REMOVE StructField("ts", LongType()), StructField("amount", DoubleType())])raw = (spark.readStream .format("kinesis") .option("streamName", "twtech-ddb-changes") .option("region", "us-east-2") .option("startingposition", "LATEST") .load())parsed = (raw.selectExpr("CAST(data AS STRING) json") .select(from_json(col("json"), schema).alias("r")) .select("r.*") .withColumn("event_time", to_timestamp((col("ts")/1000).cast("timestamp"))) .withColumn("year", year("event_time")) .withColumn("month", month("event_time")) .withColumn("day", dayofmonth("event_time")))query = (parsed.writeStream .format("parquet") .option("checkpointLocation", "s3://twtech-analytics-bucket/_chk/ddb/") .option("path", "s3://twtech-analytics-bucket/curated/ddb/") .partitionBy("year","month","day") .outputMode("append") .start())query.awaitTermination()
Throughput & performance tuning (Concerns)
DynamoDB
scan/write throttling
-
dynamodb.throughput.read.percentanddynamodb.throughput.write.percent(Hadoop connector) are twtech guardrails. Start at 0.3–0.5 to share capacity with prod. - Prefer On-Demand tables during massive one-time backfills to avoid manual capacity planning.
Parallelism
- Export scans parallelize by segments. Use
dynamodb.max.map.tasksor Sparkspark.default.parallelismso each worker processes a segment. - Coalesce small files with
spark.sql.files.maxRecordsPerFileor post-write OPTIMIZE/compaction (e.g., Delta/Iceberg).
Data
formats
- Write Parquet with sensible types; avoid everything-as-string.
- Partition by low-cardinality columns (
year/month/day) plus a domain split (e.g.,tenant_id) if queries benefit.
Idempotency
/ upserts
- For S3→DDB imports, use a deterministic PK and
optionally a conditional
expression (e.g.,
attribute_not_exists(pk)orupdated_at >= :ts) via a small Lambda consumer for strict control, or implement merge logic in Spark then write.
Fault
tolerance
- For streaming: set a durable
checkpointLocationin S3 and enable exactly-once file sinks (Spark handles commit protocols). - For batch: make jobs re-runnable (write to a temp path, then atomic rename).
Costs
- Prefer EMR Serverless for burst batch; Spot for EMR on EC2.
- Parquet + partition pruning cuts Athena/Redshift Spectrum scan costs dramatically.
Security
- Use SSE-KMS on S3 outputs; narrow KMS key policy to EMR roles.
- Private traffic via Gateway/Interface VPC Endpoints (S3, DynamoDB, Kinesis).
- Lock bucket prefixes with least-privilege IAM.
Operational templates (copy/paste)
spark-submit
(EMR on EC2)
spark-submit \ --deploy-mode cluster \ --conf spark.hadoop.dynamodb.region=us-east-2 \ --conf spark.hadoop.dynamodb.throughput.read.percent=0.5 \ s3://twtech-code/ddb_export_to_parquet.py
EMR
Serverless application (Spark)
- Create once, then submit jobs with different entrypoints.
- Size with max DPUs ~ # of input segments / desired runtime.
Validation & publishing
1. Row counts: compare DDB
ScanCount
(or DescribeTable
item count) vs. Parquet row count.
2. Schema registry: Glue
Crawler -> Glue Catalog table for Athena.
3. Data quality: add Great
Expectations/Deequ step in the EMR job.
4. Promotion: write to s3://…/staging/
then move to …/curated/
on success.
Parquet / ORC:
In the context of DynamoDB ↔ S3
integration with EMR/Spark:
For:
-
Big
performance,
- Cost implications for analytics, ETL,
- Data lake workflows.
1.
Why use Parquet or ORC instead of JSON/CSV
NB:
DynamoDB exports (native or via Streams) default to JSON
format, which is:
- Human readable
but storage-heavy (lots of repeated field names)
- Expensive to query
in Athena / Redshift Spectrum (no column pruning, no predicate pushdown)
- Slow to scan
because twtech must read all columns
Parquet and ORC
are columnar storage formats
that:
- Store values column-by-column instead of
row-by-row
- Compress better (especially for repeated values)
- Allow predicate pushdown (Athena reads only
necessary columns)
- Support schema evolution (adding columns over
time)
- Enable vectorized reads in Spark
Typical cost savings:
- Querying a 100 GB JSON dataset in Athena could scan all 100 GB,
- whereas a well-partitioned Parquet dataset might scan only 2–5 GB.
2. Parquet vs ORC — Quick Comparison
|
Feature |
Parquet |
ORC |
|
Best with |
Wide schemas, many numeric/string
cols |
Highly repetitive data, fewer
updates |
|
Compression default |
Snappy (fast), GZIP (smaller) |
Zlib (smaller), Zstd (newer,
smaller+fast) |
|
Predicate pushdown |
Yes |
Yes |
|
Athena support |
Yes |
Yes |
|
Redshift Spectrum |
Yes |
Yes |
|
Schema evolution |
Add columns easily |
Add columns easily |
|
File size sweet spot |
128–512 MB |
128–512 MB |
Rule of thumb:
- Use Parquet if twtech expects a lot of
Spark/Athena/Presto queries
- Use ORC if twtech workloads are Hive-heavy or highly repetitive text that benefits from dictionary encoding
3. Converting DynamoDB Data to Parquet/ORC with Spark
3.1 From DynamoDB to Parquet
from
pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, current_date
spark = SparkSession.builder.appName("DDBToParquet").getOrCreate()
# Read
from DynamoDB via Hadoop connector
items_rdd
= spark.sparkContext.newAPIHadoopRDD(
"org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text",
conf={
"dynamodb.region": "us-east-2",
"dynamodb.input.tableName":
"twtechOrders",
"dynamodb.throughput.read.percent": "0.5"
}
)
df =
spark.read.json(items_rdd.map(lambda kv: kv[1]))
df_curated
= (
df.withColumn("load_date",
current_date())
.withColumn("year",
year("load_date"))
.withColumn("month",
month("load_date"))
.withColumn("day",
dayofmonth("load_date"))
)
(df_curated
.repartition("year",
"month", "day")
.write.mode("overwrite")
.partitionBy("year", "month",
"day")
.parquet("s3://twtech-s3bucket/dynamodb/twtechorders/"))
3.2 From DynamoDB to ORC
(df_curated
.repartition("year",
"month", "day")
.write.mode("overwrite")
.option("orc.compress",
"ZSTD") # Smaller + faster
.partitionBy("year", "month",
"day")
.orc("s3://twtech-s3bucket/dynamodb/twtechorders_orc/"))
4. Partitioning Strategy
To maximize performance:
- Partition by date-based columns (e.g., year/month/day)
- Add domain-based partitions if queries are
filtered by a specific key (e.g., region, tenant_id)
- Avoid too many small partitions (<128 MB files)
S3 Layout Example:
s3://twtech-s3bucket/dynamodb/orders/
year=2025/month=08/day=14/part-0000.parquet
year=2025/month=08/day=14/part-0001.parquet
5. Athena/Glue Integration
Once data is in
Parquet/ORC:
- Create/Update
Glue table with partition metadata:
CREATE
EXTERNAL TABLE twtechorders (
order_id string,
status string,
amount double,
load_date date
)
PARTITIONED
BY (year int, month int, day int)
STORED
AS PARQUET
LOCATION
's3://twtech-s3bucket/dynamodb/twtechorders/';
- MSCK REPAIR TABLE orders;
to pick up partitions
- Athena queries will now only scan needed partitions
& columns
6. Best Practices
- File size:
Aim for 128–512 MB to optimize scan throughput
- Compression:
- Parquet → snappy for speed, gzip for max compression
- ORC → zstd for balanced performance
- Schema evolution:
Always add columns at the end; avoid renaming
- Compaction:
For streaming pipelines, periodically compact small files into larger ones
- Data lake governance:
Consider Delta Lake or Apache Iceberg if twtech needs ACID updates
NB:
- CID updates in the DynamoDB → S3 (Parquet (Apache Parquet) Parquet / ORC (Optimized Row Columnar) ORC world.
- This is where things get tricky, because S3 alone is not ACID,
but a layer of
technology can be created on top to make it behave like a transactional table
store.
Step-by-step.
1. The Problem
When exporting or streaming DynamoDB data into
S3 in Parquet/ORC format:
- S3 is immutable for objects, but twtech can overwrite entire objects without
isolation guarantees
- Multiple writers can create inconsistent views
if updates overlap
- Deletes/updates require rewriting whole files (columnar formats aren’t
row-updatable)
- Athena/Presto/Redshift Spectrum will see “half-written”
data if queries overlap updates
NB:
- twtech needs a table format that adds transaction logs and schema evolution.
2. Solutions for ACID in S3
Three leading open formats needed:
|
Format |
ACID Support |
Time Travel |
Popular Engines |
|
Apache Hudi |
Yes |
Yes |
Spark, Hive, Flink, Presto |
|
Delta Lake |
Yes |
Yes |
Spark, Presto, Trino, Athena (via
connectors) |
|
Apache Iceberg |
Yes |
Yes |
Spark, Flink, Trino, Athena,
Snowflake |
2.1 How it works
These formats:
- Store data files (Parquet/ORC) in S3
- Maintain a transaction log (JSON/Parquet metadata) in the same bucket
- Writers append new files and update the log atomically
- Queries read the log to get a consistent snapshot
Update/Delete
workflow in ACID table formats:
- Write changed rows to new Parquet/ORC files
- Update the metadata log to mark old files as obsolete
- Readers see only the “committed” snapshot
3. DynamoDB → ACID Table on S3
A practical pattern looks like:
4. Example: Delta Lake Upserts from DynamoDB
from
pyspark.sql import SparkSession
from
delta.tables import DeltaTable
from pyspark.sql.functions import col
spark =
SparkSession.builder \
.appName("DDBToDeltaUpsert") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
\
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read
change records from DynamoDB Streams/Kinesis (example)
df_changes
= spark.readStream.format("kinesis") \
.option("streamName", "twtech-ddb-changes")
\
.option("region", "us-east-2")
\
.option("startingPosition", "TRIM_HORIZON")
\
.load()
#
Deserialize JSON into structured fields
# Assume schema has: pk, updated_at, amount, status, op (I/M/D)
delta_path = "s3://twtech-lakehouse/twtechorders_delta/"
#
Initialize table if needed
try:
delta_table = DeltaTable.forPath(spark,
delta_path)
except:
df_changes.write.format("delta").save(delta_path)
delta_table = DeltaTable.forPath(spark, delta_path)
# Merge
logic (Upsert + Delete)
(delta_table.alias("t")
.merge(df_changes.alias("s"), "t.pk
= s.pk")
.whenMatchedUpdateAll(condition="s.op
IN ('I','M')")
.whenMatchedDelete(condition="s.op =
'D'")
.whenNotMatchedInsertAll(condition="s.op
IN ('I')")
.execute())
5. Why This Matters
Without ACID:
- Deletes leave “zombie” rows
- Updates create multiple versions that analytics might
double-count
- Streaming pipelines risk partial results mid-write
With ACID (Delta/Hudi/Iceberg):
- Upserts & deletes
are safe
- Readers see consistent snapshots
- twtech gets time travel to replay the table
state at any timestamp
- ETL jobs can run incrementally without scanning all
files
6. Best Practices for ACID Tables in S3
- Partition wisely:
still use date + domain key to limit file rewrites
- Compaction:
run periodic optimize jobs to merge small files
- Schema evolution:
Add columns with nullable defaults
- Vacuum:
clean obsolete files after retention period
- Concurrency:
use a single job per table for writes to avoid conflicts.
AWS architecture diagram showingDynamoDB Streams → Spark → Delta Lake/Hudi/Iceberg in S3 → Athena/Redshift
No comments:
Post a Comment