一、Future的应用?
void eventLoopDemo() {
print('eventLoopDemo #1 of 2');
Future.microtask(() => print('microtask #1 of 3'));
//使用delay方式,是将此task放到queue的尾部,
//若前面有耗时操作,不一定能准时执行
new Future.delayed(
new Duration(seconds: 1), () => print('event #1 (delayed)'));
//使用then,是表示在此task执行后立刻执行
new Future(() => print('event #2 of 4'))
.then((_) => print('event #2a'))
.then((_) {
print('event #2b');
Future.microtask(() => print('microtask #0 (from event #2b)'));
}).then((_) => print('event #2c'));
Future.microtask(() => print('microtask #2 of 3'));
new Future(() => print('event #3 of 4'))
.then((_) => new Future(() => print('event #3a (a new event)')))
.then((_) => print('event #3b'));
new Future(() => print('event #4 of 4')).then((_) {
new Future(() => print('event #4a'));
}).then((_) => print('event #4b'));
Future.microtask(() => print('microtask #3 of 3'));
print('eventLoopDemo #2 of 2');
}
//打印结果
isolateDemo #1 of 2
isolateDemo #2 of 2
microtask #1 of 3
microtask #2 of 3
microtask #3 of 3
event #2 of 4
event #2a
event #2b
event #2c
microtask #0 (from event #2b)
event #3 of 4
event #4 of 4
event #4b
event #3a (a new event)
event #3b
event #4a
event #1 (delayed)
其中拆解Future
void eventLoopDemo() {
//使用delay方式,是将此task放到queue的尾部,
//若前面有耗时操作,不一定能准时执行
new Future(() => print('cxb #3 of 4'))
.then((_) => new Future(() => print('cxb #3a (a new cxb)')))
.then((_) => print('cxb #3b'));
new Future(() => print('cxb #4 of 4')).then((_) {
new Future(() => print('cxb #4a'));
}).then((_) => print('cxb #4b'));
}
打印:
flutter: cxb #3 of 4
flutter: cxb #4 of 4
flutter: cxb #4b
flutter: cxb #3a (a new cxb)
flutter: cxb #3b
flutter: cxb #4a
如果把? cxb?#4a 这一行换成
void eventLoopDemo() {
//使用delay方式,是将此task放到queue的尾部,
//若前面有耗时操作,不一定能准时执行
new Future(() => print('cxb #3 of 4'))
.then((_) => new Future(() => print('cxb #3a (a new cxb)')))
.then((_) => print('cxb #3b'));
new Future(() => print('cxb #4 of 4')).then((_) {
return new Future(() => print('cxb #4a'));
}).then((_) => print('cxb #4b'));
}
打印:
flutter: cxb #3 of 4
flutter: cxb #4 of 4
flutter: cxb #3a (a new cxb)
flutter: cxb #3b
flutter: cxb #4a
flutter: cxb #4b
可以看到由于return 一个Future,导致 cxb?#4a延后了。
得出结论
1、Future.delayed需要延迟执行的,是在延迟时间到了之后才将此task加到event queue的队尾,所以万一前面有很耗时的任务,那么你的延迟task不一定能准时运行。
2、Future.then每次都会返回一个Future,默认是其本身。如果在then中函数也返回一个新的Future,则新Future会重新加入到event queue中等待执行
3、一个event task运行完后,会先去查看Micro queue里有没有可以执行的micro task。没有的话,在执行下一个event task
------------------------------------------------------------------------------------------------------------------------------
类似于ios中dispatch_group效果组合
void futureTest() {
print("future start");
Future.wait([
// 2秒后返回结果
Future.delayed(new Duration(seconds: 5), () {
print("hello");
return "hello";
}).whenComplete(() => print('hello event 执行完成')),
// 4秒后返回结果
Future.delayed(new Duration(seconds: 6), () {
print("world");
return " world";
}).whenComplete(() => print('world event 执行完成')),
]).then((results) {
// 上面的两个任务执行完毕后进入
print("future finish $results");
}).catchError((e) {
// 执行失败会走到这里
print(e);
}).whenComplete(() {
// 无论成功或失败都会走到这里
print('hello world 执行完成');
});
}
flutter: hello
flutter: hello event 执行完成
flutter: world
flutter: world event 执行完成
flutter: future finish [hello, world]
flutter: hello world 执行完成
二、isolate
1、能互相通信
_testIsolate() async {
ReceivePort rp1 = new ReceivePort();
SendPort port1 = rp1.sendPort;
// 通过spawn新建一个isolate,并绑定静态方法
Isolate newIsolate = await Isolate.spawn(doWork, port1);
SendPort port2;
rp1.listen((message) {
print("rp1 收到消息: $message"); //2. 4. 7.rp1收到消息
if (message[0] == 0) {
port2 = message[1]; //得到rp2的发送器port2
} else {
if (port2 != null) {
print("port2 发送消息");
port2?.send([1, "这条信息是 port2 在main isolate中 发送的"]); // 8.port2发送消息
}
}
});
print("port1--main isolate发送消息");
port1.send([1, "这条信息是 port1 在main isolate中 发送的"]); //1.port1发送消息
// newIsolate.kill();
}
// 新的isolate中可以处理耗时任务
static void doWork(SendPort port1) {
ReceivePort rp2 = new ReceivePort();
SendPort port2 = rp2.sendPort;
rp2.listen((message) {
//9.10 rp2收到消息
print("rp2 收到消息: $message");
});
// 将新isolate中创建的SendPort发送到main isolate中用于通信
print("port1--new isolate发送消息");
port1.send([0, port2]); //3.port1发送消息,传递[0,rp2的发送器]
// 模拟耗时5秒
sleep(Duration(seconds: 5));
print("port1--new isolate发送消息");
port1.send([1, "这条信息是 port1 在new isolate中 发送的"]); //5.port1发送消息
print("port2--new isolate发送消息");
port2.send([1, "这条信息是 port2 在new isolate中 发送的"]); //6.port2发送消息
}
//I/flutter (14639): port1--main isolate发送消息
//I/flutter (14639): rp1 收到消息: [1, 这条信息是 port1 在main isolate中 发送的]
//I/flutter (14639): port1--new isolate发送消息
//I/flutter (14639): rp1 收到消息: [0, SendPort]
//I/flutter (14639): port1--new isolate发送消息
//I/flutter (14639): port2--new isolate发送消息
//I/flutter (14639): rp1 收到消息: [1, 这条信息是 port1 在new isolate中 发送的]
//I/flutter (14639): port2 发送消息
//I/flutter (14639): rp2 收到消息: [1, 这条信息是 port2 在new isolate中 发送的]
//I/flutter (14639): rp2 收到消息: [1, 这条信息是 port2 在main isolate中 发送的]
其实就是需要把rootIsolate和新建的newisolate各自的SendPort传递给对方,让双方都持有SendPort,这样才能通过自己的SendPort发送消息给对应的ReceivePort
2、dynamic result = await receivePort.first;?只能收第一条信息
_testIsolate() async {
ReceivePort rp1 = new ReceivePort();
SendPort port1 = rp1.sendPort;
// 通过spawn新建一个isolate,并绑定静态方法
Isolate newIsolate = await Isolate.spawn(doWork, port1);
SendPort port2;
dynamic receiveMsg = await rp1.first; //只拿到第一条收到结果
print('rp1 收到消息--$receiveMsg');
if (receiveMsg is SendPort) {
SendPort port2 = receiveMsg;
// print('rp1 收到消息--port2');
port2.send([1, "这条信息是 port2 在main isolate中 发送的"]);
}
// newIsolate.kill();
}
// 新的isolate中可以处理耗时任务
static void doWork(SendPort port1) {
ReceivePort rp2 = new ReceivePort();
SendPort port2 = rp2.sendPort;
rp2.listen((message) {
print("rp2 收到消息-- $message");
});
// 将新isolate中创建的SendPort发送到main isolate中用于通信
print("port1--new isolate发送消息--port2");
port1.send(port2);
// 模拟耗时5秒
sleep(Duration(seconds: 5));
print("port1--new isolate发送消息--啊哈哈");
port1.send("啊哈哈");
}
//flutter: port1--new isolate发送消息--port2
//flutter: rp1 收到消息--SendPort
//flutter: port1--new isolate发送消息--啊哈哈
//flutter: rp2 收到消息-- [1, 这条信息是 port2 在main isolate中 发送的]
dynamic receiveMsg = await rp1.first相当于一次性连接,后边的 port1.send("啊哈哈");其实并没有收到。
3、解决问题示例
1.创建isolate 2.打通两个isolate的通道(能互相发送消息) 3.main isolate将要计算的最大数传递给new isolate; newisolate计算,计算完成后,将结果发送回 main isolate
calculation(int n, Function(int result) success) async {
//创建一个ReceivePort
final receivePort1 = new ReceivePort();
//创建isolate
Isolate isolate = await Isolate.spawn(createIsolate, receivePort1.sendPort);
receivePort1.listen((message) {
if (message is SendPort) {
SendPort sendPort2 = message;
sendPort2.send(n);
} else {
print(message);
success(message);
}
});
}
//创建isolate必须要的参数
static void createIsolate(SendPort sendPort1) {
final receivePort2 = new ReceivePort();
//绑定
print("sendPort1发送消息--sendPort2");
sendPort1.send(receivePort2.sendPort);
//监听
receivePort2.listen((message) {
//获取数据并解析
print("receivePort2接收到消息--$message");
if (message is int) {
num result = summ(message);
sendPort1.send(result);
}
});
}
//计算0到 num 数值的总和
static num summ(int num) {
int count = 0;
while (num > 0) {
count = count + num;
num--;
}
return count;
}
4、receivePort.first只能接收第一次消息怎么办?
static Future<dynamic> calculation(int n) async {
//创建一个ReceivePort
final receivePort1 = new ReceivePort();
//创建isolate
Isolate isolate = await Isolate.spawn(createIsolate, receivePort1.sendPort);
//使用 receivePort1.first 获取sendPort1发送来的数据
final sendPort2 = await receivePort1.first as SendPort;
print("receivePort1接收到消息--sendPort2");
//接收消息的ReceivePort
final answerReceivePort = new ReceivePort();
print("sendPort2发送消息--[$n,answerSendPort]");
sendPort2.send([n, answerReceivePort.sendPort]);
//获得数据并返回
num result = await answerReceivePort.first;
print("answerReceivePort接收到消息--计算结果$result");
return result;
}
//创建isolate必须要的参数
static void createIsolate(SendPort sendPort1) {
final receivePort2 = new ReceivePort();
//绑定
print("sendPort1发送消息--sendPort2");
sendPort1.send(receivePort2.sendPort);
//监听
receivePort2.listen((message) {
//获取数据并解析
print("receivePort2接收到消息--$message");
final n = message[0] as num;
final send = message[1] as SendPort;
//返回结果
num result = summ(n);
print("answerSendPort发送消息--计算结果$result");
send.send(result);
});
}
//计算0到 num 数值的总和
static num summ(int num) {
int count = 0;
while (num > 0) {
count = count + num;
num--;
}
return count;
}
5、isolate的暂停、恢复、结束
//恢复 isolate 的使用
isolate.resume(isolate.pauseCapability);
//暂停 isolate 的使用
isolate.pause(isolate.pauseCapability);
//结束 isolate 的使用
isolate.kill(priority: Isolate.immediate);
//赋值为空 便于内存及时回收
isolate = null;
三、更简洁的创建isolate
import 'package:flutter/foundation.dart';
import 'dart:io';
// 创建一个新的Isolate,在其中运行任务doWork
create_new_task() async{
var str = "New Task";
var result = await compute(doWork, str);
print(result);
}
static String doWork(String value){
print("new isolate doWork start");
// 模拟耗时5秒
sleep(Duration(seconds:5));
print("new isolate doWork end");
return "complete:$value";
}
TextButton(
child: Text('flutter创建isolate'),
onPressed: () async {
num result = await compute(summ, 10000000000);
content = "计算结果$result";
setState(() {});
},
),
//计算0到 num 数值的总和
static num summ(int num) {
int count = 0;
while (num > 0) {
count = count + num;
num--;
}
return count;
}
四、封装管理
import 'dart:isolate';
typedef LikeCallback = void Function(Object value);
class ThreadManagement {
//entryPoint 必须是静态方法
static Future<Map> runTask (void entryPoint(SendPort message), LikeCallback(Object value),{Object parameter})async{
final response = ReceivePort();
Isolate d = await Isolate.spawn(entryPoint, response.sendPort);
// 调用sendReceive自定义方法
if(parameter!=null){
SendPort sendPort = await response.first;
ReceivePort receivePort = ReceivePort();
sendPort.send([parameter, receivePort.sendPort]);
receivePort.listen((value){
receivePort.close();
d.kill();
LikeCallback(value);
});
return {
'isolate': d,
"receivePort":receivePort,
};
}else{
response.listen((value){
response.close();
d.kill();
LikeCallback(value);
});
return {
'isolate': d,
"receivePort":response,
};
}
}
}
// 无参数的任务
static void getNoParamTask(SendPort port) async {
var c = await Future.delayed(Duration(seconds: 1), () {
return "banner data";
});
port.send(c);
}
// 需要参数的任务
static getParamsTask(SendPort port) async {
ReceivePort receivePort = ReceivePort();
port.send(receivePort.sendPort);
// 监听外界调用
await for (var msg in receivePort) {
Map requestURL =msg[0];
SendPort callbackPort =msg[1];
receivePort.close();
var res = await Future.delayed(Duration(seconds: 1), () {
var requestUrl = requestURL["type"];
var after = requestURL["after"];
return "url = $requestUrl, after = $after";
});
callbackPort.send(res);
}
}
调用
// 调用无参数的任务
ThreadManagement.runTask(API.getNoParamTask, (value){
if(value != null){
//业务逻辑
print(value);
}
});
//调用有参数的任务
ThreadManagement.runTask(API.getParamsTask, (value){
if(value != null){
//业务逻辑
print(value);
}
}, parameter: {
"type":"hot",
"after":"1"
});
五、官方提供的isolate库
isolate: ^2.0.3
我们可以通过?LoadBalancer ?创建出指定个数的 isolate。
Future<LoadBalancer> loadBalancer = LoadBalancer.create(2, IsolateRunner.spawn);
?这段代码将会创建出一个 isolate 线程池,并自动实现了负载均衡。
void testBalancer() async {
final lb = await loadBalancer;
int res = await lb.run(doWork, 110);
print(res);
}
int doWork(int value) {
// 模拟耗时5秒
print("new isolate doWork start");
sleep(Duration(seconds: 5));
return value;
}
//打印数据
new isolate doWork start
110
我们还是需要传入一个 function 在某个 isolate 中运行,并传入其参数 argument 。run 方法将会返回我们执行方法的返回值。 整体和 compute 使用上差不多,但是当我们多次使用额外的 isolate 的时候,不再需要重复创建了。 并且 LoadBalancer 还支持 runMultiple ,可以让一个方法在多线程中执行。
LoadBalancer 经过测试,它会在第一次使用其 isolate 的时候初始化线程池。 当应用打开后,即使我们在顶层函数中调用了 LoadBalancer.create ,但是还是只会有一个 Isolate。 当我们调用 run 方法时,才真正创建出了实际的 isolate。
|