MapReduce程序开发👀
互联网精准广告推送算法
准备数据
创建/data/mydata目录,并切换到/data/mydata目录下,使用wget下载http://59.64.78.41:60000/allfiles/mr_sf/tj_data.txt文件1
2
3mkdir -p /data/mydata //在Linux本地创建/data/mydata目录。
cd /data/mydata //切换到/data/mydata目录
wget http://59.64.78.41:60000/allfiles/mr_sf/tj_data.txt //使用wget下载http://59.64.78.41:60000/allfiles/mr_sf/tj_data.txt文件使用wget命令下载http://59.64.78.41:60000/allfiles/mr_sf/IKAnalyzer2012_u6.jar包和http://59.64.78.41:60000/allfiles/mapreduce1/hadoop2lib.tar.gz包。
将hadoop2lib.tar.gz解压到当前目录下。1
2
3
4wget http://59.64.78.41:60000/allfiles/mr_sf/IKAnalyzer2012_u6.jar
wget http://59.64.78.41:60000/allfiles/mapreduce1/hadoop2lib.tar.gz
//使用wget命令下载http://59.64.78.41:60000/allfiles/mr_sf/IKAnalyzer2012_u6.jar和http://59.64.78.41:60000/allfiles/mapreduce1/hadoop2lib.tar.gz包。
tar zxvf hadoop2lib.tar.gz 并解压到当前目录。切换到/apps/hadoop/sbin目录下,开启Hadoop相关进程(start-all.sh)
1
2cd /apps/hadoop/sbin //切换目录到/apps/hadoop/sbin下,
./start-all.sh //启动hadoop。输入JPS查看一下相关进程是否已经启动。
1
jps //查看相关进程是否已经启动。
在HDFS的根下创建一个/tj/input目录(提示:-mkdir),并将/data/mydata目录下的tj_data.txt文件上传到HDFS上的/tj/input文件夹下(提示:-put)
1
2
3hadoop fs -mkdir /tj //在HDFS的根下创建一个/tj目录
hadoop fs -mkdir /tj/input //在HDFS的根下创建一个/tj/input目录
hadoop fs -put /data/mydata/tj_data.txt /tj/input /将/data/mydata目录下的tj_data.txt文件上传到HDFS上的/tj/input文件夹下打开Eclipse,创建一个Java项目, 项目名为:mr_sf
在该项目下,创建一个名为:mr_tj的包
创建一个名为:libs 的文件夹,用于存放项目所需的jar包
将/data/mydata文件夹下的IKAnalyzer2012_u6.jar包和hadoop2lib中的jar包,全部拷贝到eclipse中,导入到libs文件夹下
右键选中libs目录下导入的所有的jar包,依次选择Build Path=>Add to Build Path
在mr_tj包中创建一个Paths类,功能为:定义输入输出目录
1
2
3
4
5
6
7package mr_tj;
public class Paths {
public static final String TJ_INPUT = "hdfs://localhost:9000/tj/input";
public static final String TJ_OUTPUT1 = "hdfs://localhost:9000/tj/output1";
public static final String TJ_OUTPUT2 = "hdfs://localhost:9000/tj/output2";
public static final String TJ_OUTPUT3 = "hdfs://localhost:9000/tj/output3";
}
创建第一个MapReduce。
- 创建一个名为FirstMapper的类,功能为:计算该条微博中,每个词出现的次数,也就是TF功能和微博总条数(N值)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package mr_tj;
import java.io.IOException;
import java.io.StringReader;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t"); //此处补充:以tab键为分隔符
if (line.length >= 2) {
String id = line[0].trim(); //此处补充,微博的ID
String content = line[1].trim(); //此处补充,微博的内容
StringReader sr = new StringReader(content);
IKSegmenter iks = new IKSegmenter(sr, true); //使用
Lexeme lexeme = null;
while ((lexeme = iks.next()) != null) {
String word = lexeme.getLexemeText(); //word就是分完的每个词
context.write(new Text(word+"_"+id), new IntWritable(1));//此处补充,输出单词_微博ID,和 次数1
}
sr.close();
context.write(new Text("count"), new IntWritable(1));//
} else {
System.err.println("error:" + value.toString() + "-----------------------");
}
}
} - 接下来,创建一个FirstReducer类,功能为:合并相同key值的数据,输出TF及N
完整代码为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18package mr_tj;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override //text就是map中计算出来的key值
protected void reduce(Text text, Iterable<IntWritable> iterable, Context context) throws IOException, InterruptedException {
int sum = 0; //初始化微博条数
for (IntWritable intWritable : iterable) {
sum += intWritable.get(); //此处补充,提示:调用intWritable的get()方法可以获得intWritable中存储的微博条数
}//计算微博总条数
if (text.equals("count")) {
System.out.println(text.toString() + "==" + sum);
}
context.write(text, new IntWritable(sum));//输出微博总条数。
}
} - 创建一个FirstPartition类,功能为:分区,如果key值为count,就将数据放入一个单独的分区,如果key值为其他的,就平均分配到三个分区
完整代码为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package mr_tj;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class FirstPartition extends HashPartitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
//如果key值为count,就返回3,其他的key值就平均分配到三个分区,
if (key.equals(new Text("count"))) {
return 3;
} else {
return super.getPartition(key, value, numReduceTasks - 1);
//numReduceTasks - 1的意思是有4个reduce,其中一个已经被key值为count的占用了,所以数据只能分配到剩下的三个分区中了
//使用super,可以调用父类的HashPartitioner
}
}
} - 创建一个FirstJob类,功能为:执行计算,得到TF和N
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package mr_tj;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FirstJob {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.hostname", "zhangyu@d2e674ec1e78");
try {
Job job = Job.getInstance(conf, "weibo1");
job.setJarByClass(FirstJob.class);
//设置map任务的输出key类型,value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置reduce个数为4
job.setNumReduceTasks(4);
//定义一个partition表分区,哪些数据应该进入哪些分区
job.setPartitionerClass(FirstPartition.class);
job.setMapperClass(FirstMapper.class);
job.setCombinerClass(FirstReducer.class);
job.setReducerClass(FirstReducer.class);
//设置执行任务时,数据获取的目录及数据输出的目录
FileInputFormat.addInputPath(job, new Path(Paths.TJ_INPUT));
FileOutputFormat.setOutputPath(job, new Path(Paths.TJ_OUTPUT1));
if (job.waitForCompletion(true)) {
System.out.println("FirstJob-执行完毕");
TwoJob.mainJob();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建第二个MapReduce。
创建一个TwoMapper类,功能为:统计每个词的DF
完整代码为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package mr_tj;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
//map时拿到split片段所在文件的文件名
if (!fs.getPath().getName().contains("part-r-00003")) {
//拿到TF的统计结果
String[] line = value.toString().trim().split("\t") ;// 此处补充,以tab键为分隔符
if (line.length >= 2) {
String[] ss = line[0].split("_") ;//此处补充,以_为分隔符
if (ss.length >= 2) {
String w = ss[0];
//统计DF,该词在所有微博中出现的条数,一条微博即使出现两次该词,也算一条
context.write(new Text(w), new IntWritable(1));//此处补充,输出结果
}
} else {
System.out.println("error:" + value.toString() + "-------------");
}
}
}
}创建一个TwoReducer类
完整代码为:1
2
3
4
5
6
7
8
9
10
11
12
13
14package mr_tj;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TwoReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> arg1, Context context) throws IOException, InterruptedException {
int sum = 0;//此处补充,初始化统计值
for (IntWritable i : arg1) {
sum = sum + i.get();//此处补充,统计值累加
}
context.write(key, new IntWritable(sum));//此处补充,输出统计结果
}
}创建一个TwoJob类,功能为:执行计算,得到DF
完整代码为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package mr_tj;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoJob {
public static void mainJob() {
Configuration config = new Configuration();
config.set("yarn.resourcemanager.hostname", "zhangyu@d2e674ec1e78");
try {
Job job = Job.getInstance(config, "weibo2");
job.setJarByClass(TwoJob.class);
//设置map任务的输出key类型,value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TwoMapper.class);
job.setCombinerClass(TwoReducer.class);
job.setReducerClass(TwoReducer.class);
//设置任务运行时,数据的输入输出目录,这里的输入数据是上一个mapreduce的输出
FileInputFormat.addInputPath(job, new Path(Paths.TJ_OUTPUT1));
FileOutputFormat.setOutputPath(job, new Path(Paths.TJ_OUTPUT2));
if (job.waitForCompletion(true)) {
System.out.println("TwoJob-执行完毕");
LastJob.mainJob();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建第三个MapReduce。
创建一个LastMapper类,功能为:执行W = TF * Log(N/DF)计算
完整代码为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78package mr_tj;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
public static Map<String, Integer> cmap = null; //cmap为count
public static Map<String, Integer> df = null;
//setup方法,表示在map之前
protected void setup(Context context) throws IOException, InterruptedException {
if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
URI[] ss = context.getCacheFiles();
if (ss != null) {
for (int i = 0; i < ss.length; i++) {
URI uri = ss[i];
//判断如果该文件是part-r-00003,那就是count文件,将数据取出来放入到一个cmap中
if (uri.getPath().endsWith("part-r-00003")) {
Path path = new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line = br.readLine();
if (line.startsWith("count")) {
String[] ls = line.split("\t");//此处补充,以tab键为分隔符
cmap = new HashMap<String, Integer>();
cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
} else {
//其他的认为是DF文件,将数据取出来放到df中
df = new HashMap<String, Integer>();
Path path = new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line;
while ((line = br.readLine()) != null) {
String[] ls = line.split("\t") ;//此处补充,以tab键为分隔符
df.put(ls[0], Integer.parseInt(ls[1].trim()));
//df这个map以单词作为key,以单词的df值作为value
}
br.close();
}
}
}
}
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
if (!fs.getPath().getName().contains("part-r-00003")) {
String[] v = value.toString().trim().split("\t");//此处补充,以tab键为分隔符
if (v.length >= 2) {
int tf = Integer.parseInt(v[1].trim());
String[] ss = v[0].split("_") ;//此处补充,以_为分隔符
if (ss.length >= 2) {
String w = ss[0];
String id = ss[1];
//执行W = TF * Log(N/DF)计算
double s = tf * Math.log(cmap.get("count") / df.get(w));
//格式化,保留小数点后五位
NumberFormat nf = NumberFormat.getInstance();
nf.setMaximumFractionDigits(5);
//以 微博id+词:权重 输出
context.write(new Text(id), new Text(w+":"+ nf.format(s)));//此处补充,完成输出
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}创建一个LastReducer类
完整代码为:1
2
3
4
5
6
7
8
9
10
11
12
13package mr_tj;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LastReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> arg1, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text i : arg1) {
sb.append(i.toString() + "\t");
}
context.write(key , new Text(sb.toString()));//此处补充,完成输出
}
}创建一个LastJob类,功能为:运行第三个MR
完整代码为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package mr_tj;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LastJob {
public static void mainJob() {
Configuration config = new Configuration();
config.set("yarn.resourcemanager.hostname", "zhangyu@d2e674ec1e78");
try {
Job job = Job.getInstance(config, "weibo3");
job.setJarByClass(LastJob.class);
//将第一个job和第二个job的输出作为第三个job的输入
job.addCacheFile(new Path(Paths.TJ_OUTPUT1 + "/part-r-00003").toUri());
job.addCacheFile(new Path(Paths.TJ_OUTPUT2 + "/part-r-00000").toUri());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setMapperClass();
job.setMapperClass(LastMapper.class);
job.setCombinerClass(LastReducer.class);
job.setReducerClass(LastReducer.class);
FileInputFormat.addInputPath(job, new Path(Paths.TJ_OUTPUT1));
FileOutputFormat.setOutputPath(job, new Path(Paths.TJ_OUTPUT3));
if (job.waitForCompletion(true)) {
System.out.println("LastJob-执行完毕");
System.out.println("全部工作执行完毕");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}在FirstJob类中,单机右键,依次选择Run As=> Run on Hadoop
可以在Console界面看到执行进度在HDFS上查看一下结果数据(-ls -R)
1
hadoop fs -ls -R /tj //查看结果数据。
查看一下各关键字的权重(提示:-cat,各关键字的权重保存在hdfs的/tj/output3/part-r-00000中)得到各关键字的权重后就可以知道哪些是关注华为手机的优质用户了,我们再向这些优质用户投送华为手机的广告,就可以达到精准广告推送的效果了。
1
2Hadoop fs -cat /tj/output3/part-r-00000 //查看各
关键字的权重并保存在hdfs的/tj/output3/part-r-00000中在/data/mydata目录下分别建立output1,output2,output3三个子目录,并修改其访问权限,参照以下示例(以output1为例,其他两个类似处理):
//分别建立output1,output2,output3三个子目录,并修改其访问权限1
2
3
4
5
6
7
8sudo mkdir /data/mydata/output1
sudo chmod 777 /data/mydata/output1 //使用chmod方法,赋予data/mydata/output1文件777权限
sudo mkdir /data/mydata/output2 //建立output2子目录
sudo chmod 777 /data/mydata/output2
sudo mkdir /data/mydata/output3 //建立output3子目录
sudo chmod 777 /data/mydata/output3
```
* 将HDFS上的/tj/output1/part-r-00000、/tj/output1/part-r-00001、/tj/output1/part-r-00002、/tj/output1/part-r-00003、/tj/output2/part-r-00000、/tj/output3/part-r-00000 这6个文件存储到本地Linux的/data/mydata/目录对应的output1/2/3子目录下,若此处系统提示“权限不够”的错误,则修改HDFS上/tj目录的访问权限。代码:hadoop fs -get /tj/output1/part-r-00000 /data/mydata/output1
hadoop fs -get /tj/output1/part-r-00001 /data/mydata/output1
hadoop fs -get /tj/output1/part-r-00002 /data/mydata/output1
hadoop fs -get /tj/output1/part-r-00003 /data/mydata/output1
hadoop fs -get /tj/output1/part-r-00004 /data/mydata/output1
hadoop fs -get /tj/output2/part-r-00000 /data/mydata/output2
hadoop fs -get /tj/output3/part-r-00000 /data/mydata/output3
👀
coding呢需要破釜沉舟,有耐心,遇到问题明确思路,定位问题原因,按照解决思路一步步来,逻辑思维要清醒。👀