Как я построил serverless OCR-сервис на AWS Lambda и Amazon Bedrock

от автора

Недавно передо мной встала задача: организовать простое распознавание текста из загруженных документов (сканы PDF, изображения PNG/JPG) на ресурсах AWS. Классический подход – воспользоваться сервисом Amazon Textract или даже запустить Tesseract внутри Lambda. Однако, подход с использованием Amazon Bedrock мне показался более привлекательным и не требующим «танцев с бубном» вокруг структуры ответа Textract.

В этом посте я расскажу, как на практике за пару часов реализовал безсерверный (serverless) OCR-сервис на AWS, используя AWS Lambda и модель из Amazon Bedrock для извлечения текста. Статья ориентирована на опытных AWS-архитекторов, поэтому мы углубимся в архитектуру, покажу код (Terraform для инфраструктуры и Python для Lambda), обсудим масштабирование, ограничения и прикинем стоимость решения в регионе eu-central-1 (Франкфурт). Поехали!

Архитектура решения

Первоначальная идея заключалась в том, чтобы вызывать Lambda-функцию напрямую через S3 Event Notification при загрузке нового объекта. Но довольно быстро стало понятно: это может привести к серьёзным проблемам с троттлингом и превышением лимита одновременных вызовов Lambda (concurrent executions) на большом объеме данных.

Например, представим ситуацию: пользователь загружает пачку из 500 документов в S3. Это может вызвать 500 параллельных вызовов Lambda-функции. Если функция достаточно тяжёлая (например, делает вызов к Bedrock и работает по 5-10 секунд), то можно быстро упереться в лимит на одновременные вызовы (по умолчанию 1000 на аккаунт). К тому же, при резком всплеске входящего трафика S3 не имеет встроенного механизма контроля скорости (rate limiting) для событий – события просто полетят без удержания.

Как я решил эту проблему:

Я заменил прямой вызов Lambda с S3 на асинхронную буферизацию через EventBridge + SQS + SNS. Выглядит это так:

  1. Документ загружается в S3 – это триггерит стандартное событие в EventBridge. Многие не знают, но события S3 автоматически появляются в default event bus. Мы настраиваем EventBridge Rule, которое фильтрует нужные события (например, PutObject в нужном бакете с нужным префиксом uploads/).

  2. Это событие мы отправляем в SQS и далее уже в топик SNS:

    • отправляет одно сообщение в SQS – для буферизации и контроля потока;

    • вызывает Lambda-функцию, которая достаёт событие из топика SNS и запускает OCR-обработку.

Такой паттерн решает сразу несколько задач:

  • Позволяет отделить момент загрузки файла и начало его обработки.

  • Через SQS можно легко масштабировать обработку (например, если очередь растёт – можно добавить ещё одну Lambda или контейнер).

  • Можно делать дедупликацию, ретраи, или даже фильтровать ненужные события через EventBridge.

Вторая архитектурная проблема: размер сообщения в SQS

Изначально я хотел, чтобы результат OCR попадал сразу во вторую SQS из первой Lambda. Но тут мы наталкиваемся на суровое ограничение: размер одного сообщения в SQS — не более 256 КБ. Это может быть критичным, особенно если модель Bedrock вернёт длинный распознанный текст или structured output (например, JSON с кучей данных из PDF-документа).

Как обойти ограничение:

Я изменил паттерн следующим образом:

  • Результат работы Bedrock кладётся в S3 (например, в бакет ocr-results-bucket/uuid.json).

  • В SQS идёт только ссылка на файл результата: { "resultKey": "ocr-results/123.json", "sourceKey": "uploads/filename.png" }.

  • Lambda-подписчик, который читает из очереди, просто подтягивает JSON с результатом из S3 и пишет нужные поля в DynamoDB.

Это надёжно, масштабируемо и укладывается в лимиты. К тому же, если понадобятся ручные ретраи или проверка ошибок, удобно просто перечитать S3-файл.

Итоговая архитектура:

  1. Пользователь загружает файл в S3 в папку uploads/

  2. EventBridge ловит PutObject событие и отправляет его в SNS topic:

    • в очередь SQS (буфер);

    • и одновременно запускает Lambda InvokeBedrock (через SNS);

  3. Lambda получает путь к файлу, вызывает Bedrock, сохраняет результат OCR в S3 (в ocr-results/) и публикует сообщение в SQS c ссылкой на результат.

  4. Вторая Lambda SaveToDB обрабатывает очередь, подтягивает JSON с результатом из S3 и записывает в DynamoDB только ссылку на этот объект. Это позволяет избежать превышения лимитов размера записи в DynamoDB и снизить нагрузку при чтении, ведь полный результат можно получить по ссылке из S3.

Всё — мы минимизировали риски троттлинга, соблюли лимиты SQS и сохранили serverless-подход.

Terraform конфигурация

Теперь, когда архитектура проработана, можно описать всё это в Terraform. Нам понадобится:

  • S3-бакет для загрузки исходных документов

  • EventBridge Rule

  • SNS-топик и подписки (на Lambda и SQS)

  • Очередь SQS

  • Lambda-функции (InvokeBedrock и SaveToDB)

  • IAM роли и политики для Lambda

  • DynamoDB таблица

  • S3-бакет для хранения результатов OCR

Код ниже разворачивает инфраструктуру под нашу архитектуру и учитывает:

  • безопасную передачу данных через SQS

  • размещение результатов в S3

  • фан-аут через SNS

  • минимизацию рисков троттлинга

main.tf
provider "aws" {   region = "eu-central-1" }  # S3 buckets resource "aws_s3_bucket" "uploads" {   bucket = "ocr-input-documents" }  resource "aws_s3_bucket" "results" {   bucket = "ocr-processed-results" }  # DynamoDB для хранения метаданных (ключей) resource "aws_dynamodb_table" "ocr_index" {   name         = "OCRResultsIndex"   billing_mode = "PAY_PER_REQUEST"   hash_key     = "Id"   attribute {     name = "Id"     type = "S"   } }  # SNS Topic resource "aws_sns_topic" "ocr_topic" {   name = "ocr-trigger-topic" }  # SQS Queue resource "aws_sqs_queue" "ocr_queue" {   name = "ocr-processing-queue" }  # Подписка SQS к SNS resource "aws_sns_topic_subscription" "sqs_sub" {   topic_arn = aws_sns_topic.ocr_topic.arn   protocol  = "sqs"   endpoint  = aws_sqs_queue.ocr_queue.arn }  # Подписка Lambda на SNS (InvokeBedrock) resource "aws_lambda_permission" "allow_sns_invoke_lambda" {   statement_id  = "AllowExecutionFromSNS"   action        = "lambda:InvokeFunction"   function_name = aws_lambda_function.invoke_bedrock.function_name   principal     = "sns.amazonaws.com"   source_arn    = aws_sns_topic.ocr_topic.arn }  resource "aws_sns_topic_subscription" "lambda_sub" {   topic_arn = aws_sns_topic.ocr_topic.arn   protocol  = "lambda"   endpoint  = aws_lambda_function.invoke_bedrock.arn }  # EventBridge Rule: отлавливаем PutObject в бакете resource "aws_cloudwatch_event_rule" "s3_put" {   name        = "ocr-upload-trigger"   event_pattern = jsonencode({     "source": ["aws.s3"],     "detail-type": ["Object Created"],     "detail": {       "bucket": {"name": [aws_s3_bucket.uploads.bucket]},       "object": {"key": [{"prefix": "uploads/"}]}     }   }) }  # Target — SNS resource "aws_cloudwatch_event_target" "to_sns" {   rule      = aws_cloudwatch_event_rule.s3_put.name   target_id = "SendToSNS"   arn       = aws_sns_topic.ocr_topic.arn }  resource "aws_lambda_function" "invoke_bedrock" {   function_name = "InvokeBedrockOCR"   role          = aws_iam_role.lambda_role.arn   handler       = "invoke.lambda_handler"   runtime       = "python3.10"   filename      = "lambda_invoke.zip"    environment {     variables = {       RESULT_BUCKET = aws_s3_bucket.results.bucket       QUEUE_URL     = aws_sqs_queue.ocr_queue.id     }   } }  resource "aws_lambda_function" "save_result" {   function_name = "SaveOCRToDynamo"   role          = aws_iam_role.lambda_role.arn   handler       = "save.lambda_handler"   runtime       = "python3.10"   filename      = "lambda_save.zip"    environment {     variables = {       TABLE_NAME    = aws_dynamodb_table.ocr_index.name       RESULT_BUCKET = aws_s3_bucket.results.bucket     }   } }  # Привязка очереди к функции сохранения resource "aws_lambda_event_source_mapping" "sqs_trigger" {   event_source_arn = aws_sqs_queue.ocr_queue.arn   function_name    = aws_lambda_function.save_result.arn   batch_size       = 1 }  # IAM Role + Policy resource "aws_iam_role" "lambda_role" {   name = "ocr_lambda_role"   assume_role_policy = jsonencode({     Version = "2012-10-17",     Statement = [{       Effect = "Allow",       Principal = { Service = "lambda.amazonaws.com" },       Action = "sts:AssumeRole"     }]   }) }  resource "aws_iam_role_policy" "lambda_policy" {   name = "ocr_lambda_inline"   role = aws_iam_role.lambda_role.id   policy = jsonencode({     Version = "2012-10-17",     Statement = [       {         Effect = "Allow",         Action = ["s3:GetObject", "s3:PutObject"],         Resource = [           "${aws_s3_bucket.uploads.arn}/*",           "${aws_s3_bucket.results.arn}/*"         ]       },       {         Effect = "Allow",         Action = ["bedrock:InvokeModel"],         Resource = "*"       },       {         Effect = "Allow",         Action = ["dynamodb:PutItem"],         Resource = aws_dynamodb_table.ocr_index.arn       },       {         Effect = "Allow",         Action = ["sqs:SendMessage", "sqs:ReceiveMessage", "sqs:DeleteMessage"],         Resource = aws_sqs_queue.ocr_queue.arn       },       {         Effect = "Allow",         Action = [           "logs:CreateLogGroup",           "logs:CreateLogStream",           "logs:PutLogEvents"         ],         Resource = "arn:aws:logs:*:*:*"       }     ]   }) }

Lambda-функция: обработка PDF и вызов Bedrock

Теперь разберём первую Lambda-функцию (InvokeBedrock), которая:

  1. Получает уведомление из SNS о новом объекте в S3

  2. Определяет, является ли файл PDF или изображением

  3. Если это PDF — передаёт его содержимое напрямую в Bedrock, так как модели Claude 3 умеют обрабатывать PDF-файлы без предварительной конвертации; если это изображение (JPEG, PNG), оно кодируется в base64 и также передаётся модели

  4. Вызывает Amazon Bedrock с содержимым файла и ожидает ответ

  5. Сохраняет результат в S3 и публикует ссылку на него в SQS

lambda_invoke_bedrock.py
import os import json import base64 import boto3 import mimetypes from urllib.parse import unquote_plus from aws_lambda_powertools import Logger  logger = Logger(service="ocr-bedrock")  s3_client = boto3.client("s3") bedrock_client = boto3.client("bedrock-runtime") sqs_client = boto3.client("sqs")  RESULT_BUCKET = os.environ.get("RESULT_BUCKET") QUEUE_URL = os.environ.get("QUEUE_URL") MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"   @logger.inject_lambda_context def lambda_handler(event, context):  # noqa: E501     """Lambda function to process S3 events and invoke Bedrock model for OCR.      Args:         event (dict): The event data from S3.         context (LambdaContext): The context object for the Lambda function.      Raises:         Exception: If there is an error processing the file or invoking the model.     """     for record in event["Records"]:         try:             message = json.loads(record["Sns"]["Message"])             bucket = message["detail"]["bucket"]["name"]             key = unquote_plus(message["detail"]["object"]["key"])             logger.info(f"Processing file: s3://{bucket}/{key}")              obj = s3_client.get_object(Bucket=bucket, Key=key)             content = obj["Body"].read()             mime_type, _ = mimetypes.guess_type(key)              prompt_text = "Extract all the text from the document and return it as plain text."              if mime_type not in ["image/jpeg", "image/png", "application/pdf"]:                 logger.warning(f"Unsupported MIME type: {mime_type}")                 continue              encoded = base64.b64encode(content).decode("utf-8")              request_body = {                 "anthropic_version": "bedrock-2023-05-31",                 "messages": [                     {                         "role": "user",                         "content": [                             {"type": "text", "text": prompt_text},                             {                                 "type": "document" if mime_type in ["application/pdf"] else "image",                                 "source": {                                     "type": "base64",                                     "media_type": mime_type,                                     "data": encoded,                                 },                             },                         ],                     },                 ],             }              response = bedrock_client.invoke_model(                 modelId=MODEL_ID, body=json.dumps(request_body), contentType="application/json"             )              response_body = response["body"].read().decode("utf-8")             result_data = json.loads(response_body)              output_text = result_data.get("outputs", [{}])[0].get("content", [{}])[0].get("text", "")              result_key = key.replace("uploads/", "ocr-results/") + ".json"             s3_client.put_object(                 Bucket=RESULT_BUCKET,                 Key=result_key,                 Body=json.dumps({"text": output_text, "source": key}, ensure_ascii=False).encode("utf-8"),             )              sqs_client.send_message(                 QueueUrl=QUEUE_URL,                 MessageBody=json.dumps({"result_key": result_key, "source_key": key}),             )              logger.info(f"Result saved to s3://{RESULT_BUCKET}/{result_key} and queued in SQS")          except Exception as e:             logger.exception(f"Error processing file: {e}")             raise 

Общий смысл работы InvokeBedrock Lambda

Функция запускается при получении события из SNS, читает загруженный файл из S3 (PDF или изображение), передаёт его в Amazon Bedrock (модель Claude 3), получает текст, сохраняет его в S3, а ссылку на результат публикует в очередь SQS.

Подробное объяснение по шагам:

  1. Логгирование и инициализация

    from aws_lambda_powertools import Logger logger = Logger(service="ocr-bedrock")

    Используем aws-lambda-powertools для структурированного логгирования. Все логи автоматически маркируются ID вызова Lambda, и это удобно при отладке.

  2. Извлекаем путь к файлу из события

    message = json.loads(record["Sns"]["Message"]) bucket = message["detail"]["bucket"]["name"] key = unquote_plus(message["detail"]["object"]["key"])

    SNS сообщение содержит S3-событие. Мы достаём имя бакета и ключ файла.

  3. Получаем содержимое файла из S3

    obj = s3_client.get_object(Bucket=bucket, Key=key) content = obj["Body"].read()

    Загрузка файла в память для дальнейшей передачи в Bedrock.

  4. Определяем MIME-тип

    mime_type,  = mimetypes.guesstype(key)

    Это позволяет нам понять, что за тип файла — PDF или изображение.

  5. Кодируем содержимое

    encoded = base64.b64encode(content).decode("utf-8")

    Для моделей Bedrock мультимодальный ввод передаётся как base64.

  6. Формируем prompt и payload

    prompt_text = "Extract all the text ..." ...  request_body = {"messages": [...]}

    Мы передаём модели текстовую инструкцию + изображение/документ.

  7. Вызов Bedrock

    response = bedrock_client.invoke_model(...) response_body = response["body"].read().decode("utf-8")

    Модель обрабатывает документ и возвращает результат (текст в JSON).

  8. Парсинг результата

    result_data = json.loads(response_body) output_text = result_data.get("outputs", ...).get("text", "")

    Мы пытаемся достать текст из структуры ответа модели.

  9. Сохраняем результат в S3

    result_key = key.replace("uploads/", "ocr-results/") + ".json" s3_client.put_object(..., Key=result_key, Body=json.dumps(...))

    Распознанный текст сохраняется в S3 как JSON-файл.

  10. Публикуем ссылку на результат в SQS

    sqs_client.send_message(     QueueUrl=QUEUE_URL,     MessageBody=json.dumps({"result_key": ..., "source_key": ...}) )

    В очередь мы кладём только ссылку на результат, не сам результат — чтобы не превышать лимит в 256 КБ.

  11. Обработка ошибок

    except Exception as e:     logger.exception(...)

    Ошибки логируются в CloudWatch с полным stack trace.

Lambda-функция: сохранение ссылки в DynamoDB

Вторая функция (SaveOCRToDynamo) срабатывает при поступлении сообщения в очередь SQS. Она очень простая по логике:

  1. Получает сообщение из очереди — это JSON с полями result_key и source_key

  2. Сохраняет эту информацию в таблицу DynamoDB:

    • Id — UUID или messageId

    • SourceFile — путь к исходному файлу в S3

    • ResultFile — путь к JSON с результатом в S3

Пример кода:

lambda_savedb.py
import os import json import boto3 from aws_lambda_powertools import Logger  logger = Logger(service="ocr-save") dynamodb = boto3.resource("dynamodb") table = dynamodb.Table(os.environ.get("TABLE_NAME"))  @logger.inject_lambda_context def lambda_handler(event, context):     for record in event["Records"]:         try:             body = json.loads(record["body"])             result_key = body["result_key"]             source_key = body.get("source_key", "unknown")             record_id = record.get("messageId")              item = {                 "Id": record_id,                 "SourceFile": source_key,                 "ResultFile": result_key             }             table.put_item(Item=item)             logger.info(f"Saved result reference for {source_key} to DynamoDB")          except Exception as e:             logger.exception(f"Error saving result to DynamoDB: {e}")

Таким образом, мы сохраняем только ссылку на результат, а не весь текст, что помогает избежать лимитов DynamoDB на размер записи (400 КБ) и избыточного чтения. Доступ к полному результату можно получить по ключу ResultFile (s3://…).

Масштабирование и ограничения

Вся архитектура построена по принципу «event-driven» и полностью serverless, что даёт отличные возможности масштабирования:

  • Lambda: автоматически масштабируется. Если одновременно загружается 1000+ документов, AWS поднимает столько экземпляров, сколько нужно (в пределах лимита concurrent executions — по умолчанию 1000). При необходимости лимит можно поднять через запрос в Support.

  • SQS: разгружает нагрузку и гарантирует буферизацию. Даже если Lambda временно не справляется, сообщения надёжно лежат в очереди.

  • SNS: фан-аут позволяет подключить дополнительных подписчиков — например, для мониторинга, аналитики или триггеров на дообработку.

  • DynamoDB: режим On-Demand позволяет не думать о настройке throughput. Важно лишь не превышать размер записи и следить за partition key.

  • S3: отлично масштабируется и служит как для хранения оригиналов, так и результатов.

  • Bedrock: основное узкое место. Важно понимать:

    • Ограничение на размер запроса — ~5MB

    • Токенные лимиты у модели (например, max_tokens=4096 или 8192)

    • Возможна задержка при инференсе (особенно на больших PDF)

    • Не все модели доступны в каждом регионе — лучше использовать eu-central-1 или us-east-1

Если объём запросов становится стабильным и большим — можно рассмотреть Provisioned Throughput для Bedrock, чтобы избежать холодных стартов и задержек.

Дополнительные ограничения Amazon Bedrock по нагрузке

Amazon Bedrock, несмотря на всю магию, всё же не бесконечен. У него есть:

  • Лимиты по числу запросов в минуту (TPS) на конкретную модель — это может стать узким горлышком при массовой обработке;

  • Лимиты по токенам: как входным (prompt + media), так и выходным (генерация ответа). У Sonnet, например, ~200k токенов на сутки по умолчанию на аккаунт (может варьироваться).

Решения и обходные пути:

  1. Запрос увеличения квот через Service Quotas — первый и обязательный шаг, если вы видите throttling.

  2. Кросс-региональный инференс (inference profiles) — фича, которая позволяет вам в своём регионе (например, eu-central-1) вызывать модель, размещённую в другом регионе, где доступна нужная квота. Это может сильно разгрузить точку доступа и обойти временные ограничения.

    Подробнее: Using Amazon Bedrock inference profiles

    Пример: вы можете создать Bedrock inference profile, который направляет вызов на модель в us-east-1, даже если ваша Lambda работает в eu-central-1. Это даёт больше гибкости и повышает устойчивость под нагрузкой.

  3. Rate Limiting + очередь — если нагрузка слишком велика, вы можете либо замедлять поступление запросов через throttling на уровне приложения, либо использовать очереди с visibility timeout, чтобы выравнивать поток.

Пример расчета стоимости (eu-central-1)

Допустим, мы обрабатываем 1000 документов в месяц:

Компонент

Кол-во

Оценка стоимости

Bedrock (Claude Sonnet)

~4000 токенов (вход + выход) на 1 документ

~$0.036 × 1000 = $36.00

Lambda (Invoke + Save)

до 6 сек × 1000 вызовов

~$0.10

SQS

1000 сообщений

~$0.01 (или в пределах Free Tier)

DynamoDB

1000 записей

<$0.01 (On-Demand)

S3 (хранение + запросы)

~2 ГБ + операции

~$0.05

Итого

~$36.16 / месяц

Если заменить Sonnet на Claude Haiku, стоимость может упасть до $10–12 за тот же объём (но качество может быть чуть ниже). Также можно использовать batch-инференс или комбинировать документы.

Заключение

Мы построили полнофункциональный serverless-сервис для OCR на базе Amazon Bedrock. Архитектура получилась простой, надёжной и масштабируемой. Вместо традиционного OCR мы используем LLM, что позволяет не просто «считывать текст», а делать это гибко, контекстно, с возможностью уточнять, резюмировать или извлекать только нужные поля.

Фишки подхода:

  • минимальные затраты на поддержку инфраструктуры

  • масштабируемость «из коробки»

  • понятная схема работы, легко расширяемая под новые задачи

  • возможность дообработки, A/B тестирования моделей и фан-аута через SNS


⚠️ Важно: В своем решении я не претендую на уникальность. Также, код в статье адаптирован для демонстрации архитектуры и упрощён для читаемости. Некоторые фрагменты могут быть не полностью production-ready — например, типы ошибок Bedrock, исключения, retry-логика, валидация параметров и пр.

В продакшене обязательно:

  • убедитесь в правильной настройке IAM-ролей,

  • проверьте лимиты и квоты в Bedrock, Lambda и SQS,

  • добавьте retry/timeout/логгирование,

  • протестируйте код под реальную нагрузку.

Если будете использовать код как основу — тестируйте и дорабатывайте под свой проект. Всё, что здесь показано, не содержит чувствительных данных и подготовлено исключительно в ознакомительных целях.


Bonus: Step Functions

Все вышеперечисленное можно описать в одной простой State Machine:

Comment: >-   A Bedrock OCR state machine that invokes a Bedrock model and saves the   results to S3 and DynamoDB. StartAt: Prepare Bedrock payload States:   Prepare Bedrock payload:     Type: Pass     Next: Bedrock InvokeModel   Bedrock InvokeModel:     Type: Task     Resource: arn:aws:states:::bedrock:invokeModel     Parameters: {}     Next: Save to s3   Save to s3:     Type: Task     Resource: arn:aws:states:::lambda:invoke     OutputPath: $.Payload     Parameters:       Payload.$: $     Retry:       - ErrorEquals:           - Lambda.ServiceException           - Lambda.AWSLambdaException           - Lambda.SdkClientException           - Lambda.TooManyRequestsException         IntervalSeconds: 1         MaxAttempts: 3         BackoffRate: 2         JitterStrategy: FULL     Next: Parallel State   Parallel State:     Comment: >-       A Parallel state can be used to create parallel branches of execution in       your state machine.     Type: Parallel     Branches:       - StartAt: DynamoDB PutItem         States:           DynamoDB PutItem:             Type: Task             Resource: arn:aws:states:::dynamodb:putItem             Parameters:               TableName: MyDynamoDBTable               Item:                 Column:                   S: MyEntry             End: true       - StartAt: Do whatever else         States:           Do whatever else:             Type: Pass             End: true     End: true 


ссылка на оригинал статьи https://habr.com/ru/articles/906738/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *