Копирование элементов DynamoDB между таблицами с помощью Python
О чём эта статья
В этой статье вы найдёте:
- готовый скрипт на Python для копирования элементов между таблицами DynamoDB;
- инструкцию по запуску и объяснение вариантов поведения в четырёх основных сценариях;
- рекомендации по безопасности, тестированию и откату;
- чек‑листы для ролей (администратор, DevOps, разработчик).
Важно: предполагается базовое знание Python. Скрипт можно запускать на любой машине с Python и доступом в интернет. Оригинальный скрипт использует Python 2.7.16, но в разделе «Совместимость» описаны рекомендации для Python 3.
Предпосылки
- Базовое представление о Python.
- Установленный Python и пакет Boto3 (пример: pip install boto3).
- AWS‑аккаунт и учётные данные IAM (access_key, secret_key) с достаточными правами на DynamoDB.
- Доступ в нужный AWS‑регион (в коде указан eu-west-3 — при необходимости измените).
Проверки на машине:
- Проверка Python:
python --version- Проверка pip:
pip --version- Проверка Boto3:
pip show boto3Что мы сделаем
- Проверим предпосылки.
- Создадим файл со скриптом (ниже — полный код).
- Запустим скрипт с аргументами (пример синтаксиса ниже).
Полный код скрипта
Создайте файл copy-dynamodb-table.py и вставьте этот код. Код также доступен в репозитории автора на GitHub: https://github.com/shivalkarrahul/DevOps/blob/master/aws/python/aws-copy-dynamo-db-table/copy-dynamodb-table.py
import boto3
import os
import sys
import argparse
import datetime
global args
parser = argparse.ArgumentParser()
parser.add_argument('-sa', '--source_aws_access_key_id', required=True, action="store", dest="source_aws_access_key_id",
help="Source AWS Account aws_access_key_id", default=None)
parser.add_argument('-ss', '--source_aws_secret_access_key', required=True, action="store", dest="source_aws_secret_access_key",
help="Source AWS Account aws_secret_access_key", default=None)
parser.add_argument('-da', '--destination_aws_access_key_id', required=True, action="store", dest="destination_aws_access_key_id",
help="Destination AWS Account aws_access_key_id", default=None)
parser.add_argument('-ds', '--destination_aws_secret_access_key', required=True, action="store", dest="destination_aws_secret_access_key",
help="Destination AWS Account aws_secret_access_key", default=None)
parser.add_argument('-st', '--sourceTableName', required=True, action="store", dest="sourceTableName",
help="Source AWS Account DyanamoDB Table", default=None)
parser.add_argument('-dt', '--destinationTableName', required=True, action="store", dest="destinationTableName",
help="Destination AWS Account DyanamoDB Table", default=None)
args = parser.parse_args()
source_aws_access_key_id = args.source_aws_access_key_id
source_aws_secret_access_key = args.source_aws_secret_access_key
destination_aws_access_key_id = args.destination_aws_access_key_id
destination_aws_secret_access_key = args.destination_aws_secret_access_key
sourceTableName=args.sourceTableName
destinationTableName=args.destinationTableName
sourceTableExists = "false"
destinationTableExists = "false"
print("Printing values")
print("source_aws_access_key_id", source_aws_access_key_id)
print("source_aws_secret_access_key", source_aws_secret_access_key)
print("destination_aws_access_key_id", destination_aws_access_key_id)
print("destination_aws_secret_access_key", destination_aws_secret_access_key)
print("sourceTableName", sourceTableName)
print("destinationTableName", destinationTableName)
timeStamp = datetime.datetime.now()
backupName = destinationTableName + str(timeStamp.strftime("-%Y_%m_%d_%H_%M_%S"))
item_count = 1000 #Specify total number of items to be copied here, this helps when a specified number of items need to be copied
counter = 1 # Don't not change this
source_session = boto3.Session(region_name='eu-west-3', aws_access_key_id=source_aws_access_key_id, aws_secret_access_key=source_aws_secret_access_key)
source_dynamo_client = source_session.client('dynamodb')
target_session = boto3.Session(region_name='eu-west-3', aws_access_key_id=destination_aws_access_key_id, aws_secret_access_key=destination_aws_secret_access_key)
target_dynamodb = target_session.resource('dynamodb')
dynamoclient = boto3.client('dynamodb', region_name='eu-west-3', #Specify the region here
aws_access_key_id=source_aws_access_key_id, #Add you source account's access key here
aws_secret_access_key=source_aws_secret_access_key) #Add you source account's secret key here
dynamotargetclient = boto3.client('dynamodb', region_name='eu-west-3', #Specify the region here
aws_access_key_id=destination_aws_access_key_id, #Add you destination account's access key here
aws_secret_access_key=destination_aws_secret_access_key) #Add you destination account's secret key here
# response = dynamotargetclient.list_tables()
# print("List of tables", response)
dynamopaginator = dynamoclient.get_paginator('scan')
def validateTables(sourceTable, destinationTable):
print("Inside validateTables")
try:
dynamoclient.describe_table(TableName=sourceTable)
sourceTableExists = "true"
except dynamotargetclient.exceptions.ResourceNotFoundException:
sourceTableExists = "false"
try:
dynamotargetclient.describe_table(TableName=destinationTable)
destinationTableExists = "true"
except dynamotargetclient.exceptions.ResourceNotFoundException:
destinationTableExists = "false"
return {'sourceTableExists': sourceTableExists, 'destinationTableExists':destinationTableExists}
def copyTable(sourceTable, destinationTable,item_count,counter):
print("Inside copyTable")
print("Coping", sourceTable, "to", destinationTable)
print('Start Reading the Source Table')
try:
dynamoresponse = dynamopaginator.paginate(
TableName=sourceTable,
Select='ALL_ATTRIBUTES',
ReturnConsumedCapacity='NONE',
ConsistentRead=True
)
except dynamotargetclient.exceptions.ResourceNotFoundException:
print("Table does not exist")
print("Exiting")
sys.exit()
print('Finished Reading the Table')
print('Proceed with writing to the Destination Table')
print("Writing first", item_count , "items" )
print(dynamoresponse)
for page in dynamoresponse:
for item in page['Items']:
if (counter == item_count):
print("exiting")
sys.exit()
else:
print('writing item no', counter)
dynamotargetclient.put_item(
TableName=destinationTable,
Item=item
)
counter = counter + 1
def backupTable(destTableName, backupTimeStamp):
print("Inside backupTable")
print("Taking backup of = ", destTableName)
print("Backup Name = ", backupTimeStamp)
response = dynamotargetclient.create_backup(
TableName=destTableName,
BackupName=backupTimeStamp
)
print("Backup ARN =", response["BackupDetails"]["BackupArn"])
def deleteDestinationTable(destTableName):
print("Inside deleteDestinationTable")
try:
dynamotargetclient.delete_table(TableName=destTableName)
waiter = dynamotargetclient.get_waiter('table_not_exists')
waiter.wait(TableName=destTableName)
print("Table deleted")
except dynamotargetclient.exceptions.ResourceNotFoundException:
print("Table does not exist")
def doesNotExist():
print("Inside doesNotExist")
print("Destination table does not exist ")
print("Exiting the execution")
# sys.exit()
def createDestinationTable(sourceTable):
print("Inside createDestinationTable")
source_table = source_session.resource('dynamodb').Table(sourceTable)
target_table = target_dynamodb.create_table(
TableName=destinationTableName,
KeySchema=source_table.key_schema,
AttributeDefinitions=source_table.attribute_definitions,
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
})
target_table.wait_until_exists()
target_table.reload()
result = validateTables(sourceTableName, destinationTableName)
print("value of sourceTableExists = ", result['sourceTableExists'])
print("value of destinationTableExists = ", result['destinationTableExists'])
if (result['sourceTableExists'] == "false" ) and (result['destinationTableExists'] == "false" ):
print("Both the tables do not exist")
elif (result['sourceTableExists'] == "false" ) and (result['destinationTableExists'] == "true" ):
print("Source Table does not exist")
elif (result['sourceTableExists'] == "true" ) and (result['destinationTableExists'] == "false" ):
createDestinationTable(sourceTableName)
copyTable(sourceTableName, destinationTableName, item_count, counter)
elif (result['sourceTableExists'] == "true" ) and (result['destinationTableExists'] == "true" ):
backupTable(destinationTableName, backupName)
deleteDestinationTable(destinationTableName)
createDestinationTable(sourceTableName)
copyTable(sourceTableName, destinationTableName, item_count, counter)
else:
print("Something is wrong")Синтаксис запуска
Пример запуска из командной строки:
python copy-dynamodb-table.py -sa -ss -da -ds -st -dt Пример с подставленными значениями (только пример — не используйте чужие ключи в проде):
python copy-dynamodb-table.py -sa AKI12345IA5XJXFLMTQR -ss ihiHd8+NzLJ567890z4i6EwcN6hbV2A5cMfurscg -da AKI12345IA5XJXFLMTQR -ds ihiHd8+NzLJ567890z4i6EwcN6hbV2A5cMfurscg -st my-source-table -dt my-destination-tableЗдесь:
- -sa: access key источника
- -ss: secret key источника
- -da: access key назначения
- -ds: secret key назначения
- -st: имя исходной таблицы
- -dt: имя целевой таблицы
Используйте свои ключи и не храните секреты в репозиториях.
Разбор сценариев использования
Скрипт реализует четыре основных сценария:
- Оба таблицы не существуют. Скрипт выйдет с сообщением “Both the tables do not exist”.
- Исходная таблица не существует, а целевая — существует. Скрипт выйдет с сообщением “Source Table does not exist”.
- Исходная таблица существует, а целевая — не существует. Скрипт создаст целевую таблицу, скопирует в неё элементы.
- Оба таблицы существуют. Скрипт создаёт резервную копию целевой таблицы, удаляет её, создаёт заново с той же схемой и копирует данные из источника.
Use-case 1: оба таблицы не существуют

Use-case 2: исходная таблица не существует

Use-case 3: исходная существует, целевая не существует — скрипт создаёт целевую и копирует

Use-case 4: обе таблицы существуют — делается резервная копия и копирование

Как это работает (коротко)
- С помощью Boto3 скрипт проверяет наличие таблиц в исходном и целевом аккаунтах.
- Если нужно — создаёт целевую таблицу на основе схемы исходной (ключи и определения атрибутов).
- Читает элементы страницами через paginator.scan и делает put_item в целевую таблицу.
- Перед перезаписью существующей целевой таблицы создаётся бэкап через create_backup.
Советы по безопасности и надежности
- Никогда не храните AWS секреты в явном виде в репозитории. Используйте роли (IAM Role) или профили AWS CLI.
- Для копирования между аккаунтами предпочтительнее настраивать временные креденшелы через STS или IAM Role с доверительными отношениями.
- При больших объёмах данных используйте потоковое считывание и пакетные операции (BatchWriteItem) для эффективности.
- Мониторьте потребление RCU/WCU и задействуйте экспоненциальное увеличение пропускной способности при необходимости.
- Включите шифрование на уровне таблицы (SSE) если требуется защита данных в покое.
- Для приватного трафика используйте VPC Endpoint для DynamoDB.
Совместимость и улучшения (версия Python)
Исходный скрипт тестировался с Python 2.7.16. Для работы на Python 3:
- Убедитесь, что используете pip для установки boto3 в среде Python 3: pip3 install boto3.
- Проверьте print() — в коде выше используются скобки, совместимые с Python 3.
- Рекомендуется запускать на Python 3.8+ — поддержка Python 2 прекращена.
Для производительных переносов данных замените последовательные put_item на BatchWriteItem с обработкой unprocessed items.
План тестирования и критерии приёмки
Критерии приёмки:
- Скрипт запускается с корректными ключами и аргументами.
- При пустом источнике не создаются лишние элементы в целевой таблице.
- После успешного выполнения количество элементов в целевой таблице равняется количеству скопированных элементов (или ожидаемому лимиту item_count).
- В режиме, когда целевая таблица существовала, перед созданием нового состояния создан бэкап (проверяем наличие BackupArn).
Тестовые сценарии:
- Тест 1: обоих таблиц нет — ожидается корректный выход с сообщением.
- Тест 2: исходной нет, целевая есть — выход с сообщением.
- Тест 3: исходная есть, целевая нет — создаётся целевая, данные скопированы.
- Тест 4: обе есть — создаётся бэкап, таблица удаляется и восстанавливается, данные скопированы.
Чек‑листы по ролям
Администратор:
- Убедиться, что IAM‑учётные записи/роли имеют права dynamodb:DescribeTable, dynamodb:CreateBackup, dynamodb:DeleteTable, dynamodb:CreateTable, dynamodb:PutItem, dynamodb:Scan.
- Проверить лимиты и бюджеты AWS для операций.
DevOps:
- Подготовить окружение (виртуальная машина или CI) с Python и boto3.
- Настроить переменные окружения или профиль AWS, чтобы не передавать ключи в командной строке.
- Настроить CloudWatch для контроля ошибок и показателей.
Разработчик:
- Протестировать скрипт на тестовой таблице с репрезентативными данными.
- Проверить корректность схемы ключей после копирования.
Процедура отката и восстановление (инцидент‑ранбук)
- Если была удалена целевая таблица, найдите BackupArn, созданный перед удалением.
- Через консоль AWS или API выполните restore_table_from_backup с BackupArn.
- Проверьте целостность данных и схему ключей.
- Если восстановление невозможно, используйте экспорт/импорт через S3 (если был настроен экспорт).
Альтернативы и когда этот подход не подходит
Альтернативы:
- AWS Data Pipeline или AWS Glue — подходят для ETL, трансформации и больших объёмов.
- DynamoDB Streams + Lambda — для постоянной репликации изменений в реальном времени.
- AWS DMS (Database Migration Service) — для более сложных сценариев миграции.
Когда скрипт не подходит:
- Очень большие таблицы (миллионы записей) — используйте пакетную обработку (BatchWriteItem) и распределённые задачи.
- Необходимость трансформации данных при копировании — выполняйте ETL через Glue или кастомные Lambda.
Мини‑методология переноса (быстрая инструкция)
- Подготовьте тестовую таблицу и тестовый аккаунт.
- Настройте IAM‑роли/профили для доступа без явных ключей.
- Запустите скрипт с небольшим item_count для проверки.
- Проверьте целостность и производительность.
- Запустите перенос на полную таблицу с мониторингом.
Модель принятия решений (Mermaid)
flowchart TD
A[Начало] --> B{Существуют ли таблицы?}
B -->|Нет/Нет| C[Выход: обе таблицы отсутствуют]
B -->|Нет/Да| D[Выход: исходная отсутствует]
B -->|Да/Нет| E[Создать целевую таблицу и копировать]
B -->|Да/Да| F[Создать бэкап → Удалить целевую → Создать → Копировать]
E --> G[Завершено]
F --> GРиски и меры смягчения
- Риск: потеря данных при удалении целевой таблицы. Мера: всегда проверять создание бэкапа и его успешное завершение перед удалением.
- Риск: превышение RCU/WCU. Мера: увеличивать пропускную способность или использовать экспоненциальную задержку.
- Риск: утечка ключей. Мера: использовать IAM роли и временные креденшелы.
Примечания по GDPR и приватности
Если в таблицах содержатся персональные данные, убедитесь, что:
- шифрование данных включено (SSE);
- доступ ограничен минимально необходимыми правами;
- логирование и аудит включены для отслеживания операций копирования.
Заключение
В статье приведён рабочий скрипт на Python для копирования элементов из одной таблицы DynamoDB в другую, рассмотрены возможные сценарии, рекомендации по безопасности и тестированию, а также варианты улучшений для больших объёмов. Используйте этот скрипт как основу и адаптируйте под свои требования: добавьте BatchWriteItem, обработку ошибок и временные роли для безопасной работы.
Похожие материалы
RDP: полный гид по настройке и безопасности
Android как клавиатура и трекпад для Windows
Советы и приёмы для работы с PDF
Calibration в Lightroom Classic: как и когда использовать
Отключить Siri Suggestions на iPhone