Aggregation Pipeline

Basic Aggregation Structure

Pipeline Syntax

// Basic aggregation pipeline
db.collection.aggregate([
    { stage1: { /* stage options */ } },
    { stage2: { /* stage options */ } },
    { stage3: { /* stage options */ } }
])

// Aggregation with options
db.collection.aggregate([
    { $match: { status: "active" } },
    { $group: { _id: "$category", count: { $sum: 1 } } }
], {
    allowDiskUse: true,
    cursor: { batchSize: 1000 }
})

Simple Examples

// Count documents by category
db.products.aggregate([
    { $group: { _id: "$category", count: { $sum: 1 } } }
])

// Average price by category
db.products.aggregate([
    { $group: { _id: "$category", avgPrice: { $avg: "$price" } } }
])

// Total sales by month
db.orders.aggregate([
    { $group: { 
        _id: { $dateToString: { format: "%Y-%m", date: "$orderDate" } },
        totalSales: { $sum: "$amount" }
    }}
])

Pipeline Stages

$match Stage

// Filter documents
{ $match: { status: "active" } }

// Multiple conditions
{ $match: { 
    age: { $gte: 18 },
    city: "New York",
    status: "active"
}}

// Using operators
{ $match: {
    $or: [
        { category: "electronics" },
        { price: { $gt: 100 } }
    ]
}}

// Date filtering
{ $match: {
    createdAt: {
        $gte: new Date("2023-01-01"),
        $lt: new Date("2023-12-31")
    }
}}

$group Stage

// Group by single field
{ $group: { _id: "$category", count: { $sum: 1 } } }

// Group by multiple fields
{ $group: { 
    _id: { 
        category: "$category",
        city: "$city"
    },
    count: { $sum: 1 }
}}

// Group by computed field
{ $group: {
    _id: { $dateToString: { format: "%Y-%m", date: "$orderDate" } },
    totalSales: { $sum: "$amount" }
}}

// Group by null (all documents)
{ $group: {
    _id: null,
    totalCount: { $sum: 1 },
    avgPrice: { $avg: "$price" }
}}

$sort Stage

// Sort by single field
{ $sort: { price: 1 } }  // ascending
{ $sort: { price: -1 } } // descending

// Sort by multiple fields
{ $sort: { category: 1, price: -1 } }

// Sort by computed field
{ $sort: { "totalSales": -1 } }

// Sort by nested field
{ $sort: { "user.profile.age": 1 } }

$limit Stage

// Limit results
{ $limit: 10 }

// Limit after grouping
{ $limit: 5 }

$skip Stage

// Skip documents
{ $skip: 20 }

// Pagination
{ $skip: (page - 1) * limit }

$project Stage

// Include specific fields
{ $project: { name: 1, price: 1, _id: 0 } }

// Exclude specific fields
{ $project: { password: 0, secretKey: 0 } }

// Rename fields
{ $project: { 
    productName: "$name",
    productPrice: "$price",
    _id: 0
}}

// Computed fields
{ $project: {
    name: 1,
    fullName: { $concat: ["$firstName", " ", "$lastName"] },
    age: { $subtract: [{ $year: new Date() }, { $year: "$birthDate" }] },
    _id: 0
}}

$lookup Stage (Join)

// Basic lookup
{
    $lookup: {
        from: "categories",
        localField: "categoryId",
        foreignField: "_id",
        as: "category"
    }
}

// Lookup with pipeline
{
    $lookup: {
        from: "orders",
        let: { userId: "$_id" },
        pipeline: [
            { $match: { $expr: { $eq: ["$userId", "$$userId"] } } },
            { $group: { _id: null, totalOrders: { $sum: 1 } } }
        ],
        as: "orderStats"
    }
}

// Unwind after lookup
{
    $lookup: {
        from: "categories",
        localField: "categoryId",
        foreignField: "_id",
        as: "category"
    }
},
{ $unwind: "$category" }

$unwind Stage

// Unwind array
{ $unwind: "$tags" }

// Unwind with options
{ $unwind: { 
    path: "$tags",
    includeArrayIndex: "tagIndex",
    preserveNullAndEmptyArrays: true
}}

// Unwind nested array
{ $unwind: "$profile.addresses" }

$addFields Stage

// Add computed fields
{ $addFields: {
    fullName: { $concat: ["$firstName", " ", "$lastName"] },
    age: { $subtract: [{ $year: new Date() }, { $year: "$birthDate" }] },
    isAdult: { $gte: [{ $subtract: [{ $year: new Date() }, { $year: "$birthDate" }] }, 18] }
}}

// Add conditional fields
{ $addFields: {
    status: {
        $cond: {
            if: { $gte: ["$age", 18] },
            then: "adult",
            else: "minor"
        }
    }
}}

$replaceRoot Stage

// Replace root with embedded document
{ $replaceRoot: { newRoot: "$profile" } }

// Replace root with computed document
{ $replaceRoot: {
    newRoot: {
        _id: "$_id",
        name: { $concat: ["$firstName", " ", "$lastName"] },
        age: "$age"
    }
}}

$facet Stage

// Multiple aggregations in one pipeline
{ $facet: {
    "totalSales": [
        { $group: { _id: null, total: { $sum: "$amount" } } }
    ],
    "salesByCategory": [
        { $group: { _id: "$category", total: { $sum: "$amount" } } }
    ],
    "topProducts": [
        { $sort: { amount: -1 } },
        { $limit: 5 }
    ]
}}

Aggregation Operators

Arithmetic Operators

// Addition
{ $add: ["$price", "$tax"] }

// Subtraction
{ $subtract: ["$price", "$discount"] }

// Multiplication
{ $multiply: ["$price", "$quantity"] }

// Division
{ $divide: ["$total", "$quantity"] }

// Modulo
{ $mod: ["$amount", 100] }

// Absolute value
{ $abs: { $subtract: ["$actual", "$expected"] } }

// Ceiling
{ $ceil: "$price" }

// Floor
{ $floor: "$price" }

// Round
{ $round: ["$price", 2] }

Comparison Operators

// Equal
{ $eq: ["$status", "active"] }

// Greater than
{ $gt: ["$price", 100] }

// Less than
{ $lt: ["$age", 18] }

// Greater than or equal
{ $gte: ["$score", 80] }

// Less than or equal
{ $lte: ["$quantity", 10] }

// Not equal
{ $ne: ["$status", "inactive"] }

Logical Operators

// AND
{ $and: [{ $gt: ["$age", 18] }, { $eq: ["$status", "active"] }] }

// OR
{ $or: [{ $eq: ["$category", "electronics"] }, { $gt: ["$price", 100] }] }

// NOT
{ $not: { $lt: ["$age", 18] } }

// NOR
{ $nor: [{ $lt: ["$age", 18] }, { $eq: ["$status", "inactive"] }] }

String Operators

// Concatenate
{ $concat: ["$firstName", " ", "$lastName"] }

// Substring
{ $substr: ["$name", 0, 3] }

// To upper case
{ $toUpper: "$name" }

// To lower case
{ $toLower: "$email" }

// String length
{ $strLenCP: "$description" }

// Index of substring
{ $indexOfCP: ["$description", "important"] }

// Split string
{ $split: ["$tags", ","] }

// Trim
{ $trim: { input: "$name" } }

Array Operators

// Array length
{ $size: "$tags" }

// Array element at index
{ $arrayElemAt: ["$scores", 0] }

// First element
{ $first: "$scores" }

// Last element
{ $last: "$scores" }

// Slice array
{ $slice: ["$tags", 0, 3] }

// Filter array
{ $filter: {
    input: "$scores",
    cond: { $gte: ["$$this", 80] }
}}

// Map array
{ $map: {
    input: "$prices",
    as: "price",
    in: { $multiply: ["$$price", 1.1] }
}}

Date Operators

// Current date
{ $currentDate: { lastUpdated: true } }

// Date parts
{ $year: "$orderDate" }
{ $month: "$orderDate" }
{ $dayOfMonth: "$orderDate" }
{ $dayOfWeek: "$orderDate" }
{ $hour: "$orderDate" }
{ $minute: "$orderDate" }
{ $second: "$orderDate" }

// Date arithmetic
{ $add: ["$orderDate", { $multiply: [24, 60, 60, 1000] }] }

// Date to string
{ $dateToString: { 
    format: "%Y-%m-%d", 
    date: "$orderDate" 
}}

// String to date
{ $dateFromString: { 
    dateString: "$dateString",
    format: "%Y-%m-%d"
}}

Conditional Operators

// Simple conditional
{ $cond: {
    if: { $gte: ["$age", 18] },
    then: "adult",
    else: "minor"
}}

// Switch statement
{ $switch: {
    branches: [
        { case: { $lt: ["$score", 60] }, then: "F" },
        { case: { $lt: ["$score", 70] }, then: "D" },
        { case: { $lt: ["$score", 80] }, then: "C" },
        { case: { $lt: ["$score", 90] }, then: "B" }
    ],
    default: "A"
}}

// If null
{ $ifNull: ["$middleName", "N/A"] }

Advanced Aggregation Techniques

Window Functions

// Running total
{ $setWindowFields: {
    partitionBy: "$category",
    sortBy: { orderDate: 1 },
    output: {
        runningTotal: {
            $sum: "$amount",
            window: { documents: ["unbounded", "current"] }
        }
    }
}}

// Moving average
{ $setWindowFields: {
    partitionBy: "$category",
    sortBy: { orderDate: 1 },
    output: {
        movingAvg: {
            $avg: "$amount",
            window: { documents: [-2, 0] }
        }
    }
}}

Graph Lookup

// Recursive lookup
{ $graphLookup: {
    from: "employees",
    startWith: "$reportsTo",
    connectFromField: "reportsTo",
    connectToField: "_id",
    as: "hierarchy",
    maxDepth: 3
}}
// Search with facets
{ $facet: {
    "metadata": [
        { $group: { _id: null, total: { $sum: 1 } } }
    ],
    "categories": [
        { $group: { _id: "$category", count: { $sum: 1 } } },
        { $sort: { count: -1 } }
    ],
    "priceRanges": [
        { $bucket: {
            groupBy: "$price",
            boundaries: [0, 50, 100, 200, 500],
            default: "500+"
        }}
    ]
}}

Bucket Aggregation

// Auto bucket
{ $bucketAuto: {
    groupBy: "$price",
    buckets: 5,
    output: {
        count: { $sum: 1 },
        avgPrice: { $avg: "$price" }
    }
}}

// Manual bucket
{ $bucket: {
    groupBy: "$age",
    boundaries: [0, 18, 25, 35, 50, 65],
    default: "65+",
    output: {
        count: { $sum: 1 },
        names: { $push: "$name" }
    }
}}

Sample Aggregation

// Random sample
{ $sample: { size: 10 } }

// Sample with percentage
{ $sample: { size: { $multiply: [{ $size: "$$ROOT" }, 0.1] } } }

Performance Optimization

Index Usage

// Use hint for specific index
db.collection.aggregate([
    { $match: { category: "electronics" } },
    { $group: { _id: "$brand", count: { $sum: 1 } } }
], { hint: { category: 1, brand: 1 } })

Memory Management

// Allow disk use for large datasets
db.collection.aggregate([
    { $group: { _id: "$category", data: { $push: "$$ROOT" } } }
], { allowDiskUse: true })

Cursor Options

// Set batch size
db.collection.aggregate([
    { $match: { status: "active" } }
], { cursor: { batchSize: 1000 } })

Common Aggregation Patterns

Sales Analysis

// Monthly sales with growth
db.orders.aggregate([
    { $match: { status: "completed" } },
    { $group: {
        _id: { $dateToString: { format: "%Y-%m", date: "$orderDate" } },
        totalSales: { $sum: "$amount" },
        orderCount: { $sum: 1 }
    }},
    { $sort: { _id: 1 } },
    { $addFields: {
        growth: {
            $subtract: [
                "$totalSales",
                { $ifNull: [{ $arrayElemAt: ["$totalSales", -1] }, 0] }
            ]
        }
    }}
])

User Analytics

// User engagement by cohort
db.users.aggregate([
    { $addFields: {
        cohort: { $dateToString: { format: "%Y-%m", date: "$createdAt" } },
        lastActivity: { $max: "$activityDates" }
    }},
    { $group: {
        _id: "$cohort",
        totalUsers: { $sum: 1 },
        activeUsers: {
            $sum: {
                $cond: {
                    if: { $gte: ["$lastActivity", { $subtract: [new Date(), 30 * 24 * 60 * 60 * 1000] }] },
                    then: 1,
                    else: 0
                }
            }
        }
    }},
    { $addFields: {
        retentionRate: { $divide: ["$activeUsers", "$totalUsers"] }
    }}
])

Product Performance

// Product ranking by sales
db.orders.aggregate([
    { $unwind: "$items" },
    { $group: {
        _id: "$items.productId",
        totalSales: { $sum: { $multiply: ["$items.price", "$items.quantity"] } },
        totalOrders: { $sum: 1 },
        avgOrderValue: { $avg: { $multiply: ["$items.price", "$items.quantity"] } }
    }},
    { $lookup: {
        from: "products",
        localField: "_id",
        foreignField: "_id",
        as: "product"
    }},
    { $unwind: "$product" },
    { $addFields: {
        productName: "$product.name",
        category: "$product.category"
    }},
    { $sort: { totalSales: -1 } },
    { $limit: 10 }
])

Last updated