Mongo 聚合之管道

开启 MongoDB 学习新篇章

Posted by lichao modified on October 12, 2021

MongoDB 提供的聚合工具: 聚合框架 MapReduce 几个简单的聚合命令: count、distinct和group

聚合框架

聚合(aggregate)框架可以对集合中的文档进行变换和组合。基本上,可以用多个构件创建一个管道(Pipline),用于对一连串的文档进行处理。这些构件包括筛选(match)、投射(projecting)、分组(grouping)、排序(sorting)、限制(limiting)和跳过(skipping)。

1
2
3
4
5
6
7
8
9
10
// 这是创作文章数量前两名的作者
> db.blog.aggregate({"$project": {"author": 1}}, 
 {"$group": {"_id": "$author", "count": {"$sum": 1}}},
 {"$sort": {"count": -1}},
 {"$limit":2},
)


{ "_id" : null, "count" : 4 }
{ "_id" : "chao", "count" : 3 }

aggregate() 会返回一个文档数组。聚合的结果必须限制在 16 MB 以内(MongoDB支持的最大响应消息大小)

管道操作符

每个操作符都会接受一连串的文档,对这些文档做一些类型转换,最后将转换后的文档作为结果传递给下一个操作符。

应该尽量在管道开始阶段就将尽可能多的文档和字段过滤掉。管道如果不是直接从原先的集合中使用数据,那就无法在筛选和排序中使用索引。如果可能,聚合管道会尝试 对操作进行排序,以便能够有效地使用索引。

MongoDB 不允许单一的聚合操作占用过多的系统内存,如果 MongoDB 发现某个聚合操作占用了 20% 以上的内存,这个操作就会直接输出错误,允许将输出结果利用管道 放入到一个集合中是为了方便以后使用。

如果能够通过 “$match” 操作迅速减小结果集的大小,就可以使用管道进行实时聚合。

管道不适合实时操作

过滤

"$match" 用于对文档集合进行筛选,之后就可以在筛选得到的文档子集上做聚合。"$match" 可以使用所有常规的查询操作符(“$gt”,”$lt”,”$in”,”$ne”)。 "$match" 通常放在管道的前面位置,两个原因:

  • 可以快速将不需要的文档过滤掉,以减少管道的工作量。
  • 如果在投射和分组之前执行 "$match",查询可能使用索引。
1
2
> db.blog.aggregate("$match": {"author":{"$ne": null}}})

投射

投射可以从子文档中提取字段,可以重命名字段等。

"$project" 投射可以通过指定 {"fieldname":1} 选择需要投射的字段,或者通过制定 {"fieldname":0} 排除不需要的字段。结果存储在内存中

1
2
> db.blog.aggregate({"$project": {"author": 1, "_id": 0}})

默认情况下,如果文档中存在 “_id” 字段,这个字段就会被返回。

也可以将投射过的字段进行重命名。例如将 author重命名为 author_new, $fieldname 为了在聚合框架中引用 fieldname 字段

1
2
3
4
5
6
7
> db.blog.aggregate({"$project": {"author_new": "$author"}})


{ "_id" : ObjectId("6145a77ada8f2fe712df07f6"), "author_new" : "chao" }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f7"), "author_new" : "chao" }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f8"), "author_new" : "chao" }
{ "_id" : ObjectId("6145b4a4f8cf6c4bea05c0fe"), "author_new" : null }

重命名之后无法使用索引,应该在修改字段名称之前使用索引。

$project 还有一些更强大的选项,可以使用表达式将多个字面量和变量组合在一个值中使用。

数学表达式: 可用于操作数值:

  • "$add" :[ expr1 [, expr2, ..., exprN]]: 这个操作符接受一个或多个表达式作为参数,将表达式相加。
  • "$substract" :[ expr1, expr2]: 这个操作符接受两个表达式作为参数,将表达式相减。
  • "$multiply" :[ expr1 [, expr2, ..., exprN]]: 这个操作符接受一个或多个表达式作为参数,将表达式相乘。
  • "$divide" :[ expr1, expr2]: 这个操作符接受两个表达式作为参数,将表达式相除。
  • "$mod" :[ expr1, expr2]: 这个操作符接受两个表达式作为参数,将表达式取余。
1
2
3
4
5
6
7
8
9
10
11
12
> db.blog.aggregate({"$project": {
 "double_votes": {
  "$add": ["$votes", "$votes", 10]
 }

}})


{ "_id" : ObjectId("6145a77ada8f2fe712df07f6"), "double_votes" : 14 }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f7"), "double_votes" : 10 }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f8"), "double_votes" : 10 }
{ "_id" : ObjectId("6145b4a4f8cf6c4bea05c0fe"), "double_votes" : 16 }

可以将多个表达式嵌套在一起组成更复杂的表达式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> db.blog.aggregate({"$project": {
 "double_votes": {
  "$subtract": [
   {"$add": ["$votes", "$votes", 10]}, 
   9
  ]
  
 }
}})


{ "_id" : ObjectId("6145a77ada8f2fe712df07f6"), "double_votes" : 5 }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f7"), "double_votes" : 1 }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f8"), "double_votes" : 1 }
{ "_id" : ObjectId("6145b4a4f8cf6c4bea05c0fe"), "double_votes" : 7 }

日期表达式: 可用于操作日期类型的字段,返回一个数值。聚合框架中包含了一些用于提取日期信息的表达式

  • $year
  • $week
  • $dayOfMonth
  • $dayOfWeek
  • $dayOfYear
  • $hour
  • $minute
  • $second
1
2
3
4
5
6
7
8
9
10
11
> db.blog.aggregate({"$project": {
 "time_minute": {
  "$subtract": [ {"$week": new Date()}, {"$week": "$createdAt"}]
 }
}})


{ "_id" : ObjectId("6145a77ada8f2fe712df07f6"), "time_minute" : 4 }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f7"), "time_minute" : 4 }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f8"), "time_minute" : 4 }
{ "_id" : ObjectId("6145b4a4f8cf6c4bea05c0fe"), "time_minute" : 4 }

字符串表达式: 可用于操作字符串类型的字段:

  • "$substr" :[ expr, startOffset, numToReturn]: 这个操作符用于截取 字符串 expr 的子串(从第 startOffset 字节开始的 numToReturn 字节 ),参数 expr 必须是字符串。
  • "$concat" :[ expr1 [, expr2, ..., exprN]]: 将制定表达式(或字符串)连接在一起作为返回结果。
  • "$toLower" 返回字符串 expr 的小写形式
  • "$toUpper" 返回字符串 expr 的大写形式
1
2
3
4
5
6
7
8
9
10
> db.blog.aggregate({"$project": {
 "describe": {
  "$concat": [ "title:", {"$substr": ["$titile", 0, 6]},"...", "author:", "$author", "..."]
 }
}})

{ "_id" : ObjectId("6145a77ada8f2fe712df07f6"), "time_minute" : 4 }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f7"), "time_minute" : 4 }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f8"), "time_minute" : 4 }
{ "_id" : ObjectId("6145b4a4f8cf6c4bea05c0fe"), "time_minute" : 4 }

逻辑表达式: 可以用于控制语句。 比较表达式:

  • "$cmp" : [ expr1, expr2]: 比较 expr1和expr2,如果 expr1 等于 expr2 ,返回 0, 如果 expr1大于expr2,返回一个正数。如果 expr1 小于 expr2,返回一个负数。
  • "$strcasecmp" :[ string1, string2]: 比较 string1 和 string2。如果 string1 等于 string2 ,返回 0, 如果 string1 大于 string2,返回一个正数。如果 string1 小于 string2,返回一个负数。
  • "$eq"\"$ne"\"$gt"\"$gte"\"$lt"\"$lte": [ expr1, expr2]: 比较操作,返回 true 或 false。 布尔表达式:
  • "$and"\"$or" :[ expr1 [, expr2, ..., exprN]]: 逻辑操作,返回 true 或 false。
  • "$and"\"$or" : expr1:取反,返回 true 或 false。 控制语句:
  • "$cond" : [booleanExpr , trueExpr, falseExpr]: booleanExpr为 true,返回 trueExpr ,否则返回 falseExpr。
  • "$ifNull" : [expr , replaceExpr]: expr 为 null,返回 replaceExpr ,否则返回 expr。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> db.blog.aggregate({"$project": {
 "desc": {
  "$cond": [
   {
    "$or": [
      { "$gt": ["$votes", 3]},
      { "$eq": [ { "$strcasecmp": ["$title", "my blog title"]}, 0]}
     ]
   },
   {"$concat": [ "title:", {"$substr": ["$title", 0, 6]},"..."]},
   {"$concat": [ "title:", {"$substr": ["$title", 0, 6]},"&&&"]},
  ]
 }

}})

{ "_id" : ObjectId("6145a77ada8f2fe712df07f6"), "desc" : "title:my blo..." }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f7"), "desc" : "title:my blo&&&" }
{ "_id" : ObjectId("6145a79cda8f2fe712df07f8"), "desc" : "title:my blo&&&" }

分组

"$group" 分组首先指定了需要进行分组的字段,这里由 "_id": "$fieldname" 指定的,表示这个操作的直接结果为每一个 fieldname 只对应一个结果文档,所以 fieldname 就成为了文档的唯一标识符(”_id”)。 "count": {"$sum": 1} 表示为分组内每个文档的 “count” 字段加 1。这是 "$group" 创建的一个新字段。

1
2
3
4
5
> db.blog.aggregate({"$group": {"_id": "$author", "count": {"$sum": 1}}})


{ "_id" : "chao", "count" : 3 }
{ "_id" : null, "count" : 4 }

排序

"$sort" 会对结果集中的文档根据 fieldname 字段进行降序排列。

1
2
> db.blog.aggregate({"$sort": {"count": -1}})

限制

"$limit" 会限制最终返回的结果为当前结果中的前 n 个。

1
2
> db.blog.aggregate({"$limit":2})

抽样

$sample 随机选择从其输入指定数量的文档。

输出

1
$out  必须为pipeline最后一个阶段管道,因为是将最后计算结果写入到指定的collection中 

索引

1
$indexStats 返回数据集合的每个索引的使用情况

MapReduce

MapReduce 使用 JavaScript 作为查询语言,因此它能够表达任意复杂的逻辑。这种强大的代价是MapReduce非常慢,不应该用在实时的数据分析中。

MapReduce 能够在多台服务器之前并行执行,它会将一个大问题分解成多个小问题,将各个小问题发送到不同的机器上,每台机器负责完成一部分工作。所有的机器都完成时,再将这些零碎的解决方案合并为一个完整的解决方案。

聚合命令

MongoDB 为在集合上执行基本的聚合任务提供了一些命令。这些命令在聚合框架出现之前就已经存在了,现在已经被聚合框架取代。

count

count 用于返回集合中的文档数量。

1
2
> db.blog.count()

无论集合有多大,count 都会很快返回总的文档数量。

1
2
> db.blog.count({"title": "my blog title"})

可以给 count 传递一个查询文档,MongoDB 会计算查询结果的数量。增加查询条件会使 count 变慢。

distinct

distinct 用来找出给定键的所有不同值,使用时必须指定集合和键。

1
2
3
4
> db.runCommand({"distinct": "blog", "key": "author"})


{ "values" : [ null, "chao" ], "ok" : 1 }

等价:

1
2
> db.blog.distinct("author")

group

group 命令在 MongoDB 3.4 中已弃用。在 MongoDB 4.2 中,该函数不再存在。

group 可以执行更复杂的聚合。先选定分组所依据的键,而后 MongoDB 就会将集合依据选定键的不同值分成若干组。然后可以对每一个分组内的文档进行聚合,得到一个结果文档。