MapReduce概述MapReduce定义
MapReduce是分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析APP应用”的核心框架。
MapReduce的核心功能是将用户编写的业务逻辑代码和默认组件集成后编写完整的分布式运算程序,并在Hadoop集群上运行。
MapReduce的优缺点
好处
1.MapReduce易于编程。
很容易实现几个接口,完成一个分布式程序,该分布式程序可以分布在大量廉价的PC机上执行。 也就是说,写分布式程序就像写简单的串行程序一样。 因为这个特征使MapReduce编程非常流行。
2 .良好的可扩展性
如果计算资源不满足,可以通过简单地增加机器来扩展计算能力。
3 .高容错能力
MapReduce旨在帮助您将程序部署到便宜的PC计算机上,并需要高容错能力。 例如,如果一台机器挂起,它可以将上面的计算任务移动到另一个节点上执行。 此任务不会失败。 此外,这个过程不需要手动参与,完全在Hadoop内部进行。
4 .适用于Pb级以上大容量数据的离线处理
可以同时运行上千台服务器的集群,提供数据处理能力。
缺点
1 .不擅长实时计算
MapReduce无法像MySql那样以毫秒或秒返回结果。
2 .不擅长流量计算
流计算的输入数据是动态的,但MapReduce的输入数据集是静态的,不能动态更改。 这是因为MapReduce自身的设计特征决定了数据源必须是静态的。
3 .不擅长Dag (有向图)计算
多个APP应用程序具有依赖关系,下一个APP应用程序的输入是上一个输出。 在这种情况下,MapReduce不可用,但如果使用,每个MapReduce作业的输出节将写入磁盘,从而导致大量磁盘IO,并且性能非常低。
MapReduce的核心思想
1 )分布式运算程序往往需要至少分为两个阶段。
2 )第一阶段的MapTask同时执行实例完全并行执行,互不相关。
3 )第二阶段的ReduceTask并发实例互不相关,但他们的数据依赖于前一阶段所有MapTask并发实例的输出。
4 ) MapReduce编程模型只包含一个Map阶段和Reduce阶段。 如果用户的业务逻辑非常复杂,则只有多个MapReduce程序会串行运行。
总结:分析WordCount数据流,深入理解MapReduce的核心思想。
映射流程
完整的MapReduce程序在分布式运行时有三种类型的实例进程。
1.MrAppMaster :负责整个程序的流程调度和状态协调。
2 .地图任务:负责地图阶段的整个数据处理过程。
3.reduce task :负责reduce阶段的整个数据处理流程。
官方WordCount源代码
在使用反编译工具反编译源代码时,您发现WordCount用例具有Map类、Reduce类和驱动程序类。 数据类型是Hadoop自身封装的序列化类型。
映射编程规范
用户编写的程序分为Mapper、Reducer、Driver三个部分。
1 .映射阶段
)1)用户定制的Mapper继承自己的父类
)2) Mapper的输入数据为KV对的形式( KV的类型可自定义) ) )。
)3) Mapper的商务逻辑写在map )方法中
)4) Mapper的输出数据为KV对的形式( KV的类型可以自定义) )。
(5) map ) )方法) MapTask进程)在每次调用时执行一次
2 .重做阶段
)1)用户自定义的Reducer继承自己的分类
)2)对应于Reducer的输入数据类型的Mapper的输出数据类型也是KV
)3) Reducer的业务逻辑写在Reduce ) )方法中
(4) ReduceTask进程按同一组调用1次reduce )方法
3 .驱动器阶段
与YARN群集相对应的客户端用于将我们的整个程序提交到YARN群集,并提交封装了MapReduce程序相关执行参数的作业对象。
WordCount案例实践
1 .需求
计数并输出指定文本文件中各单词出现的总次数
(1)输入数据,hello.txt
)2)要输出的数据
atguigu2ban zhing1cls2Hadoop1jiao1ss2Xue 12 .需求分析
根据MapReduce编程规范,分别创建映射器、记录器和驱动器。
3 .环境准备
(1)创建maven项目
)2)在pom.xml文件中添加以下依赖关系
unitjunitreleaseorg.Apache.logging.log4j log4j-core2.8.2org.Apache.Hadoop Hadoop-common2.7.2org.Apache dfe
log4j.rootLogger=INFO, TD out log4j.appender.stdout=org.appache.log4j.console appender log4j.appender.stdout.layout=org.Apache.loue – % m % n log4j.appender.log file=org.Apache.log4j.file appender log4j.appender.appender spring.log log4j.appender
(1)创建Mapper类
import java.io.IOException; importorg.Apache.Hadoop.io.int writable; importorg.Apache.Hadoop.io.long writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.mapper; publicclasswordcountmapperextendsmapper { textk=new text (; intwritablev=newintwritable(1; @ overrideprotectedvoidmap ( longwritablekey,Text value,Context context ) throws IOException,InterruptedException {//1 #039; ); //3输出for(stringword:words ) { k.set } word ); context.write(k,v ); } ) (2)创建Reducer类
import java.io.IOException; importorg.Apache.Hadoop.io.int writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.reducer; publicclasswordcountreducerextendsreducer { intsum; IntWritable v=new IntWritable (; @ overrideprotectedvoidreduce ( text key,Iterable values,Context context ) throws IOException,InterruptedException {//1累积添加//2输出v.set(sum ); context.write(key,v ); (3)创建驱动程序驱动系统
import java.io.IOException; importorg.Apache.Hadoop.conf.configuration; importorg.Apache.Hadoop.fs.path; importorg.Apache.Hadoop.io.int writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.job; importorg.Apache.Hadoop.MapReduce.lib.input.fileinputformat; importorg.Apache.Hadoop.MapReduce.lib.output.fileoutputformat; publicclasswordcountdriver { publicstaticvoidmain [ ] args ] throwsioexception,ClassNotFoundException,interupted exception job job=job.getinstance ( configuration ); //2设置jar加载路径job.setjarbyclass ( word count driver.class )//3map和reduce类job.setmapperclass ( word count mapper.class ) //4设置映射输出job.setmapoutputkeyclass ( text.class ); job.setmapoutputvalueclass ( int writable.class; //5设置最终输出kv类型job.setoutputkeyclass ( text.class ); job.setoutputvalueclass ( int writable.class; //6设置输入/输出路径fileinputformat.setinputpaths ( job,newpath(Args[0] ) ); fileoutputformat.setoutputpath ( job,newpath ) Args[1] ); //7提交布尔结果=job.wait for completion ( true ); system.exit(result0:1; (5.本地测试
)1)如果计算机系统为win7,则将win7Hadoop jar软件包解压缩为非中文路径,并在Windows环境中设置HADOOP_HOME环境变量。 如果是电脑的windows10OS,解压windows10的hadoop jar包,设定HADOOP_HOME环境变量。
注意: windows8电脑和windows8家庭操作系统可能有问题。 必须重新编译源代码或更改操作系统。
)2)在Eclipse/Idea上运行程序
6 .集群测试
)1)在maven中打开jar包需要额外的包插件依赖关系
注意:带有红色标记的部分必须替换为自己的工程主类
maven-compiler-plugin2.3. 21.81.8 maven-assembly-plugin jar-with-dependencies com.atguigu.Mr.wordcounted
)1)将程序打包为jar包,并将其复制到Hadoop群集
步骤详细信息:右键单击-运行最大安装。 编译完成后,将在项目的target文件夹中生成jar包。 如果看不见的话。 右键单击项目- "可以在Refresh中查看项目。 将无依赖关系的jar包重命名为wc.jar,并将该jar包复制到Hadoop群集。
)2)启动Hadoop集群
(3)运行WordCount程序
hadoopjarwc.jar com.atguigu.word count.word count driver/user/atguigu/input/user/atguigu/output Hadoop序列化概述
序列化是将内存中的对象转换为字节序列(或其他数据传输协议),以便于保存(永久)到磁盘和进行网络传输。
反向序列是将收到的字节序列(或其他数据传输协议)或磁盘持久性数据转换为内存中的对象。
为什么要序列化
一般来说,“活着”的对象只存在于内存中,关闭电源后就无法关闭电源。 此外,“活着”对象只能由本地进程使用,不能发送到网络上的其他计算机。 但是,序列化可以存储“活着的对象”并将“活着的”对象发送到远程计算机。
为什么不使用Java序列化
Java的序列化为重量级序列化框架,当一个对象被序列化时,会附带很多额外的信息(各种检查信息、Header、继承体系等),不容易在网络上有效地传输。 所以,Hadoop自己开发了序列化机制( Writable )。
Hadoop序列化特性:
)1)紧凑)高效使用存储空间
)2)高速)读写数据的开销小
)3)可扩展性:可随着通信协议的升级而升级
)4)支持多语言交互
定制bean对象实现序列化接口( Writable )
企业开发中常用的基本序列化类型不能满足所有需求。 例如,如果在Hadoop框架中传递bean对象,则该对象必须实现序列化接口。
具体的bean对象序列化过程有以下7个。
(1)必须实现可写接口
)反序列化时,需要递归调用空参构造函数,因此需要空参构造函数
public FlowBean ( ) {super ); (3)改写序列化方法
@ overridepublicvoidwrite ( dataoutputout ) throwsioexception ( out.write long ) upflow; out.writelong(downflow ); out.writelong(sumflow ); (4)重写反序列化方法
@ overridepublicvoidreadfields ( datain Putin ) throwsioexception ( up flow=in.read long ); downFlow=in.readLong (; sumFlow=in.readLong (; (5)注意反序列化的顺序和序列化的顺序完全一致
)6)要将结果显示在文件中,必须重写toString ( )。 ) ) t ) )分开,以便以后使用。
)7)如果需要将定制bean放入key中进行传输,则还必须实现Comparable接口。 这是因为MapReduce框的Shuffle流程必须能够对key进行排序。
@ overridepublicintcompareto ( flowbeano )//按相反顺序排列,从大到小依次为return this.sumFlow o.getSumFlow ) )-1 : 1; }序列化案例的实际技巧
1 .需求
统计每个手机号码消耗的总上行流量、下行流量、总流量
(1)输入数据
phone_data.txt
(2)输入数据格式:
71356043666120.196.100.99116954200 id手机号码网络ip上行业务下行业务网络状态码(3)输出数据格式最好
13560436666 1116 954 2070手机号码上行流量下行流量总流量2 .需求分析
编写MapReduce程序
(1)制作流量统计的Bean对象
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; importorg.Apache.Hadoop.io.writable; //1writable接口publicclassflowbeanimplementswritable { privatelongupflow; private long downFlow; private long sumFlow; //2反序列化时,需要通过反射调用null构造函数,因此必须使用public FlowBean ( ) ) {super; }publicflowbean(longupflow,long downFlow ) {super; this.upFlow=upFlow; this.downFlow=downFlow; this.sumFlow=upFlow downFlow; //3编写序列化方法@ overridepublicvoidwrite ( dataoutputout ) throwsioexception ( out.write long ) upflow ); out.writelong(downflow ); out.writelong(sumflow ); //4反序列化方法//5反序列化方法的读取顺序必须与写入序列化方法的写入顺序一致@ overridepublicvoidreadfields ( datain Putin ) throwsioexception this.sumFlow=in.readLong (; //6文本@ overridepublicstringtostring ( ( returnupflow ( t ) downflow )\t ) sumflow; }public long getUpFlow ( ) {return upFlow; } publicvoidsetupflow ( longupflow ) {this.upFlow=upFlow; }public long getDownFlow ( ) {return downFlow; } publicvoidsetdownflow ( longdownflow ) {this.downFlow=downFlow; }public long getSumFlow ( ) {return sumFlow; } publicvoidsetsumflow ( longsumflow ) {this.sumFlow=sumFlow; (2)创建Mapper类
unitjunitreleaseorg.Apache.logging.log4j log4j-core2.8.2org.Apache.Hadoop Hadoop-common2.7.2org.Apache
log4j.rootLogger=INFO, TD out log4j.appender.stdout=org.appache.log4j.console appender log4j.appender.stdout.layout=org.Apache.loue – % m % n log4j.appender.log file=org.Apache.log4j.file appender log4j.appender.appender spring.log log4j.appender
import java.io.IOException; importorg.Apache.Hadoop.conf.configuration; importorg.Apache.Hadoop.fs.path; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.job; importorg.Apache.Hadoop.MapReduce.lib.input.fileinputformat; importorg.Apache.Hadoop.MapReduce.lib.output.fileoutputformat; publicclassflowsumdriver { publicstaticvoidmain ( string [ ] args ) throws IllegalArgumentException,IOException,classnotfoutfoum #039; e:/input/inputflow&; #039;&; #039; e:/output1&; #039; }; //1获取配置信息,或指定作业对象实例configuration configuration=new configuration (; job job=job.getinstance ( configuration ); //6指定此程序的jar包所在的本地路径job.setjarbyclass ( flowsumdriver.class )。 //2指定本业务job使用的mapper/Reducer业务类job.setmapperclass ( flowcountmapper.class )。 job.setreducerclass ( flowcountreducer.class; //3指定mapper输出数据的kv类型job.setmapoutputkeyclass ( text.class )的job.setmapoutputvalueclass ( flow bean.class ); //4指定要最终输出的数据的kv类型job.setoutputkeyclass ( text.class )。 job.setoutputvalueclass ( flow bean.class; //5指定包含job的输入源文件的目录fileinputformat.setinputpaths ( job,newpath(Args[0] ) )。 fileoutputformat.setoutputpath ( job,newpath ) Args[1] ); 将包含//7job的相关参数和job使用的java类的jar包提交给yarn以执行布尔结果=job.wait for completion ( true )。 system.exit(result0:1; }}MapReduce框架原理输入格式数据输入
切片和映射任务的并行度决策机制
1 .引出问题
MapTask的并行度决定Map阶段任务处理的并行度,影响作业整体的处理速度。
思考: 1G数据,启动8个地图任务,可以实现提高集群并行处理能力。 那么,1K的数据在启动8个MapTask后也会成为提高集群性能吗? 地图任务并行任务越多越好吗? 影响地图任务并行度的因素是什么?
2 .地图任务并行度决策机制
数据块: Block是HDFS物理上将数据划分为一个块的产物。
数据切片—数据切片只是对输入进行逻辑切片,而不是在磁盘上进行切片和存储。
Job提交进程源和切片源的详细信息
1 .作业提交流程来源详细信息
import java.io.IOException; importorg.Apache.Hadoop.io.int writable; importorg.Apache.Hadoop.io.long writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.mapper; publicclasswordcountmapperextendsmapper { textk=new text (; intwritablev=newintwritable(1; @ overrideprotectedvoidmap ( longwritablekey,Text value,Context context ) throws IOException,InterruptedException {//1 #039; ); //3输出for(stringword:words ) { k.set } word ); context.write(k,v ); }}}FileInputFormat片的源代码分析( input.getsplits(job ) ) ) )
)1)程序首先查找你的数据存储的目录
)2)开始遍历目录下的每个文件) )。
)3)遍历第一个文件ss.txt
a )获取文件大小fs.sizeof(ss.txt )
b )切片大小计算
computesplitsize ( math.max ( math.mini ) maxsize,blocksize ) )=blocksize=128M
c )默认情况下,切片大小=blocksize
d )开始切割,形成
第一个切片: ss.txt——0:128M
第二个片ss.txt——128:256M
第三个片ss.txt——256M:300M
(每次切片时,判断切完的剩馀部分是否大于块的1.1倍,如果小于1.1倍,则分割切片。)
e )将切片信息写入切片规划文件
f )整个切片的核心过程是通过getSplit ) )方法完成的
g ) InputSplit只记录片的元数据信息,包括开始位置、长度和所在节点的列表。
)4)向YARN提交切片计划文件后,YARN上的MrAppMaster可以根据切片计划文件计算开放映射任务数。
文件格式切片机制
1 .切片机制
)1)简单地根据文件的内容长度进行切片
)2)片大小,缺省等于块大小
)3)不考虑整个数据集,对每个文件分别进行切片
2 .个案研究
(1)输入数据有两个文件。
)2)经过FIleIputFormat的切片机制运算,形成的切片信息如下。
FileInputFormat切片大小的参数化
(1)计算源代码中切片大小的公式
math.max(minsize,math.mini ) maxsize,blockSize );
MapReduce.input.fileinputformat.split.minisize=1缺省值为1
MapReduce.input.fileinputformat.split.maxsize=long.maxvalue默认值long.max value。 因此,默认情况下切片大小=blocksize。
)2)设置切片大小
如果“maxsize”(切片最大值)参数小于blockSize,切片将变小,并等于此参数设置的值。
通过使“切片最小值”( minsize )参数大于blockSize,可以使切片大于blockSize。
)2)获取切片信息API
import java.io.IOException; importorg.Apache.Hadoop.io.int writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.reducer; publicclasswordcountreducerextendsreducer { intsum; IntWritable v=new IntWritable (; @ overrideprotectedvoidreduce ( text key,Iterable values,Context context ) throws IOException,InterruptedException {//1累积添加//2输出v.set(sum ); context.write(key,v ); }}CombineTextInputFormat切片机制
框架的缺省TextInputFormat切片机制是为任务规划每个文件的切片。 无论文件多么小,它都将成为单独的切片,并传递给一个贴图任务。 因此,如果有大量的小文件,则会产生大量的映射任务,从而极大地降低处理效率。
1、应用场景:
CombineTextInputFormat用于包含太多小文件的场景。 可以将多个小文件逻辑地规划为一个片,从而可以将多个小文件传递到一个映射任务。
2、设置最大虚拟存储片
combinetextinputformat.setmaxinputsplitsize ( job,4194304; //4m
注意:关于虚拟存储片的最大值设置,建议根据实际的较小文件大小设置特定的值。
3、切片机制
生成切片的过程包括两个部分:虚拟存储过程和切片过程。
)1)虚拟存储过程:
将输入目录中的所有文件大小与设置的setMaxInputSplitSize值进行顺序比较,如果小于或等于设置的最大值,则在逻辑上将其拆分为一个块。 如果输入文件大于设置的最大值且大于两倍,则根据最大值剪切块。 如果剩馀数据大小超过设置的最大值且小于或等于最大值的两倍,请将文件分割为两个虚拟内存块。 这样可以防止太小的切片。
例如,如果setMaxInputSplitSize的值为4M,输入文件大小为8.02M,则首先在逻辑上分为4M。 剩下的大小为4.02M,用4M逻辑划分的话,会出现0.02M的小虚拟存储文件,所以把剩下的4.02M文件分成(2.01M和2.01M )两个文件。
)2)切片过程:
) a )确定虚拟存储的文件大小是否大于setMaxInputSplitSize值,如果大于,则单独分片。
) b )如果为以下,则与下一个虚拟存储文件合并,形成一个片。
( c )测试示例:如果有4个小文件: 1.7M、5.1M、3.4M和6.8M,则在虚拟存储之后将形成6个文件块,每个文件块的大小为:
1.7M、2.55M、2.55M )、3.4M及( 3.4M、3.4M )。
最终形成三个切片,大小为( 1.7 ) 2.55 ) m,) 2.55 ) 3.4 ) m,) 3.4 )
CombineTextInputFormat案例实践
1 .需求
将输入的大量小文件合并为一个片进行处理。
(1)输入数据
准备四个小文件
2 )期待
最好在一个片上处理四个文件
2 .实现进程
)1)不执行任何操作,运行1.6节的WordCount案例程序,观察片数为4。
)2)在WordcountDriver中添加以下代码,运行程序,观察运行的片数为3。
( a )在驱动系统中增加代码如下:
如果未设置输入格式,则缺省情况下将使用textinput format.class job.setinputformatclass ( combinetextinputformat.class )。 //虚拟存储片最大值4 mcombinetextinputformat.setmaxinputsplitsize ( job,4194304 ); ( b )如驾驶为3切片。
)3)在WordcountDriver中添加以下代码,运行程序,观察运行的片数为1。
( a )在驱动中增加代码如下:
如果未设置输入格式,则缺省情况下将使用textinput format.class job.setinputformatclass ( combinetextinputformat.class )。 //虚拟存储片最大值为20 mcombinetextinputformat.setmaxinputsplitsize ( job,20971520 ); ) b )运转为1片时。
FileInputFormat实现类
思考:运行MapReduce程序时,输入文件格式包括基于行的日志文件、二进制格式文件、数据库表等。 那么,MapReduce如何读取各种数据类型的数据呢?
FileInputFormat的典型接口实现类包括TextInputFormat、KeyValueInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat
1 .文本格式
TextInputFormat是默认的FileInputFormat实现类。 逐行读取各记录。 密钥是在整个文件中存储行的开始字节偏移,类型为长可写。 值就是这样的内容,不包括行的结尾(换行符和换行符)和Text类型。
以下为范例: 例如,一个分片包含以下四条文本记录:
每个记录表都是以下键/值对:
2.KeyValueTextInputFormat
每一行是一条记录,通过分隔符分割为key、value。 通过设置为驱动器类
conf.set ( keyvaluelinerecordreader.key _ value _ seperator,&; #039; /t&; #039; ); 框中,选择“默认值”。
默认分隔符是制表符( t )。
以下为范例: 输入是包含四条记录的分片。 其中—表示一个(水平方向的)选项卡。
每个记录由以下键/值对表示:
此时的关键是Text序列,每行在选项卡之前。
3.NLineInputFormat
使用NlineInputFormat时,表示每个映射进程处理的输入剥离将按NlineInputFormat指定的行数n进行拆分,而不是按块进行拆分。 也就是说,输入文件的总行数/N=片数,如果不能被整除,则片数=商1。
以下是一个例子,还是以上四行的输入为例。
例如,如果n是2,则每个输入标题包含两行。 打开两个地图任务。
此处的键和值与TextInputFormat生成的键和值相同。
使用KeyValueTextInputFormat的示例
1 .需求
计算与输入文件中每行第一个单词相同的行数。
(1)输入数据
bangnihaoxihuanhadoopbangbangbangnihaoxihuanhadoopbanching (2)期望结果数据
班张2 xiluan 22 .需求分析
3 .代码实现
(1)创建Mapper类
import java.io.IOException; importorg.Apache.Hadoop.io.long writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.mapper; publicclasskvtextmapperextendsmapper {//1valuelongwritablev=newlongwritable ( 1; @ overrideprotectedvoidmap ( text key,Text value,Context context ) throws IOException,interrupted exception/banzhangnihihion
import java.io.IOException; importorg.Apache.Hadoop.io.long writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.reducer; publicclasskvtextreducerextendsreducer { longwritablev=newlongwritable (; @ overrideprotectedvoidreduce ( text key,Iterable values,上下文) throws IOException,interrupted exception { long sum }v.set(sum ); //2输出context.write(key,v ); (3)创建驱动程序类
import java.io.IOException; importorg.Apache.Hadoop.conf.configuration; importorg.Apache.Hadoop.fs.path; importorg.Apache.Hadoop.io.long writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.job; importorg.Apache.Hadoop.MapReduce.lib.input.fileinputformat; importorg.Apache.Hadoop.MapReduce.lib.input.keyvaluelinerecordreader; importorg.Apache.Hadoop.MapReduce.lib.input.keyvaluetextinputformat; importorg.Apache.Hadoop.MapReduce.lib.output.fileoutputformat; publicclasskvtextdriver { publicstaticvoidmain ( string [ ] args ) throws IOException,ClassNotFoundException,interruptedexception //#039; &; #039; ); 获取//1job对象job=job.getinstance ( conf ); //2设置jar包的位置并将mapper与reducer job.setjarbyclass ( kvtextdriver.class )相关联job.setreducerclass ( kvtextreducer.class; 设置//3map输出kv类型job.setmapoutputkeyclass ( text.class ); job.setmapoutputvalueclass ( long writable.class; //4设置最终输出kv类型job.setoutputkeyclass ( text.class ); job.setoutputvalueclass ( long writable.class; //5设置输入/输出数据路径fileinputformat.setinputpaths ( job,newpath(Args[0] ) ); //输入格式job.setinputformatclass ( keyvaluetextinputformat.class ); //6输出数据路径fileoutputformat.setoutputpath ( job,newpath(Args[1] ) ); //7提交job job.wait for completion ( true ) }}NLineInputFormat用法示例
1 .需求
要求对每个单词统计个数,根据每个输入文件的行数规定输出几个切片。 在这个案例中,需要每三行放入一个切片。
(1)输入数据
bangnihaoxihuanhadoopbangbangbanchangnihaoxihuanhadoopbanchangnihaoxihuanhadoopbangnihaoxihuangnihaoxxihang 期待Haoxihuanhadoopbadoopban GihaoxihuanHadoopBangBangBangNihaoxihuanHadoopBanghang (2)输出数据
Number of splits:42 .需求分析
3 .代码实现
(1)创建Mapper类
import java.io.IOException; importorg.Apache.Hadoop.io.long writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.mapper; publicclassnlinemapperextendsmapper { privatetextk=new text (; privatelongwritablev=newlongwritable ( 1; @ overrideprotectedvoidmap ( longwritablekey,Text value,Context context ) throws IOException,InterruptedException { //1
e.toString(); // 2 切割 String[] splited = line.split(” “); // 3 循环写出 for (int i = 0; i < splited.length; i++) { k.set(splited[i]); context.write(k, v); } }}
(2)编写Reducer类
import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class NLineReducer extends Reducer{ LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0l; // 1 汇总 for (LongWritable value : values) { sum += value.get(); } v.set(sum); // 2 输出 context.write(key, v); }}
(3)编写Driver类
import java.io.IOException;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NLineDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { “e:/input/inputword”, “e:/output1” }; // 1 获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 7设置每个切片InputSplit中划分三条记录 NLineInputFormat.setNumLinesPerSplit(job, 3); // 8使用NLineInputFormat处理记录数 job.setInputFormatClass(NLineInputFormat.class); // 2设置jar包位置,关联mapper和reducer job.setJarByClass(NLineDriver.class); job.setMapperClass(NLineMapper.class); job.setReducerClass(NLineReducer.class); // 3设置map输出kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 4设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 5设置输入输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6提交job job.waitForCompletion(true); }}
4.测试
(1)输入数据
banzhang ni haoxihuan hadoop banzhangbanzhang ni haoxihuan hadoop banzhangbanzhang ni haoxihuan hadoop banzhangbanzhang ni haoxihuan hadoop banzhangbanzhang ni haoxihuan hadoop banzhang banzhang ni haoxihuan hadoop banzhang
(2)输出结果的切片数
自定义InputFormat
在企业开发中,Hadoop框架自带的InputFormat类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题。
自定义InputFormat步骤如下:
(1)自定义一个类继承FileInputFormat
(2)改写RecordReader,实现一个读取一个完整文件封装为KV
(3)在输出时使用SequenceFileOutPutFormat输出合并文件。
自定义InputFormat案例实操
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
1.需求
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
(1)输入数据
(2)期望输出文件格式
2.需求分析
1.自定义一个类继承FileInputFormat
(1)重写isSplitable()方法,返回false不可切割
(2)重写createRecordReader(),创建自定义的RecordReader对象,并初始化
2.改写RecordReader,实现一次读取一个完整文件封装为KV
(1)采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文件封装了value中。
(2)获取文件路径信息+名称,并设置key
3.设置Driver
3.程序实现
(1)自定义InputFromat
import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;// 定义类继承FileInputFormatpublic class WholeFileInputformat extends FileInputFormat{ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context); return recordReader; }}
(2)自定义RecordReader类
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class WholeRecordReader extends RecordReader{ private Configuration configuration; private FileSplit split; private boolean isProgress= true; private BytesWritable value = new BytesWritable(); private Text k = new Text(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit)split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (isProgress) { // 1 定义缓存区 byte[] contents = new byte[(int)split.getLength()]; FileSystem fs = null; FSDataInputStream fis = null; try { // 2 获取文件系统 Path path = split.getPath(); fs = path.getFileSystem(configuration); // 3 读取数据 fis = fs.open(path); // 4 读取文件内容 IOUtils.readFully(fis, contents, 0, contents.length); // 5 输出文件内容 value.set(contents, 0, contents.length);// 6 获取文件路径及名称String name = split.getPath().toString();// 7 设置输出的key值k.set(name); } catch (Exception e) { }finally { IOUtils.closeStream(fis); } isProgress = false; return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return k; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { }}
(3)编写SequenceFileMapper类处理流程
import java.io.IOException;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class SequenceFileMapper extends Mapper{ @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); }}
(4)编写SequenceFileReducer类处理流程
import java.io.IOException;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class SequenceFileReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); }}
(5)编写SequenceFileDriver类处理流程
package com.atguigu.mapreduce.inputformat;import java.io.IOException;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class SequenceFileReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); }}(5)编写SequenceFileDriver类处理流程package com.atguigu.mapreduce.inputformat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;public class SequenceFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { “e:/input/inputinputformat”, “e:/output1” };// 1 获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf);// 2 设置jar包存储位置、关联自定义的mapper和reducer job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class);// 7设置输入的inputFormat job.setInputFormatClass(WholeFileInputformat.class);// 8设置输出的outputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class);// 3 设置map输出端的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); // 4 设置最终输出端的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class);// 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6 提交job boolean result = job.waitForCompletion(true); System.exit(result 0 : 1); }}3.2 MapReduce工作流程
MapReduce工作流程
1.流程示意图
2.流程详解
上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:
1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3)多个溢出文件会被合并成大的溢出文件
4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
3.注意
Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M。
4.源码解析流程
context.write(k, NullWritable.get());output.write(key, value);collector.collect(key, value,partitioner.getPartition(key, value, partitions)); HashPartitioner();collect() close() collect.flush()sortAndSpill() sort() QuickSortmergeParts(); //file.out //file.out.indexcollector.close();
Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
MapTask工作机制
1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
ReduceTask工作机制
1.ReduceTask工作机制
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
2.设置ReduceTask并行度(个数)
ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:
3.实验:测试ReduceTask多少合适
1)实验环境:1个Master节点,16个Slave节点:CPU:8GHZ,内存: 2G
(2)实验结论:
4.注意事项
(1)如果ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。
(2)ReduceTask默认值就是1,所以输出文件个数为一个。
(3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜。
(4)ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。
(5)具体多少个ReduceTask,需要根据集群性能而定。
(6)如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。大于1肯定不执行。