Automatically moderate image content to detect violations such as:
Extract bucket and key information from S3 event:
for sqs_record in event.get('Records', []):
try:
# Parse S3 event from SQS body
s3_event = json.loads(sqs_record['body'])
# Process each S3 record
for s3_record in s3_event.get('Records', []):
try:
bucket = s3_record['s3']['bucket']['name']
key = s3_record['s3']['object']['key']
AWS Rekognition analyzes the image and returns violation labels:
moderation_result = moderate_image(bucket, key)
if 'error' in moderation_result:
print(f"Moderation failed: {moderation_result['error']}")
results['errors'] += 1
continue
results['processed'] += 1
if moderation_result['passed']:
results['approved'] += 1
mark_article_as_approved(article_id)
# Only forward to next queue if approved
final_status = {
'moderationStatus': 'approved',
'processed': True
}
forward_to_next_queue(bucket, key, article_id, final_status)
else:
results['rejected'] += 1
action_result, owner_id = handle_moderation_failure(
bucket, key, article_id, moderation_result
)
if action_result in results['actions']:
results['actions'][action_result] += 1
If content violates policies, send admin notification:
# Always send admin notification for deleted/quarantined content
if action_result in ['deleted', 'quarantined']:
print(f"📧 Sending admin notification for {action_result} content")
send_admin_notification(article_id, key, moderation_result, owner_id)
elif moderation_result['maxSeverity'] in ['critical', 'high']:
print(f"📧 Sending admin notification for {moderation_result['maxSeverity']} severity")
send_admin_notification(article_id, key, moderation_result, owner_id)
# DO NOT forward to next queue if rejected/deleted
# Pipeline stops here, user already received deletion email
print(f"⚠️ Pipeline stopped for rejected image: {key}")
print(f" User notification already sent via send_user_deletion_email()")
When violent content is detected, the system automatically sends email notifications:
