【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1 【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2 【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part3
11:Backend
public static final String EDGESTORE_NAME = "edgestore";
public static final String INDEXSTORE_NAME = "graphindex";
public static final String METRICS_STOREMANAGER_NAME = "storeManager";
public static final String METRICS_MERGED_STORE = "stores";
public static final String METRICS_MERGED_CACHE = "caches";
public static final String METRICS_CACHE_SUFFIX = ".cache";
public static final String LOCK_STORE_SUFFIX = "_lock_";
public static final String SYSTEM_TX_LOG_NAME = "txlog";
public static final String SYSTEM_MGMT_LOG_NAME = "systemlog";
public static final double EDGESTORE_CACHE_PERCENT = 0.8;
public static final double INDEXSTORE_CACHE_PERCENT = 0.2;
private static final long ETERNAL_CACHE_EXPIRATION = 1000L *3600*24*365*200;
public static final int THREAD_POOL_SIZE_SCALE_FACTOR = 2;
private final KeyColumnValueStoreManager storeManager;
private final KeyColumnValueStoreManager storeManagerLocking;
private final StoreFeatures storeFeatures;
private KCVSCache edgeStore;
private KCVSCache indexStore;
private KCVSCache txLogStore;
private IDAuthority idAuthority;
private KCVSConfiguration systemConfig;
private KCVSConfiguration userConfig;
private boolean hasAttemptedClose;
private final StandardScanner scanner;
private final KCVSLogManager managementLogManager;
private final KCVSLogManager txLogManager;
private final LogManager userLogManager;
private final Map<String, IndexProvider> indexes;
private final int bufferSize;
private final Duration maxWriteTime;
private final Duration maxReadTime;
private final boolean cacheEnabled;
private final ExecutorService threadPool;
private final Function<String, Locker> lockerCreator;
private final ConcurrentHashMap<String, Locker> lockers = new ConcurrentHashMap<>();
private final Configuration configuration;
public Backend(Configuration configuration) {
this.configuration = configuration;
KeyColumnValueStoreManager manager = getStorageManager(configuration);
if (configuration.get(BASIC_METRICS)) {
storeManager = new MetricInstrumentedStoreManager(manager,METRICS_STOREMANAGER_NAME,configuration.get(METRICS_MERGE_STORES),METRICS_MERGED_STORE);
} else {
storeManager = manager;
}
indexes = getIndexes(configuration);
storeFeatures = storeManager.getFeatures();
managementLogManager = getKCVSLogManager(MANAGEMENT_LOG);
txLogManager = getKCVSLogManager(TRANSACTION_LOG);
userLogManager = getLogManager(USER_LOG);
cacheEnabled = !configuration.get(STORAGE_BATCH) && configuration.get(DB_CACHE);
int bufferSizeTmp = configuration.get(BUFFER_SIZE);
Preconditions.checkArgument(bufferSizeTmp > 0, "Buffer size must be positive");
if (!storeFeatures.hasBatchMutation()) {
bufferSize = Integer.MAX_VALUE;
} else bufferSize = bufferSizeTmp;
maxWriteTime = configuration.get(STORAGE_WRITE_WAITTIME);
maxReadTime = configuration.get(STORAGE_READ_WAITTIME);
if (!storeFeatures.hasLocking()) {
Preconditions.checkArgument(storeFeatures.isKeyConsistent(),"Store needs to support some form of locking");
storeManagerLocking = new ExpectedValueCheckingStoreManager(storeManager,LOCK_STORE_SUFFIX,this,maxReadTime);
} else {
storeManagerLocking = storeManager;
}
if (configuration.get(PARALLEL_BACKEND_OPS)) {
int poolSize = Runtime.getRuntime().availableProcessors() * THREAD_POOL_SIZE_SCALE_FACTOR;
threadPool = Executors.newFixedThreadPool(poolSize);
log.info("Initiated backend operations thread pool of size {}", poolSize);
} else {
threadPool = null;
}
final String lockBackendName = configuration.get(LOCK_BACKEND);
if (REGISTERED_LOCKERS.containsKey(lockBackendName)) {
lockerCreator = REGISTERED_LOCKERS.get(lockBackendName);
} else {
throw new JanusGraphConfigurationException("Unknown lock backend \"" +
lockBackendName + "\". Known lock backends: " +
Joiner.on(", ").join(REGISTERED_LOCKERS.keySet()) + ".");
}
Preconditions.checkNotNull(lockerCreator);
scanner = new StandardScanner(storeManager);
}
12,BypassMergeSortShuffleWriter
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i < numPartitions; i++) {
final DiskBlockObjectWriter writer = partitionWriters[i];
partitionWriterSegments[i] = writer.commitAndGet();
writer.close();
}
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
try {
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
13:DiskBlockObjectWriter
def commitAndGet(): FileSegment = {
if (streamOpen) {
objOut.flush()
bs.flush()
objOut.close()
streamOpen = false
if (syncWrites) {
val start = System.nanoTime()
fos.getFD.sync()
writeMetrics.incWriteTime(System.nanoTime() - start)
}
val pos = channel.position()
val fileSegment = new FileSegment(file, committedPosition, pos - committedPosition)
committedPosition = pos
writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition
numRecordsWritten = 0
fileSegment
} else {
new FileSegment(file, committedPosition, 0)
}
}
14:IndexShuffleBlockResolver
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val dataFile = getDataFile(shuffleId, mapId)
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
} else {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
}
}
15:Executor:
override def run(): Unit = {
val startTimeMs = System.currentTimeMillis()
def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
def timeoutExceeded(): Boolean = killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs
try {
taskRunner.kill(interruptThread = interruptThread, reason = reason)
var finished: Boolean = false
while (!finished && !timeoutExceeded()) {
taskRunner.synchronized {
if (taskRunner.isFinished) {
finished = true
} else {
taskRunner.wait(killPollingIntervalMs)
}
}
if (taskRunner.isFinished) {
finished = true
} else {
logWarning(s"Killed task $taskId is still running after $elapsedTimeMs ms")
if (takeThreadDump) {
try {
Utils.getThreadDumpForThread(taskRunner.getThreadId).foreach { thread =>
if (thread.threadName == taskRunner.threadName) {
logWarning(s"Thread dump from task $taskId:\n${thread.stackTrace}")
}
}
} catch {
case NonFatal(e) =>
logWarning("Exception thrown while obtaining thread dump: ", e)
}
}
}
}
if (!taskRunner.isFinished && timeoutExceeded()) {
if (isLocal) {
logError(s"Killed task $taskId could not be stopped within $killTimeoutMs ms; " +
"not killing JVM because we are running in local mode.")
} else {
throw new SparkException(
s"Killing executor JVM because killed task $taskId could not be stopped within " +
s"$killTimeoutMs ms.")
}
}
} finally {
taskReaperForTask.synchronized {
taskReaperForTask.get(taskId).foreach { taskReaperInMap =>
if (taskReaperInMap eq this) {
taskReaperForTask.remove(taskId)
} else {
}
}
}
}
}
}
|