IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> java中的协程Quasar -> 正文阅读

[Java知识库]java中的协程Quasar

简单介绍

https://docs.paralleluniverse.co/quasar/

环境配置

Quasar fibers rely on bytecode instrumentation. This can be done at classloading time via a Java Agent, or at compilation time with an Ant task.

我们采取用maven插件的办法来对我们的代码进行instrumentation

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-antrun-plugin</artifactId>
                <version>1.8</version>
                <executions>
                    <execution>
                        <id>instrument-classes</id>
                        <phase>compile</phase>
                        <configuration>
                            <tasks>
                                <property name="ant_classpath" refid="maven.dependency.classpath"/>
                                <taskdef name="instrumentationTask"
                                         classname="co.paralleluniverse.fibers.instrument.InstrumentationTask"
                                         classpath="${ant_classpath}"/>
                                <instrumentationTask allowMonitors="true" allowBlocking="true" check="true"
                                                     verbose="true" debug="true">
                                    <fileset dir="${project.build.directory}/classes/" includes="**/*"/>
                                </instrumentationTask>
                            </tasks>
                        </configuration>
                        <goals>
                            <goal>run</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

HELLOWORLD

https://www.cnblogs.com/fnlingnzb-learner/p/11012736.html

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.SuspendableRunnable;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SandBox {
        @Suspendable
        static void m1() throws InterruptedException, SuspendExecution {
            String m = "m1";
            m = m2();
        }
        static String m2() throws SuspendExecution, InterruptedException {
            String m = m3();
            //注意这里不能用Thread.sleep.会导致无法正常运行。
            Strand.sleep(1000);
            return m;
        }

        @Suspendable
        static String m3() {
            List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList());
            return l.toString();
        }
        static public void main(String[] args) throws ExecutionException, InterruptedException {
            int count = 10000;
            testThreadpool(count);
            testFiber(count);
        }


        static void testThreadpool(int count) throws InterruptedException {
            final CountDownLatch latch = new CountDownLatch(count);
            ExecutorService es = Executors.newFixedThreadPool(200);
            LongAdder latency = new LongAdder();
            long t = System.currentTimeMillis();
            for (int i =0; i< count; i++) {
                es.submit(() -> {
                    long start = System.currentTimeMillis();
                    try {
                        m1();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (SuspendExecution suspendExecution) {
                        suspendExecution.printStackTrace();
                    }
                    start = System.currentTimeMillis() - start;
                    latency.add(start);
                    latch.countDown();
                });
            }
            latch.await();
            t = System.currentTimeMillis() - t;
            long l = latency.longValue() / count;
            System.out.println("thread pool took: " + t + ", latency: " + l + " ms");
            es.shutdownNow();
        }
        static void testFiber(int count) throws InterruptedException {
            final CountDownLatch latch = new CountDownLatch(count);
            LongAdder latency = new LongAdder();
            long t = System.currentTimeMillis();
            for (int i =0; i< count; i++) {
                new Fiber<Void>("Caller", new SuspendableRunnable() {
                    @Override
                    public void run() throws SuspendExecution, InterruptedException {
                        long start = System.currentTimeMillis();
                        m1();
                        start = System.currentTimeMillis() - start;
                        latency.add(start);
                        latch.countDown();
                    }
                }).start();
            }
            latch.await();
            t = System.currentTimeMillis() - t;
            long l = latency.longValue() / count;
            System.out.println("fiber took: " + t  + ", latency: " + l + " ms");
        }
    }



结果输出

thread pool took: 50283, latency: 1004 ms
fiber took: 1273, latency: 0 ms

可见fiber对于线程切换时间的节省是非常大的。

原理

Fiber Internals
We will now cover in some depth the inner workings of Quasar fibers. You should read this section if you’d like to annotate suspendable methods with the @Suspendable annotation rather than by declaring throws SuspendExecution, or if you’re just curious.

Internally, a fiber is a continuation which is then scheduled in a scheduler. A continuation captures the instantaneous state of a computation, and allows it to be suspended and then resumed at a later time from the point where it was suspended. Quasar creates continuations by instrumenting (at the bytecode level) suspendable methods. For scheduling, Quasar uses ForkJoinPool, which is a very efficient, work-stealing, multi-threaded scheduler.

Whenever a class is loaded, Quasar’s instrumentation module (usually run as a Java agent) scans it for suspendable methods. Every suspendable method f is then instrumented in the following way: It is scanned for calls to other suspendable methods. For every call to a suspendable method g, some code is inserted before (and after) the call to g that saves (and restores) the state of a local variables to the fiber’s stack (a fiber manages its own stack), and records the fact that this (i.e. the call to g) is a possible suspension point. At the end of this “suspendable function chain”, we’ll find a call to Fiber.park. park suspends the fiber by throwing a SuspendExecution exception (which the instrumentation prevents you from catching, even if your method contains a catch(Throwable t) block).

If g indeed blocks, the SuspendExecution exception will be caught by the Fiber class. When the fiber is awakened (with unpark), method f will be called, and then the execution record will show that we’re blocked at the call to g, so we’ll immediately jump to the line in f where g is called, and call it. Finally, we’ll reach the actual suspension point (the call to park), where we’ll resume execution immediately following the call. When g returns, the code inserted in f will restore f’s local variables from the fiber stack.

This process sounds complicated, but its incurs a performance overhead of no more than 3%-5%.

首先,quansar会对代码进行instrumentation, 如果一个方法被标记成 @Suspendable,那么就修改成这个代码前后是可以进行切换的(把本地变量保存起来,准备切换去别的地方执行)。如果一个这个方法内部,线程被Block了(被park),那么quansar让他就抛出SuspendExecution(不用担心这个异常被你自己捕获了,quansar已经考虑这个点了),那么当这个异常被quansar收到后,就会触发本线程到别的地方先去执行。一直到这个线程从Block中恢复,然后从实际Block的地方继续执行。

可以看来,quansar不适合CPU密集的线程,而是适合那些经常要线程等待的系统。可以极大的节省CPU切换的损耗。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-01-11 23:51:22  更:2022-01-11 23:52:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 9:03:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码