Large-File-Processing

问题:

有一个 100GB 的文件,里面内容是文本
要求:

  • 找出第一个不重复的词
  • 只允许扫一遍原文件
  • 尽量少的 IO
  • 内存限制 16G
  • 随机字符串,每行一个字符串 (长度范围从 0-100)。

思路:

  1. 100G字符串,0-100字节随机,最后换行占两个字节
  2. 每行是一个byte数组,长度1-100不等(不加上换行符),一个字节8位,所以共有2^800种组合
  3. 所以整个文件不可能直接存到内存中,最坏情况,100G中,每个字符串都不同,第一个字符串就是要寻找的目标字符串
  4. 100G 数据最坏情况下有多少行? 决定用 int 还是 long 表示字符串出现频率和字符串第一次出现的位置,假设每行一个字符,行数100 * 1024 * 1024 * 1024 / 3 >2^31-1,超出int范围,若都是相同字符串,则字符串频率也会超出,java中 long最大为2^63 -1 > 100 * 1024 * 1024 *1024, 所以用long统计足够。
  5. 按照最坏情况,设要把大文件拆分成 x 份,每份文件中要记录每行的字符串内容以及在源文件中第一次出现的位置,需要一个long数据转化成字符串 根据最大值 1024 * 1024 * 1024 * 00/3=35791394133.33333, 需要11个字节,再加一个分隔符占一个字节,总共需要12个字节,此文件读到内存中要把原来的String类型的统计字符串位置索引的内容转化成long,每行最多扩大8(long字节数)-1(字符串位置索引最小字节)=7个字节,(其实扩大不了这么多,因为有的是从12个字节减小到8个字节)
  6. 源文件切割份数计算方式如下图
  7. 接下来就是如何切割使得尽量均匀达到我们设置的内存,最重要的一点是相同字符串要在同一个文件中,这是保证分布运算的关键,所以就要用到Hash函数,相同字符串的hash函数值是相同的(我用的java自带的计算String的hashcode)。但是由于2^800 是一个很大的种类数,还是存在极特殊情况使得小文件分布不均匀,遍历文件对一次分割文件变大的文件按照所占内存大小重新分割。
  8. 寻找被切割后的尽量少的文件数是为了尽量减少IO
  9. 切割完成后的对每个文件进行处理的算法就比较简单了,读文件把其存到内存中,统计每个字符串其出现频率和第一次出现的位置。每个文件保存一个结果,即频率为1且最早出现的字符串信息,以后遍历的每个文件中若有频率为1且更早索引位置的,将原有结果替换。若文本中无结果,返回字符串"全文无非重复字符串"
  10. 维护一张hashmap在读取的时候统计词频,在内存范围内,若有词超过两个,就不读入小文件,控制哈希表在14G范围内,多了就不增加写入小文件。
  11. 维护一张bitmap,对每个字符串构建hash函数,14G*8=112G的数值范围已经确保bitmap足够大,100G字符串平均长度50,只有2G的种类数,112G种对比2G种,不同字符串hash冲突的概率极小,极大概率保证字符串hash值不冲突,【然后从尾到头读文件】,字符串计算hash值,查bitmap表,若为0,则置为1,加入候选解,若为1,则删除候选解。再为0变1,则替换候选解。这种效率很高,查找解的速度就是磁盘读取速度。(但是有错误概率,因为没有维护候选解,从头到尾只有一个候选解字符串,另外就是只能从大概率上保证不同字符串的hash值不冲突)
  12. 终极方案,11和12同时进行,同时维护一张bitmap和一张hashmap,hashmap可以作为bitmap的候选解hash表。

可改进的地方

  1. 算法可以优化查询速度,维护一颗树或者堆
  2. 读写内容时,buffer内存效率值也可以改进,目前根据经验设置为1M
  3. 考虑双线程进行读写操作,一边读一遍处理数据,这个提升了改进buffer的空间,也能提升整个的查询效率。

使用和运行

新建Project将3个java文件拷入即可,记得修改首行包名

主程序函数入口:FindFisrtX.main

创建文本测试用例main函数入口:FileIO.main


程序github地址


主程序:FindFisrtX.java

import java.io.*;
import java.util.HashMap;
import java.util.Map;


/** * 定义字符串信息 */
class WordsInfo {
    String word;
    long firstApperIndex;
    long frequency;

    public WordsInfo(long firstApperIndex, long frequency) {
        this.firstApperIndex = firstApperIndex;
        this.frequency = frequency;
    }

    public WordsInfo(long frequency) {
        this.frequency = frequency;
    }

    public WordsInfo(String word, long firstApperIndex, long frequency) {
        this.word = word;
        this.firstApperIndex = firstApperIndex;
        this.frequency = frequency;
    }
}

public class FindFirstX {
    /** * 主函数入口 * * @param args * @throws IOException */
    public static void main(String[] args) throws IOException {
        long startTime = System.currentTimeMillis();
        int num_files = 5;// 被分割文件数量
        String sourceFilePath = "G:/wordTest710.txt"; // 100G大文件路径
        // String sourceFilePath = "D:/面试/pingCAP/test.txt"; // 100G大文件路径
        // String sourceFilePath = "D:/面试/pingCAP/test.txt"; // 100G大文件路径
        FileIO.delAllFile("G:/PingCAP");
        String desFolderPath = "G:/PingCAP"; //切割后的小文件存放路径
        String fileName = "wordShow"; // 小目标文件标准名称
        String[] strTemp; // 存放字符串与出现位置的数组
        String result = "全文无非重复字符串"; // 保存最终结果
        WordsInfo wordsInfo; //存放每个小文件中最有可能的目标解信息
        Long firstApperIndex = Long.MAX_VALUE;
        FileInputStream inputStream = null;
        BufferedInputStream bis = null;
        BufferedReader reader = null;
        FileIO.cutLargeFile(num_files, sourceFilePath, desFolderPath, fileName, 1024 * 1024 * 40); //按照内存限制切割小文件
        File dirFile = new File(desFolderPath);
        String[] fileList = dirFile.list();
        for (String s : fileList) {
            System.out.println(s);
        }

        for (String fileName_re : fileList) {

            Map<String, WordsInfo> wordsMap = new HashMap<>(); //存单词的容器
            try {
                inputStream = new FileInputStream(desFolderPath + "/" + fileName_re);
                bis = new BufferedInputStream(inputStream); //带缓冲数组的输入流
                reader = new BufferedReader(new InputStreamReader(bis, "utf-8"), 1 * 1024 * 1024);
                String line;
                while ((line = reader.readLine()) != null) {

                    strTemp = line.trim().split("分");
                    KeepWordsToMap(wordsMap, strTemp[0], Long.valueOf(strTemp[1])); // 保存到容器
                }

            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (inputStream != null) {
                    inputStream.close();
                }

                if (reader != null) {
                    reader.close();
                }
                if (bis != null) {
                    bis.close();
                }
            }
            wordsInfo = FindFirstSingleX(wordsMap);
            if (wordsInfo.frequency == 1 && wordsInfo.firstApperIndex < firstApperIndex) {
                firstApperIndex = wordsInfo.firstApperIndex;
                result = wordsInfo.word;
            }
        }
        System.out.println("第一个不重复的字符串为: " + result); // 输出结果
        long endTime = System.currentTimeMillis();
        System.out.println("程序总运行时间:" + (endTime - startTime) + "ms"); //输出程序运行时间
    }

    /** * method :把每个字符串存进当前map,并记录其,第一次出现的位置以及出现频率 * * @param wordsMap * @param s * @param countIndex */
    public static void KeepWordsToMap(Map<String, WordsInfo> wordsMap, String s, Long countIndex) {

        if (wordsMap.get(s) != null)
            wordsMap.replace(s.trim(), new WordsInfo(wordsMap.get(s.trim()).firstApperIndex, wordsMap.get(s.trim()).frequency + 1L));
        else wordsMap.put(s, new WordsInfo(countIndex, 1L));
    }

    /** * method:遍历map,得到第一次出现未重复的解,若无返回默认解 * * @param wordsMap * @return */
    public static WordsInfo FindFirstSingleX(Map<String, WordsInfo> wordsMap) {
        String result = "";
        long minFirstApperIndex = Long.MAX_VALUE;
        long frequency = 2;
        WordsInfo wordsInfo = new WordsInfo(result, minFirstApperIndex, frequency);
        for (String s : wordsMap.keySet()) {
            if (wordsMap.get(s).frequency == 1 && wordsMap.get(s).firstApperIndex < minFirstApperIndex) {
                wordsInfo.word = s;
                wordsInfo.firstApperIndex = wordsMap.get(s).firstApperIndex;
                minFirstApperIndex = wordsMap.get(s).firstApperIndex;
                wordsInfo.frequency = 1;
            }
        }

        return wordsInfo;
    }

}

FileIO 操作文件的一些方法,以及切割小文件的方法

import java.io.*;
import java.util.HashMap;

public class FileIO {
    // 用于创建测试用例
    public static void main(String[] args) throws IOException {
        long startTime = System.currentTimeMillis();    //获取开始时间
        float a = 0.15F; // 随机输入一个不重复数据
        String str = "";
        File f = new File("G:/wordTest710.txt");
        FileOutputStream fop = new FileOutputStream(f, false);
        OutputStreamWriter writer = new OutputStreamWriter(fop, "UTF-8");
        BufferedWriter bw = new BufferedWriter(writer, 1 * 1024 * 1024);
        for (long i = 0L; i < 800000L; i++) {
            if (a < Math.random()) {
                bw.append("TWODOG");
                bw.append("\r\n");
                a = 2.0F;
            }
            str = Utils.creatWord(1, 100);
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
        }
        bw.append("xiaoxinniubi");
        bw.append("\r\n");

        writer.flush();
        bw.flush();
        fop.flush();
        writer.close();
        bw.close();
        fop.close();

        System.out.println("完成");
        long endTime = System.currentTimeMillis();    //获取结束时间
        System.out.println("创建测试用例程序运行时间:" + (endTime - startTime) + "ms");    //输出程序运行时间

    }

    /** * 方法:把字符串写入文件 * * @param line * @param ch * @param Index :大文件里出现位置的索引 */
    public static void WriteToFile(String line, char ch, Long Index, BufferedWriter bw) throws IOException {

        bw.append(line + ch + Index + "\r\n");
    }

    public static void WriteToFile(String line, BufferedWriter bw) throws IOException {

        bw.append(line + "\r\n");
    }

    /** * 方法: 把大文件切割成小文件 * * @param num_file 分割后的小文件数量 * @param sourceFilePath 被分割源文件路径 * @param desFolderPath 存放分割后目标文件夹路径 * @param fileName 小目标文件标准名称 * @param smallFileMem 小文件内存限制 * @throws IOException */
    public static void cutLargeFile(int num_file, String sourceFilePath, String desFolderPath, String fileName, long smallFileMem) throws IOException {
        long hashMapMem = 0L;// 定义读取文件时候存储的hashmap空间
        final long tempMapMemLimit = 1024L * 1024L * 1024L * 14L;
        HashMap<String, Long> tempHashMap = new HashMap<>(); //维护一个减少小文件写入的hash表
        long startTime = System.currentTimeMillis();    //获取开始时间
        FileInputStream inputStream = null;
        BufferedInputStream bis = null;
        BufferedReader reader = null;
        // int num_file = 26;
        File[] files = new File[num_file];
        FileOutputStream[] fops = new FileOutputStream[num_file];
        OutputStreamWriter[] writers = new OutputStreamWriter[num_file];
        BufferedWriter[] bws = new BufferedWriter[num_file];
        for (int i = 0; i < num_file; i++) {
            files[i] = new File(desFolderPath + "/" + fileName + i + ".txt");
            fops[i] = new FileOutputStream(files[i], true);
            writers[i] = new OutputStreamWriter(fops[i], "UTF-8");
            bws[i] = new BufferedWriter(writers[i], 1 * 1024 * 1024);
        }

        try {
            Long index = 0L; //统计字符串在源文件中的位置
            inputStream = new FileInputStream(sourceFilePath);
            bis = new BufferedInputStream(inputStream); //带缓冲数组的输入流
            reader = new BufferedReader(new InputStreamReader(bis, "utf-8"), 1 * 1024 * 1024);
            String line;

            while ((line = reader.readLine()) != null) {
                String trueLine = line.trim();

              /* System.out.println("tempHashMap.get(trueLine) " + tempHashMap.get(trueLine)); System.out.println("hashMapMem < (long)(1024 * 1024 * 1024 * 14) " + (hashMapMem < tempMapMemLimit)); System.out.println("真假: " + tempHashMap.get(trueLine) == null && hashMapMem < tempMapMemLimit);*/
                if (tempHashMap.get(trueLine) == null && hashMapMem < tempMapMemLimit) {
                    tempHashMap.put(trueLine, 1L);
                    hashMapMem += (8L + 4L + (long) trueLine.length()); // hashcode占4字节,频率占8字节,字符串占 trueLine.length() 字节
                } else if (tempHashMap.get(trueLine) != null && hashMapMem < tempMapMemLimit) {
                    tempHashMap.put(line.trim(), tempHashMap.get(trueLine) + 1L);
                    hashMapMem += (8L + 4L + (long) trueLine.length());
                }

                if (tempHashMap.get(trueLine) < 2 || (tempHashMap.get(trueLine) == null && hashMapMem > (long) (1024 * 1024 * 1024 * 14))) {
                    int type = trueLine.hashCode() % num_file > 0 ? trueLine.hashCode() % num_file : -trueLine.hashCode() % num_file;
                    //System.out.println("type: " + type);
                    // System.out.println("line.trim().hashCode: " + line.trim().hashCode());
                    FileIO.WriteToFile(trueLine, '分', index, bws[type]);
                    index++;
                }

            }
            for (int i = 0; i < num_file; i++) {
                fops[i].flush();
                writers[i].flush();
                bws[i].flush();
                fops[i].close();
                writers[i].close();
                bws[i].close();
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (inputStream != null) {
                inputStream.close();
            }

            if (reader != null) {
                reader.close();
            }
            if (bis != null) {
                bis.close();
            }
        }
        for (File file : files) {
            FileInputStream inputStream_re = null;
            BufferedInputStream bis_re = null;
            BufferedReader reader_re = null;
            System.out.println(file.length());
            if (file.length() > smallFileMem) {
                int copies = (int) (Math.ceil((double) file.length()) / (double) smallFileMem); // 分成copies份
                //int copies = 2; // 分成copies份
                File[] files_re = new File[copies];
                FileOutputStream[] fops_re = new FileOutputStream[copies];
                OutputStreamWriter[] writers_re = new OutputStreamWriter[copies];
                BufferedWriter[] bws_re = new BufferedWriter[copies];
                for (int i = 0; i < copies; i++) {
                    int fileIndex = i + num_file;
                    files_re[i] = new File(desFolderPath + "/" + fileName + (fileIndex) + ".txt");
                    fops_re[i] = new FileOutputStream(files_re[i], true);
                    writers_re[i] = new OutputStreamWriter(fops_re[i], "UTF-8");
                    bws_re[i] = new BufferedWriter(writers_re[i], 1 * 1024 * 1024);
                }
                try {
                    inputStream_re = new FileInputStream(file.getAbsoluteFile());
                    bis_re = new BufferedInputStream(inputStream_re); //带缓冲数组的输入流
                    reader_re = new BufferedReader(new InputStreamReader(bis_re, "utf-8"), 1 * 1024 * 1024);
                    String line;
                    String[] trueStr;//文本中真实字符串

                    while ((line = reader_re.readLine()) != null) {
                        trueStr = line.trim().split("分");
                        int type = Utils.APHash(trueStr[0]) % copies > 0 ? Utils.APHash(trueStr[0]) % copies : -Utils.APHash(trueStr[0]) % copies;
                        FileIO.WriteToFile(line.trim(), bws_re[type]);

                    }
                    for (int i = 0; i < copies; i++) {
                        fops_re[i].flush();
                        writers_re[i].flush();
                        bws_re[i].flush();
                        fops_re[i].close();
                        writers_re[i].close();
                        bws_re[i].close();
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (inputStream_re != null) {
                        inputStream_re.close();
                    }

                    if (reader_re != null) {
                        reader_re.close();
                    }
                    if (bis_re != null) {
                        bis_re.close();
                    }
                }
                num_file = num_file + copies;
                file.delete();
            }
        }

        long endTime = System.currentTimeMillis();
        System.out.println("大文件分成小文件程序运行时间:" + (endTime - startTime) + "ms");

    }


    /** * 清空文件夹 * * @param folderPath */
    public static void delFolder(String folderPath) {
        try {
            delAllFile(folderPath); //删除完里面所有内容
            String filePath = folderPath;
            filePath = filePath.toString();
            java.io.File myFilePath = new java.io.File(filePath);
            myFilePath.delete(); //删除空文件夹
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /** * 删除指定文件夹下所有文件 * * @param path 文件夹完整绝对路径 * @return */
    public static boolean delAllFile(String path) {
        boolean flag = false;
        File file = new File(path);
        if (!file.exists()) {
            return flag;
        }
        if (!file.isDirectory()) {
            return flag;
        }
        String[] tempList = file.list();
        File temp = null;
        for (int i = 0; i < tempList.length; i++) {
            if (path.endsWith(File.separator)) {
                temp = new File(path + tempList[i]);
            } else {
                temp = new File(path + File.separator + tempList[i]);
            }
            if (temp.isFile()) {
                temp.delete();
            }
            if (temp.isDirectory()) {
                delAllFile(path + "/" + tempList[i]);//先删除文件夹里面的文件
                delFolder(path + "/" + tempList[i]);//再删除空文件夹
                flag = true;
            }
        }
        return flag;
    }
}


其它一些小工具常见hash函数等 Utils.java

package xin.twodog.PingCAP;

import java.io.File;

public class Utils {

    /** * 随机生成单词 * * @param min 最小长度 * @param max 最大长度 * @return */

    public static String creatWord(int min, int max) {
        int count = (int) (Math.random() * (max - min + 1)) + min;
        String str = "";
        for (int i = 0; i < count; i++) {
            str += (char) ((int) (Math.random() * 26) + 'a');
        }
        return str;
    }

    /** * 返回文件内存大小 * * @param filePath * @return * @throws Exception */
    public static Long getFileMem(String filePath) {
        File localFile = new File(filePath);
        return localFile.length();
    }


    /** * 删除文件 * * @param filePath */
    public static void delFile(String filePath) {
        File localFile = new File(filePath);
        localFile.delete();
    }


    /** * DEKHash算法 * * @param str * @return */
    public static int DEKHash(String str) {
        int hash = str.length();
        for (int i = 0; i < str.length(); i++) {
            hash = ((hash << 5) ^ (hash >> 27)) ^ str.charAt(i);
        }
        return (hash & 0x7FFFFFFF);
    }

    /** * APHash算法 * * @param str * @return */
    public static int APHash(String str) {
        int hash = 0;
        for (int i = 0; i < str.length(); i++) {
            hash ^= ((i & 1) == 0) ? ((hash << 7) ^ str.charAt(i) ^ (hash >> 3)) :
                    (~((hash << 11) ^ str.charAt(i) ^ (hash >> 5)));
        }
        return hash;
    }

    /** * 改进的32位FNV算法1 * * @param data 字符串 * @param data * @return int值 */
    public static int FNVHash1(String data) {
        final int p = 16777619;
        int hash = (int) 2166136261L;
        for (int i = 0; i < data.length(); i++)
            hash = (hash ^ data.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash;
    }

    /** * JS hash 算法 * * @param str * @return */
    public static int JSHash(String str) {
        int hash = 1315423911;
        for (int i = 0; i < str.length(); i++) {
            hash ^= ((hash << 5) + str.charAt(i) + (hash >> 2));
        }
        return (hash & 0x7FFFFFFF);
    }
}