简单介绍
https://docs.paralleluniverse.co/quasar/
环境配置
Quasar fibers rely on bytecode instrumentation. This can be done at classloading time via a Java Agent, or at compilation time with an Ant task.
我们采取用maven插件的办法来对我们的代码进行instrumentation
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>instrument-classes</id>
<phase>compile</phase>
<configuration>
<tasks>
<property name="ant_classpath" refid="maven.dependency.classpath"/>
<taskdef name="instrumentationTask"
classname="co.paralleluniverse.fibers.instrument.InstrumentationTask"
classpath="${ant_classpath}"/>
<instrumentationTask allowMonitors="true" allowBlocking="true" check="true"
verbose="true" debug="true">
<fileset dir="${project.build.directory}/classes/" includes="**/*"/>
</instrumentationTask>
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
HELLOWORLD
https://www.cnblogs.com/fnlingnzb-learner/p/11012736.html
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.SuspendableRunnable;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SandBox {
@Suspendable
static void m1() throws InterruptedException, SuspendExecution {
String m = "m1";
m = m2();
}
static String m2() throws SuspendExecution, InterruptedException {
String m = m3();
Strand.sleep(1000);
return m;
}
@Suspendable
static String m3() {
List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList());
return l.toString();
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
int count = 10000;
testThreadpool(count);
testFiber(count);
}
static void testThreadpool(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count);
ExecutorService es = Executors.newFixedThreadPool(200);
LongAdder latency = new LongAdder();
long t = System.currentTimeMillis();
for (int i =0; i< count; i++) {
es.submit(() -> {
long start = System.currentTimeMillis();
try {
m1();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SuspendExecution suspendExecution) {
suspendExecution.printStackTrace();
}
start = System.currentTimeMillis() - start;
latency.add(start);
latch.countDown();
});
}
latch.await();
t = System.currentTimeMillis() - t;
long l = latency.longValue() / count;
System.out.println("thread pool took: " + t + ", latency: " + l + " ms");
es.shutdownNow();
}
static void testFiber(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count);
LongAdder latency = new LongAdder();
long t = System.currentTimeMillis();
for (int i =0; i< count; i++) {
new Fiber<Void>("Caller", new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
long start = System.currentTimeMillis();
m1();
start = System.currentTimeMillis() - start;
latency.add(start);
latch.countDown();
}
}).start();
}
latch.await();
t = System.currentTimeMillis() - t;
long l = latency.longValue() / count;
System.out.println("fiber took: " + t + ", latency: " + l + " ms");
}
}
结果输出
thread pool took: 50283, latency: 1004 ms
fiber took: 1273, latency: 0 ms
可见fiber对于线程切换时间的节省是非常大的。
原理
Fiber Internals We will now cover in some depth the inner workings of Quasar fibers. You should read this section if you’d like to annotate suspendable methods with the @Suspendable annotation rather than by declaring throws SuspendExecution, or if you’re just curious.
Internally, a fiber is a continuation which is then scheduled in a scheduler. A continuation captures the instantaneous state of a computation, and allows it to be suspended and then resumed at a later time from the point where it was suspended. Quasar creates continuations by instrumenting (at the bytecode level) suspendable methods. For scheduling, Quasar uses ForkJoinPool, which is a very efficient, work-stealing, multi-threaded scheduler.
Whenever a class is loaded, Quasar’s instrumentation module (usually run as a Java agent) scans it for suspendable methods. Every suspendable method f is then instrumented in the following way: It is scanned for calls to other suspendable methods. For every call to a suspendable method g, some code is inserted before (and after) the call to g that saves (and restores) the state of a local variables to the fiber’s stack (a fiber manages its own stack), and records the fact that this (i.e. the call to g) is a possible suspension point. At the end of this “suspendable function chain”, we’ll find a call to Fiber.park. park suspends the fiber by throwing a SuspendExecution exception (which the instrumentation prevents you from catching, even if your method contains a catch(Throwable t) block).
If g indeed blocks, the SuspendExecution exception will be caught by the Fiber class. When the fiber is awakened (with unpark), method f will be called, and then the execution record will show that we’re blocked at the call to g, so we’ll immediately jump to the line in f where g is called, and call it. Finally, we’ll reach the actual suspension point (the call to park), where we’ll resume execution immediately following the call. When g returns, the code inserted in f will restore f’s local variables from the fiber stack.
This process sounds complicated, but its incurs a performance overhead of no more than 3%-5%.
首先,quansar会对代码进行instrumentation, 如果一个方法被标记成 @Suspendable,那么就修改成这个代码前后是可以进行切换的(把本地变量保存起来,准备切换去别的地方执行)。如果一个这个方法内部,线程被Block了(被park),那么quansar让他就抛出SuspendExecution(不用担心这个异常被你自己捕获了,quansar已经考虑这个点了),那么当这个异常被quansar收到后,就会触发本线程到别的地方先去执行。一直到这个线程从Block中恢复,然后从实际Block的地方继续执行。
可以看来,quansar不适合CPU密集的线程,而是适合那些经常要线程等待的系统。可以极大的节省CPU切换的损耗。
|