當前位置:
首頁 > 知識 > 如何恢復未釋放租約的HDFS文件

如何恢復未釋放租約的HDFS文件

之前有文章介紹過HDFS租約帶來的問題,導致spark應用無法正常讀取文件,只能將異常文件找出並且刪除後,任務才能繼續執行。

但是刪除文件實在是下下策,而且文件本身其實並未損壞,只是因為已經close的客戶端沒有及時的釋放租約導致。

按照Hadoop官網的說法,HDFS會啟動一個單獨的線程,專門處理未及時釋放的租約,自動釋放超過「硬超時」(默認1小時)仍未釋放的租約,但是從問題的現象上來看,這個線程並沒有正常的工作,甚至懷疑這個線程是否沒有啟動,我使用的是CDH集群,可能與相關的設置有關,這一點需要確認。

如果Hadoop沒有自動清理租約,我們有辦法手動的刷新租約嗎?答案是肯定的。

在網上查看資料時,發現HDFS源碼中的DistributedFileSystem類提供了一個叫做recoverLease的方法,可以主動的刷新租約。但是非常奇怪,既然已經為外界提供了這個介面,為什麼不提供shell指令給用戶使用呢?為什麼只能通過代碼的方式調用呢?我使用的是hadoop-2.6.0,也許後期的版本有所更新,這一點也需要求證。

下面看一下這個方法的源碼:

/**
* Start the lease recovery of a file
*
* @param f a file
* @return true if the file is already closed
* @throws IOException if an error occurs
*/
public boolean recoverLease(final Path f) throws IOException {
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<Boolean> {
@Override
public Boolean doCall(final Path p)
throws IOException, UnresolvedLinkException {
return dfs.recoverLease(getPathName(p));
}
@Override
public Boolean next(final FileSystem fs, final Path p)
throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem)fs;
return myDfs.recoverLease(p);
}
throw new UnsupportedOperationException("Cannot recoverLease through" +
" a symlink to a non-DistributedFileSystem: " + f + " -> " + p);
}
}.resolve(this, absF);
}

有興趣的朋友可以下載hadoop源碼來仔細推敲一下內部的實現原理,這裡我們只說如何調用,解決我們的問題:

public static void recoverLease(String path) throws IOException {
DistributedFileSystem fs = new DistributedFileSystem;
Configuration conf = new Configuration;
fs.initialize(URI.create(path), conf);
fs.recoverLease(new Path(path));
fs.close;
}

這是我編寫的一個調用改介面的簡單的封裝方法,需要注意的是,此處傳入的path,必須是包含文件系統以及namenode和埠號的全路徑,比如:

hdfs://namenode1:9000/xxx/xxx.log

如果只需要恢復單個文件,調用上述方法即可,但是通常情況下,我們需要對一個目錄進行遞歸的處理,即恢復指定目錄下所有租約異常的文件。

這個時候,我們需要先找出指定目錄下所有租約異常的文件,形成一個Set或者List,然後再遍歷這個容器,對每個文件進行恢復。

尋找文件列表的方法如下:

public static Set<String> getOpenforwriteFileList(String dir) throws IOException {
/*拼接URL地址,發送給namenode監聽的dfs.namenode.http-address埠,獲取所需數據*/
StringBuilder url = new StringBuilder;
url.append("/fsck?ugi=").append("dev");
url.append("&openforwrite=1");

/*獲得namenode的主機名以及dfs.namenode.http-address監聽埠,例如:http://hadoopnode1:50070*/
Path dirpath;
URI namenodeAddress;
dirpath = HDFSUtil.getResolvedPath(dir);
namenodeAddress = HDFSUtil.getDFSHttpAddress(dirpath);

url.insert(0, namenodeAddress);
try {
url.append("&path=").append(URLEncoder.encode(
Path.getPathWithoutSchemeAndAuthority(new Path(dir)).toString, "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace;
}

Configuration conf = new Configuration;
URLConnectionFactory connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(conf);
URL path = null;
try {
path = new URL(url.toString);
} catch (MalformedURLException e) {
e.printStackTrace;
}

URLConnection connection;
BufferedReader input = null;
try {
connection = connectionFactory.openConnection(path, UserGroupInformation.isSecurityEnabled);
InputStream stream = connection.getInputStream;
input = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
} catch (IOException | AuthenticationException e) {
e.printStackTrace;
}

if (input == null) {
System.err.println("Cannot get response from namenode, url = " + url);
return null;
}

String line;
Set<String> resultSet = new HashSet<>;
try {
while ((line = input.readLine) != null) {
if (line.contains("MISSING") || line.contains("OPENFORWRITE")) {
String regEx = "/[^ ]*";
Pattern pattern = Pattern.compile(regEx);
Matcher matcher = pattern.matcher(line);
while (matcher.find) {
resultSet.add(matcher.group.replaceAll(":", ""));
}
}
}
} catch (IOException e) {
e.printStackTrace;
} finally {
input.close;
}

return resultSet;

}

其實獲取租約異常列表的方法是我從HDFS源碼的org.apache.hadoop.hdfs.tools.DFSck中仿照而來的,通過向NameNode的dfs.namenode.http-address埠通信,獲取openforwrite狀態的文件列表,然後通過正則匹配以及字元串切割,獲取所需的內容。

順便提一句,由於此代碼是Java代碼,並且返回的Set類型為java.util.Set,如果在Scala代碼中調用,則需要將Set類型轉化為scala.collection.immutable.Set,具體方法如下:

/*獲取需要被恢復租約的文件列表,返回類型為java.util.Set*/
val javaFilesSet = HDFSUtil.getOpenforwriteFileList(hdfsPrefix + recoverDirPath)
if (null == javaFilesSet || javaFilesSet.isEmpty) {
println("No files need to recover lease : " + hdfsPrefix + recoverDirPath)
return
}

/*將java.util.Set轉換成scala.collection.immutable.Set*/
import scala.collection.JavaConverters._
val filesSet = javaFilesSet.asScala.toSet

至此,利用以上兩個方法,即可獲取指定目錄下的所有租約異常的文件列表,然後遍歷調用租約恢復介面,即可實現批量恢復。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 科技優家 的精彩文章:

Windows10系統PHP開發環境配置
從 Vue 1.x 遷移—Vue.js
初識 tk.mybatis.mapper 通用mapper

TAG:科技優家 |

您可能感興趣

Drake 未釋出的 OVO x Air Jordan 14 PE 現身網路!
未釋出NIKE LeBron 15 「Purple Rain」曝光!你期待它市售嗎?
未釋出的 OVO x Air Jordan 4 「Splatter」 Sample
如果在跳蚤市場看到未釋出的 Air Jordan 1,你會買嗎?
$25,000美金!Drake 未釋出的 OVO x AJ14 天價登場!
安倍勝券在握:日本目前並未釋放實力,一旦出手簡直小菜一碟!
陸學者:蔡英文未釋放善意 兩岸關係找不到原點只能兜圈子
楊冪至今仍未釋懷,同台正眼不看他一眼
為什麼說應該遠離市場持幣過節:還有這些風險尚未釋放
珈偉股份再公告與「投之家」無關,但謎團仍未釋清
每日好詩|浮荷,尚未釋放出要找到我的那個濕潤的詞
台媒曝知情人透露范冰冰並未釋放,且一直關押在北京,連媽媽也被迫回國接受談話!
馬伊琍從未釋懷文章出軌姚笛?港媒曝馬伊琍心中如有根刺
洪荒之力尚未釋放,洪荒少女受傷,傷病之後選擇堅持
梅西仍未釋懷歐冠出局!缺席西甲頒獎典禮 和家人待在一起療傷