东's profile谭东的空间BlogGuestbook Tools Help

Blog


    May 19

    hadoop/mapred优化方法.V004

    欢迎拍砖.
     
    ***某些方法, 会导致程序可维护性会降低***
     
    从三个方面着手优化 :
    1. hadoop配置
    2. 设计mapred/job
    3. 代码级别.
    4. 改造hadoop
     
    一. conf/hadoop-site.xml配置.
    经验要求高, 特别需要结合实际情况.
    典型参数如
    复制因子,
    mapred.child.java.opts,
    mapred.tasktracker.map.tasks.maximum,
    mapred.tasktracker.reduce.tasks.maximum,
    mapred.map.tasks,
    mapred.reduce.tasks,
    fs.inmemory.size.mb,
    dfs.block.size
    等等
     
    二. 在同一个job内完成尽可能多的计算任务, 主要是设计key和自定义OutputFormat, 将能合并的计算任务合并.
    举例 : 用户访问行为(userid, ip, cookie), 分别统计每个用户的ip数和cookie数.
    最简单的设计, 是使用量个job, 分别计算ip数和cookie数.但是我们可以按照下面的思路, 在一个job中完成这两项计算 :
    (a). 把userid和字段存储到key中
    public class UserKey implements WritableComparable<UserKey>{
    int userId;//userid
    byte field;//0代表ip, 1代表cookie
    @Override
    public int compareTo(UserKey o) {
    if(userId > o.userId)return 1;
    if(userId < o.userId)return -1;
    if(field > o.field)return 1;
    if(field < o.field)return -1;
    return 0;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    }
    @Override
    public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    }
    }
    (b). 实现自定义的OutputFormat, 下面是两处关键代码如下 :
    (x).
    SequenceFile.Writer[] writers = new SequenceFile.Writer[2];
    writers[0] = SequenceFile.createWriter(FileSystem.get(conf), conf, "ip", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
    writers[1] = SequenceFile.createWriter(FileSystem.get(conf), conf, "field", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
    (xx).
    writers[key.field].append(key.userId, value.get());
     
    三. 避免不必要的reduce任务.
    (1). 假定要处理的数据是排序且已经分区的. 或者对于一份数据, 需要多次处理, 可以先排序分区.
    (2). 自定义InputSplit, 将单个分区作为单个mapred的输入.
    (3). 在map中处理数据, Reducer设置为空.
    这样, 既重用了已有的 "排序", 也避免了多余的reduce任务.
     
    四. 使用自定义的MapRunnable.
    hadoop自带了两个MapRunnable,
    (1). 一个是默认的单线程MapRunnable, org.apache.hadoop.mapred.MapRunner
    (2).另一个是多线程的, org.apache.hadoop.mapred.lib.MultithreadedMapRunner.
    根据特定情况, 可以自定义MapRunnable,
    (1). 启用多线程, 比如web爬行时, 可启用多线程抓取网页.
    (2). 避免map时, 单台tasktracker上辅助数据冗余, 比如在多模匹配时, 避免生成多份DFA.
     
    五. 在某些情况下, 利用数据分布特性设计PARTITIONER的分区算法, 避免单个mapred消耗时间过长.
    这跟木桶原理有些神似.
    比如处理大量字符串时,
    (1). 已知首字不同的字符串之间不存在任何关联关系
    (2). 原始数据在某些 "首字" 上分布密集, 另一些 "首字" 上分布稀疏.
    例如, 原始数据中, 1亿个以3开头, 1亿个以7开头, 3个以6开头.
    那么,
    (1). 如果以首字对4求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在同一分区.若hadoop群集只支持同时2个map任务, 则...
    (2). 如果以首字对3求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在不同分区.
     
    六. 最大限度地重用对象, 避免对象的生成/销毁开销.
    该点在hadoop自带的org.apache.hadoop.mapred.MapRunner中尤为突出,
    它使用同一个key对象和同一个value对象不停读取原始数据, 再将自身交给mapper处理.
    (此处注意, 若要保留该对象的即时状态, 需要clone, 深克隆或浅克隆.)
     
    七. 在逻辑意义上, 合并同一对象. 如dotnet和java中的字符串INTERN技术.
     
    八. 根据已有条件, 简化循环判定.
    比如, for(int i = 0; i < end && i < size; i++);
    可以改成 :
    end = end < size ? end : size;
    for(int i = 0; i < end; i++);
     
    九. 降低多线程数目, 而让固定数目的线程循环处理.
    比如, 一台机器8个CPU, 现在需要处理80亿个数据,
    那么下面两个方案 :
    (1). 启动800个线程, 每个线程处理80亿/800个数据.
    (2). 启动8个线程(注意, 此处是8个), 每个线程循环处理, 每次循环处理100万个.
    通常我个人选择方案(2).因为 :
    (1). 最大限度利用了CPU.
    (2). 避免了线程调度.
    (3). 在java中, 可以使用AtomicInteger控制线程循环, AtomicInteger的效率很高.
    (4). 有时, 还可以避免单个线程消耗时间过长.
     
    十. 使用位移替代浮点数计算. 比如用 100 >> 3替代100 * 0.125.
    (另外, 我们会需要将某个中间值乘以一个调节因子(经验值), 比如乘以0.12,
    如果乘以0.12和0.124 "差不多" 时, 可以考虑直接使用位移).
     
    十一. 避免循环体内不必要的判断逻辑, 与第八条不同.
    比如, 处理10亿个数据, 每遇到一个有效数据时, 需要同前一个有效数据进行关联处理(或与前一个中间值进行关联处理),
    for(int i = 0; i < size; i++)
    {
    //1. 判定是否存在前一个有效数据
    //2. 如果不存在前一个有效数据, 则continue;
    //3. 如果存在前一个有效数据, 则进行关联处理, 再continue.
    }
    通常在此种需求下, 一旦遇到一个有效数据, 必定会产生一个可供后续紧邻数据关联的值,
    那么 :
    int i = 0;
    for(int i = 0; i < size; i++)
    {
    //1. data[i]是否有效?
    //2. data[i]无效, continue;
    //3. data[i]有效, break;
    }
    for(; i < size; i++)
    {
    //与前一个有效数据进行关联处理, 再continue.
    }
     
    十二. 方法调用过程, 辅助数据尽量放在方法体内, 避免使用全局辅助数据, 一来节省内存, 二来提高对象可重用性.
     
    十三. 尽量不要生成转瞬即逝的对象, 或者专门构建专属对像来完成这一任务.
    比如 :
    1). 提供直接使用构造函数参数进行序列化的静态方法, 避免先使用参数构造对象再进行序列化.
    2). 参考上述第六点.
     
    十四. 利用-1 和 1的关联性, 减少内存使用量, 或携带更多的信息.
    比如java.util.Arrays.binarySearch方法的返回值.
     
    十五. 对于方方正正的多位数组Arr[d0][d1][d2]..[dn], 且di >> d(i+1)时, 可以考虑使用一维数组替代, 减少对象.
    这是因为java中多位数组实际上使用 "数组的数组" 实现的.
     
    十六. 尽量使key的WritableComparable性能最佳, 尽量使value的Writable性能最佳.
    比如使用掩码操作.
     
    十七. 尽早丢弃无关对象.
    见 "使用hadoop/mapred的典型计数问题".
     
    十八. 改造hadoop, 使merge过程更具弹性, 或更符合实际需求.
    比如 :
    1). 使reduce的<key, values>中的values按照顺序迭代.
    2). 见 "使用hadoop/mapred的典型计数问题".
     
    十九. 有效设计mapred中的combiner, 尽早降低I/O等操作.
    此过程中, 可以结合自定义OutputFormat, 使得同一个Recuder类可同时充当map->merge->reduce中的后两个过程.
    见 "使用hadoop/mapred的典型计数问题".
    May 18

    hadoop/mapred优化方法.V003

    个人总结了10个可以考虑优化的点, 供大家参考, 也想抛砖引玉, 要是最后能形成一个 "优化大全" 就非常nb了.
    欢迎拍砖.
     
    ***某些方法, 会导致程序可维护性会降低***
     
    从三个方面着手优化 :
    1. hadoop配置
    2. 设计mapred/job
    3. 代码级别.
     
    一. conf/hadoop-site.xml配置.
    经验要求高, 特别需要结合实际情况.
    典型参数如
    复制因子,
    mapred.child.java.opts,
    mapred.tasktracker.map.tasks.maximum,
    mapred.tasktracker.reduce.tasks.maximum,
    mapred.map.tasks,
    mapred.reduce.tasks,
    fs.inmemory.size.mb,
    dfs.block.size
    等等
     
    二. 在同一个job内完成尽可能多的计算任务, 主要是设计key和自定义OutputFormat, 将能合并的计算任务合并.
    举例 : 用户访问行为(userid, ip, cookie), 分别统计每个用户的ip数和cookie数.
    最简单的设计, 是使用量个job, 分别计算ip数和cookie数.但是我们可以按照下面的思路, 在一个job中完成这两项计算 :
    (a). 把userid和字段存储到key中
    public class UserKey implements WritableComparable<UserKey>{
    int userId;//userid
    byte field;//0代表ip, 1代表cookie
    @Override
    public int compareTo(UserKey o) {
    if(userId > o.userId)return 1;
    if(userId < o.userId)return -1;
    if(field > o.field)return 1;
    if(field < o.field)return -1;
    return 0;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    }
    @Override
    public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    }
    }
    (b). 实现自定义的OutputFormat, 下面是两处关键代码如下 :
    (x).
    SequenceFile.Writer[] writers = new SequenceFile.Writer[2];
    writers[0] = SequenceFile.createWriter(FileSystem.get(conf), conf, "ip", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
    writers[1] = SequenceFile.createWriter(FileSystem.get(conf), conf, "field", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
    (xx).
    writers[key.field].append(key.userId, value.get());
     
    三. 避免不必要的reduce任务.
    (1). 假定要处理的数据是排序且已经分区的. 或者对于一份数据, 需要多次处理, 可以先排序分区.
    (2). 自定义InputSplit, 将单个分区作为单个mapred的输入.
    (3). 在map中处理数据, Reducer设置为空.
    这样, 既重用了已有的 "排序", 也避免了多余的reduce任务.
     
    四. 使用自定义的MapRunnable.
    hadoop自带了两个MapRunnable,
    (1). 一个是默认的单线程MapRunnable, org.apache.hadoop.mapred.MapRunner
    (2).另一个是多线程的, org.apache.hadoop.mapred.lib.MultithreadedMapRunner.
    根据特定情况, 可以自定义MapRunnable,
    (1). 启用多线程, 比如web爬行时, 可启用多线程抓取网页.
    (2). 避免map时, 单台tasktracker上辅助数据冗余, 比如在多模匹配时, 避免生成多份DFA.
     
    五. 在某些情况下, 利用数据分布特性设计PARTITIONER的分区算法, 避免单个mapred消耗时间过长.
    这跟木桶原理有些神似.
    比如处理大量字符串时,
    (1). 已知首字不同的字符串之间不存在任何关联关系
    (2). 原始数据在某些 "首字" 上分布密集, 另一些 "首字" 上分布稀疏.
    例如, 原始数据中, 1亿个以3开头, 1亿个以7开头, 3个以6开头.
    那么,
    (1). 如果以首字对4求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在同一分区.若hadoop群集只支持同时2个map任务, 则...
    (2). 如果以首字对3求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在不同分区.
     
    六. 最大限度地重用对象, 避免对象的生成/销毁开销.
    该点在hadoop自带的org.apache.hadoop.mapred.MapRunner中尤为突出,
    它使用同一个key对象和同一个value对象不停读取原始数据, 再将自身交给mapper处理.
    (此处注意, 若要保留该对象的即时状态, 需要clone, 深克隆或浅克隆.)
     
    七. 在逻辑意义上, 合并同一对象. 如dotnet和java中的字符串INTERN技术.
     
    八. 根据已有条件, 简化循环判定.
    比如, for(int i = 0; i < end && i < size; i++);
    可以改成 :
    end = end < size ? end : size;
    for(int i = 0; i < end; i++);
     
    九. 降低多线程数目, 而让固定数目的线程循环处理.
    比如, 一台机器8个CPU, 现在需要处理80亿个数据,
    那么下面两个方案 :
    (1). 启动800个线程, 每个线程处理80亿/800个数据.
    (2). 启动8个线程(注意, 此处是8个), 每个线程循环处理, 每次循环处理100万个.
    通常我个人选择方案(2).因为 :
    (1). 最大限度利用了CPU.
    (2). 避免了线程调度.
    (3). 在java中, 可以使用AtomicInteger控制线程循环, AtomicInteger的效率很高.
    (4). 有时, 还可以避免单个线程消耗时间过长.
     
    十. 使用位移替代浮点数计算. 比如用 100 >> 3替代100 * 0.125.
    (另外, 我们会需要将某个中间值乘以一个调节因子(经验值), 比如乘以0.12,
    如果乘以0.12和0.124 "差不多" 时, 可以考虑直接使用位移).
     
    十一. 避免循环体内不必要的判断逻辑, 与第八条不同.
    比如, 处理10亿个数据, 每遇到一个有效数据时, 需要同前一个有效数据进行关联处理(或与前一个中间值进行关联处理),
    for(int i = 0; i < size; i++)
    {
    //1. 判定是否存在前一个有效数据
    //2. 如果不存在前一个有效数据, 则continue;
    //3. 如果存在前一个有效数据, 则进行关联处理, 再continue.
    }
    通常在此种需求下, 一旦遇到一个有效数据, 必定会产生一个可供后续紧邻数据关联的值,
    那么 :
    int i = 0;
    for(int i = 0; i < size; i++)
    {
    //1. data[i]是否有效?
    //2. data[i]无效, continue;
    //3. data[i]有效, break;
    }
    for(; i < size; i++)
    {
    //与前一个有效数据进行关联处理, 再continue.
    }
     
    十二. 方法调用过程, 辅助数据尽量放在方法体内, 避免使用全局辅助数据, 一来节省内存, 二来提高对象可重用性.
     
    十三. 尽量不要生成转瞬即逝的对象, 或者专门构建专属对性来完成这一任务. 比如提供直接使用构造函数参数进行序列化的静态方法, 避免先使用参数构造对象再进行序列化.
     
    十四. 利用-1 和 1的关联性, 减少内存使用量.
     
    十五. 对于方方正正的多位数组Arr[d0][d1][d2]..[dn], 且di >> d(i+1)时, 可以考虑使用一维数组替代.
    这是因为java中多位数组实际上使用 "数组的数组" 实现的.
     
    十六. 尽量使key的IO/WritableComparable性能最佳, 尽量使value的Writable性能最佳. 比如使用掩码操作.
     
    十七. 尽早丢弃无关对象. 见 "使用hadoop/mapred的典型计数问题"
    May 15

    一个文本的信息量到底有多大?

    给你一个文本, 怎么知道它到底有多少信息呢?

    (我们假定不考虑文字的先后顺序, 即给你一个字符集, 怎么知道它到底有多少信息?)

     

    1). 给定3种字符, A, B, C, 每种字符都有无穷多个, 分别用{A}, {B}, {C}表示.

    2). 现在用4A, 5B7C搭建成一个字符集X, 不考虑字符之间的顺序问题, X有多少信息量?

     

    我们来看一下 :

    0). 字符集X总共有16个字符.

    1). 在字符集X, 随机选择一个字符x, xA的概率是4/16.

    2). {A}抓到一个A字符的概率为1/2, 即为 "抓到" "没抓到".

    3). 我们发现A类字符的概率从1/2变成了4/16!

    4). 而在{A}, 抓到2A字符的概率为(1/2) * (1/2) = 1/4

    5). 从第四点得到, 字符集X中的4A, 其实可以映射为{A}中的2A, 这里的2也就是AX贡献的信息量.

    6). 很可惜, A为字符集X贡献的信息量, 只占字符集X4/16, 所以2还需要乘以一个4/16.

    7). 同理可得BC为字符集X贡献的信息量, 最终计算得到X的信息量 :

         4/16 * log(1/2, 4/16) +

         5/16 * log(1/2, 5/16) +

         7/16 * log(1/2, 7/16)

         这也就是一个entropy()公式.

     

    entropy()在维基百科上的解释 :

    热力学中熵表示的是“系统混乱状态”;信息论中信息熵表示的是信息量;生态学中熵表示的是生物多样性。

    May 14

    使用hadoop/mapred的典型计数问题

    引言 :
    这类问题, 比较常见, 时常要求性能. 个人也遇到过.
    现提供两个候选思路.
     
    模拟需求 :
    1). 每个record的格式 : (id, cookie), 即一个id对应一个cookie
    2). 一小时内产生上亿个record
    3). 要求计算每个id对应多少个不同的cookie, 要求速度尽量快.
     
    候选设计(一) :
    1). 将id + cookie联合做hadoop/mapred的Key(下文称该key为联合key), Value直接使用hadoop自带的NullWritable.
    2). partitioner的分区算法只针对联合key中的id计算.
    3). 在reducer中, 忽略values, 直接将联合key置入collector.
    4). 将上述reducer作为combiner.
    5). 自定义reduce的OutputFormat, 它的collect方法如下
    //用pre_c_key表示上一个联合key
    //用input_c_key表示输入的联合key
    if(input_c_key.id == prev_c_key.id)prev_c_key.count++;
    else
    {
    //1. 将prev_c_key.id和prev_c_key.count写入文件
    //2. prev_c_key.id = input_c_key.id
    //3. prev_c_key.count = 1;
    }
     
    影响该设计效率的因素 :
    一般思路, 可能考虑将id做key, 现在使用id + cookie做key, 将导致key膨胀.
    还好确认hadoop/mapred中的MERGE过程不会因此对该设计产生负面影响
    (个人感觉上, 认为这个设计比较可行)
     
    候选设计(二) :
    改造hadoop/mapred的中间merge过程, 使key与value都相同的record只保留一个.
    1). id做key, cookie做value
    2). 改造org.apache.hadoop.mapred.Merger.MergerQueue类中的lessThan方法, 使得当key相等时, 返回value的大小关系
    3). 改造org.apache.hadoop.mapred.Merger类的writeFile方法, 使它仅写入key与value与上一个相比皆不同的record.
     
    影响该设计效率的因素 :
    先期成本高.
    May 13

    写给小学生看的 : 何为entropy(熵)

    维基百科上的解释 :
    热力学中熵表示的是“系统混乱状态”;信息论中信息熵表示的是信息量;生态学中熵表示的是生物多样性。

    怎么解释? 示例如下 :
    1). 有3种元素A, B, C, 分别用{A}, {B}, {C}表示.
    2). 有信息集合X, X中含有4个A, 5个B, 1个C
    3). 使用上述3种元素, 构建信息集合X.

    那么 :
    1). 从{A}中取出一个A的概率为1/2, 即 "取出" 和 "不取出", 所以4个A为系统贡献的信息量为log(2, 4).
    2). 同理, 5个B为系统贡献的信息量为log(2, 5), 1个C为系统贡献的信息量为log(2, 1)
    3). 加权平均, 得到系统的 "混乱度" (注意本释义中 "信息量" 和 "混乱度" 这两个概念):
        4 * (4 + 5 + 1) * log(2, 4) + 5 * (4 + 5 + 1) * log(2, 5) + 1 * (4 + 5 + 1) * log(2, 1), 再减去一个log(2, 4 + 5 + 1)
        这个刚好是一个教科书上的entropy(熵)公式.
        太巧了  转动眼睛

    4). 您可以换个姿势, 再求一遍 : 若从{A}中取出一个A的概率为1/4呢? 在特定情况下, 是有可能的

    下期将解释为什么 选择log(2, 4), 而不是其他数据公式, 敬请关注.

    April 22

    hadoop/mapred优化方法.V002(草稿)

    自V001以来, 收到不少朋友的阅读, 与大家交流.
    现在有V002的初稿, 只是在V001的基础上改版, 并增加少量内容.
     
    个人总结了10个可以考虑优化的点, 供大家参考, 也想抛砖引玉, 要是最后能形成一个 "优化大全" 就非常nb了.
    欢迎拍砖.
     
    ***某些方法, 会导致程序可维护性会降低***
     
    从三个方面着手优化 :
    1. hadoop配置
    2. 设计mapred/job
    3. 代码级别.
     
    一. conf/hadoop-site.xml配置.
    经验要求高, 特别需要结合实际情况.
    典型参数如
    复制因子,
    mapred.child.java.opts,
    mapred.tasktracker.map.tasks.maximum,
    mapred.tasktracker.reduce.tasks.maximum,
    mapred.map.tasks,
    mapred.reduce.tasks,
    fs.inmemory.size.mb,
    dfs.block.size
    等等
     
    二. 在同一个job内完成尽可能多的计算任务, 主要是设计key和自定义OutputFormat, 将能合并的计算任务合并.
    举例 : 用户访问行为(userid, ip, cookie), 分别统计每个用户的ip数和cookie数.
    最简单的设计, 是使用量个job, 分别计算ip数和cookie数.但是我们可以按照下面的思路, 在一个job中完成这两项计算 :
    (a). 把userid和字段存储到key中
    public class UserKey implements WritableComparable<UserKey>{
    int userId;//userid
    byte field;//0代表ip, 1代表cookie
    @Override
    public int compareTo(UserKey o) {
    if(userId > o.userId)return 1;
    if(userId < o.userId)return -1;
    if(field > o.field)return 1;
    if(field < o.field)return -1;
    return 0;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    }
    @Override
    public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    }
    }
    (b). 实现自定义的OutputFormat, 下面是两处关键代码如下 :
    (x).
    SequenceFile.Writer[] writers = new SequenceFile.Writer[2];
    writers[0] = SequenceFile.createWriter(FileSystem.get(conf), conf, "ip", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
    writers[1] = SequenceFile.createWriter(FileSystem.get(conf), conf, "field", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
    (xx).
    writers[key.field].append(key.userId, value.get());
     
    三. 避免不必要的reduce任务.
    (1). 假定要处理的数据是排序且已经分区的. 或者对于一份数据, 需要多次处理, 可以先排序分区.
    (2). 自定义InputSplit, 将单个分区作为单个mapred的输入.
    (3). 在map中处理数据, Reducer设置为空.
    这样, 既重用了已有的 "排序", 也避免了多余的reduce任务.
     
    四. 使用自定义的MapRunnable.
    hadoop自带了两个MapRunnable,
    (1). 一个是默认的单线程MapRunnable, org.apache.hadoop.mapred.MapRunner
    (2).另一个是多线程的, org.apache.hadoop.mapred.lib.MultithreadedMapRunner.
    根据特定情况, 可以自定义MapRunnable,
    (1). 启用多线程, 比如web爬行时, 可启用多线程抓取网页.
    (2). 避免map时, 单台tasktracker上辅助数据冗余, 比如在多模匹配时, 避免生成多份DFA.
     
    五. 在某些情况下, 利用数据分布特性设计PARTITIONER的分区算法, 避免单个mapred消耗时间过长.
    这跟木桶原理有些神似.
    比如处理大量字符串时,
    (1). 已知首字不同的字符串之间不存在任何关联关系
    (2). 原始数据在某些 "首字" 上分布密集, 另一些 "首字" 上分布稀疏.
    例如, 原始数据中, 1亿个以3开头, 1亿个以7开头, 3个以6开头.
    那么,
    (1). 如果以首字对4求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在同一分区.若hadoop群集只支持同时2个map任务, 则...
    (2). 如果以首字对3求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在不同分区.
     
    六. 最大限度地重用对象, 避免对象的生成/销毁开销.
    该点在hadoop自带的org.apache.hadoop.mapred.MapRunner中尤为突出,
    它使用同一个key对象和同一个value对象不停读取原始数据, 再将自身交给mapper处理.
    (此处注意, 若要保留该对象的即时状态, 需要clone, 深克隆或浅克隆.)
     
    七. 在逻辑意义上, 合并同一对象. 如dotnet和java中的字符串INTERN技术.
     
    八. 根据已有条件, 简化循环判定.
    比如, for(int i = 0; i < end && i < size; i++);
    可以改成 :
    end = end < size ? end : size;
    for(int i = 0; i < end; i++);
     
    九. 降低多线程数目, 而让固定数目的线程循环处理.
    比如, 一台机器8个CPU, 现在需要处理80亿个数据,
    那么下面两个方案 :
    (1). 启动800个线程, 每个线程处理80亿/800个数据.
    (2). 启动8个线程(注意, 此处是8个), 每个线程循环处理, 每次循环处理100万个.
    通常我个人选择方案(2).因为 :
    (1). 最大限度利用了CPU.
    (2). 避免了线程调度.
    (3). 在java中, 可以使用AtomicInteger控制线程循环, AtomicInteger的效率很高.
    (4). 有时, 还可以避免单个线程消耗时间过长.
     
    十. 使用位移替代浮点数计算. 比如用 100 >> 3替代100 * 0.125.
    (另外, 我们会需要将某个中间值乘以一个调节因子(经验值), 比如乘以0.12,
    如果乘以0.12和0.124 "差不多" 时, 可以考虑直接使用位移).
     
    十一. 避免循环体内不必要的判断逻辑, 与第八条不同.
    比如, 处理10亿个数据, 每遇到一个有效数据时, 需要同前一个有效数据进行关联处理(或与前一个中间值进行关联处理),
    for(int i = 0; i < size; i++)
    {
    //1. 判定是否存在前一个有效数据
    //2. 如果不存在前一个有效数据, 则continue;
    //3. 如果存在前一个有效数据, 则进行关联处理, 再continue.
    }
    通常在此种需求下, 一旦遇到一个有效数据, 必定会产生一个可供后续紧邻数据关联的值,
    那么 :
    int i = 0;
    for(int i = 0; i < size; i++)
    {
    //1. data[i]是否有效?
    //2. data[i]无效, continue;
    //3. data[i]有效, break;
    }
    for(; i < size; i++)
    {
    //与前一个有效数据进行关联处理, 再continue.
    }
     
    十二. 方法调用过程, 辅助数据尽量放在方法体内, 避免使用全局辅助数据, 一来节省内存, 二来提高对象可重用性.
     
    十三. 尽量不要生成转瞬即逝的对象, 或者专门构建专属对性来完成这一任务. 比如提供直接使用构造函数参数进行序列化的静态方法, 避免先使用参数构造对象再进行序列化.
     
    十四. 利用-1 和 1的关联性, 减少内存使用量.
     
    十五. 对于方方正正的多位数组Arr[d0][d1][d2]..[dn], 且di >> d(i+1)时, 可以考虑使用一维数组替代.
    这是因为java中多位数组实际上使用 "数组的数组" 实现的.
    April 07

    hadoop/mapred优化方法.V001

    个人总结了10个可以考虑优化的点, 供大家参考, 也想抛砖引玉, 要是最后能形成一个 "优化大全" 就非常nb了.
    欢迎拍砖.
     
    作者 : 谭东
     
    一. conf/hadoop-site.xml配置, 略过.
     
    二. 注重job重用, 主要是设计key和自定义OutputFormat, 将能合并的mapred job合并.
    举例 : 用户访问行为(userid, ip, cookie), 分别统计每个用户的ip数和cookie数.
    (a). 把userid和字段存储到key中
    public class UserKey implements WritableComparable<UserKey>{
    int userId;//userid
    byte field;//0代表ip, 1代表cookie
    @Override
    public int compareTo(UserKey o) {
    if(userId > o.userId)return 1;
    if(userId < o.userId)return -1;
    if(field > o.field)return 1;
    if(field < o.field)return -1;
    return 0;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    }
    @Override
    public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    }
    }
    (b). 实现自定义的OutputFormat, 下面是两处关键代码如下 :
    (x).
    SequenceFile.Writer[] writers = new SequenceFile.Writer[2];
    writers[0] = SequenceFile.createWriter(FileSystem.get(conf), conf, "ip", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
    writers[1] = SequenceFile.createWriter(FileSystem.get(conf), conf, "field", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
    (xx).
    writers[key.field].append(key.userId, value.get());
     
    三. 避免不必要的reduce任务.
    (1). 假定要处理的数据是排序且已经分区的. 或者对于一份数据, 需要多次处理, 可以先排序分区.
    (2). 自定义InputSplit, 将单个分区作为单个mapred的输入.
    (3). 在map中处理数据, Reducer设置为空.
    这样, 既重用了已有的 "排序", 也避免了多余的reduce任务.
     
    四. 使用自定义的MapRunnable.
    hadoop自带了两个MapRunnable,
    (1). 一个是默认的单线程MapRunnable, org.apache.hadoop.mapred.MapRunner
    (2).另一个是多线程的, org.apache.hadoop.mapred.lib.MultithreadedMapRunner.
    根据特定情况, 可以自定义MapRunnable,
    (1). 启用多线程, 比如web爬行时, 可启用多线程抓取网页.
    (2). 避免map时, 单台tasktracker上辅助数据冗余, 比如在多模匹配时, 避免生成多份DFA.
     
    五. 在某些情况下, 利用数据分布特性设计PARTITIONER的分区算法, 避免单个mapred消耗时间过长.
    比如处理大量字符串时,
    (1). 已知首字不同的字符串之间不存在任何关联关系
    (2). 原始数据在某些 "首字" 上分布密集, 另一些 "首字" 上分布稀疏.
    例如, 原始数据中, 1亿个以3开头, 1亿个以7开头, 3个以6开头.
    那么,
    (1). 如果以首字对4求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在同一分区.若hadoop群集只支持同时2个map任务, 则...
    (2). 如果以首字对3求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在不同分区.
     
    六. 最大限度地重用对象, 避免对象的生成/销毁开销.
    该点在hadoop自带的org.apache.hadoop.mapred.MapRunner中尤为突出,
    它使用同一个key对象和同一个value对象不停读取原始数据, 再将自身交给mapper处理.
    (此处注意, 若要保留该对象的即时状态, 需要clone, 深克隆或浅克隆.)
     
    七. 在逻辑意义上, 合并同一对象. 如dotnet和java中的字符串INTERN技术.
     
    八. 根据已有条件, 简化循环判定.
    比如, for(int i = 0; i < end && i < size; i++);
    可以改成 :
    end = end < size ? end : size;
    for(int i = 0; i < end; i++);
     
    九. 降低多线程数目, 而让固定数目的线程循环处理.
    比如, 一台机器8个CPU, 现在需要处理80亿个数据,
    那么下面两个方案 :
    (1). 启动800个线程, 每个线程处理80亿/800个数据.
    (2). 启动8个线程(注意, 此处是8个), 每个线程循环处理, 每次循环处理100万个.
    通常我个人选择方案(2).因为 :
    (1). 最大限度利用了CPU.
    (2). 避免了线程调度.
    (3). 在java中, 可以使用AtomicInteger控制线程循环, AtomicInteger的效率很高.
     
    十. 使用位移替代浮点数计算. 比如用 100 >> 3替代100 * 0.125.
    (另外, 我们会需要将某个中间值乘以一个调节因子(经验值), 比如乘以0.12,
    如果乘以0.12和0.124 "差不多" 时, 可以考虑直接使用位移).
     
    十一. 避免循环体内不必要的判断逻辑, 与第八条不同.
    比如, 处理10亿个数据, 每遇到一个有效数据时, 需要同前一个有效数据进行关联处理(或与前一个中间值进行关联处理),
    for(int i = 0; i < size; i++)
    {
    //1. 判定是否存在前一个有效数据
    //2. 如果不存在前一个有效数据, 则continue;
    //3. 如果存在前一个有效数据, 则进行关联处理, 再continue.
    }
    通常在此种需求下, 一旦遇到一个有效数据, 必定会产生一个可供后续紧邻数据关联的值,
    那么 :
    int i = 0;
    for(int i = 0; i < size; i++)
    {
    //1. data[i]是否有效?
    //2. data[i]无效, continue;
    //3. data[i]有效, break;
    }
    for(; i < size; i++)
    {
    //与前一个有效数据进行关联处理, 再continue.
    }
    November 28

    谈论 情深不寿,慧极必伤,淡淡的,最好。

     

    引用

    情深不寿,慧极必伤,淡淡的,最好。

    有一个朋友,认识有快十年了吧,实在不经常见面。我有空的时候,他不再;他有空的时候,我又在出差,总之不断错过。电话,偶尔的MSN,却也不妨碍是静水深流。有些朋友,就是这样,不常见面却深深认同着对方正在做的事情。

    然后有这样一个下午,太阳很好,他就坐在我对面,很轻声的聊着天。很仔细的给我泡秋天的刚下来的铁观音,味道甘甜的。然后点一支烟,背景是崔健或窦唯或罗大佑,很轻声放着,我还第一次这么听摇滚乐。我们就这么聊音乐,聊文化,聊互联网,声音也是小小的。

    他知道我心高气傲,听不进去别人的批评,所以总拿自己举例子,呵呵,狡猾。“怒伤肝、喜伤心、思伤脾、忧伤肺、恐伤肾。器官主管情绪,大脑是用来理性思考的,但我们往往用脑思考的同时,把情绪带进去了,影响了自己的判断。你能做到只用脑,不动心么?”

    这时的夕阳很美,很久没关注傍晚的风景了。只用脑,不动心,我记下来了。