MongoDB 中mapreduce 的 分组, 去重(转)

mongodb distinct 去重 mapreduce it

文档结构如下:

1 Spcode
2 Spname
3 Consignid
4 Consname
5 Region
6 Regionname
7 Serviceid
8 Servicename
9 Srctermid

一个月数据量大约1000w左右。

要实现任意字段的组合, 分组,  对Srctermid做去重操作。

MySQL:

1 SELECT Spcode, Spname, Consignid, Consname, COUNT(DISTINCT(Srctermid))
2 FROM mt_log_201208 GROUP BY Spcode, Spname, Consignid, Consname

mapreduce: 

01 res = db.runCommand({
02     mapreduce:'mo_log_201208',
03     query:{Logtime:{$gte:'20120801', $lte:'20120821'}},
04     map:function() {
05         emit({Spcode:this.Spcode, Spname:this.Spname,
06         Consignid:this.Consignid, Consname:this.Consname},
07         {"data":[{"Srctermid":this.Srctermid}]});
08     },
09     reduce:function(key, value) {
10         var ret = {data:[]};
11         var srctermid = {};
12         for(var i in value) {
13             var ia = value[i];         
14             for(var j in ia.data) {
15                 if(!srctermid[ia.data[j].Srctermid]) {
16                     srctermid[ia.data[j].Srctermid] = true;
17                     ret.data.push(ia.data[j]);
18                 }
19             }
20         }
21         return ret;
22     },
23     finalize:function(key, values){
24         return {count:values.data.length};
25     },
26     out:'tmp_mo_spcode_consignid_1',
27     verbose:true
28 })
01 > db[res.result].findOne();
02 {
03     "_id" : {
04         "Spcode" : "20017",
05         "Spname" : "es",
06         "Consignid" : "000000",
07         "Consname" : "pa"
08     },
09     "value" : {
10         "count" : 978
11     }
12 }

 因为是对任意字段的组合分组, 对Srctermid字段去重, 为了降低每次查询扫描的记录数, 可以按照完整条件, 生成中间结果集。

mapreduce:

01 res = db.runCommand({
02     mapreduce:'mo_log_201208',
03     query:{Logtime:{$gte:'20120801', $lte:'20120821'}},
04     map:function() {
05         emit({Spcode:this.Spcode, Spname:this.Spname,
06                         Consignid:this.Consignid, Consname:this.Consname,
07                         Region:this.Region, Regionname:this.Regionname,
08                         Serviceid:this.Serviceid, Servicename:this.Servicename},
09                         {"data":[{"Srctermid":this.Srctermid}]});
10     },
11     reduce:function(key, value) {
12         var ret = {data:[]};
13         var srctermid = {};
14         for(var i in value) {
15             var ia = value[i];         
16             for(var j in ia.data) {
17                 if(!srctermid[ia.data[j].Srctermid]) {
18                     srctermid[ia.data[j].Srctermid] = true;
19                     ret.data.push(ia.data[j]);
20                 }
21             }
22         }
23         return ret;
24     },
25     out:'tmp_mo_spcode_consignid_region_serviceid_1',
26     verbose:true
27 })
01 > db.tmp_mo_spcode_consignid_region_serviceid_20120819_1.findOne();
02 {
03     "_id" : {
04         "Spcode" : "20017",
05         "Spname" : "es",
06         "Consignid" : "000000",
07         "Consname" : "pa",
08         "Region" : "0000",
09         "Regionname" : "sa",
10         "Serviceid" : "BZ",
11         "Servicename" : "aa"
12     },
13     "value" : {
14         "data" : [
15             {
16                 "Srctermid" : "1864741xxxx"
17             },
18             {
19                 "Srctermid" : "1862370xxxx"
20             },
21             {
22                 "Srctermid" : "1862061xxxx"
23             },
24         ]
25     }
26 }
01 res = db.runCommand({
02     mapreduce:'tmp_mo_spcode_consignid_region_serviceid_20120819_1',
03     map:function() {
04         emit({Spcode:this._id.Spcode, Spname:this._id.Spname,
05                         Consignid:this._id.Consignid, Consname:this._id.Consname},
06                         {"data":this.value.data});
07     },
08     reduce:function(key, value) {
09         var ret = {data:[]};
10         var srctermid = {};
11         for(var i in value) {
12             var ia = value[i];         
13             for(var j in ia.data) {
14                 if(!srctermid[ia.data[j].Srctermid]) {
15                     srctermid[ia.data[j].Srctermid] = true;
16                     ret.data.push(ia.data[j]);
17                 }
18             }
19         }
20         return ret;
21     },
22     finalize:function(key, values){
23         return {count:values.data.length};
24     },
25     verbose:true
26 })
01 > db[res.result].findOne();
02 {
03     "_id" : {
04         "Spcode" : "20017",
05         "Spname" : "es",
06         "Consignid" : "000000",
07         "Consname" : "pa"
08     },
09     "value" : {
10         "count" : 978
11     }
12 }

mongodb对单个文档大小的限制是>v1.8版本的16MB,

mongodb 单表最大索引数为64

无索引排序的最大数据量为4M, 超过则报错退出。

在上面的操作中, 如果reduce的values超过这个限制会报错退出, 严重的mongodb服务直接dbexit

1 {
2     "assertion" : "invoke failed: JS Error: out of memory nofile_b:5",
3     "assertionCode" : 9004,
4     "errmsg" : "db assertion failure",
5     "ok" : 0
6 }
那么我们采用下面的方式来实现:先去重->再分组
01 res = db.runCommand({
02     mapreduce:'mo_log_201208',
03     map:function() {
04         emit({Spcode:this.Spcode, Spname:this.Spname,
05                         Consignid:this.Consignid, Consname:this.Consname,
06                         Srctermid:this.Srctermid}, {count:1});
07     },
08     reduce:function(key, value) {
09         var ret = {count:0};
10         ret.count++;
11         return ret;
12     },
13     out:'tmp_mo_spcode_consignid_region_serviceid_1',
14     verbose:true
15 })
01 res = db.runCommand({
02     mapreduce:'tmp_spcode_consignid_region_serviceid_1',
03     map:function() {
04         emit({Spcode:this._id.Spcode, Spname:this._id.Spname,       
05                         Consignid:this._id.Consignid, Srct:this._id.Consname},
06                         {count:this.value.count});
07     },
08     reduce:function(key, value) {
09         var ret = {count:0};
10         for(var i in value) {
11             ret.count += value[i].count;
12         }
13         return ret;
14     },
15     verbose:true
16 })
同样为了减少每次查询扫描的记录数量, 可以生成中间结果集
01 res = db.runCommand({
02     mapreduce:'mo_log_201208',
03     map:function() {
04         emit({Spcode:this.Spcode, Spname:this.Spname,
05                         Consignid:this.Consignid, Consname:this.Consname,
06                         Region:this.Region, Regionname:this.Regionname,
07                         Serviceid:this.Serviceid, Servicename:this.Servicename,
08                         Srctermid:this.Srctermid}, {count:1});
09     },
10     reduce:function(key, value) {
11         var ret = {count:0};
12         ret.count++;
13         return ret;
14     },
15     out:'tmp_mo_spcode_consignid_region_serviceid_1',
16     verbose:true
17 })
01 res = db.runCommand({
02     mapreduce:'tmp_mo_spcode_consignid_region_serviceid_1',
03     map:function() {
04         emit({Spcode:this._id.Spcode, Spname:this._id.Spname,
05                         Consignid:this._id.Consignid, Consname:this._id.Consname,
06                         Srctermid:this._id.Srctermid}, {count:1});
07     },
08     reduce:function(key, value) {
09         var ret = {count:0};
10         ret.count++;
11         return ret;
12     },
13     out:'tmp_mo_spcode_consignid_region_serviceid_2',
14     verbose:true
15 })
01 res = db.runCommand({
02     mapreduce:'tmp_spcode_consignid_region_serviceid_2',
03     map:function() {
04         emit({Spcode:this._id.Spcode, Spname:this._id.Spname,       
05                         Consignid:this._id.Consignid, Srct:this._id.Consname},
06                         {count:this.value.count});
07     },
08     reduce:function(key, value) {
09         var ret = {count:0};
10         for(var i in value) {
11             ret.count += value[i].count;
12         }
13         return ret;
14     },
15     verbose:true
16 })
01 > db[res.result].findOne();
02 {
03     "_id" : {
04         "Spcode" : "20017",
05         "Spname" : "es",
06         "Consignid" : "000000",
07         "Consname" : "pa"
08     },
09     "value" : {
10         "count" : 978
11     }
12 }

 总结:mongodb 对map/reduce 的支持是单线程的,   灰常消耗系统资源, 避免在前台使用。


相关推荐

  • Hadoop MapReduce 去重复行实现 使用Hadoop去重复行的简单方法1.使用TextInputFormat直接读取文本文件2.在map阶段直接将文本行输出给reduce(及文本行作为reduce的key)3.reduce直接输出key,value不输出JavaCode123
  • ThinkPHP 项目分组配置 项目分组是ThinkPHP一个重要机制,项目分组功能可以把以往的多项目合并到一个项目中去,对于公共的文件可以重用,但每个分组又可以有自己独立的配置文件、公共文件、语言包等。以一个普通的网站为例,如果不采用项目分组,那么通常是将网站前台(提供
  • ThinkPHP 中的分组模式 详见Thinkphp手册从2.1版本开始,ThinkPHP支持分组的二级域名部署,该功能可以使项目中的多个分组呈现为二级域名的形式,例如经过配置二级域名部署,可以把:HTUhttp://domain.cc/index.php/Admin/U
  • 合作学习在分组实验中树立环保意识 合作学习在分组实验中树立环保意识生物分组实验中蕴含了许多环保教育的分组实验,如观察根毛、光合作用产生的气体是氧气、观察校园内植物、测定植物的蒸腾作用等分组实验都与环境有关,教师应以高度的环保责任感,充分利用分组实验,联系实际,把环保教育贯穿
  • 不规则分组 - 制表助手 Web报表设计器 集深数据系统 不规则分组通过不规则分组向导可以快速设计出不完全分组、归并分组、重叠分组、条件分组、按段分组多种类型的分组报表。使用时可选中需要分组的单元格,点击菜单栏中的【插入->不规则分组】,在弹出的不规则分组向导中完成分组设置。例如选中某个单元
  • Internet分组交换/顺序分组交换IPX/SPX(Internetwork Packet Exchange/Sequences Packe Internet分组交换/顺序分组交换IPX/SPX(InternetworkPacketExchange/SequencesPacketExchange)是Novell公司的通信协议集。与NetBEUI形成鲜明区别的是IPX/SPX比较庞
  • Classes of WTAC—WTAC的分组方式 上次做了TA的简介,聊的比较散。这次我们侧重聊聊赛事的分组规则,有助于大家了解各组别的要求,这样后期介绍赛况或者是赛车都会更有针对性。以下是几个极具代表性的TA赛事分组情况,级别由低到高排列。WTAC分组:Clubsprint、OPEN、P
  • Day11:UITableView(单分组, 多分组, 左滑删除, 插入和移动) UITableView(表格视图)UITableView是用一种表格的形式来显示一组或者多组数据UITableView继承于UIScrollView,UITableView默认值水平方向不能滚动,在垂直方向可以滚动一、单分组的UITable
  • 数据透视表的项目分组(数据透视表中级班课程2小结) 数据透视表的项目分组目的:在数据透视表中存在的项目进行进一步的分组,从而进一步进行数据挖掘分析。针对不同数据类型的字段进行项目组合有不同的方法。本课程中分别对日期型、数值型和文本型的数据类型展示如何进行项目分组组合及取消组合的相关操作。虽然
  • 分组交换VS电路交换 1,分组交换:分组交换也称包交换,它是将用户传送的数据划分成多个更小的等长部分,每个部分叫做一个数据段。在每个数据段的前面加上一些必要的控制信息组成的首部,就构成了一个分组。首部用以指明该分组发往何地址,然后由交换机根据每个分组的地址标志,
  • 中 期 评 估B(现代密码学分组) 中期评估B(现代密码学分组)董天谧这一个多学期来,我们现代密码学研究分组的成员们经过不懈努力,已经将书本上的知识通过不断的讨论实践与请教,综合转化成了一些有用的算法与相应的伪代码,甚至写出了一些程序。(见博客相册与博客视频)我们已经能够基本
  • oracle 分组、排序函数 分析函数用于计算基于组的某种聚合值,它和聚合函数的不同之处是对于每个组返回多行,而聚合函数对于每个组只返回一行。一、rank函数—排名函数rank函数最常用的是求某一个数值在某一区域内的排名。返回结果集分区内指定字段的值的排名,指定字段的值

你的评论

就没有什么想说的吗?

最新博客

关于我们 加入传客 媒体报道 帮助中心 传客活动 免责声明 联系我们 移动版 移动应用

©2017传客网    琼ICP备15003173号-2    

本站部分文章来源于互联网,版权归属于原作者。
本站所有转载文章言论不代表本站观点,如是侵犯了原作者的权利请发邮件联系站长(weishubao@126.com),我们收到后立即删除。
站内所有资源仅供学习与参考,请勿用于商业用途,否则产生的一切后果将由您自己承担!

X