概述
PurgeMonitor
PurgeMonitor主要对过期失效的parity文件、目录进?行删除。 PurgeMonitor最主要的是确定purge的元素,它是通过DirectoryTraversal来对进?行遍历, 需要提供check?方法,来确定返回符合purge条件的元素。!首先是对directory进?行purge。 然后是对parity进?行purge,同时检查是否需要将parity移动(原因是parity尽可能分散,防?止 某个节点挂起导致的数据丢失)。
PlacementMonitor线程
内部主要靠BlockMover线程?工作,与PurgeMonitor配合,通过拉取datanode topology以 及socket通知DataNode将block到其它DataNode上。
实现细节
线程的创建和运行
在RaidNode.initialize方法中进行线程的创建和运行。
this.triggerThread = new Daemon(new TriggerMonitor());
this.triggerThread.setName("Trigger Thread");
this.triggerThread.start();
this.placementMonitor = new PlacementMonitor(conf);
this.placementMonitor.start();
PlacementMonitor线程的运行细节
PlacementMonitor----》BlockMover—》ClusterInfo,实际上运行的是ClusterInfo线程。其作用是定期从集群中获取DataNode节点信息,更新网络拓扑结构。 ClusterInfo.run 方法
class ClusterInfo implements Runnable {
NetworkTopology topology = new NetworkTopology();
DatanodeInfo liveNodes[];
static final long UPDATE_PERIOD = 60000L;
volatile boolean running = true;
long lastUpdate = -1L;
@Override
public void run() {
while (running) {
try {
long now = System.currentTimeMillis();
if (now - lastUpdate > UPDATE_PERIOD) {
lastUpdate = now;
synchronized (this) {
DFSClient client = new DFSClient(conf);
liveNodes =
client.getNamenode().getDatanodeReport(DatanodeReportType.LIVE);
for (DatanodeInfo n : liveNodes) {
topology.add(n);
}
}
LOG.info("The time is "+(System.currentTimeMillis()-lastUpdate));
}
Thread.sleep(UPDATE_PERIOD / 10);
}
}
}
PurgeMonitor线程的运行细节
PurgeMonitor.purgeCode 这里的主要逻辑在于创建DirectoryTraversal类,这个类会遍历raid相关的文件,将待删除文件(数据块已经被删除的校验块)放到obsoleteParityFileRetriever 中的容器中,然后进行删除。
void purgeCode(ErasureCodeType code) throws IOException {
Path parityPath = RaidNode.getDestinationPath(code, conf);
FileSystem parityFs = parityPath.getFileSystem(conf);
purgeDirectories(parityFs, parityPath);
FileSystem srcFs = parityFs;
FileStatus stat = null;
try {
stat = parityFs.getFileStatus(parityPath);
} catch (FileNotFoundException e) {}
if (stat == null) return;
LOG.info("Purging obsolete parity files for " + parityPath);
DirectoryTraversal obsoleteParityFileRetriever =
new DirectoryTraversal(
"Purge File ",
Collections.singletonList(parityPath),
parityFs,
new PurgeParityFileFilter(conf, code, srcFs, parityFs,
parityPath.toUri().getPath(), placementMonitor, entriesProcessed),
directoryTraversalThreads,
directoryTraversalShuffle);
FileStatus obsolete = null;
while ((obsolete = obsoleteParityFileRetriever.next()) !=
DirectoryTraversal.FINISH_TOKEN) {
performDelete(parityFs, obsolete.getPath(), false);
}
DirectoryTraversal obsoleteParityHarRetriever =
new DirectoryTraversal(
"Purge HAR ",
Collections.singletonList(parityPath),
parityFs,
new PurgeHarFilter(conf, code, srcFs, parityFs,
parityPath.toUri().getPath(), placementMonitor, entriesProcessed),
directoryTraversalThreads,
directoryTraversalShuffle);
while ((obsolete = obsoleteParityHarRetriever.next()) !=
DirectoryTraversal.FINISH_TOKEN) {
performDelete(parityFs, obsolete.getPath(), true);
}
}
DirectoryTraversal 在 DirectoryTraversal 的初始化中,将根目录放入到directories中,然后使用深搜下面的子目录和文件,创建Processor线程进行具体的执行。
public DirectoryTraversal(String friendlyName, Collection<Path> roots,
FileSystem fs, Filter filter, int numThreads, boolean doShuffle,
boolean allowUseStandby)
throws IOException {
this.output = new ArrayBlockingQueue<FileStatus>(OUTPUT_QUEUE_SIZE);
this.directories = new LinkedBlockingDeque<Path>();
this.fs = fs;
this.filter = filter;
this.totalDirectories = new AtomicInteger(roots.size());
this.processors = new Processor[numThreads];
this.activeThreads = new AtomicInteger(numThreads);
this.doShuffle = doShuffle;
if (doShuffle) {
List<Path> toShuffleAndAdd = new ArrayList<Path>();
toShuffleAndAdd.addAll(roots);
Collections.shuffle(toShuffleAndAdd);
this.directories.addAll(toShuffleAndAdd);
} else {
this.directories.addAll(roots);
}
LOG.info("Starting with directories:" + roots.toString() +
" numThreads:" + numThreads);
for (int i = 0; i < processors.length; ++i) {
processors[i] = new Processor();
processors[i].setName(friendlyName + i);
}
for (int i = 0; i < processors.length; ++i) {
processors[i].start();
}
}
Processors.run
public void run() {
this.cache = PlacementMonitor.locatedFileStatusCache.get();
List<Path> subDirs = new ArrayList<Path>();
List<FileStatus> filtered = new ArrayList<FileStatus>();
try {
while (!finished && totalDirectories.get() > 0) {
Path dir = null;
try {
dir = directories.poll(1000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
continue;
}
if (dir == null) {
continue;
}
try {
filterDirectory(dir, subDirs, filtered);
} catch (Throwable ex) {
LOG.error(getName() + " throws Throwable. Skip " + dir, ex);
totalDirectories.decrementAndGet();
continue;
}
int numOfDirectoriesChanged = -1 + subDirs.size();
if (totalDirectories.addAndGet(numOfDirectoriesChanged) == 0) {
interruptProcessors();
}
submitOutputs(filtered, subDirs);
}
}
}
filterDirectory方法进行相关文件或者目录的判断(重点),后续重点调用PurgeMonitor.check方法判断该文件是否被删除。
submitOutputs方法,subDirs是需要继续递归搜索的子目录,加入到队列中;filtered是需要删除的文件,加入到容器中。
private void submitOutputs(List<FileStatus> filtered, List<Path> subDirs) {
if (doShuffle) {
Collections.shuffle(subDirs);
}
for (Path subDir : subDirs) {
while (!finished) {
try {
directories.putFirst(subDir);
break;
} catch (InterruptedException e) {
}
}
}
for (FileStatus out : filtered) {
while (!finished) {
try {
output.put(out);
break;
} catch (InterruptedException e) {
}
}
}
}
}
PurgeMonitor.check方法用来判断校验文件是否应该被删除,也就是与数据文件无法对应的校验文件。
public boolean check(FileStatus f) throws IOException {
if (f.isDir()) return false;
String pathStr = f.getPath().toUri().getPath();
if (!pathStr.startsWith(parityPrefix)) return false;
if (pathStr.indexOf(RaidNode.HAR_SUFFIX) != -1) return false;
counter.incrementAndGet();
String src = pathStr.replaceFirst(parityPrefix, "");
Path srcPath = new Path(src);
boolean shouldDelete = false;
FileStatus srcStat = null;
try {
srcStat = srcFs.getFileStatus(srcPath);
} catch (FileNotFoundException e) {
shouldDelete = true;
}
if (!shouldDelete) {
try {
if (!shouldDelete) {
ParityFilePair ppair =
ParityFilePair.getParityFile(code, srcPath, conf);
if ( ppair == null ||
!parityFs.equals(ppair.getFileSystem()) ||
!pathStr.equals(ppair.getPath().toUri().getPath())) {
shouldDelete = true;
} else {
if (placementMonitor != null) {
placementMonitor.checkFile(srcFs, srcStat,
ppair.getFileSystem(), ppair.getFileStatus(), code);
}
}
}
}
}
return shouldDelete;
}
}
如果检验文件与数据文件相对应,则进入到checkFile方法,进行监测和分散位于相关节点的raid文件的block,具体逻辑在于checkBlockLocations方法中,先按照你设置的多少个数据块生成多少个校验块,比较EC(5,3),5个数据块和3个检验块是一个stripe,将raid文件分成相应的stripe;获取每个stripe上的对应的block;统计stripe上block在DataNode节点上分布情况,如果发现多个block(大于1个)集中在一个DataNode节点上在log中进行报错;移动单个节点上有多个block的上的block。
void checkBlockLocations(List<BlockInfo> srcBlocks,
List<BlockInfo> parityBlocks, ErasureCodeType code,
FileStatus srcFile, BlockAndDatanodeResolver resolver) throws IOException {
if (srcBlocks == null || parityBlocks == null) {
return;
}
int stripeLength = RaidNode.getStripeLength(conf);
int parityLength = code == ErasureCodeType.XOR ?
1 : RaidNode.rsParityLength(conf);
int numBlocks = (int)Math.ceil(1D * srcFile.getLen() /
srcFile.getBlockSize());
int numStripes = (int)Math.ceil(1D * (numBlocks) / stripeLength);
Map<String, Integer> nodeToNumBlocks = new HashMap<String, Integer>();
Set<String> nodesInThisStripe = new HashSet<String>();
for (int stripeIndex = 0; stripeIndex < numStripes; ++stripeIndex) {
List<BlockInfo> stripeBlocks = getStripeBlocks(
stripeIndex, srcBlocks, stripeLength, parityBlocks, parityLength);
countBlocksOnEachNode(stripeBlocks, nodeToNumBlocks, nodesInThisStripe, srcFile);
logBadFile(nodeToNumBlocks, parityLength, srcFile);
updateBlockPlacementHistogram(nodeToNumBlocks, blockHistograms.get(code));
submitBlockMoves(
nodeToNumBlocks, stripeBlocks, nodesInThisStripe, resolver);
}
}
在BlockMover.java的run方法中是执行移动block的具体实现,在源服务器和目标服务器之间建立连接,调用replaceBook方法之间移动block。
public void run() {
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
chooseNodes();
if (simulate) {
LOG.debug("Simulate mode. Skip move target:" + target +
" source:" + source + " proxySource:" + proxySource);
metrics.blockMove.inc();
return;
}
sock = new Socket();
sock.connect(NetUtils.createSocketAddr(
target.getName()), HdfsServerConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
sock.setSoTimeout(3600000);
out = new DataOutputStream( new BufferedOutputStream(
sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
if (LOG.isDebugEnabled()) {
LOG.debug( "Start moving block " + block.getBlock().getBlockId() +
" from "+ source.getName() +
" to " + target.getName() +
" through " + proxySource.getName());
}
ExtendedBlock eb = block.getBlock();
final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
sendRequest(out, eb, accessToken,source.getDatanodeUuid(),proxySource);
in = new DataInputStream( new BufferedInputStream(
sock.getInputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
receiveResponse(in);
metrics.blockMove.inc();
LOG.info( "Moving block " + block.getBlock().getBlockId() +
" from "+ source.getName() +
" to " + target.getName() +
" through " + proxySource.getName() + " succeed.");
} catch (Exception e) {
LOG.warn("Error moving block " + block.getBlock().getBlockId() +
" from " + ((source == null) ? "null" : source.getName()) + " to " +
target.getName() + " through " + proxySource.getName(), e);
if (e instanceof EOFException) {
LOG.warn("Moving block " + block.getBlock().getBlockId() +
" was cancelled because the time exceeded the limit");
}
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
}
}
|