Amazon Rekognition - Deep Dive.
Scope:
- Intro,
- Key Features and Use Cases,
- Link to official documentation,
- The concept: Amazon Rekognition,
- Core Features (deep Dive),
- Architecture & Workflow,
- Integration with Other AWS Services,
- Pricing Model,
- Security & Compliance,
- Common Use Cases,
- Strengths & Limitations,
- Alternatives & When to Use Them,
- Rule of Thumb,
- Best Practices,
- Insights.
Intro:
- Amazon Rekognition is a deep learning-powered cloud service from Amazon Web Services (AWS).
- Amazon Rekognition enables developers to add image and video analysis to their applications through simple API calls.
- Amazon Rekognition can be used to identify objects, scenes, faces, celebrities, and inappropriate content, as well as to extract text from images and videos.
- Object, Scene, and Concept Detection: Identify thousands of objects, scenes, and concepts in images and videos (e.g., "car," "beach," "sunset").
- Content Moderation: Automatically detect inappropriate, unwanted, or offensive content (images and videos) to create a safer user experience and ensure brand safety.
- Facial Analysis and Recognition: Detect faces in images and videos, analyze facial attributes (like age range, emotions, and presence of glasses), compare faces for similarity, and search for specific faces within a large collection.
- Identity Verification: Use face comparison and liveness detection for user onboarding and authentication workflows to remotely verify the identity of users.
- Celebrity Recognition: Identify celebrities in images and videos.
- Text Detection: Extract text from images and video streams, which can be useful for use cases like license plate reading or analyzing social media posts.
- Custom Labels: Train custom machine learning models to detect specific objects or images tailored to business needs without requiring prior machine learning expertise.
- Personal Protective Equipment (PPE) Detection: Detect if people in an image are wearing safety gear like face masks, gloves, or hard hats.
- Link to official documentation Amazon Rekognition AWS page.
1. The concept: Amazon
Rekognition
- Amazon Rekognition is a fully managed computer vision service provided by AWS.
- Amazon Rekognition uses deep learning models to analyze images and videos, enabling developers to build applications that can recognize objects, people, text, activities, and inappropriate content without requiring machine learning expertise.
- Amazon Rekognition abstracts away the heavy lifting of training models and provides APIs for quick integration.
2. Core Features
- Rekognition provides both image APIs and video APIs (real-time or batch).
A. Image Analysis
- Object & Scene Detection
Detects thousands of objects (car, phone, dog, tree, etc.) and scenes (beach, city, office). - Face Detection & Analysis
- Bounding box for faces.
- Facial landmarks (eyes, nose, mouth).
- Attributes like emotions (happy, sad, surprised),
gender, age range, beard, eyeglasses.
- Face Comparison
Compares a face in an image with one or more faces (useful for 1:1 verification). - Face Search (Collection-based)
- Stores indexed faces in a Face Collection.
- Performs 1:N matching to find similar faces (like
searching through a database).
- Celebrity Recognition
Detects well-known personalities. - Text in Image (OCR)
Detects printed and handwritten text. - Unsafe Content Detection
Identifies explicit, violent, or suggestive content (often used for moderation).
B. Video Analysis
- Object & activity tracking in real time.
- Face detection, recognition, and tracking across frames.
- Person pathing (tracking movements).
- Celebrity recognition in video.
- Content moderation at scale.
Rekognition video analysis is
asynchronous:
- twtech uploads video to S3.
- Call API (e.g., StartLabelDetection).
- Rekognition processes and sends results via Amazon
SNS or stores them in Amazon S3.
3. Architecture & Workflow
Typical
Flow:
- Image/Video Ingestion
Upload to Amazon S3 or stream via Kinesis Video Streams. - Analysis
Use Rekognition APIs (sync for images, async for videos). - Result Delivery
- API response (JSON).
- SNS topic for async jobs.
- Can store in DynamoDB, Elasticsearch, or S3 for
querying.
- Integration
Connect to downstream applications (security dashboards, moderation pipelines, etc.).
4. Integration with Other AWS Services
- Amazon S3
→ storage for images/videos.
- Kinesis Video Streams → real-time video input.
- Lambda → trigger-based automation after analysis.
- DynamoDB → store indexed face metadata.
- ElasticSearch / OpenSearch → search across results.
- SNS / SQS → notification and queue handling.
- Step Functions → orchestration of multi-step workflows.
5. Pricing Model
- Pricing is pay-as-you-go, with free tier options: (As of 2025) — may vary by region, check AWS Pricing page
- Image APIs:
~$0.001 per image for label detection.
- Face Comparison: ~$0.001 per comparison.
- Face Search: ~$0.01 per 1,000 images.
- Text Detection: ~$0.0015 per image.
- Unsafe Content Detection: ~$0.001 per image.
- Video APIs: Charged per minute of video processed (varies by feature, ~$0.10–$0.12/minute).
Optimization Tip:
- Use bounding box cropping and batching to reduce costs.
6. Security & Compliance
- Encryption:
Supports KMS for at-rest data.
- IAM Integration: Fine-grained permissions.
- Data Privacy: AWS claims images/videos are not stored beyond processing unless explicitly requested (e.g., face collections).
- Compliance: HIPAA eligible, GDPR aligned (but responsibility for usage is customer’s).
7. Common Use Cases
- Security & Surveillance: Identity verification, intruder detection.
- User Authentication: KYC (Know your Customer) processes, mobile app logins.
- Content Moderation: Social media, dating apps, marketplaces.
- Retail Analytics: Foot traffic analysis, customer demographics.
- Media & Entertainment: Celebrity recognition, automatic metadata tagging.
- Law Enforcement: Missing persons, criminal investigations (controversial, requires ethical considerations).
8. Strengths & Limitations
✅ Strengths
No ML expertise required.- Scales easily (serverless).
- Pre-trained on massive datasets.
- Broad feature set.
️ Limitations
- Accuracy may vary by demographic groups (bias
concerns).
- Costs can grow quickly at scale.
- Limited customization (compared to custom models in Amazon SageMaker).
- Some privacy/ethical concerns (e.g., facial recognition misuse).
9. Alternatives & When to Use Them
- Amazon SageMaker + CV Models → If twtech needs custom-trained models.
- Open-source (YOLO, OpenCV, Detectron2) → If twtech wants full control, but must manage infra.
Rule of Thumb:
- Use Rekognition for fast prototyping, scalability,
and common CV tasks.
- Use SageMaker or custom ML when domain-specific accuracy is critical.
10. Best Practices
- Store results in a database (not just rely on API
calls).
- Use confidence thresholds (e.g., only accept label detection >90%).
- Consider batching requests to reduce costs.
- Regularly evaluate fairness & bias.
- For compliance-heavy apps, maintain an audit trail of decisions.
Insights
Scope:
- A practical implementation guide:
- Sample architecture,
- IAM bits,
- Step-by-step flows for image (sync) and video (async),
- Use cases,
- Working code
examples in Python (boto3),
- Working code examples in JavaScript (AWS SDK v3)
- Plus notes on testing,
- Retries,
- Scaling,
- Cost-control.
A , Sample architecture (text diagram)
B. Key AWS resources & permissions (high
level)
- S3 buckets for images/videos (separate production/test buckets recommended).
- IAM role for Lambdas:
rekognition:*(or scoped actions).-
s3:GetObject, s3:PutObject,sns:Publish,dynamodb:PutItem(if storing results),kms:Decryptif using KMS.- SNS topic for Rekognition asynchronous job completion.
- (Optional) Kinesis Video Streams + IAM for streaming pipelines.
- Face Collection(s) for 1:N face search.
# Minimal IAM policy snippet (attach to Lambda role)
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "rekognition:DetectLabels", "rekognition:DetectFaces", "rekognition:CompareFaces", "rekognition:IndexFaces", "rekognition:CreateCollection", "rekognition:SearchFacesByImage", "rekognition:StartLabelDetection", "rekognition:GetLabelDetection", "rekognition:StartFaceSearch", "rekognition:GetFaceSearch" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject" ], "Resource": "arn:aws:s3:::twtech-s3bucket-name/*" }, { "Effect": "Allow", "Action": [ "sns:Publish", "sns:Subscribe", "sns:ListSubscriptionsByTopic" ], "Resource": "arn:aws:sns:us-east-2:accountID:twtech-Topic" }, { "Effect": "Allow", "Action": [ "dynamodb:PutItem", "dynamodb:GetItem", "dynamodb:Query" ], "Resource": "arn:aws:dynamodb:us-east-2:accountID:table/twtech-Table" } ]}
# NB:
- The above can be Scope by resource and region in production.
C. Image pipeline (sync) — Flow
1. Client
uploads image to S3 (pre-signed URL or direct from client).
2. Lambda
triggered (S3 event) or client calls backend API directly.
3. Call
Rekognition sync APIs (e.g., DetectLabels,
DetectFaces, IndexFaces, DetectText).
4. Store
JSON results in DynamoDB / S3.
5. Use
confidence thresholds and logic in Lambda to act (moderation, notifications,
auth).
# Python (boto3) — detect labels + faces
# requirements: boto3import jsonimport boto3from botocore.exceptions import ClientError rek = boto3.client('rekognition')s3 = boto3.client('s3')def analyze_image_from_s3(bucket, key): image = {'S3Object': {'Bucket': bucket, 'twtech-s3bucket-Name':twtech-key}} # Detect labels labels_resp = rek.detect_labels(Image=image, MaxLabels=20, MinConfidence=75.0) # Detect faces faces_resp = rek.detect_faces(Image=image, Attributes=['ALL']) return {'Labels': labels_resp.get('Labels',[]), 'Faces': faces_resp.get('FaceDetails',[])}# Sample usage (could be called inside a Lambda handler)if __name__ == "__main__": bucket = "twtech-s3bucket" key = "uploads/photo.jpg" try: result = analyze_image_from_s3(bucket, key) print(json.dumps(result, indent=2)) except ClientError as e: print("Error:", e)
# Nodejs (AWS SDK v3) — detect labels
# package @aws-sdk/client-rekognitionimport { RekognitionClient, DetectLabelsCommand } from "@aws-sdk/client-rekognition";const client = new RekognitionClient({ region: "us-east-2" });export async function detectLabelsFromS3(bucket, key) { const params = { Image: { S3Object: { Bucket: bucket, twtech-s3bucket-Name: twtech-key } }, MaxLabels: 20, MinConfidence: 75.0 }; const cmd = new DetectLabelsCommand(params); const resp = await client.send(cmd); return resp.Labels;}# Sample usage(async () => { const labels = await detectLabelsFromS3("twtech-s3bucket", "uploads/photo.jpg"); console.log(JSON.stringify(labels, null, 2));})();
D. Face collections (indexing + searching)
- Create a collection (
CreateCollection). -
IndexFacesto store face metadata (recommended: store externalImageId). -
SearchFacesByImageorSearchFaces/SearchFacesByImagefor match retrieval.
# Python: create collection, index face, search
def create_collection(collection_id): try: rek.create_collection(CollectionId=collection_id) except ClientError as e: if e.response['Error']['Code'] != 'ResourceAlreadyExistsException': raisedef index_face(bucket, key, collection_id, external_id=None): image = {'S3Object': {'Bucket': bucket, 'twtech-s3bucket-Name': twtech-key}} resp = rek.index_faces(Image=image, CollectionId=collection_id, \ExternalImageId=external_id or key, DetectionAttributes=['DEFAULT']) return resp.get('FaceRecords',[])def search_face_by_image(bucket, key, collection_id, threshold=90, max_faces=5): image = {'S3Object': {'Bucket': bucket, 'twtech-s3bucket-Name': twtech-key}} resp = rek.search_faces_by_image(CollectionId=collection_id, Image=image, \FaceMatchThreshold=threshold, MaxFaces=max_faces) return resp.get('FaceMatches',[])
E. Video pipeline (async) — Flow
1. Upload
video to S3.
2. Start
async job (e.g., StartLabelDetection,
StartFaceSearch, StartFaceDetection) — pass NotificationChannel (SNS topic ARN +
role ARN).
3. Rekognition
processes video (minutes depend on video length).
4. Rekognition
publishes a job-completion message to SNS.
5. SNS
triggers a Lambda that calls GetLabelDetection
/ GetFaceSearch with JobId to fetch results, then persists
them.
Key: SNS notification format
- SNS message contains
JobIdandStatus. - Always check
Status == "SUCCEEDED" before calling
Get*.
# Node: start label detection (example)
import { RekognitionClient, StartLabelDetectionCommand, GetLabelDetectionCommand } \from "@aws-sdk/client-rekognition";const client = new RekognitionClient({ region: "us-east-2" });export async function startLabelDetection(bucket, key, snsTopicArn, roleArn) { const params = { Video: { S3Object: { Bucket: bucket, twtech-s3bucket-Name: twtech-key } }, NotificationChannel: { SNSTopicArn: snsTopicArn, RoleArn: roleArn }, MinConfidence: 70.0, }; const cmd = new StartLabelDetectionCommand(params); const resp = await client.send(cmd); return resp.JobId; // Save this JobId if you want to poll}// In the SNS triggered Lambda handler: call GetLabelDetection with JobIdexport async function getLabelsForJob(jobId, maxResults=1000) { const cmd = new GetLabelDetectionCommand({ JobId: jobId, \MaxResults: maxResults, SortBy: "TIMESTAMP" }); const resp = await client.send(cmd); return resp; // contains Labels[] with Timestamp & Label properties}
# Python: SNS handler skeleton for job completion
import jsonimport boto3rek = boto3.client('rekognition')def lambda_handler(event, context): # event is SNS -> message is a JSON string for record in event['Records']: sns_msg = record['Sns']['Message'] msg = json.loads(sns_msg) job_id = msg.get('JobId') or msg.get('jobId') status = msg.get('Status') or msg.get('status') if status != 'SUCCEEDED': print("Job failed or in progress:", msg) continue # example for label detection resp = rek.get_label_detection(JobId=job_id, SortBy='TIMESTAMP') # Persist resp to S3/DynamoDB print("Got label detection:", resp.get('Labels',[])[:3])
F. Real-time streaming (Kinesis Video
Streams + Rekognition Streaming)
- Use Kinesis Video Streams to push live RTSP/WebRTC streams.
- Rekognition supports real-time face recognition and label detection via its streaming API (requires different setup and long-running consumers).
- Typical pattern: Kinesis -> Rekognition streaming -> process with WebSocket or Lambda consumer.
G. Error handling, retries, and throttling
- Use exponential backoff and jitter on retryable errors (HTTP 5xx, 429).
- Respect Rekognition quotas (requests/sec and
minutes for video). Catch
ProvisionedThroughputExceededExceptionorThrottlingException. - For large batches, shard work across Lambdas and/or use Step Functions.
- Use idempotency where possible (store JobId and job state).
H. Testing & validation tips
- Start with a small dataset and confidence thresholds tuned to twtech needs.
- Use
MinConfidenceor twtech owns post-filtering (e.g., only acceptLabel.Confidence > 90). - Run fairness / bias checks with a representative dataset.
- For face matching, evaluate false acceptance rate (FAR) and false rejection rate (FRR) on your dataset.
- Log input metadata (image size, source) to troubleshoot edge cases.
I. Cost & performance suggestions
- Use cropped images or reduced resolution for non-critical tasks to lower cost & latency.
- For large-volume jobs, consider batching (images) or using async video jobs instead of frame-by-frame sync processing.
- Cache results for repeated queries (don’t call Rekognition for the same image multiple times).
J. Sample full Lambda (image ->
moderate content action) Python script
import json, osimport boto3rek = boto3.client('rekognition')s3 = boto3.client('s3')dynamodb = boto3.resource('dynamodb')TABLE_NAME = os.environ.get('RESULTS_TABLE')def lambda_handler(event, context): # Expect S3 put event for rec in event['Records']: bucket = rec['s3']['bucket']['twtech-s3bucket-name'] key = rec['s3']['object']['key'] image = {'S3Object': {'Bucket': bucket, 'twtech-s3bucket-Name': twtech-key}} labels = rek.detect_labels(Image=image, MaxLabels=50, MinConfidence=50.0).get('Labels',[]) unsafe = rek.detect_moderation_labels(Image=image, MinConfidence=60.0).get('ModerationLabels', []) item = { 'id': f"{bucket}/{key}", 'labels': labels, 'moderation': unsafe } # write to DynamoDB for later UI consumption table = dynamodb.Table (TABLE_NAME) table.put_item(Item=item) return {"status": "ok"}
K. Useful production practices
- Enable server-side encryption for S3 and use KMS.
- Audit logs: enable CloudTrail for Rekognition usage.
- Version twtech Face Collections naming (e.g.,
employees-v1) and rotate/cleanup stale faces. - Implement a data retention policy (delete faces/collections if required by privacy rules).
- Provide an opt-out / human review path for sensitive decisions.
L. Quick checklist for deploying a simple
prototype
1. Create
S3 bucket(s).
2. Create
an SNS topic and subscription (Lambda).
3. Create
Lambda functions: image handler, video job starter, SNS job handler.
4. Attach
IAM roles (scoped).
5. Create
Rekognition collection (if using faces).
6. Test
with example images and a short video.
7. Inspect
results in CloudWatch logs, DynamoDB, or S3.
Step-by-step streaming implementation using Kinesis Video Streams + Amazon Rekognition Stream Processors.
- Plus a clear integration diagram (ASCII)
Scope:
- High-level integration diagram
- Components & responsibilities
- Infra / IAM minimal snippets
- Producer options (GStreamer example + notes)
- Python (boto3) code to create/start/stop a Rekognition Stream Processor and to create/prepare the collection
- Node (AWS SDK v3) equivalents for the same management actions
- Sample consumer: Lambda that reads Rekognition streaming results from the Kinesis Data Stream and processes them
- Operational notes (latency, scaling, costs, testing)
Architecture (ASCII diagram)
Components & responsibilities (short)
- Edge producer:
pushes camera RTSP frames to Kinesis Video Streams (KVS). Can be
GStreamer, the KVS Producer SDK or WebRTC.
- Rekognition Stream Processor: long-running resource that analyzes the live video from KVS and writes structured results to a Kinesis Data Stream (KDS). Configured for face search (1:N) or labels.
- Consumer (Lambda / KDS consumer): reads Rekognition events from KDS, applies business logic, persists to DB, triggers alerts, etc.
- Face Collection (optional): store indexed faces to use with face search in the stream processor.
Minimal IAM snippets (principle of least privilege —
tighten for prod)
1) Role for Rekognition stream processor (allow Rekognition to read KVS and write to KDS)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect":"Allow",
"Action":[
"kinesisvideo:GetDataEndpoint",
"kinesisvideo:ListFragments",
"kinesisvideo:ListStreams",
"kinesisvideo:GetMedia"
],
"Resource":"arn:aws:kinesisvideo:us-east-2:accountID:stream/twtech-KvsStreamName/*"
},
{
"Effect":"Allow",
"Action":[
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource":"arn:aws:kinesis: us-east-2:accountID:stream/twtech-KvsStreamName "
},
{
"Effect":"Allow",
"Action":[
"rekognition:*"
],
"Resource":"*"
}
]
}
2) Lambda role needs:
- kinesis:GetRecords,
kinesis:GetShardIterator,
kinesis:DescribeStream
for the KDS
- dynamodb:PutItem (or S3 PutObject) to persist results
- cloudwatch:PutMetricData etc.
Producer: simplest option — GStreamer to Kinesis Video Streams
- If twtech has:
- a camera URL (rtsp://...)
- or local device,
- GStreamer can stream to KVS using the kvssink plugin from the KVS Producer package.
# Example command (Linux):
# stream RTSP -> KVS using gstreamer (kvssink)
#
Assumes twtech installed the KVS GStreamer plugins and exported AWS creds /
role
- gst-launch-1.0 rtspsrc location="rtsp://camera.local/stream" ! rtph264depay ! h264parse ! \
kvssink stream-name="twtech-camera-stream"
storage-size=512
Notes:
- On EC2 or edge device, it's typical to use the Kinesis
Video Producer SDK (or the Dockerized GStreamer with kvssink).
- For browsers or mobile, consider KVS WebRTC (different flow) — Rekognition supports either KVS or WebRTC, but stream processor creation uses a KVS stream ARN.
Rekognition Stream Processor — Python (boto3) management
This code:
- creates a collection (for face search),
- creates a Kinesis Data Stream for outputs,
- creates the Rekognition Stream Processor,
- starts it.
- It also shows how to stop and delete the stream processor.
Assumes AWS credentials are configured
and roles (role_arn) exist with
permissions above.
# file: rek_stream_processor.py
import
boto3
import
time
import json
rek =
boto3.client('rekognition')
kinesis
= boto3.client('kinesis')
kvs =
boto3.client('kinesisvideo')
# CONFIG - change these
REGION =
"us-east-2"
KVS_STREAM_NAME
= "twtech-camera-stream"
KDS_OUTPUT_NAME
= "twtech-rekognition-output-stream"
REK_ROLE_ARN
= "arn:aws:iam::accountID:role/RekognitionStreamRole" # role that Rekognition will assume
COLLECTION_ID
= "twtech-face-collection"
def
create_kds_stream(name, shard_count=1):
try:
kinesis.create_stream(StreamName=name,
ShardCount=shard_count)
# wait until active
while True:
desc =
kinesis.describe_stream(StreamName=name)
status =
desc['StreamDescription']['StreamStatus']
if status == 'ACTIVE':
break
print("Waiting for KDS to
become ACTIVE...")
time.sleep(2)
print("Created KDS:", name)
except kinesis.exceptions.ResourceInUseException:
print("KDS already exists:", name)
def
create_collection(collection_id):
try:
rek.create_collection(CollectionId=collection_id)
print("Created collection:",
collection_id)
except rek.exceptions.ResourceAlreadyExistsException:
print("Collection already exists:", collection_id)
def
get_kvs_stream_arn(stream_name):
resp =
kvs.describe_stream(StreamName=twtech-stream_name)
arn = resp['StreamInfo']['StreamARN']
return arn
def
create_stream_processor(sp_name, kvs_stream_arn, kds_stream_arn, role_arn,
collection_id):
# Settings for face searching
settings = {
"FaceSearch": {
"CollectionId":
collection_id,
"FaceMatchThreshold":
90.0
}
# For label detection, use
"ConnectedHome" or "FaceSearch" not both.
}
try:
resp = rek.create_stream_processor(
Input={'KinesisVideoStream':
{'Arn': twtech-kvs_stream_arn}},
Output={'KinesisDataStream':
{'Arn': twtech-kds_stream_arn}},
Name=sp_name,
RoleArn=role_arn,
Settings=settings
)
print("Created Stream
Processor:", resp['StreamProcessorArn'])
return resp['StreamProcessorArn']
except
rek.exceptions.ResourceAlreadyExistsException:
print("Stream processor already
exists:", sp_name)
# find arn
sp_list = rek.list_stream_processors()
for sp in
sp_list.get('StreamProcessors', []):
if sp['Name'] == sp_name:
return sp['StreamProcessorArn']
raise
def
start_stream_processor(sp_arn):
rek.start_stream_processor(Name=None,
StreamProcessorArn=sp_arn)
print("Started stream processor:", sp_arn)
def
stop_stream_processor(sp_arn):
rek.stop_stream_processor(Name=None,
StreamProcessorArn=sp_arn)
print("Stopped stream processor:", sp_arn)
def
delete_stream_processor(sp_arn):
# need name instead of ARN for
delete API in older SDKs; try both
try:
rek.delete_stream_processor(StreamProcessorArn=sp_arn)
except Exception as e:
print("Delete error:", e)
print("Deleted stream processor:", sp_arn)
if
__name__ == "__main__":
# create resources
create_kds_stream(KDS_OUTPUT_NAME)
create_collection(COLLECTION_ID)
# get ARNs
kvs_arn = get_kvs_stream_arn(KVS_STREAM_NAME)
kds_arn = kinesis.describe_stream(StreamName=KDS_OUTPUT_NAME)['StreamDescription']['StreamARN']
sp_arn =
create_stream_processor("my-stream-processor", kvs_arn, kds_arn,
REK_ROLE_ARN, COLLECTION_ID)
# start it
start_stream_processor(sp_arn)
print("Processor started. To stop:
call stop_stream_processor(sp_arn)")
Notes:
Settings changed depending on whether twtech configurea face search or connected-home labels.- Example above uses FaceSearch (1:N) referencing a collection.
- create_stream_processor returns ARN; some SDK versions expect Name instead — boto3 supports ARN in newer versions.
- When twtech creates a Rekognition stream processor with Kinesis Data Stream as output, Rekognition will write JSON records to KDS.
- Then, twtech attaches a Lambda as an event source mapping to that KDS to process incoming events.
Here’s a Lambda handler
(Python) that decodes Kinesis records and does simple business logic (store to
DynamoDB / trigger alert):
# file: twtech-kds_consumer_lambda.py
import
base64
import
json
import
boto3
import
os
from botocore.exceptions import ClientError
dynamodb
= boto3.resource('dynamodb')
TABLE =
os.environ.get('RESULTS_TABLE', 'twtech-rekognition-stream-results')
table = dynamodb.Table(TABLE)
def
process_rekognition_record(record_json):
# Rekognition stream processor
output schema can include FaceSearchResponse or LabelDetection etc.
# Just store the raw JSON for
inspection; adapt to parse specifics as needed.
# Sample keys:
'FaceSearchResponse', 'LabelDetection'
item = {
'id': record_json.get('EventId') or
record_json.get('JobId') or str(record_json.get('Timestamp', 'ts')),
'payload': record_json
}
return item
def
lambda_handler(event, context):
for rec in event['Records']:
# Kinesis record data is
base64 encoded
payload_b64 = rec['kinesis']['data']
payload_bytes =
base64.b64decode(payload_b64)
try:
payload_json =
json.loads(payload_bytes)
except Exception as e:
print("Failed to parse
payload:", e)
continue
print("Received Rekognition
event:", json.dumps(payload_json)[:2000])
item = process_rekognition_record(payload_json)
try:
table.put_item(Item=item)
except ClientError as e:
print("DynamoDB put
failed:", e)
return {'status': 'ok'}
- Lambda event source mapping should be created with the Kinesis Data Stream ARN and a reasonable batch size (e.g., 100) and retry behavior configured.
Node (AWS SDK v3) — create and
start Stream Processor (equivalent)
// file: twtech-rek_stream_processor.mjs
import {
RekognitionClient, CreateStreamProcessorCommand, StartStreamProcessorCommand }
from "@aws-sdk/client-rekognition";
import {
KinesisClient, CreateStreamCommand, DescribeStreamCommand } from
"@aws-sdk/client-kinesis";
import { KinesisVideoClient, DescribeStreamCommand as DescribeKvsStream } from "@aws-sdk/client-kinesis-video";
const
rek = new RekognitionClient({});
const
kinesis = new KinesisClient({});
const kvs = new KinesisVideoClient({});
const
KVS_STREAM_NAME = "twtech-camera-stream";
const
KDS_OUTPUT_NAME = "twtech-rekognition-output-stream";
const
REK_ROLE_ARN = "arn:aws:iam::accountID:role/twtech-RekognitionStreamRole";
const COLLECTION_ID = "twtech-face-collection";
async
function createKds(name) {
try {
await kinesis.send(new
CreateStreamCommand({ StreamName: name, ShardCount: 1 }));
// wait actively for ACTIVE
while (true) {
const desc = await kinesis.send(new
DescribeStreamCommand({ StreamName: name }));
if (desc.StreamDescription.StreamStatus
=== "ACTIVE") break;
console.log("Waiting for KDS
ACTIVE...");
await new Promise(r => setTimeout(r,
2000));
}
} catch (e) {
if (e.name ===
"ResourceInUseException") console.log("KDS exists");
else throw e;
}
}
async
function getKvsArn(streamName) {
const resp = await kvs.send(new
DescribeKvsStream({ StreamName: twtech-streamName }));
return resp.StreamInfo.StreamARN;
}
async
function createStreamProcessor(name, kvsArn, kdsArn, roleArn, collectionId) {
const settings = {
FaceSearch: {
CollectionId: collectionId,
FaceMatchThreshold: 90.0
}
};
const cmd = new
CreateStreamProcessorCommand({
Input: { KinesisVideoStream: { Arn: kvsArn
} },
Output: { KinesisDataStream: { Arn: kdsArn
} },
Name: name,
RoleArn: roleArn,
Settings: settings
});
const resp = await rek.send(cmd);
console.log("Created SP ARN:",
resp.StreamProcessorArn);
return resp.StreamProcessorArn;
}
async
function startSP(spArn) {
await rek.send(new
StartStreamProcessorCommand({ StreamProcessorArn: spArn }));
console.log("Started SP:", spArn);
}
(async
() => {
await createKds(KDS_OUTPUT_NAME);
// ensure collection created separately
const kvsArn = await
getKvsArn(KVS_STREAM_NAME);
const kdsDesc = await kinesis.send(new
DescribeStreamCommand({ StreamName: twtech-KDS_OUTPUT_NAME }));
const kdsArn = kdsDesc.StreamDescription.StreamARN;
const spArn = await
createStreamProcessor("twtech-stream-processor", kvsArn, kdsArn, REK_ROLE_ARN,
COLLECTION_ID);
await startSP(spArn);
})();
# End-to-end testing checklist
- Create KVS stream and
confirm producer is successfully sending frames (use KVS console or GetDataEndpoint
+ GetMedia tool).
- Create KDS and confirm it's ACTIVE.
- Create Face Collection and
IndexFaces
several sample faces (externalImageId
helps map to your users).
- Create Rekognition Stream Processor with FaceSearch settings (or Label detection) referencing KVS and KDS ARNs, and the IAM role.
- Start the stream processor. Watch CloudWatch logs for Rekognition stream processor errors (the service logs to CloudWatch).
- Attach Lambda to KDS as event source mapping and inspect DynamoDB / S3 outputs for records from Rekognition.
- Tune thresholds: FaceMatchThreshold, min confidence, etc.
Operational notes & best practices
- Latency: Rekognition stream processors operate in near-real-time but add processing delay (hundreds of ms to a few seconds).
- Test to measure for twtech use case.
- Throughput & shards: Kinesis Data Stream shard count determines how many parallel consumers you can scale to.
- Increase shards for higher throughput.
- Collection maintenance: Reindex collections as needed and purge stale faces according to privacy policies.
- Monitoring: Enable CloudWatch Metrics and Alarms for Rekognition stream processors, KVS and KDS errors, Lambda failures.
- Security & privacy: Use KMS + SSE for S3 and ensure twtech has proper legal basis for face recognition in the jurisdiction.
- Implement human-in-the-loop for high-risk decisions.
- Cost: Stream processing charges are per minute of video processed. KVS, KDS, and Lambda also incur costs.
- Estimate based on expected concurrent streams and minutes processed.
No comments:
Post a Comment