Гид по технологиям

Агрегационный pipeline в MongoDB: руководство по стадиям и оптимизации

8 min read Базы данных Обновлено 24 Dec 2025
Агрегационный pipeline в MongoDB — руководство
Агрегационный pipeline в MongoDB — руководство

Иконка базы данных и обработки с логотипом MongoDB

Что такое агрегация в MongoDB и как она работает

Схема процесса агрегационного конвейера

Агрегационный pipeline — это последовательность стадий (stages), через которые проходит набор документов. Каждая стадия принимает документы на входе и отдаёт один или несколько преобразованных документов на выходе. Результат одной стадии может быть входом для следующей.

Ключевая идея: представьте pipeline как фабрику обработки данных — на каждом конвейере выполняется одно логическое действие (фильтрация, группировка, разворачивание массива, сортировка и т. д.). Такая модульность упрощает отладку, повторное использование и оптимизацию.

Определения в одну строку:

  • Документ — BSON-объект (аналог JSON) в коллекции.
  • Стадия — отдельный шаг в массиве агрегатных операций.
  • Pipeline — массив стадий, переданный в db.collection.aggregate([…]).

Основные стадии агрегации

Ниже — описание самых распространённых стадий с примерами и рекомендациями, когда их применять.

$match — фильтрация входного набора

$match отбирает только те документы, которые соответствуют заданному условию. Используйте $match как можно раньше, чтобы уменьшить объём данных для последующих стадий.

Пример:

{ $match: { Sold: { $gte: 5 } } }

Важно: если возможно — применяйте фильтр до разворачивания массивов ($unwind) и до тяжёлых группировок.

$group — агрегация по ключам

$group группирует документы по значению поля (или комбинации) и вычисляет агрегаты ($sum, $avg, $min, $max и т. д.). Поле _id в $group становится ключом группы.

Пример исходных данных (sales):

Пример таблицы sales (образец данных)

Пример группировки, подсчитывающей суммарные продажи и максимальную сумму для каждой секции:

{
  $group: {
    _id: "$Section",
    total_sales_count: { $sum: "$Sold" },
    top_sales: { $max: "$Amount" }
  }
}

Пояснение: _id: “$Section” создаёт одну запись на каждую уникальную секцию. Новые поля total_sales_count и top_sales — результат применения агрегаторов.

$skip — пропуск документов

$skip пропускает указанное количество документов в результатах. Часто используется совместно с $sort и $limit для постраничной навигации.

Пример:

{ $skip: 1 }

$sort — сортировка

$sort упорядочивает набор результатов по указанным полям в порядке возрастания (1) или убывания (-1). Учтите, что сортировка большого объёма данных требует выделенной памяти и/или подходящих индексов.

Пример сортировки по полю top_sales по убыванию:

{ $sort: { top_sales: -1 } }

$limit — ограничение числа результатов

$limit сокращает выходной набор до заданного числа документов. Часто используется после $sort, чтобы вернуть, например, топ-1 или топ-10.

Пример:

{ $limit: 1 }

$project — формирование выводимого документа

$project позволяет выбрать поля для итогового документа и при необходимости дать им новые имена или вычислить выражения.

Пример: переименование и скрытие _id:

{
  $project: {
    _id: 0,
    Section: "$_id",
    TotalSold: "$total_sales_count",
    TopSale: "$top_sales"
  }
}

Без $project выход будет «сырым» — с полями, которые образовались в $group. $project делает представление удобным для конечного пользователя.

Неотформатированные результаты агрегации (пример)

Отформатированный итог агрегации — разделы и продажи

$unwind — разворачивание массивов в документы

$unwind разбивает массив внутри документа на несколько документов — по одному на элемент массива. Это полезно для агрегирования по элементам массива.

Пример данных Orders:

Пример данных Orders с массивом items

Пример: посчитать суммарный доход по каждому товару, разворачивая items:

db.Orders.aggregate([
  { $unwind: "$items" },
  {
    $group: {
      _id: "$items.product",
      total_revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } }
    }
  },
  { $sort: { total_revenue: -1 } },
  {
    $project: {
      _id: 0,
      Product: "$_id",
      TotalRevenue: "$total_revenue"
    }
  }
])

Результат — набор документов с Product и TotalRevenue.

Результат агрегации после $unwind — доход по продуктам

Полный пример конвейера на одном наборе данных

Ниже показан целый pipeline, комбинирующий $match, $group, $sort, $skip и $project, использующий пример sales:

db.sales.aggregate([
  { $match: { Sold: { $gte: 5 } } },
  {
    $group: {
      _id: "$Section",
      total_sales_count: { $sum: "$Sold" },
      top_sales: { $max: "$Amount" }
    }
  },
  { $sort: { top_sales: -1 } },
  { $skip: 0 },
  {
    $project: {
      _id: 0,
      Section: "$_id",
      TotalSold: "$total_sales_count",
      TopSale: "$top_sales"
    }
  }
])

Этот конвейер сначала фильтрует строки с Sold >= 5, затем группирует по Section, вычисляет сумму и максимум, сортирует по max, пропускает указанные документы и формирует окончательный вывод.

Сравнение: Aggregation Pipeline vs MapReduce

MapReduce исторически использовался для агрегирования в MongoDB, но начиная с версий 4.x–5.x было рекомендовано переходить на pipeline. Причины:

  • Pipeline встроен в MongoDB и оптимизируется движком запросов.
  • Pipeline проще писать и читабельнее (JSON-подобная нотация стадий).
  • MapReduce требует написания JavaScript-функций для map и reduce и часто требует внешних средств для интеграции.

Когда MapReduce всё ещё имеет смысл:

  • Если логика агрегации существенно сложнее того, что удобно выражать в pipeline, и её проще описать в коде. Тем не менее, такие случаи редки.

Ментальные модели и эвристики для проектирования pipeline

  • Push filters down — ставьте $match как можно раньше.
  • Reduce early — группируйте и агрегируйте, чтобы уменьшить объём данных.
  • Avoid large in-memory sorts — обеспечьте подходящие индексы или выполняйте $sort после уменьшения набора.
  • Keep stages simple — каждая стадия решает одну задачу.
  • Prefer built-in operators — выражения типа $sum/$avg быстрее, чем манипуляции в JavaScript.

Методика проектирования: шаг за шагом

  1. Определите конечный формат вывода (поля и агрегаты).
  2. Перечислите необходимые преобразования: фильтрация, разворачивание массивов, вычисления, сортировка.
  3. Расположите $match и индексируемые операции как можно раньше.
  4. Сгруппируйте и вычислите агрегаты ($group).
  5. Отсортируйте и ограничьте результаты ($sort, $limit).
  6. Выполните $project для формирования чистого вывода.
  7. Тестируйте на подвыборке и измеряйте время.

Check-листы по ролям

Разработчик:

  • Проверить корректность логики $group и $project.
  • Убедиться, что $match выполняет минимально необходимые фильтры.
  • Написать unit-тесты для ключевых выражений агрегирования.

DBA/оператор:

  • Проверить индексы, покрывающие поля $match и $sort.
  • Оценить использование памяти при $sort (параметр allowDiskUse).
  • Настроить мониторинг операций длительных агрегатов.

Аналитик данных:

  • Подтвердить, что итоговые поля соответствуют требованиям отчёта.
  • Проверить выборку на граничных кейсах (пустые массивы, NULL).

Критерии приёмки

  • Функциональные: результаты агрегации совпадают с ожидаемыми для контрольных наборов.
  • Производительность: время выполнения в тестовой среде уложено в приемлемый SLA.
  • Надёжность: pipeline корректно обрабатывает пустые и частично заполненные документы.
  • Безопасность: нет утечек чувствительных полей в финальном выводе.

Руководство по отладки и оперативный план отката

  1. Запустите pipeline на небольшом подмножестве (limit/seed).
  2. Добавляйте стадии по одной и проверяйте промежуточный результат.
  3. Используйте explain() для оценки плана выполнения: db.collection.aggregate(pipeline).explain().
  4. Если производительность падает после изменения, откатите изменения и сравните explain до и после.
  5. Для срочного отката — восстановите предыдущую версию pipeline из системы контроля версий и примените её.

Оптимизация производительности

  • Индексируйте поля, используемые в $match и $sort. Если $match по нескольким полям — рассмотрите составные индексы.
  • Используйте allowDiskUse: true для больших сортировок, если невозможно поддержать индексом.
  • Старайтесь уменьшать объём данных до тяжёлых стадий ($group, $sort) — фильтруйте и проецируйте впереди.
  • Избегайте $project с вычислениями перед $group, если вычисления не нужны до группировки.
  • Наблюдайте за значениями memoryUsageBytes и stage details в explain().

Пример вызова с разрешением использования диска:

db.collection.aggregate(pipeline, { allowDiskUse: true })

Советы по миграции с MapReduce на pipeline

  1. Проанализируйте текущие map и reduce-функции и вытяните логику в последовательность стадий ($group, $project, $unwind и т. п.).
  2. Замените побочные операции JavaScript на встроенные выражения ($multiply, $sum и т. д.).
  3. Тестируйте корректность на выборке и сравнивайте результаты MapReduce и pipeline.
  4. Оцените производительность: pipeline, как правило, быстрее и требует меньше внешних зависимостей.

Безопасность и защита данных

  • Не возвращайте чувствительные поля в окончательном $project.
  • Ограничьте права доступа к коллекциям и операциям агрегирования с помощью ролей MongoDB.
  • При обработке персональных данных соблюдайте требования локального законодательства (например, применяйте псевдонимизацию или удаление идентификаторов, если это необходимо).

Типичные ошибки и как их избежать

  • Слишком позднее применение $match — приводит к лишним вычислениям.
  • Сортировка без индекса на большом наборе — приводит к использованию памяти и замедлению.
  • Неправильное использование $unwind (без проверки пустых массивов) — может увеличить число документов непредсказуемо.
  • Переусложнение этапов — лучше разделить большие трансформации на несколько простых шагов.

Формулы и короткая шпаргалка операторов

  • $sum — суммирование значений.
  • $avg — среднее значение.
  • $min / $max — минимум/максимум.
  • $multiply — умножение выражений.
  • $addFields / $set — добавить или изменить поле.
  • $replaceRoot / $replaceWith — заменить корень документа.

Примеры тест-кейсов и критериев приёмки

  1. Контрольный набор из 100 документов: результаты pipeline совпадают с ожидаемыми по каждому полю.
  2. Пустой набор: pipeline возвращает пустой массив без ошибок.
  3. Массивы с нулевой длинной: $unwind корректно обрабатывает (использовать preserveNullAndEmptyArrays при необходимости).
  4. Стресс-тест на 1M документов: время выполнения в пределах заданного порога.

Decision flowchart для выбора стратегии (Mermaid)

flowchart TD
  A[Нужно агрегировать данные?] -->|Нет| B[Использовать простые запросы]
  A -->|Да| C{Есть сложная JS-логика?}
  C -->|Нет| D[Использовать Aggregation Pipeline]
  C -->|Да| E{Можно ли выразить в stages?}
  E -->|Да| D
  E -->|Нет| F[Оставить MapReduce или вынести логику в ETL]
  D --> G[Оптимизировать: push $match, добавить индексы]
  G --> H[Тестировать и деплоить]

Локальные особенности и советы для русскоязычной инфраструктуры

  • При работе в облачных окружениях проверьте настройки регионов и соответствие требованиям локального хранения персональных данных.
  • Если отчёты требуют локализованных форматов дат/валют, формируйте представление в $project и применяйте локализацию на уровне приложения.

Краткий глоссарий

  • Pipeline — последовательность стадий агрегирования.
  • Stage — отдельный шаг в pipeline (например, $match, $group).
  • Document — BSON-объект в коллекции.

Заключение

Агрегационный pipeline — основной инструмент для сложных запросов в MongoDB. Он предоставляет модульный и эффективный подход для обработки и преобразования данных. Чтобы получить лучшую производительность, проектируйте pipeline с учётом индексов, минимизируйте объём данных на ранних стадиях и тестируйте с explain(). При переходе с MapReduce переносите логику в соответствующие стадии и проверяйте результаты на тестовых наборах.

Важно: начните с небольших шагов — спроектируйте минимальный pipeline, проверьте производительность, затем добавляйте оптимизации.

Сводка и следующие шаги:

  • Определите ожидаемый формат вывода.
  • Расположите $match и индексы в начале.
  • Тестируйте и измеряйте через explain().
  • Документируйте pipeline и храните версии в системе контроля.
Поделиться: X/Twitter Facebook LinkedIn Telegram
Автор
Редакция

Похожие материалы

Как анализировать BSOD с WhoCrashed
Windows

Как анализировать BSOD с WhoCrashed

Скачать фото iCloud в полном разрешении
Фото

Скачать фото iCloud в полном разрешении

Ветвление в Microsoft Forms — настройка и советы
Инструкции

Ветвление в Microsoft Forms — настройка и советы

Валидация ответов в Google Forms
Google Forms

Валидация ответов в Google Forms

Стереопара Amazon Echo — инструкция
How-to

Стереопара Amazon Echo — инструкция

7 способов улучшить Spotify на телефоне
Музыка

7 способов улучшить Spotify на телефоне