Tuesday, September 9, 2025

Amazon Rekognition | Deep Dive.

Amazon Rekognition - Deep Dive.

Scope:

  • Intro,
  • Key Features and Use Cases,
  • Link to  official documentation,
  • The concept: Amazon Rekognition,
  • Core Features (deep Dive),
  • Architecture & Workflow,
  • Integration with Other AWS Services,
  • Pricing Model,
  • Security & Compliance,
  • Common Use Cases,
  • Strengths & Limitations,
  • Alternatives & When to Use Them,
  • Rule of Thumb,
  • Best Practices,
  • Insights.

Intro:

    • Amazon Rekognition is a deep learning-powered cloud service from Amazon Web Services (AWS).
    •  Amazon Rekognition enables developers to add image and video analysis to their applications through simple API calls
    • Amazon Rekognition can be used to identify objects, scenes, faces, celebrities, and inappropriate content, as well as to extract text from images and videos. 
Key Features and Use Cases
    • Object, Scene, and Concept Detection: Identify thousands of objects, scenes, and concepts in images and videos (e.g., "car," "beach," "sunset").
    • Content Moderation: Automatically detect inappropriate, unwanted, or offensive content (images and videos) to create a safer user experience and ensure brand safety.
    • Facial Analysis and Recognition: Detect faces in images and videos, analyze facial attributes (like age range, emotions, and presence of glasses), compare faces for similarity, and search for specific faces within a large collection.
    • Identity Verification: Use face comparison and liveness detection for user onboarding and authentication workflows to remotely verify the identity of users.
    • Celebrity Recognition: Identify celebrities in images and videos.
    • Text Detection: Extract text from images and video streams, which can be useful for use cases like license plate reading or analyzing social media posts.
    • Custom Labels: Train custom machine learning models to detect specific objects or images tailored to business needs without requiring prior machine learning expertise.
    • Personal Protective Equipment (PPE) Detection: Detect if people in an image are wearing safety gear like face masks, gloves, or hard hats.
NB:
https://aws.amazon.com/rekognition/

1. The concept: Amazon Rekognition

    • Amazon Rekognition is a fully managed computer vision service provided by AWS.
    • Amazon Rekognition uses deep learning models to analyze images and videos, enabling developers to build applications that can recognize objects, people, text, activities, and inappropriate content without requiring machine learning expertise.
    • Amazon Rekognition abstracts away the heavy lifting of training models and provides APIs for quick integration.

2. Core Features (deep Dive)

    • Rekognition provides both image APIs and video APIs (real-time or batch).

A. Image Analysis

  • Object & Scene Detection
    Detects thousands of objects (car, phone, dog, tree, etc.) and scenes (beach, city, office).
  • Face Detection & Analysis
    • Bounding box for faces.
    • Facial landmarks (eyes, nose, mouth).
    • Attributes like emotions (happy, sad, surprised), gender, age range, beard, eyeglasses.
  • Face Comparison
    Compares a face in an image with one or more faces (useful for 1:1 verification).
  • Face Search (Collection-based)
    • Stores indexed faces in a Face Collection.
    • Performs 1:N matching to find similar faces (like searching through a database).
  • Celebrity Recognition
    Detects well-known personalities.
  • Text in Image (OCR)
    Detects printed and handwritten text.
  • Unsafe Content Detection
    Identifies explicit, violent, or suggestive content (often used for moderation).

B. Video Analysis

    • Object & activity tracking in real time.
    • Face detection, recognition, and tracking across frames.
    • Person pathing (tracking movements).
    • Celebrity recognition in video.
    • Content moderation at scale.

Rekognition video analysis is asynchronous:

    • twtech uploads video to S3.
    • Call API (e.g., StartLabelDetection).
    • Rekognition processes and sends results via Amazon SNS or stores them in Amazon S3.

3. Architecture & Workflow

Typical Flow:

  1. Image/Video Ingestion
    Upload to Amazon S3 or stream via Kinesis Video Streams.
  2. Analysis
    Use Rekognition APIs (sync for images, async for videos).
  3. Result Delivery
    • API response (JSON).
    • SNS topic for async jobs.
    • Can store in DynamoDB, Elasticsearch, or S3 for querying.
  4. Integration
    Connect to downstream applications (security dashboards, moderation pipelines, etc.).

4. Integration with Other AWS Services

    • Amazon S3 storage for images/videos.
    • Kinesis Video Streams real-time video input.
    • Lambda trigger-based automation after analysis.
    • DynamoDB store indexed face metadata.
    • ElasticSearch / OpenSearch search across results.
    • SNS / SQS notification and queue handling.
    • Step Functions orchestration of multi-step workflows.

5. Pricing Model

  • Pricing is pay-as-you-go, with free tier options: (As of 2025) — may vary by region, check AWS Pricing page

    • Image APIs: ~$0.001 per image for label detection.
    • Face Comparison: ~$0.001 per comparison.
    • Face Search: ~$0.01 per 1,000 images.
    • Text Detection: ~$0.0015 per image.
    • Unsafe Content Detection: ~$0.001 per image.
    • Video APIs: Charged per minute of video processed (varies by feature, ~$0.10–$0.12/minute).

 Optimization Tip: 

  • Use bounding box cropping and batching to reduce costs.

6. Security & Compliance

    • Encryption: Supports KMS for at-rest data.
    • IAM Integration: Fine-grained permissions.
    • Data Privacy: AWS claims images/videos are not stored beyond processing unless explicitly requested (e.g., face collections).
    • Compliance: HIPAA eligible, GDPR aligned (but responsibility for usage is customer’s).

7. Common Use Cases

    • Security & Surveillance: Identity verification, intruder detection.
    • User Authentication: KYC (Know your Customer) processes, mobile app logins.
    • Content Moderation: Social media, dating apps, marketplaces.
    • Retail Analytics: Foot traffic analysis, customer demographics.
    • Media & Entertainment: Celebrity recognition, automatic metadata tagging.
    • Law Enforcement: Missing persons, criminal investigations (controversial, requires ethical considerations).

8. Strengths & Limitations

Strengths


    • No ML expertise required.
    • Scales easily (serverless).
    • Pre-trained on massive datasets.
    • Broad feature set.

Limitations

    • Accuracy may vary by demographic groups (bias concerns).
    • Costs can grow quickly at scale.
    • Limited customization (compared to custom models in Amazon SageMaker).
    • Some privacy/ethical concerns (e.g., facial recognition misuse).

9. Alternatives & When to Use Them

    • Amazon SageMaker + CV Models If twtech needs custom-trained models.
    • Open-source (YOLO, OpenCV, Detectron2) If twtech wants full control, but must manage infra.

Rule of Thumb:

    • Use Rekognition for fast prototyping, scalability, and common CV tasks.
    • Use SageMaker or custom ML when domain-specific accuracy is critical.

10. Best Practices

    • Store results in a database (not just rely on API calls).
    • Use confidence thresholds (e.g., only accept label detection >90%).
    • Consider batching requests to reduce costs.
    • Regularly evaluate fairness & bias.
    • For compliance-heavy apps, maintain an audit trail of decisions.

Insights

Scope:

    • A practical implementation guide:
    • Sample architecture,
    • IAM bits,
    • Step-by-step flows for image (sync) and video (async),
    • Use cases,
    • Working code examples in Python (boto3),
    • Working code examples in JavaScript (AWS SDK v3)
    • Plus notes on testing,
    • Retries,
    • Scaling,
    • Cost-control.

A ,  Sample architecture (text diagram)

B.  Key AWS resources & permissions (high level)

    •         S3 buckets for images/videos (separate production/test buckets recommended).
    •         IAM role for Lambdas: 
      • rekognition:* (or scoped actions).
      •  s3:GetObject
      • s3:PutObject
      • sns:Publish
      • dynamodb:PutItem (if storing results)
      • kms:Decrypt if using KMS.
    •         SNS topic for Rekognition asynchronous job completion.
    •        (Optional) Kinesis Video Streams + IAM for streaming pipelines.
    •         Face Collection(s) for 1:N face search.

# Minimal IAM policy snippet (attach to Lambda role)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "rekognition:DetectLabels",
        "rekognition:DetectFaces",
        "rekognition:CompareFaces",
        "rekognition:IndexFaces",
        "rekognition:CreateCollection",
        "rekognition:SearchFacesByImage",
        "rekognition:StartLabelDetection",
        "rekognition:GetLabelDetection",
        "rekognition:StartFaceSearch",
        "rekognition:GetFaceSearch"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": "arn:aws:s3:::twtech-s3bucket-name/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sns:Publish",
        "sns:Subscribe",
        "sns:ListSubscriptionsByTopic"
      ],
      "Resource": "arn:aws:sns:us-east-2:accountID:twtech-Topic"
    },
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:PutItem",
        "dynamodb:GetItem",
        "dynamodb:Query"
      ],
      "Resource": "arn:aws:dynamodb:us-east-2:accountID:table/twtech-Table"
    }
  ]
}

# NB:

  • The above can be Scope by resource and region in production.

C.  Image pipeline (sync) — Flow

     1.     Client uploads image to S3 (pre-signed URL or direct from client).
2.     Lambda triggered (S3 event) or client calls backend API directly.
3.     Call Rekognition sync APIs (e.g., DetectLabels, DetectFaces, IndexFaces, DetectText).
4.     Store JSON results in DynamoDB / S3.
5.     Use confidence thresholds and logic in Lambda to act (moderation, notifications, auth).

# Python (boto3) — detect labels + faces

# requirements: boto3
import json
import boto3
from botocore.exceptions import ClientError 
rek = boto3.client('rekognition')
s3 = boto3.client('s3')
def analyze_image_from_s3(bucket, key):
    image = {'S3Object': {'Bucket': bucket, 'twtech-s3bucket-Name':twtech-key}}
    # Detect labels
    labels_resp = rek.detect_labels(Image=image, MaxLabels=20, MinConfidence=75.0)
    # Detect faces
    faces_resp = rek.detect_faces(Image=image, Attributes=['ALL'])
    return {'Labels': labels_resp.get('Labels',[]), 'Faces': faces_resp.get('FaceDetails',[])}
# Sample usage (could be called inside a Lambda handler)
if __name__ == "__main__":
    bucket = "twtech-s3bucket"
    key = "uploads/photo.jpg"
    try:
        result = analyze_image_from_s3(bucket, key)
        print(json.dumps(result, indent=2))
    except ClientError as e:
        print("Error:", e)

# Nodejs (AWS SDK v3) — detect labels

# package @aws-sdk/client-rekognition
import { RekognitionClient, DetectLabelsCommand } from "@aws-sdk/client-rekognition";
const client = new RekognitionClient({ region: "us-east-2" });
export async function detectLabelsFromS3(bucket, key) {
  const params = {
    Image: { S3Object: { Bucket: bucket, twtech-s3bucket-Name: twtech-key } },
    MaxLabels: 20,
    MinConfidence: 75.0
  };
  const cmd = new DetectLabelsCommand(params);
  const resp = await client.send(cmd);
  return resp.Labels;
}
# Sample usage
(async () => {
  const labels = await detectLabelsFromS3("twtech-s3bucket", "uploads/photo.jpg");
  console.log(JSON.stringify(labels, null, 2));
})();

D.  Face collections (indexing + searching)

    •  Create a collection (CreateCollection).
    •  IndexFaces to store face metadata (recommended: store externalImageId).
    •  SearchFacesByImage or SearchFaces / SearchFacesByImage for match retrieval.

# Python: create collection, index face, search

def create_collection(collection_id):
    try:
        rek.create_collection(CollectionId=collection_id)
    except ClientError as e:
        if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
            raise
def index_face(bucket, key, collection_id, external_id=None):
    image = {'S3Object': {'Bucket': bucket, 'twtech-s3bucket-Name': twtech-key}}
    resp = rek.index_faces(Image=image, CollectionId=collection_id, \
ExternalImageId=external_id or key, DetectionAttributes=['DEFAULT'])
    return resp.get('FaceRecords',[])
def search_face_by_image(bucket, key, collection_id, threshold=90, max_faces=5):
    image = {'S3Object': {'Bucket': bucket, 'twtech-s3bucket-Name': twtech-key}}
    resp = rek.search_faces_by_image(CollectionId=collection_id, Image=image, \
FaceMatchThreshold=threshold, MaxFaces=max_faces) return resp.get('FaceMatches',[])

E.  Video pipeline (async) — Flow

     1.     Upload video to S3.
2.     Start async job (e.g., StartLabelDetection, StartFaceSearch, StartFaceDetection) — pass NotificationChannel (SNS topic ARN + role ARN).
3.     Rekognition processes video (minutes depend on video length).
4.     Rekognition publishes a job-completion message to SNS.
5.     SNS triggers a Lambda that calls GetLabelDetection / GetFaceSearch with JobId to fetch results, then persists them.

Key: SNS notification format

    • SNS message contains JobId and Status
    • Always check Status == "SUCCEEDED" before calling Get*.

# Node: start label detection (example)

import { RekognitionClient, StartLabelDetectionCommand, GetLabelDetectionCommand } \
from "@aws-sdk/client-rekognition";
const client = new RekognitionClient({ region: "us-east-2" });
export async function startLabelDetection(bucket, key, snsTopicArn, roleArn) {
  const params = {
    Video: { S3Object: { Bucket: bucket, twtech-s3bucket-Name: twtech-key } },
    NotificationChannel: { SNSTopicArn: snsTopicArn, RoleArn: roleArn },
    MinConfidence: 70.0,
  };
  const cmd = new StartLabelDetectionCommand(params);
  const resp = await client.send(cmd);
  return resp.JobId; // Save this JobId if you want to poll
}
//  In the SNS triggered Lambda handler: call GetLabelDetection with JobId
export async function getLabelsForJob(jobId, maxResults=1000) {
  const cmd = new GetLabelDetectionCommand({ JobId: jobId, \
MaxResults: maxResults, SortBy: "TIMESTAMP" });
  const resp = await client.send(cmd);
  return resp; 
// contains Labels[] with Timestamp & Label properties
}

# Python: SNS handler skeleton for job completion

import json
import boto3
rek = boto3.client('rekognition')
def lambda_handler(event, context):
    # event is SNS -> message is a JSON string
    for record in event['Records']:
        sns_msg = record['Sns']['Message']
        msg = json.loads(sns_msg)
        job_id = msg.get('JobId') or msg.get('jobId')
        status = msg.get('Status') or msg.get('status')
        if status != 'SUCCEEDED':
            print("Job failed or in progress:", msg)
            continue
        # example for label detection
        resp = rek.get_label_detection(JobId=job_id, SortBy='TIMESTAMP')
        # Persist resp to S3/DynamoDB
        print("Got label detection:", resp.get('Labels',[])[:3])

F.  Real-time streaming (Kinesis Video Streams + Rekognition Streaming)

    •  Use Kinesis Video Streams to push live RTSP/WebRTC streams.
    •  Rekognition supports real-time face recognition and label detection via its streaming API (requires different setup and long-running consumers).
    •  Typical pattern: Kinesis -> Rekognition streaming -> process with WebSocket or Lambda consumer.

G.  Error handling, retries, and throttling

    •  Use exponential backoff and jitter on retryable errors (HTTP 5xx, 429).
    •  Respect Rekognition quotas (requests/sec and minutes for video). Catch ProvisionedThroughputExceededException or ThrottlingException.
    •  For large batches, shard work across Lambdas and/or use Step Functions.
    •  Use idempotency where possible (store JobId and job state).

H.  Testing & validation tips

    •  Start with a small dataset and confidence thresholds tuned to twtech needs.
    •  Use MinConfidence or twtech owns post-filtering (e.g., only accept Label.Confidence > 90).
    •  Run fairness / bias checks with a representative dataset.
    •  For face matching, evaluate false acceptance rate (FAR) and false rejection rate (FRR) on your dataset.
    •  Log input metadata (image size, source) to troubleshoot edge cases.

I.  Cost & performance suggestions

    •  Use cropped images or reduced resolution for non-critical tasks to lower cost & latency.
    •  For large-volume jobs, consider batching (images) or using async video jobs instead of frame-by-frame sync processing.
    •  Cache results for repeated queries (don’t call Rekognition for the same image multiple times).

J.  Sample full Lambda (image -> moderate content action)  Python script

import json, os
import boto3
rek = boto3.client('rekognition')
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
TABLE_NAME = os.environ.get('RESULTS_TABLE')
def lambda_handler(event, context):
    # Expect S3 put event
    for rec in event['Records']:
        bucket = rec['s3']['bucket']['twtech-s3bucket-name']
        key = rec['s3']['object']['key']
        image = {'S3Object': {'Bucket': bucket, 'twtech-s3bucket-Name': twtech-key}}
        labels = rek.detect_labels(Image=image, MaxLabels=50, MinConfidence=50.0).get('Labels',[])
        unsafe = rek.detect_moderation_labels(Image=image, MinConfidence=60.0).get('ModerationLabels', [])
        item = {
            'id': f"{bucket}/{key}",
            'labels': labels,
            'moderation': unsafe
        }
        # write to DynamoDB for later UI consumption
        table = dynamodb.Table (TABLE_NAME)
        table.put_item(Item=item)
    return {"status": "ok"}

K. Useful production practices

    •  Enable server-side encryption for S3 and use KMS.
    •  Audit logs: enable CloudTrail for Rekognition usage.
    •  Version twtech Face Collections naming (e.g., employees-v1) and rotate/cleanup stale faces.
    •  Implement a data retention policy (delete faces/collections if required by privacy rules).
    •  Provide an opt-out / human review path for sensitive decisions.

L.  Quick checklist for deploying a simple prototype

     1.     Create S3 bucket(s).
2.     Create an SNS topic and subscription (Lambda).
3.     Create Lambda functions: image handler, video job starter, SNS job handler.
4.     Attach IAM roles (scoped).
5.     Create Rekognition collection (if using faces).
6.     Test with example images and a short video.
7.     Inspect results in CloudWatch logs, DynamoDB, or S3.


Step-by-step streaming implementation using Kinesis Video Streams + Amazon Rekognition Stream Processors. 

  • Plus a clear integration diagram (ASCII)

Scope:

    • High-level integration diagram
    • Components & responsibilities
    • Infra / IAM minimal snippets
    • Producer options (GStreamer example + notes)
    • Python (boto3) code to create/start/stop a Rekognition Stream Processor and to create/prepare the collection
    • Node (AWS SDK v3) equivalents for the same management actions
    • Sample consumer: Lambda that reads Rekognition streaming results from the Kinesis Data Stream and processes them
    • Operational notes (latency, scaling, costs, testing)

Architecture (ASCII diagram)

Components & responsibilities (short)

    • Edge producer: pushes camera RTSP frames to Kinesis Video Streams (KVS). Can be GStreamer, the KVS Producer SDK or WebRTC.
    • Rekognition Stream Processor: long-running resource that analyzes the live video from KVS and writes structured results to a Kinesis Data Stream (KDS). Configured for face search (1:N) or labels.
    • Consumer (Lambda / KDS consumer): reads Rekognition events from KDS, applies business logic, persists to DB, triggers alerts, etc.
    • Face Collection (optional): store indexed faces to use with face search in the stream processor.

Minimal IAM snippets (principle of least privilege — tighten for prod)

1) Role for Rekognition stream processor (allow Rekognition to read KVS and write to KDS)

{

  "Version": "2012-10-17",

  "Statement": [

    {

      "Effect":"Allow",

      "Action":[

        "kinesisvideo:GetDataEndpoint",

        "kinesisvideo:ListFragments",

        "kinesisvideo:ListStreams",

        "kinesisvideo:GetMedia"

      ],

      "Resource":"arn:aws:kinesisvideo:us-east-2:accountID:stream/twtech-KvsStreamName/*"

    },

    {

      "Effect":"Allow",

      "Action":[

        "kinesis:PutRecord",

        "kinesis:PutRecords"

      ],

      "Resource":"arn:aws:kinesis: us-east-2:accountID:stream/twtech-KvsStreamName "

    },

    {

      "Effect":"Allow",

      "Action":[

        "rekognition:*"

      ],

      "Resource":"*"

    }

  ]

}

2) Lambda role needs:

    • kinesis:GetRecords, kinesis:GetShardIterator, kinesis:DescribeStream for the KDS
    • dynamodb:PutItem (or S3 PutObject) to persist results
    • cloudwatch:PutMetricData etc.

Producer: simplest option — GStreamer to Kinesis Video Streams

  • If twtech has: 
  • a camera URL (rtsp://...) 
  • or local device, 
  • GStreamer can stream to KVS using the kvssink plugin from the KVS Producer package.

 # Example command (Linux):

# stream RTSP -> KVS using gstreamer (kvssink)

# Assumes twtech installed the KVS GStreamer plugins and exported AWS creds / role

  • gst-launch-1.0 rtspsrc location="rtsp://camera.local/stream" ! rtph264depay ! h264parse ! \

  kvssink stream-name="twtech-camera-stream" storage-size=512

Notes:

    • On EC2 or edge device, it's typical to use the Kinesis Video Producer SDK (or the Dockerized GStreamer with kvssink).
    • For browsers or mobile, consider KVS WebRTC (different flow) — Rekognition supports either KVS or WebRTC, but stream processor creation uses a KVS stream ARN.

Rekognition Stream Processor — Python (boto3) management

This code:

    •  creates a collection (for face search), 
    • creates a Kinesis Data Stream for outputs, 
    • creates the Rekognition Stream Processor,  
    • starts it. 
    • It also shows how to stop and delete the stream processor.

Assumes AWS credentials are configured and roles (role_arn) exist with permissions above.

# file: rek_stream_processor.py

import boto3

import time

import json

rek = boto3.client('rekognition')

kinesis = boto3.client('kinesis')

kvs = boto3.client('kinesisvideo') 

# CONFIG - change these

REGION = "us-east-2"

KVS_STREAM_NAME = "twtech-camera-stream"

KDS_OUTPUT_NAME = "twtech-rekognition-output-stream"

REK_ROLE_ARN = "arn:aws:iam::accountID:role/RekognitionStreamRole"  # role that Rekognition will assume

COLLECTION_ID = "twtech-face-collection" 

def create_kds_stream(name, shard_count=1):

    try:

        kinesis.create_stream(StreamName=name, ShardCount=shard_count)

        # wait until active

        while True:

            desc = kinesis.describe_stream(StreamName=name)

            status = desc['StreamDescription']['StreamStatus']

            if status == 'ACTIVE':

                break

            print("Waiting for KDS to become ACTIVE...")

            time.sleep(2)

        print("Created KDS:", name)

    except kinesis.exceptions.ResourceInUseException:

        print("KDS already exists:", name)

def create_collection(collection_id):

    try:

        rek.create_collection(CollectionId=collection_id)

        print("Created collection:", collection_id)

    except rek.exceptions.ResourceAlreadyExistsException:

        print("Collection already exists:", collection_id)

def get_kvs_stream_arn(stream_name):

    resp = kvs.describe_stream(StreamName=twtech-stream_name)

    arn = resp['StreamInfo']['StreamARN']

    return arn

def create_stream_processor(sp_name, kvs_stream_arn, kds_stream_arn, role_arn, collection_id):

    # Settings for face searching

    settings = {

        "FaceSearch": {

            "CollectionId": collection_id,

            "FaceMatchThreshold": 90.0

        }

        # For label detection, use "ConnectedHome" or "FaceSearch" not both.

    }

    try:

        resp = rek.create_stream_processor(

            Input={'KinesisVideoStream': {'Arn': twtech-kvs_stream_arn}},

            Output={'KinesisDataStream': {'Arn': twtech-kds_stream_arn}},

            Name=sp_name,

            RoleArn=role_arn,

            Settings=settings

        )

        print("Created Stream Processor:", resp['StreamProcessorArn'])

        return resp['StreamProcessorArn']

    except rek.exceptions.ResourceAlreadyExistsException:

        print("Stream processor already exists:", sp_name)

        # find arn

        sp_list = rek.list_stream_processors()

        for sp in sp_list.get('StreamProcessors', []):

            if sp['Name'] == sp_name:

                return sp['StreamProcessorArn']

        raise

def start_stream_processor(sp_arn):

    rek.start_stream_processor(Name=None, StreamProcessorArn=sp_arn)

    print("Started stream processor:", sp_arn)

def stop_stream_processor(sp_arn):

    rek.stop_stream_processor(Name=None, StreamProcessorArn=sp_arn)

    print("Stopped stream processor:", sp_arn)

def delete_stream_processor(sp_arn):

    # need name instead of ARN for delete API in older SDKs; try both

    try:

        rek.delete_stream_processor(StreamProcessorArn=sp_arn)

    except Exception as e:

        print("Delete error:", e)

    print("Deleted stream processor:", sp_arn)

if __name__ == "__main__":

    # create resources

    create_kds_stream(KDS_OUTPUT_NAME)

    create_collection(COLLECTION_ID)

    # get ARNs

    kvs_arn = get_kvs_stream_arn(KVS_STREAM_NAME)

    kds_arn = kinesis.describe_stream(StreamName=KDS_OUTPUT_NAME)['StreamDescription']['StreamARN']

    sp_arn = create_stream_processor("my-stream-processor", kvs_arn, kds_arn, REK_ROLE_ARN, COLLECTION_ID)

    # start it

    start_stream_processor(sp_arn)

    print("Processor started. To stop: call stop_stream_processor(sp_arn)")

Notes:


    • Settings changed depending on whether twtech configurea face search or connected-home labels. 
    • Example above uses FaceSearch (1:N) referencing a collection.
    • create_stream_processor returns ARN; some SDK versions expect Name instead — boto3 supports ARN in newer versions.
Consumer Lambda: read Rekognition streaming results from Kinesis Data Stream (Python)
    • When twtech creates a Rekognition stream processor with Kinesis Data Stream as output, Rekognition will write JSON records to KDS.
    • Then, twtech attaches a Lambda as an event source mapping to that KDS to process incoming events.

Here’s a Lambda handler (Python) that decodes Kinesis records and does simple business logic (store to DynamoDB / trigger alert):

# file: twtech-kds_consumer_lambda.py

import base64

import json

import boto3

import os

from botocore.exceptions import ClientError

dynamodb = boto3.resource('dynamodb')

TABLE = os.environ.get('RESULTS_TABLE', 'twtech-rekognition-stream-results')

table = dynamodb.Table(TABLE)

def process_rekognition_record(record_json):

    # Rekognition stream processor output schema can include FaceSearchResponse or LabelDetection etc.

    # Just store the raw JSON for inspection; adapt to parse specifics as needed.

    # Sample keys: 'FaceSearchResponse', 'LabelDetection'

    item = {

        'id': record_json.get('EventId') or record_json.get('JobId') or str(record_json.get('Timestamp', 'ts')),

        'payload': record_json

    }

    return item

def lambda_handler(event, context):

    for rec in event['Records']:

        # Kinesis record data is base64 encoded

        payload_b64 = rec['kinesis']['data']

        payload_bytes = base64.b64decode(payload_b64)

        try:

            payload_json = json.loads(payload_bytes)

        except Exception as e:

            print("Failed to parse payload:", e)

            continue

        print("Received Rekognition event:", json.dumps(payload_json)[:2000])

        item = process_rekognition_record(payload_json)

        try:

            table.put_item(Item=item)

        except ClientError as e:

            print("DynamoDB put failed:", e)

    return {'status': 'ok'}

    • Lambda event source mapping should be created with the Kinesis Data Stream ARN and a reasonable batch size (e.g., 100) and retry behavior configured.

Node (AWS SDK v3) — create and start Stream Processor (equivalent)

// file: twtech-rek_stream_processor.mjs

import { RekognitionClient, CreateStreamProcessorCommand, StartStreamProcessorCommand } from "@aws-sdk/client-rekognition";

import { KinesisClient, CreateStreamCommand, DescribeStreamCommand } from "@aws-sdk/client-kinesis";

import { KinesisVideoClient, DescribeStreamCommand as DescribeKvsStream } from "@aws-sdk/client-kinesis-video";

const rek = new RekognitionClient({});

const kinesis = new KinesisClient({});

const kvs = new KinesisVideoClient({});

const KVS_STREAM_NAME = "twtech-camera-stream";

const KDS_OUTPUT_NAME = "twtech-rekognition-output-stream";

const REK_ROLE_ARN = "arn:aws:iam::accountID:role/twtech-RekognitionStreamRole";

const COLLECTION_ID = "twtech-face-collection";

async function createKds(name) {

  try {

    await kinesis.send(new CreateStreamCommand({ StreamName: name, ShardCount: 1 }));

    // wait actively for ACTIVE

    while (true) {

      const desc = await kinesis.send(new DescribeStreamCommand({ StreamName: name }));

      if (desc.StreamDescription.StreamStatus === "ACTIVE") break;

      console.log("Waiting for KDS ACTIVE...");

      await new Promise(r => setTimeout(r, 2000));

    }

  } catch (e) {

    if (e.name === "ResourceInUseException") console.log("KDS exists");

    else throw e;

  }

}

async function getKvsArn(streamName) {

  const resp = await kvs.send(new DescribeKvsStream({ StreamName: twtech-streamName }));

  return resp.StreamInfo.StreamARN;

}

async function createStreamProcessor(name, kvsArn, kdsArn, roleArn, collectionId) {

  const settings = {

    FaceSearch: {

      CollectionId: collectionId,

      FaceMatchThreshold: 90.0

    }

  };

  const cmd = new CreateStreamProcessorCommand({

    Input: { KinesisVideoStream: { Arn: kvsArn } },

    Output: { KinesisDataStream: { Arn: kdsArn } },

    Name: name,

    RoleArn: roleArn,

    Settings: settings

  });

  const resp = await rek.send(cmd);

  console.log("Created SP ARN:", resp.StreamProcessorArn);

  return resp.StreamProcessorArn;

}

async function startSP(spArn) {

  await rek.send(new StartStreamProcessorCommand({ StreamProcessorArn: spArn }));

  console.log("Started SP:", spArn);

}

(async () => {

  await createKds(KDS_OUTPUT_NAME);

  // ensure collection created separately

  const kvsArn = await getKvsArn(KVS_STREAM_NAME);

  const kdsDesc = await kinesis.send(new DescribeStreamCommand({ StreamName: twtech-KDS_OUTPUT_NAME }));

  const kdsArn = kdsDesc.StreamDescription.StreamARN;

  const spArn = await createStreamProcessor("twtech-stream-processor", kvsArn, kdsArn, REK_ROLE_ARN, COLLECTION_ID);

  await startSP(spArn);

})();

# End-to-end testing checklist

    1. Create KVS stream and confirm producer is successfully sending frames (use KVS console or GetDataEndpoint + GetMedia tool).
    2. Create KDS and confirm it's ACTIVE.
    3. Create Face Collection and IndexFaces several sample faces (externalImageId helps map to your users).
    4. Create Rekognition Stream Processor with FaceSearch settings (or Label detection) referencing KVS and KDS ARNs, and the IAM role.
    5. Start the stream processor. Watch CloudWatch logs for Rekognition stream processor errors (the service logs to CloudWatch).
    6. Attach Lambda to KDS as event source mapping and inspect DynamoDB / S3 outputs for records from Rekognition.
    7. Tune thresholds: FaceMatchThreshold, min confidence, etc.

Operational notes & best practices

    • Latency: Rekognition stream processors operate in near-real-time but add processing delay (hundreds of ms to a few seconds). 
    • Test to measure for twtech use case.
    • Throughput & shards: Kinesis Data Stream shard count determines how many parallel consumers you can scale to. 
    • Increase shards for higher throughput.
    • Collection maintenance: Reindex collections as needed and purge stale faces according to privacy policies.
    • Monitoring: Enable CloudWatch Metrics and Alarms for Rekognition stream processors, KVS and KDS errors, Lambda failures.
    • Security & privacy: Use KMS + SSE for S3 and ensure twtech has proper legal basis for face recognition in the jurisdiction. 
    • Implement human-in-the-loop for high-risk decisions.
    • Cost: Stream processing charges are per minute of video processed. KVS, KDS, and Lambda also incur costs. 
    • Estimate based on expected concurrent streams and minutes processed.


No comments:

Post a Comment

Amazon EventBridge | Overview.

Amazon EventBridge - Overview. Scope: Intro, Core Concepts, Key Benefits, Link to official documentation, Insights. Intro: Amazon EventBridg...