您现在的位置是:主页 > news > python node 网站开发/制作网站
python node 网站开发/制作网站
admin2025/5/2 15:53:02【news】
简介python node 网站开发,制作网站,做网站的成本,济宁网站建设培训班MapReduce原理理解:看下图图示即可明白整体流程点击图片可看高清大图原型:在mapreduce中,map task调用map处理逻辑将处理后的key和value利用outputcollector.collect()放入一个环形缓冲区中,那么这个缓冲区是有一定大小的…
MapReduce原理理解:
看下图图示即可明白整体流程
点击图片可看高清大图
原型:
在mapreduce中,map task调用map处理逻辑将处理后的key和value利用outputcollector.collect()放入一个环形缓冲区中,那么这个缓冲区是有一定大小的,那么如果放入的内容很多很多的时候怎么办呢?其实hadoop里面有这么个机制,在缓冲区达到某一个值或者比率的时候,比如80%,那么hadoop会利用Spiller.spill()将这个80%的数据读出来按照HashPartitioner.getPartitioner()来进行分区并按照key进行快速排序,然后kv键值对会继续向缓冲区中存放,形成一个环形存储逻辑,只要达到80%就将这块取出来,那么缓冲区就会一直可用。
模拟实现:
定义了一个容量为100的字符串数组来模拟环形缓冲区。另外还定义了一个size用来决定原型中比率,默认值给的80.start用来记录这个80内容的起始位置,一个布尔类型的参数为true时意味着缓冲区中已经超过80,需要开始进行Spiller.spill()。
程序主要是两个进程,第一个进程是不断地将源文件按行读取,存入数组中,当数据达到80的时候,将布尔类型标记置为true,并设置起始位置,不需要考虑其他情况,当达到100的时候,会继续往数组中写入数据,从0开始,不断循环,知道文件读取完毕。
第二个进程会不断去验证标记位的值,如果为true,就会获得当前起始位置,将标记位置为false,然后遍历这80的数据,存入到某个编号文件中,读取完毕后会再次去嗅探标记位。
代码实现:
package com.season.spill;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
public class FileJobSubmitter {
public static final String FILE_READ ="F:\\io\\BufferInput\\season.txt";
public static String[] CIRCLE_BUFFER = new String[100];
public static int BUFFER_SIZE = 80;
public static int BUFFER_START = 0;
public static int BUFFER_COUNTS = 0;
public static boolean READ_FLAG = false;
public static int FILE_ID = 1000;
public static void main(String[] args) {
ReadFileToCircleBuffer rf = new ReadFileToCircleBuffer();
Thread rfThread = new Thread(rf);
WriteFileFromBuffer wf = new WriteFileFromBuffer();
Thread wfThread = new Thread(wf);
rfThread.start();
wfThread.start();
}
static class ReadFileToCircleBuffer implements Runnable {
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(FILE_READ))));
String lenTxt = null;
int i = 0 ;
while((lenTxt = br.readLine()) != null ){
CIRCLE_BUFFER[i] = lenTxt;
BUFFER_COUNTS++;
if(BUFFER_COUNTS == 80){
BUFFER_START = i < 79 ? 20 + i : i-79 ;
READ_FLAG = true ;
BUFFER_COUNTS = 0 ;
}
System.out.println("内容:"+lenTxt+" flag="+ READ_FLAG + " start="+ BUFFER_START+" 总数=" + BUFFER_COUNTS+" i = "+ i);
i = i==99?0:i+1;
Thread.sleep(1000);
}
BUFFER_START = i < BUFFER_COUNTS -1 ? 100 - BUFFER_COUNTS + i : i- BUFFER_COUNTS + 1 ;
READ_FLAG = true ;
BUFFER_SIZE = BUFFER_COUNTS;
br.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class WriteFileFromBuffer implements Runnable {
@Override
public void run() {
while (true) {
try {
if (READ_FLAG) {
READ_FLAG = false;
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File("F:\\io\\BufferOutput\\part-" + FILE_ID + "size80.txt"))));
// 将起点开始的80个记录存入文件中
int start = BUFFER_START;
for (int i = 0; i < BUFFER_SIZE; i++) {
System.out.println( "*******存入:"+CIRCLE_BUFFER[start]);
String temp = CIRCLE_BUFFER[start];
bw.write(temp);
bw.newLine();
Thread.sleep(500);
start = start==99?0:start+1;
}
bw.close();
FILE_ID++;
}
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
总结:
其中有些输出是为了查看当前的进度来验证程序的准确性。经过这次小小的练习,收获颇丰,又将一些很少用到的基本知识复习了一般,不乏有长时间不用忘记的,其实,应该多拿出一些时间来实现某些简单的小程序巩固自己的基础知识,拓实自己。