快乐学习
前程无忧、中华英才非你莫属!

深入hadoop的文件系统与数据流详解

学hadoop的童鞋注意了:(http://hadoop.apache.org/docs/r1.0.4/cn/)官方中文api文档。

一、Hadoop文件系统

Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。Java抽象类 org.apache.hadoop.fs.FileSystem定义了Hadoop 中的一个文件系统接口,并且该抽象类有几个具体实现,如表3-1所示

文件系统

URI方案

Java实现(均包含在org.apache.hadoop包中)

描述

Local

file

fs.LocalFileSystem

使用了客户端校验和的本地磁盘文件系统。没有使用校验和的本地磁盘文件系统RawLocalFileSystem。详情参见4.1.2节

HDFS

hdfs

hdfs.DistributedFileSystem

Hadoop 的分布式文件系统。将HDFS设计成与MapReduce结合使用,可以实现高性能

HFTP

Hftp

hdfs.hftpFileSystem

一个在HTTP 上提供对HDFS 只读访问的文件系统(尽管名称为HFTP,但与FTP无关)。通常与distcp结合使用(参见3.8节),以实现在运行不同版本的HDFS的集群之间复制数据

HSFTP

hsftp

hdfs.HsftpFileSyste

在HTTPS 上提供对HDFS只读访问的文件系统(同上,与FTP 无关)

WebHDFS

Webhdfs

Hdfs.web.WebHdfsFileSystem

基于HTTP,对HDFS提供安全读写访问的文件系统。WebHDFS是为了替代HFTP和HSFTP而构建的

HAR

har

fs.HarFileSystem

一个构建在其他文件系统之上用于文件存档的文件系统。Hadoop存档文件系统通常用于需要将HDFS 中的文件进行存档时,以减少namenode内存的使用。参见3.9节

hfs(云存储)

kfs

fs.kfs.kosmosFileSystem

CloudStore(其前身为Kosmos文件系统)是类似于HDFS或是谷歌的GFS的文件系统,用C++写。详情参见http://kosmosfs.sourceforge.net/

FTP

ftp

fs.ftp.FTPFileSystem

由FTP 服务器支持的文件系统

S3(原生)

S3n

fs.s3native.NativeS3FileSystem

由Amazon S3 支持的文件系统。参见http://wiki.apache.org/hadoop/AmazonS3

S3(基于块)

S3

fs.sa.S3FileSystem

由Amazon S3 支持的文件系统,以块格式存储文件(与HDFS 很相似)以解决S3 的5 GB文件大小限制

分布式RAID

hdfs

hdfs.DistributedRaidFileSystem

RAID版本的HDFS是为了存档而设计的。针对HDFS中的每个文件,创建一个(更小的)校验文件,并允许HDFS中的数据副本由3降为2,由此可以减少25%~30%的存储空间,但是数据丢失的概率保持不变。分布式RAID模式需要在集群中运行一个RaidNode后台进程

View

viewfs

viewfs.ViewFileSystem

针对其他Hadoop文件系统挂载的客户端表。通常用于联邦namenode创建挂载点。详情参见3.2.3节。

Hadoop 对文件系统提供了许多接口,它一般使用URI 方案来选取合适的文件系统实例进行交互。举例来说,我们在前一小节中遇到的文件系统命令行解释器可以操作所有的Hadoop 文件系统命令。要想列出本地文件系统根目录下的文件,可以输入以下命令:

% hadoop fs -ls file:///

尽管运行的MapReduce程序可以访问任何文件系统(有时也很方便),但在处理大数据集时,建议你还是选择一个有数据本地优化的分布式文件系统,如HDFS。

二、HDFS源码分析

2.1 HDFS客户端的输入流类结构

当从HDFS集群中读取数据到客户端时,就要用到HDFS中的输入流类了,但是这里使用到了DFSInputSteam,FSInputStream,FSDataInputStream,DFSDataInputStream等这些类,如果不了解Java IO类的设计思想,这里很容易看迷糊,为什么用到类这么多*InputStream类?

装饰器模式与Java IO中的装饰器模式

Java IO中,也有很多输入流类,比如InputStream,FileInputStream,FilterInputStream,DataInputStream等等,那么这些类是如何组织的呢?什么时候使用FileInputStream,什么时候使用DataInputStream?这就要从装饰器模式开始学习。关于Java IO中的装饰器模式可以参考博文:http://www.cnblogs.com/zuoxiaolong/p/pattern11.html,这篇博文比较清晰的介绍类装饰器模式,以及Java IO的设计者如何将装饰器模式应用到Java IO类的设计中去的。

简单点理解就是,在Java IO相关的类中,InputStream是java IO中的顶层接口类,其他所有的有关IO的类都会直接或者间接用到这个类。FilterInputStream类中持有一个对IpnutStream子类对象的引用,如FileInputStream类的对象,而DataInputStream继承自FilterInputStream,在FilterInputStream类的基础上增加了读取基本的Java数据类型等功能,如使用DataInputStream可以读取一个int类型的数据,可以读取long类型的数据。

HDFS中输入流的装饰器模式

FSInputStream是Hadoop抽象文件系统规定的一个输入流接口,它继承自InputStream抽象类,FSDataInuptStream类是Hadoop抽象文件系统中的一个输入流接口,继承自java.io.DataInputStream,这两个类是Hadoop抽象文件系统规定的类,实现一个具体的Hadoop文件系统的输入流都要从这两个类继承,如HDFS中的DFSInputStream继承自FSInputStream,DFSDataInuptStream继承自FSDataInputStream。

DFSInputStream类是HDFS客户端的输入数据流类,其在HDFS客户端中的作用与InputStream类似类似,直接处理数据。

DFSDataInputStream是HDFS客户端输入流类,其在HDFS客户端中的作用与DataInput在Java IO中的作用类似,它通过持有一个DFSInputStream对象引用来处理数据流

可见,HDFS中的输入流的类结构与Java IO中类似,都使用类装饰器模式来设计输入流的结构。

此外在读取HDFS的的数据过程中DFSInputStream类负责从HDFS中读取数据,在这个过程中需要与远程的机器进行网络通信(暂不考虑本地读取)。这个过程中则使用org.apache.hadoop.hdfs.BlockReader接口的实现类org.apache.hadoop.hdfs.DFSClient.RemoteBlockReader.RemoteBlockReader来读取数据,RemoteBlockReader类继承自org.apache.hadoop.fs.FSInputChecker类,其中FSInputChecker类为数据输入提供了检验能力,RemoteBlockReader通过Socket与远程的机器通信,进而读取HDFS集群中的数据。

Reference

http://www.cnblogs.com/zuoxiaolong/p/pattern11.html

《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》


2.2 stream------Hadoop--FSDataInputStream和FSDataOutputStream

一、FSDataInputStream

   FileSystem中的open()方法实际上返回的是一个FSDataInputStream,而不是标准的java.io类。这个类是java.io.DataInputStream

   的一个子类,支持随机访问,这样就可以从流的任何位置读取数据了

  public class FSDataInputStream extends DataInputStream
       implements Seekable, PositionedReadable,
       ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
       HasEnhancedByteBufferAccess {。。。。。}

   Seekable接口允许在文件中定位,并提供一个查询方法,用于查询当前位置相对于文件开始处的偏移量(getpos())

  

 public interfence Seekable{
   void seek(long pos) throws IOException;
    long getPos() throws IOException;
    boolean seekToNewSource(long targetPos) throws IOException;
   }

   调用seek() 来定位大于文件长度的位置会导致IOException异常。与java.io.InputStream 中的skip() 不同,seek()并

  没有指出数据流当前位置之后的一点,它可以移到文件中任意一个绝对位置。

  应用程序开发人员并不常用seekToNewSource()方法。此方法一般倾向于切换到数据的另一个副本并在新的副本中寻找targetPos指定的位置。

  HDFS内部就采用这样的方法在数据节点故障时为客户端提供可靠的数据输入流。

  FSDataInputStream也实现了PositionedReadable接口,从一个指定位置读取一部分数据

二、FSDataOutputStream
Hadoop 的FileSystem中的create()方法返回了一个FSDataOutputStream,与FSDataInputStream类似,
它也有一个查询文件当前位置的方法:
public class FSDataOutputStream extends DataOutputStream
    implements Syncable, CanSetDropBehind {
............
...........
 public long getPos() throws IOException {
      return position;                            // return cached position
    }

}

但是,与FSDataInputStream不同,FSDataOutputStream不允许定位。这是因为HDFS只允许对一个打开的文件

顺序写入,或向一个已有文件添加。换句话说,它不支持文件尾部的其他位置的写入,这样一来,写入时的定位就没有什么意义。

 -------------------引自Hadoop权威指南第三版


参考文献:

【1】http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html

【2】http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html

【3】http://wiki.apache.org/hadoop/HadoopDfsReadWriteExample

【4】http://blog.csdn.net/gaoxingnengjisuan/article/details/11177049

hdfs源码分析第一弹http://www.cnblogs.com/davidwang456/p/4772728.html             

hdfs源码分析第二弹http://www.cnblogs.com/davidwang456/p/4778810.html?utm_source=tuicool


参考文献 《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌

Hadoop分布式文件系统--HDFS结构分析:http://blog.csdn.net/androidlushangderen/article/details/47377543

HDFS源码分析(一)-----INode文件节点 http://blog.csdn.net/Androidlushangderen/article/details/47427925           

HDFS源码分析(二)-----元数据备份机制: http://blog.csdn.net/androidlushangderen/article/details/47679977     

HDFS源码分析(三)-----数据块关系基本结构:http://blog.csdn.net/androidlushangderen/article/details/47734269   

HDFS源码分析(四)-----节点Decommission机制http://blog.csdn.net/androidlushangderen/article/details/47788227       、

HDFS源码分析(五)-----节点注册与心跳机制:   http://blog.csdn.net/androidlushangderen/article/details/47945597

HDFS源码分析(六)-----租约 : http://blog.csdn.net/androidlushangderen/article/details/48012001




三、Hadoop系统中数据流详解:

3.6  数据流 

3.6.1  剖析文件读取 

为了了解客户端及与之交互的HDFS、namenode和datanode之间的数据流是什么样的,我们可参考图3-2,该图显示了在读取文件时事件的发生顺序。

客户端通过调用FileSystem对象的open()方法来打开希望读取的文件,对于HDFS 来说,这个对象是分布式文件系统(图3-2 中的步骤1)的一个实例。DistributedFileSystem通过使用RPC(

1.1 RPC (remote procedure call)远程过程调用.
远程过程指的是不是同一个进程。
1.2 RPC至少有两个过程。调用方(client),被调用方(server)。
1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。
1.4 RPC是hadoop构建的基础。
2. 通过例子获得的认识?
2.1 RPC是一个远程过程调用。
2.2 客户端调用服务端的方法,意味着调用服务端的对象中的方法。
2.3 如果服务端的对象允许客户端调用,那么这个对象必须实现接口。
2.4 如果客户端能够调用到服务端对象的方法,那么这些方法一定位于对象的接口中。

)来调用namenode,以确定文件起始块的位置(步骤2)。对于每一个块,namenode返回存有该块副本的datanode地址。此外,这些datanode根据它们与客户端的距离来排序(根据集群的网络拓扑;参见3.6.1节的的补充材料“网络拓扑与Hadoop”)。如果该客户端本身就是一个datanode (比如,在一个MapReduce任务中),并保存有相应数据块的一个副本本时,该节点就会从本地datanode读取数据(参见图3-2)。

图3-2. 客户端读取HDFS中的数据 

DistributedFileSystem类返回一个

FSDataInputStream(http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/fs/FSDataInputStream.html

对象(一个支持文件定位的输入流)给客户端并读取数据。FSDataInputStream类转而封装DFSInputStream对象,该对象管理着datanode和namenode的I/O。

接着,客户端对这个输入流调用read()方法(步骤3)。存储着文件起始几个块的datanode地址的DFSInputStream随即连接距离最近的datanode。通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端(步骤4)。到达块的末端时,DFSInputStream关闭与该datanode的连接,然后寻找下一个块的最佳datanode(步骤5)。客户端只需要读取连续的流,并且对于客户端都是透明的。

客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新建连接的顺序读取的。它也会根据需要询问namenode来检索下一批数据块的datanode的位置。一旦客户端完成读取,就对FSDataInputStream调用close()方法(步骤6)。

在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错误,会尝试从这个块的另外一个最邻近datanode读取数据。它也记住那个故障datanode,以保证以后不会反复读取该节点上后续的块。DFSInputStream也会通过校验和确认从datanode发来的数据是否完整。如果发现有损坏的块,就在DFSInputStream试图从其他datanode读取其复本之前通知namenode。

这个设计的一个重点是,namenode告知客户端每个块中最佳的datanode,并让客户端直接连接到该datanode检索数据。由于数据流分散在集群中的所有datanode,所以这种设计能使HDFS可扩展到大量的并发客户端。同时,namenode只需要响应块位置的请求(这些信息存储在内存中,因而非常高效),无需响应数据请求,否则随着客户端数量的增长,namenode会很快成为瓶颈。

网络拓扑与Hadoop

在本地网络中,两个节点被称为“彼此近邻”是什么意思?在海量数据处理中,其主要限制因素是节点之间数据的传输速率——带宽很稀缺。这里的想法是将两个节点间的带宽作为距离的衡量标准。

不用衡量节点之间的带宽——实际上很难实现(它需要一个稳定的集群,并且在集群中两两节点对数量是节点数量的平方)——Hadoop为此采用一个简单的方法:把网络看作一棵树,两个节点间的距离是它们到最近共同祖先的距离总和。该树中的层次是没有预先设定的,但是相对于数据中心、机架和正在运行的节点,通常可以设定等级。具体想法是针对以下每个场景,可用带宽依次递减:

  • 同一节点上的进程

  • 同一机架上的不同节点

  • 同一数据中心中不同机架上的节点

  • 不同数据中心中的节点

例如,假设有数据中心d1机架r1中的节点n1。该节点可以表示为/d1/r1/n1。利用这种标记,这里给出四种距离描述:

  • distance(/d1/r1/n1, /d1/r1/n1)=0(同一节点上的进程)

  • distance(/d1/r1/n1, /d1/r1/n2)=2(同一机架上的不同节点)

  • distance(/d1/r1/n1, /d1/r2/n3)=4(同一数据中心中不同机架上的节点)

  • distance(/d1/r1/n1, /d2/r3/n4)=6(不同数据中心中的节点)

示意图参见图3-3(数学爱好者会注意到,这是一个测量距离的例子)。最后,我们必须意识到Hadoop无法自行定义网络拓扑结构。它需要我们能够理解并辅助定义,我们将在9.1.1节的“网络拓扑”中讨论如何配置网络拓扑。不过在默认情况下,假设网络是扁平化的只有一层——或换句话说,所有节点都在同一数据中心的同一机架上。规模小的集群可能如此,不需要进一步配置。

图3-3. Hadoop中的网络距离 

3.6.2  剖析文件写入 

接下来我们看看文件是如何写入HDFS 的。尽管比较详细,但对于理解数据流还是很有用的,因为它清楚地说明了HDFS 的一致模型。

我们要考虑的情况是如何新建一个文件,把数据写入该文件,最后关闭该文件。参见图3-4。

图3-4. 客户端将数据写入HDFS 

客户端通过对DistributedFileSystem对象调用create()函数来新建文件(图3-4中的步骤1)。

DistributedFileSystem对namenode创建一个RPC调用,在文件系统的命名空间中新建一个文件,此时该文件中还没有相应的数据块(步骤2)。namenode执行各种不同的检查以确保这个文件不存在以及客户端有新建该文件的权限。如果这些检查均通过,namenode就会为创建新文件记录一条记录;否则,文件创建失败并向客户端抛出一个IOException异常DistributedFileSystem向客户端返回一个FSDataOutputStream对象,由此客户端可以开始写入数据。就像读取事件一样,FSDataOutputStream封装一个DFSoutPutstream对象,该对象负责处理datanode和namenode之间的通信

在客户端写入数据时(步骤3),DFSOutputStream将它分成一个个的数据包,并写入内部队列,称为“数据队列”(data queue)。DataStreamer处理数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块来存储数据复本。这一组datanode构成一个管线——我们假设复本数为3,所以管线中有3个节点。DataStreamer将数据包流式传输到管线中第1个datanode,该datanode存储数据包并将它发送到管线中的第2个datanode。同样,第2个datanode存储该数据包并且发送给管线中的第3个(也是最后一个)datanode (步骤4)。 

DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确认回执,称为“确认队列”(ack queue)。收到管道中所有datanode确认信息后,该数据包才会从确认队列删除(步骤5)。 

如果在数据写入期间datanode发生故障,则执行以下操作(对写入数据的客户端是透明的)。首先关闭管线,确认把队列中的所有数据包都添加回数据队列的最前端,以确保故障节点下游的datanode不会漏掉任何一个数据包。为存储在另一正常datanode的当前数据块指定一个新的标识,并将该标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数据块。从管线中删除故障数据节点并且把余下的数据块写入管线中另外两个正常的的datanode。namenode注意到块复本量不足时,会在另一个节点上创建一个新的复本。后续的数据块继续正常接受处理。 

在一个块被写入期间可能会有多个datanode同时发生故障,但非常少见。只要写入了dfs.replication.min的复本数(默认为1),写操作就会成功,并且这个块可以在集群中异步复制,直到达到其目标复本数(dfs.replication的默认值为3)。 

客户端完成数据的写入后,对数据流调用close()方法(步骤6)。该操作将剩余的所有数据包写入datanode管线,并在联系到namenode且发送文件写入完成信号之前,等待确认(步骤7)。namenode已经知道文件由哪些块组成(通过Datastreamer请求分配数据块),所以它在返回成功前只需要等待数据块进行最小量的复制。 

复本怎么放

namenode如何选择在哪个datanode存储复本(replica)?这里需要对可靠性、写入带宽和读取带宽进行权衡。例如,把所有复本都存储在一个节点损失的写入带宽最小,因为复制管线都是在同一节点上运行,但这并不提供真实的冗余(如果节点发生故障,那么该块中的数据会丢失)。同时,同一机架上服务器间的读取带宽是很高的。另一个极端,把复本放在不同的数据中心可以最大限度地提高冗余,但带宽的损耗非常大。即使在同一数据中心(到目前为止,所有Hadoop 集群均运行在同一数据中心内),也有许多不同的数据布局策略。其实,在发布的Hadoop 0.17.0版中改变了数据布局策略来辅助保持数据块在集群内分布相对均匀(第350页的“均衡器”详细说明了如何保持集群的均衡)。在1.x之后的发行版本,可即时选择数据块的布局策略。

Hadoop 的默认布局策略是在运行客户端的节点上放第1个复本 (如果客户端运行在集群之外,就随机选择一个节点,不过系统会避免挑选那些存储太满或太忙的节点)。第2个复本放在与第一个不同且随机另外选择的机架中节点上(离架)。第3个复本与第2个复本放在同一个机架上,且随机选择另一个节点。其他复本放在集群中随机选择的节点上,不过系统会尽量避免在同一个的机架上放太多复本。

一旦选定复本的放置位置,就根据网络拓扑创建一个管线。如果复本数为3,则有图3-5所示的管线。

总的来说,这一方法不仅提供很好的稳定性(数据块存储在两个机架中)并实现很好的负载均衡,包括写入带宽(写入操作只需要遍历一个交换机)、读取性能(可以从两个机架中选择读取)和集群中块的均匀分布(客户端只在本地机架上写入一个块)。

图3-5. 一个典型的复本管线 

3.6.3  一致模型 

文件系统的一致模型(coherency model)描述了文件读/写的数据可见性。HDFS为性能牺牲了一些POSIX要求,因此一些操作与你期望的可能不同。

POSIX表示可移植操作系统接口(Portable Operating System Interface ,缩写为 POSIX ),POSIX标准定义了操作系统应该为应用程序提供的接口标准,是IEEE为要在各种UNIX操作系统上运行的软件而定义的一系列API标准的总称,其正式称呼为IEEE 1003,而国际标准名称为ISO/IEC 9945。
一个POSIX兼容的操作系统编写的程序,应该可以在任何其它的POSIX操作系统(即使是来自另一个厂商)上编译执行。

新建一个文件之后,它能在文件系统的命名空间中立即可见,如下所示:

  1. [js] view plaincopy
    Path p = new Path("p");    
    Fs.create(p);    
    assertThat(fs.exists(p),is(true));

但是,写入文件的内容并不保证能立即可见,即使数据流已经刷新并存储。所以文件长度显示为0: 

  1. Path p = new Path("p");    
    OutputStream out = fs.create(p);    
    out.write("content".getBytes("UTF-8"));    
    out.flush();    
    assertThat(fs.getFileStatus(p).getLen(),is(0L));

当写入的数据超过一个块后,第一个数据块对新的reader就是可见的。之后的块也不例外。总之,当前正在写入的块对其他reader不可见。 

HDFS提供一个方法来使所有缓存与数据节点强行同步,即对FSDataOutputStream调用sync()方法。当sync()方法返回成功后,对所有新的reader而言,HDFS能保证文件中到目前为止写入的数据均到达所有datanode的写入管道并且对所有新的reader均可见:[ 在Hadoop 1.x之后的版本,sync()方法被丢弃了,进而采用等价的hflush()方法。另外还增加了一个hsync()方法,确保操作系统刷新数据到磁盘(类似于POSIX的fsync方法)。但在本书写作期间,此方法还没有实现,只有hflush()方法。]

  1. Path p = new Path("p");    
    FSDataOutputStream out = fs.create(p);    
    out.write("content".getBytes("UTF-8"));    
    out.flush();    
    out.sync();    
    assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

该操作类似于POSIX中的fsync系统调用,该调用提交的是一个文件描述符的缓冲数据。例如,利用标准Java API将数据写入本地文件,我们能够在刷新数据流且同步之后看到文件内容: 

  1. FileOutputStream out = new FileOutputStream(localFile);    
    out.write("content".getBytes("UTF-8"));    
    out.flush(); // flush to operating system    
    out.getFD().sync(); // sync to disk    
    assertThat(localFile.length(), is(((long) "content".length())));

在HDFS中关闭文件其实还隐含执行sync()方法: 

  1. Path p = new Path("p");    
    OutputStream out = fs.create(p);    
    out.write("content".getBytes("UTF-8"));    
    out.close();

对应用设计的重要性 

这个一致模型和设计应用程序的具体方法息息相关。如果不调用sync()方法,就要准备好在客户端或系统发生故障时可能会丢失数据块。对很多应用来说,这是不可接受的,所以需要在适当的地方调用sync()方法,例如在写入一定的记录或字节之后。尽管sync()操作被设计成尽量减少HDFS负载,但它有许多额外的开销,所以在数据鲁棒性和吞吐量之间就会有所取舍。怎样权衡与具体的应用相关,通过设置不同调用sync()方法的频率来衡量应用程序的性能,最终找到一个合适的频率。 

打赏
赞(0) 打赏
未经允许不得转载:同乐学堂 » 深入hadoop的文件系统与数据流详解

特别的技术,给特别的你!

联系QQ:1071235258QQ群:710045715

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏

error: Sorry,暂时内容不可复制!