Гид по технологиям

Копирование элементов DynamoDB между таблицами с помощью Python

6 min read AWS Обновлено 25 Nov 2025
Копирование таблиц DynamoDB на Python
Копирование таблиц DynamoDB на Python

О чём эта статья

В этой статье вы найдёте:

  • готовый скрипт на Python для копирования элементов между таблицами DynamoDB;
  • инструкцию по запуску и объяснение вариантов поведения в четырёх основных сценариях;
  • рекомендации по безопасности, тестированию и откату;
  • чек‑листы для ролей (администратор, DevOps, разработчик).

Важно: предполагается базовое знание Python. Скрипт можно запускать на любой машине с Python и доступом в интернет. Оригинальный скрипт использует Python 2.7.16, но в разделе «Совместимость» описаны рекомендации для Python 3.

Предпосылки

  1. Базовое представление о Python.
  2. Установленный Python и пакет Boto3 (пример: pip install boto3).
  3. AWS‑аккаунт и учётные данные IAM (access_key, secret_key) с достаточными правами на DynamoDB.
  4. Доступ в нужный AWS‑регион (в коде указан eu-west-3 — при необходимости измените).

Проверки на машине:

  • Проверка Python:
python --version
  • Проверка pip:
pip --version
  • Проверка Boto3:
pip show boto3

Что мы сделаем

  1. Проверим предпосылки.
  2. Создадим файл со скриптом (ниже — полный код).
  3. Запустим скрипт с аргументами (пример синтаксиса ниже).

Полный код скрипта

Создайте файл copy-dynamodb-table.py и вставьте этот код. Код также доступен в репозитории автора на GitHub: https://github.com/shivalkarrahul/DevOps/blob/master/aws/python/aws-copy-dynamo-db-table/copy-dynamodb-table.py

import boto3
import os
import sys
import argparse
import datetime


global args
parser = argparse.ArgumentParser()

parser.add_argument('-sa', '--source_aws_access_key_id', required=True, action="store", dest="source_aws_access_key_id",
                    help="Source AWS Account aws_access_key_id", default=None)
parser.add_argument('-ss', '--source_aws_secret_access_key', required=True, action="store", dest="source_aws_secret_access_key",
                    help="Source AWS Account aws_secret_access_key", default=None)
parser.add_argument('-da', '--destination_aws_access_key_id', required=True, action="store", dest="destination_aws_access_key_id",
                    help="Destination AWS Account aws_access_key_id", default=None)
parser.add_argument('-ds', '--destination_aws_secret_access_key', required=True, action="store", dest="destination_aws_secret_access_key",
                    help="Destination AWS Account aws_secret_access_key", default=None)
parser.add_argument('-st', '--sourceTableName', required=True, action="store", dest="sourceTableName",
                    help="Source AWS Account DyanamoDB Table", default=None)
parser.add_argument('-dt', '--destinationTableName', required=True, action="store", dest="destinationTableName",
                    help="Destination AWS Account DyanamoDB Table", default=None) 
args = parser.parse_args()                                                                                                                       

source_aws_access_key_id = args.source_aws_access_key_id
source_aws_secret_access_key = args.source_aws_secret_access_key

destination_aws_access_key_id = args.destination_aws_access_key_id
destination_aws_secret_access_key = args.destination_aws_secret_access_key


sourceTableName=args.sourceTableName 
destinationTableName=args.destinationTableName 

sourceTableExists = "false" 
destinationTableExists = "false" 

print("Printing values")
print("source_aws_access_key_id", source_aws_access_key_id)
print("source_aws_secret_access_key", source_aws_secret_access_key)
print("destination_aws_access_key_id", destination_aws_access_key_id)
print("destination_aws_secret_access_key", destination_aws_secret_access_key)
print("sourceTableName", sourceTableName)
print("destinationTableName", destinationTableName)


timeStamp = datetime.datetime.now()
backupName = destinationTableName + str(timeStamp.strftime("-%Y_%m_%d_%H_%M_%S"))

item_count = 1000 #Specify total number of items to be copied here, this helps when a specified number of items need to be copied
counter = 1 # Don't not change this

source_session = boto3.Session(region_name='eu-west-3', aws_access_key_id=source_aws_access_key_id, aws_secret_access_key=source_aws_secret_access_key)
source_dynamo_client = source_session.client('dynamodb')

target_session = boto3.Session(region_name='eu-west-3', aws_access_key_id=destination_aws_access_key_id, aws_secret_access_key=destination_aws_secret_access_key)
target_dynamodb = target_session.resource('dynamodb')


dynamoclient = boto3.client('dynamodb', region_name='eu-west-3', #Specify the region here
    aws_access_key_id=source_aws_access_key_id,  #Add you source account's access key here
    aws_secret_access_key=source_aws_secret_access_key) #Add you source account's secret key here

dynamotargetclient = boto3.client('dynamodb', region_name='eu-west-3', #Specify the region here
    aws_access_key_id=destination_aws_access_key_id, #Add you destination account's access key here
    aws_secret_access_key=destination_aws_secret_access_key) #Add you destination account's secret key here
# response = dynamotargetclient.list_tables()
# print("List of tables", response)

dynamopaginator = dynamoclient.get_paginator('scan')

def validateTables(sourceTable, destinationTable):
    print("Inside validateTables")
    try:
        dynamoclient.describe_table(TableName=sourceTable)
        sourceTableExists = "true"
    except dynamotargetclient.exceptions.ResourceNotFoundException:
        sourceTableExists = "false"


    try:
        dynamotargetclient.describe_table(TableName=destinationTable)
        destinationTableExists = "true"
    except dynamotargetclient.exceptions.ResourceNotFoundException:
        destinationTableExists = "false"
    
    return {'sourceTableExists': sourceTableExists, 'destinationTableExists':destinationTableExists}        



def copyTable(sourceTable, destinationTable,item_count,counter):
    
    print("Inside copyTable")
    print("Coping", sourceTable, "to", destinationTable)

    print('Start Reading the Source Table')
    try:
            dynamoresponse = dynamopaginator.paginate(
            TableName=sourceTable,
            Select='ALL_ATTRIBUTES',
            ReturnConsumedCapacity='NONE',
            ConsistentRead=True
        )
    except dynamotargetclient.exceptions.ResourceNotFoundException:
        print("Table does not exist")
        print("Exiting")
        sys.exit()

    print('Finished Reading the Table')
    print('Proceed with writing to the Destination Table')
    print("Writing first", item_count , "items" )
    print(dynamoresponse)
    for page in dynamoresponse:
        for item in page['Items']:
            if (counter ==  item_count):
                print("exiting")
                sys.exit()
            else:      
                print('writing item no', counter)
                dynamotargetclient.put_item(
                    TableName=destinationTable,
                    Item=item
                    )   
            counter = counter + 1

def backupTable(destTableName, backupTimeStamp):
    print("Inside backupTable")
    print("Taking backup of = ", destTableName)
    print("Backup Name = ", backupTimeStamp)

    response = dynamotargetclient.create_backup(
        TableName=destTableName,
        BackupName=backupTimeStamp
    )
    print("Backup ARN =", response["BackupDetails"]["BackupArn"])

def deleteDestinationTable(destTableName):
    print("Inside deleteDestinationTable")
    try:
        dynamotargetclient.delete_table(TableName=destTableName)
        waiter = dynamotargetclient.get_waiter('table_not_exists')
        waiter.wait(TableName=destTableName)
        print("Table deleted")
    except dynamotargetclient.exceptions.ResourceNotFoundException:
        print("Table does not exist")


def doesNotExist():
    print("Inside doesNotExist")
    print("Destination table does not exist ")
    print("Exiting the execution")
    # sys.exit()

def createDestinationTable(sourceTable):
    print("Inside createDestinationTable")
    source_table = source_session.resource('dynamodb').Table(sourceTable)

    target_table = target_dynamodb.create_table(
    TableName=destinationTableName,
    KeySchema=source_table.key_schema,
    AttributeDefinitions=source_table.attribute_definitions,
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    })

    target_table.wait_until_exists()
    target_table.reload()


result = validateTables(sourceTableName, destinationTableName)
print("value of sourceTableExists = ", result['sourceTableExists'])
print("value of destinationTableExists = ", result['destinationTableExists'])

if (result['sourceTableExists'] == "false" ) and (result['destinationTableExists'] == "false" ):
    print("Both the tables do not exist")

elif (result['sourceTableExists'] == "false" ) and (result['destinationTableExists'] == "true" ):
    print("Source Table does not exist")

elif (result['sourceTableExists'] == "true" ) and (result['destinationTableExists'] == "false" ):
    createDestinationTable(sourceTableName)
    copyTable(sourceTableName, destinationTableName, item_count, counter)

elif (result['sourceTableExists'] == "true" ) and (result['destinationTableExists'] == "true" ):
    backupTable(destinationTableName, backupName)
    deleteDestinationTable(destinationTableName)

    createDestinationTable(sourceTableName)
    copyTable(sourceTableName, destinationTableName, item_count, counter)

else:
    print("Something is wrong")

Синтаксис запуска

Пример запуска из командной строки:

python copy-dynamodb-table.py -sa  -ss  -da  -ds  -st  -dt 

Пример с подставленными значениями (только пример — не используйте чужие ключи в проде):

python copy-dynamodb-table.py -sa AKI12345IA5XJXFLMTQR -ss ihiHd8+NzLJ567890z4i6EwcN6hbV2A5cMfurscg -da AKI12345IA5XJXFLMTQR -ds ihiHd8+NzLJ567890z4i6EwcN6hbV2A5cMfurscg -st my-source-table -dt my-destination-table

Здесь:

  • -sa: access key источника
  • -ss: secret key источника
  • -da: access key назначения
  • -ds: secret key назначения
  • -st: имя исходной таблицы
  • -dt: имя целевой таблицы

Используйте свои ключи и не храните секреты в репозиториях.

Разбор сценариев использования

Скрипт реализует четыре основных сценария:

  1. Оба таблицы не существуют. Скрипт выйдет с сообщением “Both the tables do not exist”.
  2. Исходная таблица не существует, а целевая — существует. Скрипт выйдет с сообщением “Source Table does not exist”.
  3. Исходная таблица существует, а целевая — не существует. Скрипт создаст целевую таблицу, скопирует в неё элементы.
  4. Оба таблицы существуют. Скрипт создаёт резервную копию целевой таблицы, удаляет её, создаёт заново с той же схемой и копирует данные из источника.

Use-case 1: оба таблицы не существуют

Оба таблицы не существуют

Use-case 2: исходная таблица не существует

Исходная таблица не существует

Use-case 3: исходная существует, целевая не существует — скрипт создаёт целевую и копирует

Целевая таблица не существует — создаётся и копируется

Use-case 4: обе таблицы существуют — делается резервная копия и копирование

Обе таблицы существуют — резерв и копирование

Как это работает (коротко)

  • С помощью Boto3 скрипт проверяет наличие таблиц в исходном и целевом аккаунтах.
  • Если нужно — создаёт целевую таблицу на основе схемы исходной (ключи и определения атрибутов).
  • Читает элементы страницами через paginator.scan и делает put_item в целевую таблицу.
  • Перед перезаписью существующей целевой таблицы создаётся бэкап через create_backup.

Советы по безопасности и надежности

  • Никогда не храните AWS секреты в явном виде в репозитории. Используйте роли (IAM Role) или профили AWS CLI.
  • Для копирования между аккаунтами предпочтительнее настраивать временные креденшелы через STS или IAM Role с доверительными отношениями.
  • При больших объёмах данных используйте потоковое считывание и пакетные операции (BatchWriteItem) для эффективности.
  • Мониторьте потребление RCU/WCU и задействуйте экспоненциальное увеличение пропускной способности при необходимости.
  • Включите шифрование на уровне таблицы (SSE) если требуется защита данных в покое.
  • Для приватного трафика используйте VPC Endpoint для DynamoDB.

Совместимость и улучшения (версия Python)

Исходный скрипт тестировался с Python 2.7.16. Для работы на Python 3:

  • Убедитесь, что используете pip для установки boto3 в среде Python 3: pip3 install boto3.
  • Проверьте print() — в коде выше используются скобки, совместимые с Python 3.
  • Рекомендуется запускать на Python 3.8+ — поддержка Python 2 прекращена.

Для производительных переносов данных замените последовательные put_item на BatchWriteItem с обработкой unprocessed items.

План тестирования и критерии приёмки

Критерии приёмки:

  1. Скрипт запускается с корректными ключами и аргументами.
  2. При пустом источнике не создаются лишние элементы в целевой таблице.
  3. После успешного выполнения количество элементов в целевой таблице равняется количеству скопированных элементов (или ожидаемому лимиту item_count).
  4. В режиме, когда целевая таблица существовала, перед созданием нового состояния создан бэкап (проверяем наличие BackupArn).

Тестовые сценарии:

  • Тест 1: обоих таблиц нет — ожидается корректный выход с сообщением.
  • Тест 2: исходной нет, целевая есть — выход с сообщением.
  • Тест 3: исходная есть, целевая нет — создаётся целевая, данные скопированы.
  • Тест 4: обе есть — создаётся бэкап, таблица удаляется и восстанавливается, данные скопированы.

Чек‑листы по ролям

Администратор:

  • Убедиться, что IAM‑учётные записи/роли имеют права dynamodb:DescribeTable, dynamodb:CreateBackup, dynamodb:DeleteTable, dynamodb:CreateTable, dynamodb:PutItem, dynamodb:Scan.
  • Проверить лимиты и бюджеты AWS для операций.

DevOps:

  • Подготовить окружение (виртуальная машина или CI) с Python и boto3.
  • Настроить переменные окружения или профиль AWS, чтобы не передавать ключи в командной строке.
  • Настроить CloudWatch для контроля ошибок и показателей.

Разработчик:

  • Протестировать скрипт на тестовой таблице с репрезентативными данными.
  • Проверить корректность схемы ключей после копирования.

Процедура отката и восстановление (инцидент‑ранбук)

  1. Если была удалена целевая таблица, найдите BackupArn, созданный перед удалением.
  2. Через консоль AWS или API выполните restore_table_from_backup с BackupArn.
  3. Проверьте целостность данных и схему ключей.
  4. Если восстановление невозможно, используйте экспорт/импорт через S3 (если был настроен экспорт).

Альтернативы и когда этот подход не подходит

Альтернативы:

  • AWS Data Pipeline или AWS Glue — подходят для ETL, трансформации и больших объёмов.
  • DynamoDB Streams + Lambda — для постоянной репликации изменений в реальном времени.
  • AWS DMS (Database Migration Service) — для более сложных сценариев миграции.

Когда скрипт не подходит:

  • Очень большие таблицы (миллионы записей) — используйте пакетную обработку (BatchWriteItem) и распределённые задачи.
  • Необходимость трансформации данных при копировании — выполняйте ETL через Glue или кастомные Lambda.

Мини‑методология переноса (быстрая инструкция)

  1. Подготовьте тестовую таблицу и тестовый аккаунт.
  2. Настройте IAM‑роли/профили для доступа без явных ключей.
  3. Запустите скрипт с небольшим item_count для проверки.
  4. Проверьте целостность и производительность.
  5. Запустите перенос на полную таблицу с мониторингом.

Модель принятия решений (Mermaid)

flowchart TD
  A[Начало] --> B{Существуют ли таблицы?}
  B -->|Нет/Нет| C[Выход: обе таблицы отсутствуют]
  B -->|Нет/Да| D[Выход: исходная отсутствует]
  B -->|Да/Нет| E[Создать целевую таблицу и копировать]
  B -->|Да/Да| F[Создать бэкап → Удалить целевую → Создать → Копировать]
  E --> G[Завершено]
  F --> G

Риски и меры смягчения

  • Риск: потеря данных при удалении целевой таблицы. Мера: всегда проверять создание бэкапа и его успешное завершение перед удалением.
  • Риск: превышение RCU/WCU. Мера: увеличивать пропускную способность или использовать экспоненциальную задержку.
  • Риск: утечка ключей. Мера: использовать IAM роли и временные креденшелы.

Примечания по GDPR и приватности

Если в таблицах содержатся персональные данные, убедитесь, что:

  • шифрование данных включено (SSE);
  • доступ ограничен минимально необходимыми правами;
  • логирование и аудит включены для отслеживания операций копирования.

Заключение

В статье приведён рабочий скрипт на Python для копирования элементов из одной таблицы DynamoDB в другую, рассмотрены возможные сценарии, рекомендации по безопасности и тестированию, а также варианты улучшений для больших объёмов. Используйте этот скрипт как основу и адаптируйте под свои требования: добавьте BatchWriteItem, обработку ошибок и временные роли для безопасной работы.

Поделиться: X/Twitter Facebook LinkedIn Telegram
Автор
Редакция

Похожие материалы

RDP: полный гид по настройке и безопасности
Инфраструктура

RDP: полный гид по настройке и безопасности

Android как клавиатура и трекпад для Windows
Гайды

Android как клавиатура и трекпад для Windows

Советы и приёмы для работы с PDF
Документы

Советы и приёмы для работы с PDF

Calibration в Lightroom Classic: как и когда использовать
Фото

Calibration в Lightroom Classic: как и когда использовать

Отключить Siri Suggestions на iPhone
iOS

Отключить Siri Suggestions на iPhone

Рисование таблиц в Microsoft Word — руководство
Office

Рисование таблиц в Microsoft Word — руководство