Hadoop汇总

文章目录

  • 1、集群搭建
    • 1.1、流程
    • 1.2、集群搭建这部分有两个面试题
      • 2.1、常用端口号的说明:
      • 2.2、常用配置文件
      • 1.3、集群时间同步(但没必要)
      • 2、HDFS
        • 2.1、HDFS的shell操作(开发重点)
        • 2.2、HDFS的客户端API
        • 2.3、面试重点:读写流程
        • 2.4、NN和2NN(了解)
        • 2.5、DataNode工作机制(了解)
        • 3、MapReduce
          • 3.1、序列化
            • 1、概念
            • 2、实现(举例)
            • 3.2、核心框架原理
              • 1、InputFormat
              • 2、Shuffle机制
              • 3、OutputFormat数据输出
              • 4、Reduce Join
              • 5、Map Join
                • 5.1、使用场景
                • 5.2、优点
                • 5.3、具体方法**:采用DistributedCache**
                • 5.4、思路流程
                • 6、数据清洗ETL
                • 3.3、MR总结
                  • 1、输入数据接口:InputFormat
                  • 2、逻辑处理接口:Mapper
                  • 3、Partitioner分区
                  • 4、Comparable排序
                  • 5、Combiner合并
                  • 6、逻辑处理接口:Reducer
                  • 7、输出数据接口:OutputFormat
                  • 4、Hadoop数据压缩
                  • 5、Yarn
                    • 5.1、基础架构 & 工作机制
                    • 5.2、Yarn调度器和调度算法【重点搞清原理】
                    • 5.3、Yarn命令
                    • 5.4、Yarn生产环境核心参数【非常重要】
                    • 5.5、容量调度器的多队列
                    • 5.6、公平调度器案例
                    • 5.7 Tool接口
                    • 5.8、总结

                      1、集群搭建

                      1.1、流程

                      1. 关闭防火墙【讲义第9页】

                      2. 在其中一台机器创建文件夹,并且修改所属主和所属组,在其上安装java,hadoop,然后克隆虚拟机,

                      3. scp:实现服务器与服务器之间的数据和配置拷贝,当时拷贝的安装包,java和hadoop

                      4. rsync:主要用于备份和镜像,只对差异文件做更新,而scp是把所有文件都复制过去

                      5. xsync:集群分发的脚本,循环复制文件到所有节点的相同目录下,可以给集群下的所有机器批量分发文件,用于将环境变量分发过去,需要注意的是环境变量中的文档是root用户才可以编写

                        所以我们需要使用sudo,但是直接sudo xsync是找不到的,因为加上sudo相当于是root用户了,但是我们把xsync建在了why用户下面,所以这个命令不是系统命令,没法直接sudo xsync使用

                        所以我们应该把命令的路径给他,sudo /home/why_hadoop/bin/xsync,其中这个命令我们是把他建在了主文件夹下面创建的bin中,这个bin不是home根目录下的bin

                      6. ssh免密:ssh可以登录另一台主机,通过 ssh ip地址

                      7. 集群部署计划:

                      1. 配置文件:

                        包括自定义配置文件和默认的四个配置文件

                      2. 配置集群:

                        配置上述说的几大配置文件(讲义中有详细讲解,23-24页)

                        流程就是,按照上述第六点的计划,也就是需求,来具体分配配置

                      3. 启动集群之前,需要配置一下workers,集群上有几个节点,就配置几个主机名称

                      第一次启动的话格式化namenode,之后启动 hdfs,启动yarn

                      注意是在hadoop103上启动yarn

                      通过jps命令检查节点名称和状态

                      1. hdfs的数据存储位置
                      1. 配置历史服务器【讲义27页】

                        查看jobhistory: hadoop102:19888

                      2. 配置日志的聚集:日志聚集概念:应用运行完成以后,将程序运行日志信息上传到 HDFS 系统上

                        日志聚集功能好处:可以方便的查看到程序运行详情,方便开发调试。 注意:开启日志聚集功能,需要重新启动 NodeManager 、ResourceManager 和 HistoryServer。

                        步骤:

                        • 配置 yarn-site.xml
                        • 分发配置
                        • 关闭 NodeManager 、ResourceManager 和 HistoryServer
                        • 启动 NodeManager 、ResourceManage 和 HistoryServer
                        • 删除 HDFS 上已经存在的输出文件
                        • 执行 WordCount 程序
                        • 查看日志
                        • 总结一下:

                          1. 在hadoop102上启动hdfs,和historyserver,在hadoop103上启动yarn

                          2. Web 端查看 HDFS 的 NameNod:hadoop102:9870

                          3. Web 端查看 YARN 的 ResourceManager是:hadoop103:8088

                          4. 一般动态的集群使用时候会单独的对组件进行开启和关闭,就是对集群中一台机器中的一个进程进行操作

                      5. **编写统一开启hdfs和yarn的脚本**,其中需要注意主机名和hadoop版本号:**之后可以直接myhadoop.sh start / stop**
                      6. **编写查看三台服务器 Java 进程脚本**:jpsall
                      7. 要进入why用户搭建集群,不要进入root用户中
                      

                      1.2、集群搭建这部分有两个面试题

                      2.1、常用端口号的说明:

                      2.2、常用配置文件

                      1.3、集群时间同步(但没必要)

                      如果服务器在公网环境(能连接外网),可以不采用集群时间同步,因为服务器会定期 和公网时间进行校准;

                      如果服务器在内网环境,必须要配置集群时间同步,否则时间久了,会产生时间偏差, 导致集群执行任务时间不同步。

                      2、HDFS

                      2.1、HDFS的shell操作(开发重点)

                      // 具体详细见讲义p4
                      

                      主要包括三大部分命令:上传、下载、Hdfs直接加载

                      2.2、HDFS的客户端API

                      这个主要是为了可以在windows中的HDFS远程客户端代码中,远程连接服务器集群,对其进行数据的增删改查

                      为了在windows中可以远程连接上这个集群,所以windows中也需要有hadoop的环境变量

                      // 客户端代码常用套路:
                      /**
                      1、获取一个客户端对象
                      2、执行相关的命令操作
                      3、关闭资源
                      **/
                      

                      注意在maven中写代码,要不记得加上@Test,使用的是Juint,在代码旁边点击按钮运行,要不就是写一个main函数

                      //参数优先级排序:
                      (1)客户端代码中设置的值(就是在java文件中定义的configuration) >(2)ClassPath下的用户自定义配置文件(就是在resource中自己创建的xml中定义的东西) >(3)然后是服务器的自定义配置(xxx-site.xml) >(4)服务器的默认配置(xxx-default.xml)
                      
                      // 这里需要注意的一点,是目标路径如果是win,中间需要转义字符“  \ \ ”,但是如果是hdfs中的则是“/ ”
                      
                      public class HdfsClient { private FileSystem fs;
                          @Before
                          public void init() throws URISyntaxException, IOException, InterruptedException { //连接集群的namenode地址
                              URI uri = new URI("hdfs://hadoop102:8020");
                              //创建一个配置文件
                              Configuration configuration = new Configuration();
                              configuration.set("dfs.replication","2");
                              //用户
                              String user = "why_hadoop";
                              //1、获取到了客户端对象
                              fs = FileSystem.get(uri,configuration,user);
                          }
                          @After
                          public void close() throws IOException { //3、关闭资源
                              fs.close();
                          }
                          //创建目录
                          @Test
                          public void testmkdir() throws URISyntaxException, IOException, InterruptedException { //2、创建一个文件夹
                              fs.mkdirs(new Path("/xiyou/huaguoshan"));
                          }
                          //上传
                          @Test
                          public void testPut() throws IOException { // 参数解读:是否删除原数据,是否覆盖,原数据路径win,目的路径HDFS
                              fs.copyFromLocalFile(false,true,new Path("E:\\Work\\sunwukong.txt"),new Path("/xiyou/huaguoshan"));
                          }
                          //下载
                          @Test
                          public void testGet() throws IOException { // 参数解读:原文件是否删除,源文件路径HDFS,目标地址路径win,false表示校验
                              fs.copyToLocalFile(true,new Path("/xiyou/huaguoshan"),new Path("E:\\Work"),true);
                          }
                          //删除
                          @Test
                          public void testRm() throws IOException { // 参数解读:要删除路径,是否递归删除,
                      //        fs.delete(new Path("/jdk-8u212-linux-x64.tar.gz"),false);
                              //删除空目录
                      //        fs.delete(new Path("/xiyou"),false);
                              //删除非空目录
                              //必须要递归删除
                              fs.delete(new Path("/jinguo"),true);
                          }
                          //文件的更名和移动
                          @Test
                          public void testmv() throws IOException { // 参数解读:原文件路径,目标文件路径,
                      //        fs.rename(new Path("/input/word.txt"),new Path("/input/song.txt"));
                              //文件的移动和更名
                      //        fs.rename(new Path("/input/song.txt"),new Path("/cls.txt"));
                              //目录的更名
                              fs.rename(new Path("/input"),new Path("/output1"));
                          }
                          //获取文件详细信息
                          @Test
                          public void fileDetail() throws IOException { // 参数解读:原文件路径,是否递归
                              //这里是一个迭代器,所以写好fs.listFiles后,后面“。var”一下补全代码
                              RemoteIterator listFiles = fs.listFiles(new Path("/"), true);
                              //遍历文件
                              //这么简写:listFiles.hasNext().while
                              while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next();
                                  System.out.println("======"+fileStatus.getPath()+"=======");
                                  System.out.println(fileStatus.getPermission());
                                  System.out.println(fileStatus.getGroup());
                                  System.out.println(fileStatus.getOwner());
                                  System.out.println(fileStatus.getLen());
                                  System.out.println(fileStatus.getBlockSize());
                                  System.out.println(fileStatus.getPath().getName());
                                  //获取块信息
                                  //fileStatus.getBlockLocations().var
                                  BlockLocation[] blockLocations = fileStatus.getBlockLocations();
                                  System.out.println(Arrays.toString(blockLocations));
                              }
                          }
                          //判断是文件夹还是文件
                          @Test
                          public void testFile() throws IOException { FileStatus[] listStatus = fs.listStatus(new Path("/"));
                              for (FileStatus status : listStatus) { if (status.isFile()) { System.out.println("这是一个文件,"+status.getPath().getName());
                                  }else{ System.out.println("这是一个目录,"+status.getPath().getName());
                                  }
                              }
                          }
                      }
                      

                      2.3、面试重点:读写流程

                      详细看讲义

                      对讲义进行了标注

                      2.4、NN和2NN(了解)

                      详细看讲义

                      对讲义进行了标注

                      2.5、DataNode工作机制(了解)

                      详细看讲义

                      对讲义进行了标注

                      3、MapReduce

                      1、mapreduce主要分为两部分:Map阶段和Reduce阶段
                      2、map阶段的并发MapTask,完全并行运行,互不相干
                      3、reduce阶段的并发Reduce Task,完全互不相干,但是他们的数据是依赖于上一阶段的所有Map Task并发实例的输出
                      4、MapReduce编程模型只能包含一个Map阶段和一 个Reduce阶段,如果用户的业务逻辑非常复杂,那 就只能多个MapReduce程序,串行运行
                      

                      下面是通常情况下的步骤

                      • map阶段的输入一个键值对,输出也是一个键值对
                      • map输出的键值对用作reduce阶段的输入键值对,最后还是输出一个键值对kv
                      • 第三个阶段,Driver阶段,相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是 封装了MapReduce程序相关运行参数的job对象

                      可以看讲义,里面有一个wordcount的案例

                      // 需要注意的是在idea中使用补全代码功能的时候,一定要看清导的包是什么,重名的很多
                      

                      3.1、序列化

                      1、概念

                      如果在传输过程中,key-value涉及到多个字段,一开始最简单的,key是偏移量,value就是简单的名称

                      但是经过map阶段后,输出的key-value,中value可能有多个字段

                      这种时候就需要自定义序列化

                      封装一个类,用来实现序列化

                      2、实现(举例)

                      就是多了自定义 bean 对象实现序列化接口(Writable)

                      3.2、核心框架原理

                      /*
                      正常来说map阶段输入进来的key是int类型的表示偏移量,v就是输入文件中一行一行的内容
                      然后reduce的输入就是经过map处理的,比如最简单的wordcount,记录次数,那么key就是数字名字,v就是一个集合<次数,次数>,就全部是1,因为它相当于在中间的shullfe阶段对其中相同key的值进行了一个统计,出现一次,就往其对应的v的集合中加一个1
                      所以在reduce阶段的代码中,如果对value进行for循环,意思是对key1对应的value集合先进行循环,再去看key2的
                      但是如果key1对应的value我们再map阶段设置成了NULLwritable,而且其次数还不唯一,那么本来假如是sun <1,1>,但是我们设置成null
                      就是sun ,如果在reduce阶段直接context.write,那么就会只记录其出现了一次
                      【具体的可以见讲义38页在讲output的时候遇到了】
                      */
                      

                      1、InputFormat

                      • 主要涉及的是切片和MapTask的并行度决定机制

                        一个切片对应一个MapTask,其中切片是在job提交的过程中完成的,需要注意的是切片的规划是对每一个文件规划一次,而不是考虑数据集整体

                        • Fileinputfromat 切片具体流程如下所示【但是还有别的切片方法和处理方式,比如combineTextinputformat等】
                          /** 
                          去看讲义的3.1部分 
                          **/
                          这里面还介绍了combineTextInputFormat的切片,就是引入一个虚拟存储过程
                          实际在代码中就是在driver中添加两行代码
                              
                          // 如果不设置InputFormat,它默认用的是TextInputFormat.class
                          job.setInputFormatClass(CombineTextInputFormat.class);
                          //虚拟存储切片最大值设置20m
                          CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
                          

                          2、Shuffle机制

                          需要注意的是,如果没有reducer就没有数据混洗的过程,就是下面说的shuffle机制

                          // 是在map之后,reduce之前的数据处理过程
                          // 在讲义的22页开始
                          

                          shuffle其实就是在mapper中出来,然后进入环形缓冲区中,对数据进一步的处理,比如排序,编号,从内存到硬盘,从硬盘到内存,合并,压缩等过程

                          1. 其中主要包括的是partitioner分区,就是选择具体划分给几个reducer  				//讲义23页-26页
                              这里主要就是需要单独写一个继承partitioner的类,内容就是重写getPartition
                              根据需求指定什么情况返回值是几,返回值是几就表明最后哪种情况给分配的reducer是第几个
                              然后最后在driver中set一下
                               //8 指定自定义分区器
                                  job.setPartitionerClass(ProvincePartitioner.class);
                               //9 同时指定相应数量的ReduceTask
                                  job.setNumReduceTasks(5);
                          2. 以及排序,对分区内的数据进行排序,这里是继承WritableComparable的接口,内部排序	   //讲义27页-33页
                              
                          3. Combiner合并操作														   //讲义34页-36页
                              因为reducer的个数相较mapper的个数要少很多,所以如果数据量很大的情况下
                              如果执行了combiner操作,那么在mapper阶段相当于对数据进行了整合,最后给reducer的数据就会相对少一些,可以减轻负担
                              其实其本质也是在mapper阶段,后面再写一个类,继承了reducer然后进行数据的一些整合,需要注意的就是数据的泛型定义【或者方法二,可以看讲义,其实就是利用之前的写好的thing reducer的类】
                              另外在最后的driver中加一行setcombine巴拉巴拉的就好
                              // 需要注意的是,combiner不能乱用,前提是不能影响业务逻辑,别如求和可以,但是求平均就不可以,因为数据总量会进行改变
                          

                          3、OutputFormat数据输出

                          1. 自定义一个outputformat类,继承Fileoutputformat
                          2. 里面内容是,重写recordWriter类型的getrecordWriter,在里面实现设置文件的输出流
                          3. 上面重写的那个方法,需要一个recordWriter类型的返回值,那么就需要再写一个类继承recordWriter
                          4. 在继承了recordWriter的类里面重写输出流,需要注意的是里面的抛异常,不要向上抛了,在内部抛异常即可,最后记得关流
                          5. 最后在driver驱动类中setOutputFormatClass

                          4、Reduce Join

                          Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

                          Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

                          // 这里在讲义里的第47页,举例子里面
                          之所以获取文件名字不是在map函数里面
                          单独写了一个setup函数,是因为每一个文件只需要获取一次的文件名
                          如果获取文件名的那个filespilt方法写在了map里面,那就是相当于在读取文件每一行的过程中都会获取一次文件名字
                          所以我们用的是setup方法【不过这里听疑惑的,估计是源码里知道有这个方法,但是我没注意,所以不知道为什么可以可以直接就去重写setup方法】
                              //算是一种优化
                              
                          需要注意的是在这里面,就是reduce部分,想要从value中取值,进行处理,需要注意,这里和java的迭代器不一样
                          如果直接从for循环中的value中取出来的值,将其赋值给类对象,hadoop这里进行了更改
                          只是赋给其地址,所以我们需要单独创建一个tmp,将values中for出来的值赋值给那个tmp,再将tmp赋值给最终需要整合的对象里面
                          

                          5、Map Join

                          如果合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

                          因为map task可以有多个,而且每一个map Task的大小都是128左右,但是可以有很多个map Task,所以我们还是希望join的操作是在map阶段

                          5.1、使用场景

                          ​ Map Join适用于一张表十分小、一张表很大的场景。

                          具体的意思是,Map Join适合处理两张表的工作,其中一张是比较小的 ,可以缓存至内存;另外一张特别大的可以用在map Task中

                          5.2、优点
                          • 在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
                            5.3、具体方法**:采用DistributedCache**

                            ​ (1)在Mapper的setup阶段,将文件读取到缓存集合中。

                            ​ (2)在Driver驱动类中加载缓存。

                            //缓存普通文件到Task运行节点。
                            job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
                            //如果是集群运行,需要设置HDFS路径
                            job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
                            

                            ====================== 那具体怎么做呢? =====================================================

                            5.4、思路流程
                            • 这里主要的问题是两个,一个是数据文件加载到内存,这个是写在驱动类中,就是上面5.3中介绍到的,分两种情况,写下其中一种代码。同时Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
                               job.setNumReduceTasks(0);
                              
                              • 另外一个就是在Mapper阶段怎么将文件,读取到缓存集合中,这里主要涉及到的是IO流的知识:
                                1. 通过context从cache中获取加载进去的文件,这里返回的是一个地址集合

                                  //通过缓存文件得到小表数据pd.txt
                                  URI[] cacheFiles = context.getCacheFiles();
                                  
                                2. 然后我们path一下,获取上面的地址集合中我们想要的,其实也就是path[0]

                                  Path path = new Path(cacheFiles[0]);
                                  
                                3. 下面需要考虑的就是我们怎么从地址中获取数据,那么就需要创建一个IO流

                                  //获取文件系统对象,并开流
                                  FileSystem fs = FileSystem.get(context.getConfiguration());
                                  FSDataInputStream fis = fs.open(path);
                                  
                                4. 获取到了流数据后,我们怎么读取呢,这里用到了BufferedReader转换一下

                                  //通过包装流转换为reader,方便按行读取
                                  BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"UTF-8"));
                                  
                                5. 之后就是逻辑操作

                                  //逐行读取,按行处理
                                  String line;
                                  while (StringUtils.isNotEmpty(line = reader.readLine())) {//切割一行
                                      //01	小米
                                      String[] split = line.split("\t");
                                      pdMap.put(split[0], split[1]);
                                  }
                                  
                                6. 最后记得关流

                                  IOUtils.closeStream(reader);
                                  

                                6、数据清洗ETL

                                ​ 在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

                                ​ 抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库

                                ​ hadoop只是ETL的工具之一

                                // 讲义第53页
                                

                                3.3、MR总结

                                1、输入数据接口:InputFormat

                                (1)默认使用的实现类是:TextInputFormat

                                (2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。

                                (3)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。

                                2、逻辑处理接口:Mapper

                                用户根据业务需求实现其中三个方法:map() setup() cleanup ()

                                3、Partitioner分区

                                (1)有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces

                                (2)如果业务上有特别的需求,可以自定义分区。

                                4、Comparable排序

                                (1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。

                                (2)部分排序:对最终输出的每一个文件进行内部排序。

                                (3)全排序:对所有数据进行排序,通常只有一个Reduce。

                                (4)二次排序:排序的条件有两个。

                                5、Combiner合并

                                Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

                                6、逻辑处理接口:Reducer

                                用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()

                                7、输出数据接口:OutputFormat

                                (1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。

                                (2)用户还可以自定义OutputFormat。

                                4、Hadoop数据压缩

                                // 重点看讲义56页之后
                                

                                主要涉及到的就是上面图中的三个位置

                                5、Yarn

                                YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成。

                                主要是负责资源调度的

                                5.1、基础架构 & 工作机制

                                5.2、Yarn调度器和调度算法【重点搞清原理】

                                ​ 目前,Apache Hadoop3.1.3默认的资源调度器是Capacity Scheduler(容量调度器)。

                                ​ Hadoop作业调度器主要有三种:

                                • FIFO
                                • 容量(Capacity Scheduler)
                                • 公平(Fair Scheduler)。

                                  CDH框架默认调度器是Fair Scheduler(公平调度器)。

                                  其中FIFO先进先出不适合大数据中的高并发,所以只是用作引入了解

                                  重点学习掌握容量和公平两个调度器

                                  // 重点看讲义第6页到第10页
                                  是面试重点,也是理论基础
                                      
                                  容量调度器的特点,公平调度器的特点
                                  两个共同之处是多队列、容量保证、灵活性、多租户
                                  //不同之处是容量调度器优先选择资源利用率第低的队列、但是公平调度器优先选择对资源缺额比较大【这里就需要重点看一下缺额是什么】
                                  //另外两种调度器每个队列的设置的资源分配方式也不一样,公平调度多了一个Fair策略
                                      
                                  除了特点需要明白,还需要明白两种调度器的:
                                      资源分配方式    //三个维度:针对队列、作业、容器
                                      资源分配算法
                                  

                                  5.3、Yarn命令

                                  // 主要见讲义的第10页到12页
                                  
                                  • yarn application查看任务
                                  • yarn logs查看日志
                                  • yarn applicationattempt查看尝试运行的任务
                                  • yarn container查看容器
                                  • yarn node查看节点状态
                                  • yarn rmadmin更新配置
                                  • yarn queue查看队列

                                    5.4、Yarn生产环境核心参数【非常重要】

                                    如果对并发度要求比较高,那么选择公平调度器

                                    如果并不是很高,那就是容量调度器,Apache默认就是容量调度器

                                    下图中的参数在实际生产中是必须要注意和配置的,因为默认的配置很有可能会与我们的集群设配不符合

                                    如果不进行配置,那么很容易造成集群崩掉

                                    // 也可以看讲义13页-16页的实例进行巩固理解
                                    这里的例子讲解了怎么根据实际的需求去具体的更改配置
                                    

                                    注:调整下列参数之前尽量拍摄Linux快照,否则后续的案例,还需要重写准备集群。

                                    5.5、容量调度器的多队列

                                    1)在生产环境怎么创建队列?

                                    • 调度器默认就1个default队列,不能满足生产要求。
                                    • 按照框架:hive /spark/ flink 每个框架的任务放入指定的队列(企业用的不是特别多)
                                    • 按照业务模块:登录注册、购物车、下单、业务部门1、业务部门2

                                      2)创建多队列的好处?

                                      • 因为担心员工不小心,写递归死循环代码,把所有资源全部耗尽。

                                      • 实现任务的降级使用,特殊时期保证重要的任务队列资源充足。11.11 6.18

                                      • 业务部门1(重要)=》业务部门2(比较重要)=》下单(一般)=》购物车(一般)=》登录注册(次要)

                                        // 具体怎么设置多队列以及配置使用哪个队列,因为默认是default队列
                                        // 可以见讲义17-19页
                                        
                                        // 优先级配置的例子在讲义第20页
                                        一般默认是没有设置优先级,都是0
                                        需要在yarn-site文件里面配置一下
                                        

                                        5.6、公平调度器案例

                                        // 一般用作中大型公司
                                        //见讲义21页例子
                                        

                                        5.7 Tool接口

                                        在想要给mapreduce程序分配队列时候,我们命令行里面并不是只有输入路径,输出路径

                                        还会有个选择哪个资源队列的参数

                                        这时候如果不设置Tool接口,那么系统会默认第一个参数,就是上面选择哪个资源队列的参数为输入路径了

                                        这就是问题所在

                                        1. 我们在写mapreduce的流程需要更改一些:写map和reduce的时候变成内部类

                                        2. 我们需要单独写一个类,实现了接口Tool,在里面重写run等三个核心驱动方法,并且map和reduce写在里面的内部类

                                          【就是public static class mapper ,reducer等】

                                          这里面的conf不是new出来的,是之后有一个专门的驱动类里面的Torunner给他传进来

                                          所以这里面我们只要写好getConf,setConf,和run,假设我们现在已经取得了conf信息了

                                          public class WordCount implements Tool { private Configuration conf;
                                              //核心驱动(conf需要传入)
                                              @Override
                                              public int run(String[] args) throws Exception { Job job = Job.getInstance(conf);
                                                  job.setJarByClass(WordCountDriver.class);
                                                  job.setMapperClass(WordCountMapper.class);
                                                  job.setReducerClass(WordCountReducer.class);
                                                  job.setMapOutputKeyClass(Text.class);
                                                  job.setMapOutputValueClass(IntWritable.class);
                                                  job.setOutputKeyClass(Text.class);
                                                  job.setOutputValueClass(IntWritable.class);
                                                  FileInputFormat.setInputPaths(job, new Path(args[0]));
                                                  FileOutputFormat.setOutputPath(job, new Path(args[1]));
                                                  return job.waitForCompletion(true) ? 0 : 1;
                                              }
                                              @Override
                                              public void setConf(Configuration configuration) { this.conf = conf;
                                              }
                                              @Override
                                              public Configuration getConf() { return conf;
                                              }
                                              //mapper
                                              ...
                                              //reducer
                                              ...
                                                  
                                           }
                                          }
                                          
                                        3. 另外驱动类也需要更改一下

                                          public class WordCountDriver { private static Tool tool;
                                              public static void main(String[] args) throws Exception { // 1. 创建配置文件
                                                  Configuration conf = new Configuration();
                                                  // 2. 判断是否有tool接口
                                                  switch (args[0]){ case "wordcount":
                                                          tool = new WordCount(); //这里是判断集群中的命令行第一个参数是什么,如果是yarn命令,		   							 				//就执行上面定义的方法
                                                          break;
                                                      default:
                                                          throw new RuntimeException(" No such tool: "+ args[0] );
                                                  }
                                                  // 3. 用Tool执行程序
                                                  // Arrays.copyOfRange 将老数组的元素放到新数组里面
                                                  int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
                                                  System.exit(run);
                                              }
                                          }
                                          

                                          5.8、总结

                                          // 1. 创建配置文件 Configuration conf = new Configuration();
                                           // 2. 判断是否有tool接口
                                              switch (args[0]){
                                                  case "wordcount":
                                                      tool = new WordCount(); //这里是判断集群中的命令行第一个参数是什么,如果是yarn命令,		   							 				//就执行上面定义的方法
                                                      break;
                                                  default:
                                                      throw new RuntimeException(" No such tool: "+ args[0] );
                                              }
                                              // 3. 用Tool执行程序
                                              // Arrays.copyOfRange 将老数组的元素放到新数组里面
                                              int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
                                              System.exit(run);
                                          }
                                          

                                          }

                                          ## 5.8、总结