面对现实,超越自己
逆水行舟,不进则退
posts - 269,comments - 32,trackbacks - 0
1、NameNode启动加载元数据情景分析
  • NameNode函数里调用FSNamesystemm读取dfs.namenode.name.dir和dfs.namenode.edits.dir构建FSDirectory。
  • FSImage类recoverTransitionRead和saveNameSpace分别实现了元数据的检查、加载、内存合并和元数据的持久化存储。
  • saveNameSpace将元数据写入到磁盘,具体操作步骤:首先将current目录重命名为lastcheckpoint.tmp;然后在创建新的current目录,并保存文件;最后将lastcheckpoint.tmp重命名为privios.checkpoint.
  • checkPoint的过程:Secondary NameNode会通知nameNode产生一个edit log文件edits.new,之后所有的日志操作写入到edits.new文件中。接下来Secondary NameNode会从namenode下载fsimage和edits文件,进行合并产生新的fsimage.ckpt;然后Secondary会将fsimage.ckpt文件上传到namenode。最后namenode会重命名fsimage.ckpt为fsimage,edtis.new为edits;
2、元数据更新及日志写入情景分析
以mkdir为例:
logSync代码分析:
代码:
  1. public void logSync () throws IOException {  
  2. ArrayList<EditLogOutputStream > errorStreams = null ;  
  3. long syncStart = 0;  
  4.   
  5. // Fetch the transactionId of this thread.  
  6. long mytxid = myTransactionId .get (). txid;  
  7. EditLogOutputStream streams[] = null;  
  8. boolean sync = false;  
  9. try {  
  10. synchronized (this) {  
  11. assert editStreams. size() > 0 : "no editlog streams" ;  
  12. printStatistics (false);  
  13. // if somebody is already syncing, then wait  
  14. while (mytxid > synctxid && isSyncRunning) {  
  15. try {  
  16. wait (1000 );  
  17. catch (InterruptedException ie ) {  
  18. }  
  19. }  
  20. //  
  21. // If this transaction was already flushed, then nothing to do  
  22. //  
  23. if (mytxid <= synctxid ) {  
  24. numTransactionsBatchedInSync ++;  
  25. if (metrics != null// Metrics is non-null only when used inside name node  
  26. metrics .transactionsBatchedInSync .inc ();  
  27. return;  
  28. }  
  29. // now, this thread will do the sync  
  30. syncStart = txid ;  
  31. isSyncRunning = true;  
  32. sync = true;  
  33. // swap buffers  
  34. for( EditLogOutputStream eStream : editStreams ) {  
  35. eStream .setReadyToFlush ();  
  36. }  
  37. streams =  
  38. editStreams .toArray (new EditLogOutputStream[editStreams. size()]) ;  
  39. }  
  40. // do the sync  
  41. long start = FSNamesystem.now();  
  42. for (int idx = 0; idx < streams. length; idx++ ) {  
  43. EditLogOutputStream eStream = streams [idx ];  
  44. try {  
  45. eStream .flush ();  
  46. catch (IOException ie ) {  
  47. FSNamesystem .LOG .error ("Unable to sync edit log." , ie );  
  48. //  
  49. // remember the streams that encountered an error.  
  50. //  
  51. if (errorStreams == null) {  
  52. errorStreams = new ArrayList <EditLogOutputStream >( 1) ;  
  53. }  
  54. errorStreams .add (eStream );  
  55. }  
  56. }  
  57. long elapsed = FSNamesystem.now() - start ;  
  58. processIOError (errorStreams , true);  
  59. if (metrics != null// Metrics non-null only when used inside name node  
  60. metrics .syncs .inc (elapsed );  
  61. finally {  
  62. synchronized (this) {  
  63. synctxid = syncStart ;  
  64. if (sync ) {  
  65. isSyncRunning = false;  
  66. }  
  67. this.notifyAll ();  
  68. }  
  69. }  
  70. }  

3、Backup Node 的checkpoint的过程分析:
  1. /** 
  2. * Create a new checkpoint 
  3. */  
  4. void doCheckpoint() throws IOException {  
  5. long startTime = FSNamesystem.now ();  
  6. NamenodeCommand cmd =  
  7. getNamenode().startCheckpoint( backupNode. getRegistration());  
  8. CheckpointCommand cpCmd = null;  
  9. switch( cmd. getAction()) {  
  10. case NamenodeProtocol .ACT_SHUTDOWN :  
  11. shutdown() ;  
  12. throw new IOException ("Name-node " + backupNode .nnRpcAddress  
  13. " requested shutdown.");  
  14. case NamenodeProtocol .ACT_CHECKPOINT :  
  15. cpCmd = (CheckpointCommand )cmd ;  
  16. break;  
  17. default:  
  18. throw new IOException ("Unsupported NamenodeCommand: "+cmd.getAction()) ;  
  19. }  
  20.   
  21. CheckpointSignature sig = cpCmd. getSignature();  
  22. assert FSConstants.LAYOUT_VERSION == sig .getLayoutVersion () :  
  23. "Signature should have current layout version. Expected: "  
  24. + FSConstants.LAYOUT_VERSION + " actual " + sig. getLayoutVersion();  
  25. assert !backupNode .isRole (NamenodeRole .CHECKPOINT ) ||  
  26. cpCmd. isImageObsolete() : "checkpoint node should always download image.";  
  27. backupNode. setCheckpointState(CheckpointStates .UPLOAD_START );  
  28. if( cpCmd. isImageObsolete()) {  
  29. // First reset storage on disk and memory state  
  30. backupNode. resetNamespace();  
  31. downloadCheckpoint(sig);  
  32. }  
  33.   
  34. BackupStorage bnImage = getFSImage() ;  
  35. bnImage. loadCheckpoint(sig);  
  36. sig.validateStorageInfo( bnImage) ;  
  37. bnImage. saveCheckpoint();  
  38.   
  39. if( cpCmd. needToReturnImage())  
  40. uploadCheckpoint(sig);  
  41.   
  42. getNamenode() .endCheckpoint (backupNode .getRegistration (), sig );  
  43.   
  44. bnImage. convergeJournalSpool();  
  45. backupNode. setRegistration(); // keep registration up to date  
  46. if( backupNode. isRole( NamenodeRole.CHECKPOINT ))  
  47. getFSImage() .getEditLog (). close() ;  
  48. LOG. info( "Checkpoint completed in "  
  49. + (FSNamesystem .now() - startTime )/ 1000 + " seconds."  
  50. " New Image Size: " + bnImage .getFsImageName (). length()) ;  
  51. }  
  52. }  

4、元数据可靠性机制。
  • 配置多个备份路径。NameNode在更新日志或进行Checkpoint的过程,会将元数据放在多个目录下。
  • 对于没一个需要保存的元数据文件,都创建一个输出流,对访问过程中出现的异常输出流进行处理,将其移除。并再合适的时机再次检查移除的数据量是否恢复正常。有效的保证了备份输出流的异常问题。
  • 采用了多种机制来保证元数据的可靠性。例如在checkpoint的过程中,分为几个阶段,通过不同的文件名来标识当前所处的状态。为存储失败后进行恢复提供了可能。
5、元数据的一致性机制。
  • 首先从NameNode启动时,对每个备份目录是否格式化、目录元数据文件名是否正确等进行检查,确保元数据文件间的状态一致性,然后选取最新的加载到内存,这样可以确保HDFS当前状态和最后一次关闭时的状态一致性。
  • 其次,通过异常输出流的处理,可以确保正常输出流数据的一致性。
  • 运用同步机制,确保了输出流一致性问题。

    本文转自:
    http://blog.csdn.net/kntao/article/details/7770597
posted on 2013-05-24 15:29 王海光 阅读(462) 评论(0)  编辑 收藏 引用 所属分类: Linux

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理