Skip to content

Commit 15a95d7

Browse files
committed
Hadoop—MR程序
0 parents  commit 15a95d7

File tree

38 files changed

+1240
-0
lines changed

38 files changed

+1240
-0
lines changed

.classpath

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<classpath>
3+
<classpathentry kind="src" path="src"/>
4+
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
5+
<classpathentry kind="con" path="org.eclipse.jdt.USER_LIBRARY/hadoop-jar-2.4.1"/>
6+
<classpathentry kind="output" path="bin"/>
7+
</classpath>

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/bin/

.project

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<projectDescription>
3+
<name>Hadoop-MR</name>
4+
<comment></comment>
5+
<projects>
6+
</projects>
7+
<buildSpec>
8+
<buildCommand>
9+
<name>org.eclipse.jdt.core.javabuilder</name>
10+
<arguments>
11+
</arguments>
12+
</buildCommand>
13+
</buildSpec>
14+
<natures>
15+
<nature>org.eclipse.jdt.core.javanature</nature>
16+
</natures>
17+
</projectDescription>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
eclipse.preferences.version=1
2+
encoding//input/emp=GBK

.settings/org.eclipse.jdt.core.prefs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
eclipse.preferences.version=1
2+
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
3+
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
4+
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
5+
org.eclipse.jdt.core.compiler.compliance=1.7
6+
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
7+
org.eclipse.jdt.core.compiler.debug.localVariable=generate
8+
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
9+
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
10+
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
11+
org.eclipse.jdt.core.compiler.source=1.7
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

input/dept

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
10,ACCOUNTING,NEW YORK
2+
20,RESEARCH,DALLAS
3+
30,SALES,CHICAGO
4+
40,OPERATIONS,BOSTON

input/emp

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
7369,SMITH,CLERK,7902,17-12��-80,800,,20
2+
7499,ALLEN,SALESMAN,7698,20-2�� -81,1600,300,30
3+
7521,WARD,SALESMAN,7698,22-2�� -81,1250,500,30
4+
7566,JONES,MANAGER,7839,02-4�� -81,2975,,20
5+
7654,MARTIN,SALESMAN,7698,28-9�� -81,1250,1400,30
6+
7698,BLAKE,MANAGER,7839,01-5�� -81,2850,,30
7+
7782,CLARK,MANAGER,7839,09-6�� -81,2450,,10
8+
7839,KING,PRESIDENT,,17-11��-81,5000,,10
9+
7844,TURNER,SALESMAN,7698,08-9�� -81,1500,0,30
10+
7900,JAMES,CLERK,7698,03-12��-81,950,,30
11+
7902,FORD,ANALYST,7566,03-12��-81,3000,,20
12+
7934,MILLER,CLERK,7782,23-1�� -82,1300,,10

output/Q2/._SUCCESS.crc

8 Bytes
Binary file not shown.

output/Q2/.part-r-00000.crc

8 Bytes
Binary file not shown.

output/Q2/_SUCCESS

Whitespace-only changes.

output/Q2/part-r-00000

Whitespace-only changes.

src/README.md

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# MapReduce应用案例
2+
3+
## 环境说明
4+
Hadoop搭建环境:
5+
6+
| 虚拟机操作系统: CentOS6.3 64位,单核,1G内存
7+
| JDK:1.7.0_60 64位
8+
| Hadoop:2.4.1
9+
10+
MR程序编译环境:
11+
12+
| Eclipse IED
13+
| mapred.LocalJobRunner本地运行模式
14+
15+
## 准备测试数据
16+
17+
测试数据包括两个文件dept(部门)和emp(员工),其中各字段用逗号分隔:
18+
19+
20+
dept文件内容:
21+
22+
10,ACCOUNTING,NEW YORK
23+
20,RESEARCH,DALLAS
24+
30,SALES,CHICAGO
25+
40,OPERATIONS,BOSTON
26+
emp文件内容:
27+
28+
7369,SMITH,CLERK,7902,17-12月-80,800,,20
29+
7499,ALLEN,SALESMAN,7698,20-2月 -81,1600,300,30
30+
7521,WARD,SALESMAN,7698,22-2月 -81,1250,500,30
31+
7566,JONES,MANAGER,7839,02-4月 -81,2975,,20
32+
7654,MARTIN,SALESMAN,7698,28-9月 -81,1250,1400,30
33+
7698,BLAKE,MANAGER,7839,01-5月 -81,2850,,30
34+
7782,CLARK,MANAGER,7839,09-6月 -81,2450,,10
35+
7839,KING,PRESIDENT,,17-11月-81,5000,,10
36+
7844,TURNER,SALESMAN,7698,08-9月 -81,1500,0,30
37+
7900,JAMES,CLERK,7698,03-12月-81,950,,30
38+
7902,FORD,ANALYST,7566,03-12月-81,3000,,20
39+
7934,MILLER,CLERK,7782,23-1月 -82,1300,,10
40+
41+
## 应用案例
42+
### 例子1:求各个部门的总工资
43+
#### 问题分析
44+
MapReduce中的join分为好几种,比如有最常见的 reduce side join、map side join和semi join 等。reduce join 在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join 在处理多个小表关联大表时非常有用 。
45+
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
46+
47+
(1)用户使用静态方法`DistributedCache.addCacheFile()`指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:`hdfs://jobtracker:50030/home/XXX/file`)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
48+
49+
(2)用户使用:在分布式环境`DistributedCache.getLocalCacheFiles()`/在伪分布式环境`DistributedCache.getCacheFiles()`方法获取文件目录,并使用标准的文件读写API读取相应的文件。
50+
在下面代码中,将会把数据量小的表(部门dept)缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。
51+
52+
#### 处理流程图
53+
![求各个部门的总工资处理流程图](https://i.imgur.com/XpWCrvb.jpg)
54+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package com.elon33.mr1;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
import org.apache.hadoop.conf.Configuration;
10+
import org.apache.hadoop.conf.Configured;
11+
import org.apache.hadoop.fs.Path;
12+
import org.apache.hadoop.io.IntWritable;
13+
import org.apache.hadoop.io.LongWritable;
14+
import org.apache.hadoop.io.NullWritable;
15+
import org.apache.hadoop.io.Text;
16+
import org.apache.hadoop.mapreduce.Job;
17+
import org.apache.hadoop.mapreduce.Mapper;
18+
import org.apache.hadoop.mapreduce.Reducer;
19+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
20+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
21+
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
22+
import org.apache.hadoop.util.GenericOptionsParser;
23+
import org.apache.hadoop.util.Tool;
24+
import org.apache.hadoop.util.ToolRunner;
25+
26+
public class Q10MiddlePersonsCountForComm extends Configured implements Tool {
27+
28+
public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {
29+
30+
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
31+
32+
String[] kv = value.toString().split(",");
33+
34+
context.write(new IntWritable(0), new Text(kv[0] + "," + ("".equals(kv[3]) ? " " : kv[3])));
35+
}
36+
}
37+
38+
public static class Reduce extends Reducer<IntWritable, Text, NullWritable, Text> {
39+
40+
List<String> employeeList = new ArrayList<String>();
41+
Map<String, String> employeeToManagerMap = new HashMap<String, String>();
42+
43+
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
44+
45+
for (Text value : values) {
46+
employeeList.add(value.toString().split(",")[0].trim());
47+
employeeToManagerMap.put(value.toString().split(",")[0].trim(), value.toString().split(",")[1].trim());
48+
}
49+
}
50+
51+
@Override
52+
protected void cleanup(Context context) throws IOException, InterruptedException {
53+
int totalEmployee = employeeList.size();
54+
int i, j;
55+
int distance;
56+
System.out.println(employeeList);
57+
System.out.println(employeeToManagerMap);
58+
59+
for (i = 0; i < (totalEmployee - 1); i++) {
60+
for (j = (i + 1); j < totalEmployee; j++) {
61+
distance = calculateDistance(i, j);
62+
String value = employeeList.get(i) + " and " + employeeList.get(j) + " = " + distance;
63+
context.write(NullWritable.get(), new Text(value));
64+
}
65+
}
66+
}
67+
68+
private int calculateDistance(int i, int j) {
69+
String employeeA = employeeList.get(i);
70+
String employeeB = employeeList.get(j);
71+
int distance = 0;
72+
73+
if (employeeToManagerMap.get(employeeA).equals(employeeB) || employeeToManagerMap.get(employeeB).equals(employeeA)) {
74+
distance = 0;
75+
}
76+
else if (employeeToManagerMap.get(employeeA).equals(employeeToManagerMap.get(employeeB))) {
77+
distance = 0;
78+
} else {
79+
List<String> employeeA_ManagerList = new ArrayList<String>();
80+
List<String> employeeB_ManagerList = new ArrayList<String>();
81+
82+
employeeA_ManagerList.add(employeeA);
83+
String current = employeeA;
84+
while (false == employeeToManagerMap.get(current).isEmpty()) {
85+
current = employeeToManagerMap.get(current);
86+
employeeA_ManagerList.add(current);
87+
}
88+
89+
employeeB_ManagerList.add(employeeB);
90+
current = employeeB;
91+
while (false == employeeToManagerMap.get(current).isEmpty()) {
92+
current = employeeToManagerMap.get(current);
93+
employeeB_ManagerList.add(current);
94+
}
95+
96+
int ii = 0, jj = 0;
97+
String currentA_manager, currentB_manager;
98+
boolean found = false;
99+
100+
for (ii = 0; ii < employeeA_ManagerList.size(); ii++) {
101+
currentA_manager = employeeA_ManagerList.get(ii);
102+
for (jj = 0; jj < employeeB_ManagerList.size(); jj++) {
103+
currentB_manager = employeeB_ManagerList.get(jj);
104+
if (currentA_manager.equals(currentB_manager)) {
105+
found = true;
106+
break;
107+
}
108+
}
109+
110+
if (found) {
111+
break;
112+
}
113+
}
114+
115+
distance = ii + jj - 1;
116+
}
117+
118+
return distance;
119+
}
120+
}
121+
122+
@Override
123+
public int run(String[] args) throws Exception {
124+
125+
Job job = new Job(getConf(), "Q10MiddlePersonsCountForComm");
126+
job.setJobName("Q10MiddlePersonsCountForComm");
127+
128+
job.setJarByClass(Q10MiddlePersonsCountForComm.class);
129+
job.setMapperClass(MapClass.class);
130+
job.setReducerClass(Reduce.class);
131+
132+
job.setMapOutputKeyClass(IntWritable.class);
133+
job.setMapOutputValueClass(Text.class);
134+
135+
job.setOutputFormatClass(TextOutputFormat.class);
136+
job.setOutputKeyClass(NullWritable.class);
137+
job.setOutputValueClass(Text.class);
138+
139+
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
140+
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
141+
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
142+
143+
job.waitForCompletion(true);
144+
return job.isSuccessful() ? 0 : 1;
145+
}
146+
147+
public static void main(String[] args) throws Exception {
148+
int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args);
149+
System.exit(res);
150+
}
151+
}

0 commit comments

Comments
 (0)