前边提到了单线程的实现,这里贴出多线程版,此处主要用多线程去处理hash后的小文件:
package com.kingdee.gmis.mass.data.ips;
import static com.kingdee.gmis.mass.data.ips.MassIP.K10;
import static com.kingdee.gmis.mass.data.ips.MassIP.getPartitionFile;
import static com.kingdee.gmis.mass.data.ips.MassIP.partationCount;
import static com.kingdee.gmis.mass.data.ips.MassIP.printResult;
import static com.kingdee.gmis.mass.data.ips.MassIP.*;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.kingdee.gmis.common.TopNHeap;
import com.kingdee.gmis.mass.data.ips.MassIP.IPInfo;
public class ConcurrentMassIP {
private static int workerCnt = 3;
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// generateMassIp("ip", "ips.txt", 100000000);
// generatePartitionFile("ip", "ips.txt", 100);
searchTopN(20);
}
private static AtomicInteger curIdx = new AtomicInteger(-1);
static File[] smallFiles;
static TopNHeap<IPInfo> destHeap;
/**
* 查找出现次数最多的K个ip地址
*
* @param count
* @throws Exception
*/
public static void searchTopN(int count) throws Exception {
Thread.sleep(10000);
long start = System.currentTimeMillis();
smallFiles = getPartitionFile("ip", partationCount);
destHeap = new TopNHeap<MassIP.IPInfo>(count);
int cnt = workerCnt;
synchronized (lock) {
for (int i = 0; i < cnt; i++) {
new Thread(new Worker(i, count)).start();
}
lock.wait();
printResult(destHeap);
}
System.out.println("Total spend " + (System.currentTimeMillis() - start) + " ms");
}
static Object lock = new Object();
public static synchronized void mergeToResult(TopNHeap<IPInfo> srcHeap) {
try {
destHeap.mergeHeap(srcHeap);
} finally {
if (--workerCnt == 0) {
synchronized (lock) {
lock.notify();
}
}
}
}
static class Worker implements Runnable {
private int topCount;
private int id;
public Worker(int id, int count) {
this.id = id;
this.topCount = count;
}
@Override
public void run() {
int curFileIdx;
DataInputStream dis = null;
Map<Integer, Integer> ipCountMap = new HashMap<Integer, Integer>();
TopNHeap<IPInfo> heap = new TopNHeap<IPInfo>(topCount);
int processCnt = 0;
while ((curFileIdx = curIdx.addAndGet(1)) < partationCount) {
processCnt++;
ipCountMap.clear();
try {
dis = new DataInputStream(new BufferedInputStream(
new FileInputStream(smallFiles[curFileIdx]), K10));
while (dis.available() > 0) {
int ip = dis.readInt();
Integer cnt = ipCountMap.get(ip);
ipCountMap.put(ip, cnt == null ? 1 : cnt + 1);
}
searchMostCountIps(ipCountMap, heap);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (dis != null) {
try {
dis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
System.out.println("Thread " + this.id + " process " + processCnt
+ " files.");
mergeToResult(heap);
}
}
}
测试了下,3线程是120s左右,串行是320s左右,的确提高了很多。测试机子是4核cpu,如果启4个线程,机器都变得很卡,cpu居高不下。另jvm启动参数为:
-server
-Xmx1024m
-Xms1024m
-Xmn600m
-XX:+UseConcMarkSweepGC
-XX:ParallelGCThreads=4
因为此处理过程中,大多数对象存活周期并不长,所以可以把新生代设置大一些。堆初始化设置大些,避免minor gc的时候才去扩展内存大小,因为可以预料到程序一旦启动,加载的内存的东西就会很多。
另外,此处垃圾收集器是用了cms,老生代内存回收就用了cms,新生代用了PurNew,并行收集的,设置gc线程数等于cpu内核数。当然这里也可以设置成-XX:+UseParallelGC、-XX:+UseParallelOldGC都可以,此处主要是新生代内存回收频繁,所以一定要把新生代设置成并发或并行版本的。
通过-server把虚拟机启动为server模式,这样运行时候会启用c2级别的JIT优化,能获得更高质量的编译代码。当然server模式下启动的jvm,默认使用的gc收集器跟-XX:UseParallelGC使用的一样
分享到:
相关推荐
NULL 博文链接:https://yueyemaitian.iteye.com/blog/1180299
1.提取出某日访问百度次数最多的那个IP 2.有一个1G大小的一个文件,里面每一行是一个词 3.给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url? 4.在2.5亿个整数中找出...
1. 给定a、b两个文件,各存放50亿个url,每个url...4. 海量日志数据,提取出某日访问百度次数最多的那个IP。(利用hash分而治之,然后上归并,堆) 5. 在2.5亿个整数中找出不重复的整数,内存不足以容纳这2.5亿个整数。
第一部分、十道海量数据处理面试题 1、海量日志数据,提取出某日访问百度次数最多的那个IP。 此题,在我之前的一篇文章算法里头有所提到,当时给出的方案是:IP的数目还是有限的,最多2^32个,所以可以考虑使用hash...
公元1年1月1日为星期一给定字符串,找出出现次数最多的字符,并且计算次数。给定字符串,找出出现次数最多的字符,并且计算次数。编程:编写一个截取字符串的函数,输入为一个字符串和字节数,输出为按字节截取的...
4、海量日志数据,提取出某日访问百度次数最多的那个IP。 方案1:首先是这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。大数据面试题(2)全文共26页,当前为第3页。大数据面试题(2)全文共26页...
计算某年、某月、某日和某年、某月、某日之间的天数间隔。要求年、月、日通过main方法的参数传递到程序中。
输入公历的某年某月某日,相应的计算出这一天是星期几。
访问SOHU 163主页变为2008年某日主页面故障分析报告--最终版
不通过判断闰年的方法计算距离未来某天或者距离某纪念日过去多少天
由用户输入一个日期,年月日形式,计算输入的某年某月某日是该年的第几天
本程序是用C语言编写的,主要是为了查询某年某月某日是星期几而设计的,该程序经过本人测试,运行成功!
c++ vs2008 计算某日为星期几 swictch case 语句 跟 getweek()
PLC判断距某年某月某日天数
计算某年某月某日是星期几
2. 题目:输入某年某月某日,判断这一天是这一年的第几天?
C语言编程,用来计算输入日期是当年的具体第多少天,考虑闰年
易语言取某日天干地支计算源码。@易语言入门教程。
用c#编写的一个计算某日是该年第多少天的程序,包含三种方法,适合刚开始学习c#的新手参考!