怎么计算mapper个数

How to calculate number of mappers in Hadoop?

  1. jainamisha

    How to set no of mappers for a MapReduce job?
    When we submit a MapReduce job how many map tasks run in Hadoop?
    How many Mappers run for a MapReduce job in Hadoop?

  2. yadavmanoj

    首先让我们理解什么是 Block?
    Blocks是数据的物理存储区块,数据被切分成大小一样的blocks去存储。默认情况,每个Block块大小是128MB或64MB,并分布在不同的数据节点上。因此,Hadoop能够并行的在这些数据块上执行数据处理。

    现在我们说Mapper,Mapper用于在数据上执行客户端定义的处理操作。因为,在HDFS系统,数据切分成blocks和分布存储。所以,执行mapper操作在部分的数据blocks上。map任务就是并行在分布的数据块上执行mapper功能。简单说,map任务数量=blocks数。No. of map task = Data size / block size.

    举例,假如有1GB (1024 MB) 数据需要存储和处理,hadoop 以非常小的chunk 存储,假定 hadoop 默认是 128 MB 为单位切分数据块。那么,1 TB 数据有 8 blocks (1024 / 128 = 8 ), 那么就需要有8个mappers。

     

    krishnangopi

     

    计算 Mappers 合适个数

    首先,它依赖于文件是否可以被hadoop拆分,大部分文件是可以拆分的,一些压缩的文件是不能被拆分。

    可拆分的文件Splittable files:

    1) 通过合计所有文件的大小,来计算input 文件的总大小。

    2) mapper个数 =  合计大小 / 在hadoop中定义的Input split size

    Total size calculated = 1GB (1024MB)
    
    Input split size = 128MB
    
    No of Mappers = 8 (1024 / 128)

     

    FileInputFormat 是inputFormat实现的一个基类,用文件作为它的数据源。

    控制split size的属性是:

    Mapreduce.input.fileinputformat.split.minsize – default value: 1 byte

    Mapreduce.input.fileinputformat.split.maxsize – default value: 8192 PB (petabytes)

    Dfs.blocksize – 128 MB (megabytes)

    Splitsize = Max(Minumsize, Min(Maximumsize, Blocksize))

    通常讲,minsize < blocksize < maxsize,所以,splitsize = blocksize

    1 byte < 128MB < 8192PB =====> Splitsize = 128MB

    例如:如果maxsize = 64MB and blocksize = 128 MB, 那么splitsize 将限制为 maxsize,minsize < maxsize < blocksize,因此 splitsize = maxsize

    1 < 64MB < 128MB =====> Splitsize = 64MB

    不能拆分文件 Non-Splittable files: Mappers个数 = Input files个数

    如果文件太大,那么它将成为整个mapreduce job性能瓶颈。

    另外,也可以直接disable hadoop文件的split。

     

理想的mapper个数问题

  1. question:

    当尝试把mapper个数从2增加时,性能会线性增加。但是,当mapper数量增加到特定值时,性能开始迅速下降。

    在Hadoop cluster中,为了得到最优的MapReduce job的性能,配置多少个mapper数是最佳呢?

    Ashok Vengala
  2. Answer:

    当部署cluster时,你就应该决定每个节点上,需要配置和并行运行多少个mappers。基本上,下面的两个因素决定mapper个数:

    1) No of cores

    2) Ram memory

    通常,每个mapper需要 1 to 1.5 core processor。如果使用量比较小,1个即可。如果使用量比较高,那么每个mapper 需要1.5 core processor。但是,一定要记住Ram memory也是最关键因素。

    假如你有一个10 cores cpu和25GB内存的slave 节点,你的job运行需要5GB内存,也就是说每个map任务要求5GB内存。那么你也最多可以并行运行5个mappers。在slave节点,即使有更多的可用的cores,因为没有足够的内存也不能并行运行多于5mappers任务。在这个案例中,mappers的最大数受限于可用内存而不是因为cores个数。

    这里我们看到,在每个slave节点上有10个cores是浪费的。其中剩余的5个cores并没有使用。所以,最优的资源使用方案是 go with “10 cores with 50GB memory” or “5 cores with 25GB of ram memory”

     

  1. 当尝试把mapper个数从2增多时 increase the number of Mappers from 2, performance increased linearly with number of mappers. But when mappers are increased then certain limit, again performance started degrading.

    To get the optimum performance of MapReduce job how many Mappers should be configured in Hadoop cluster ?

    Posted 1 year ago #
  2. Ashok Vengala
    Member

    You cannot change the no of Mappers via new Java API, because we are using Job class in MapReduce configuration core. In old API(deprecated), we can set no of mappers using setNumMapTasks(int n) methods via the JobConf object. Ideally, this is not the best way to set/change the no of mappers.

    By default, no of mappers are 2 on each slave-node. We can set/change this value using mapreduce.tasktracker.map.tasks.maximum parameter. You need to set this parameter in mapred-site.xml file. We should not directly select random value to set the no of mappers.

    Ideally for each logical InputSplit, a independent mapper or map dynamic container will get invoked. If we go with default case, on each particular slave node, Node-manager can run only two mappers or map dynamic containers parallely irrespective of logical input splits. Initially two input split are assigned to two map dynamic containers on slave-node1. then the remaining input split might be in a queue. In some cases, these input splits might got traveled to some other
    slave-node(Let’s say SN2) which is having map dynamic container sitting idle. This mapper can process the traveled input-split on this slave-node (SN2).

    Even though if you specified 2 value(No of mappers) in configuration file. Node-manager doesn’t invoke all mappers parallely. This decision is taken care by Resource Manager based on the input split(s) available on a particular slave-node. But that slave-node can run maximum 2 map dynamic container parallely.

    Please go through below one, so that you can come to know how many no of maximum mapper we need to set in order to get optimize solution on a particular slave node.

    When you are setting up the cluster, at that time you should decide how many maximum no of mappers that should be configured/run parallely on all slaves-nodes. Basically, no of mappers are decided based on the below two factors, that is,

    1) No of cores
    2) Ram memory

    Lets say we have 10 cores on your system. we can have 10 mappers(One mapper = one core) if go with one core for each mapper. Each mapper/map dynamic container can run on one core. This case might not be true in all cases.

    Let’s say you have 10 cores on your slave-node, and ram memory is 25GB. Your job need 5GB of memory, so every map tasks requires 5GB of ram.You will have 5 cores on each slave-node. So that we can run maximum 5 mappers parallely. On slave-node, it doesn’t have enough memory to run more than 5 mappers parallely even though we have more no of cores available on slave-node. In this case, maximum no of mappers are limited by amount of ram available in your systems. It is not limited by cores available in your system.

    If your job required ,every map tasks to be loaded with 5GB of memory, then you are wasting cores if you are having 10 cores on each slave-nodes. Here we are using only 5 cores on each slave-node, remaining 5 cores are not utilized. Either go with “10 cores with 50GB memory” or “5 cores with 25GB of ram memory”. This will gives the optimal usage of resources.

    In general, for each mapper, we will go with 1 to 1.5 core processor. If the usage/processing is very small/light, then go with 1 core processor for each map dynamic container. If the usage/processing is very heavy, then go with 1.5 core processor for each map dynamic container. And also you should the keep above two factors in mind to serve the optimized solution.

    Follow the link to learn more about: Mappers in the Hadoop

Hadoop MapReduce是怎么工作的

Contents

  • 1. Objective
  • 2. What is MapReduce?
  • 3. How Hadoop MapReduce Works?
  • 4. MapReduce Flow Chart
    • 4.1. Input Files
    • 4.2. InputFormat
    • 4.3. InputSplits
    • 4.4. RecordReader
    • 4.5. Mapper
    • 4.6. Combiner
    • 4.7. Partitioner
    • 4.8. Shuffling and Sorting
    • 4.9. Reducer
    • 4.10. RecordWriter
    • 4.11. OutputFormat
  • 5. Conclusion

1. Objective

MapReduce 是Hadoop的核心组件,用于在并行处理数据时把大任务拆分为一组独立的小任务。这里介绍MapReduce数据流程中从Mapper到Reducer的每一步工作方法,包括Input Files, InputFormat in Hadoop, InputSplits, RecordReader, Mapper, Combiner, Partitioner, Shuffling and Sorting, Reducer, RecordWriter and OutputFormat。

Learn How Hadoop MapReduce works internally?

2. What is MapReduce?

MapReduce 是Hadoop平台上处理存储在HDFS的结构化和非结构化数据的应用软件框架,用于在并行处理数据时把大任务拆分为一组独立的小任务。可以开发定制化的业务逻辑在MapReduce中,然后剩余的事情有MapReduce引擎来处理。

3. Hadoop MapReduce 是怎么工作的?

MapReduce 分为两个步骤工作: Map 和 Reduce 。

在Map环节,开发人员可以开发业务逻辑和规则;Reduce环节是轻量的聚合汇总处理。

4. MapReduce 流程

4.1. Input Files

MapReduce的输入数据来自于存储在HDFS上的 input files,文件格式可是任意的格式,包括二进制和log。

4.2. InputFormat

InputFormat 定义input files是怎么被split和读取的。InputFormat 创建 InputSplit。

4.3. InputSplits

有 InputFormat生成InputSplits,逻辑上它表达的是每个Mapper处理的数据,每个split有一个Map 任务负责,split的个数= Map 任务个数。

4.4. RecordReader

RecordReader通过和InputSplit交流,把数据转换为 Mapper可读的key-value 对,然后发送给Mapper去处理。默认下,使用TextInputFormat 做转换。

4.5. Mapper

处理从RecordReader来的数据并生成新的key-value 对,它的输出是写到本地磁盘的临时key-value值 (完全不同于输入的key-value对)。Mapper的临时输出不会存储在HDFS上,主要是因为HDFS会生成不必要的多个copy和HDFS的高延迟。

4.6. Combiner

Combiner 也被称作 ‘Mini-reducer’,执行本地的聚合处理,用于最小化 mapper 和 reducer 数据的传输。

4.7. Partitioner

如果正在使用多个Reducer,那么久会启动Partitioner。 Partitioner 执行 partitioning,Partitioning 使用key做分区然后排序。通过使用Hash功能,key (或 subkey ) 来衍生出Partitions。

根据Key值,有相同key值的记录将进入同一个partion,每个partition会发送到一个reducer。

4.8. Shuffling and Sorting

这里把上一步的输出发送给Reduce node ( 它其实是 slave node但reduce阶段在这个node上运行,所以称为reducer node)。shuffling 是物理数据的网络移动过程,一旦所有的mappers完成,临时合并和排序后传送给reducer nodes。

4.9. Reducer

在每一组输出上运行reducer 功能,然后最后写到HDFS。

4.10. RecordWriter

It writes these output key-value pair from the Reducer phase to the output files.

4.11. OutputFormat

The way these output key-value pairs are written in output files by RecordWriter is determined by the OutputFormat. OutputFormat instances provided by the Hadoop are used to write files in HDFS or on the local disk. Thus the final output of reducer is written on HDFS by OutputFormat instances. Follow this link to learn OutputFormat in detail.

Hence, in this manner, a Hadoop MapReduce works over the cluster.、

5. Conclusion

In conclusion, we can say that data flow in MapReduce is the combination of different processing phases of such as Input Files, InputFormat in Hadoop, InputSplits, RecordReader, Mapper, Combiner, Partitioner, Shuffling and Sorting, Reducer, RecordWriter, and OutputFormat. Hence all these components play an important role in the Hadoop mapreduce working.

Since you understand the end to end Mapreduce job flow, test your knowledge by playing the Hadoop Quiz.