一 代码
1 ActiveDaemonThread
package concurrent.activeobject3;
/**
* @className: ActiveDaemonThread
* @description: 守护线程,主要是从 queue 中获取 Message,然后执行 execute 方法
* @date: 2022/5/4
* @author: cakin
*/
public class ActiveDaemonThread extends Thread {
private final ActiveMessageQueue queue;
public ActiveDaemonThread(ActiveMessageQueue queue) {
super("ActiveDaemonThread");
this.queue = queue;
// ActiveDaemonThread 为守护线程
setDaemon(true);
}
@Override
public void run() {
while (true) {
// 从 MethodMessage 队列中获取一个 MethodMessage,然后执行 execute 方法
ActiveMessage activeMessage = this.queue.take();
activeMessage.execute();
}
}
}
2?ActiveMessage
package concurrent.activeobject3;
import concurrent.activeobject2.ActiveFuture;
import concurrent.future.Future;
import java.lang.reflect.Method;
/**
* @className: ActiveMessage
* @description: 可以满足所有 Active Object 接口方法的要求,用于收集接口方法信息和具体的调用方法
* @date: 2022/5/5
* @author: 贝医
*/
class ActiveMessage {
// 接口方法参数
private final Object[] objects;
// 接口方法
private final Method method;
// 有返回值的方法,会返回 ActiveFuture<?> 类型
private final ActiveFuture<Object> future;
// 具体的 Service 接口
private final Object service;
public ActiveMessage(Builder builder) {
this.objects = builder.objects;
this.method = builder.method;
this.future = builder.future;
this.service = builder.service;
}
// ActiveMessage 的方法通过反射的方式调用执行的具体实现
public void execute() {
try {
Object result = method.invoke(service, objects);
if (future != null) {
// 如果是有返回值的接口方法,则需要通过 get 方法获得最终的结果
Future<?> realFuture = (Future<?>) result;
Object realResult = realFuture.get();
// 将结果交给 ActiveFuture,接口方法的线程会得到返回
future.finish(realResult);
}
} catch (Exception e) {
// 如果发生异常,那么有返回值的方法将会显示地指定结果为 null,无返回值的接口方法则会忽略该异常
if (future != null) {
future.finish(null);
}
}
}
static class Builder {
private Object[] objects;
private Method method;
// 有返回值的方法,会返回 ActiveFuture<?> 类型
private ActiveFuture<Object> future;
// 具体的 Service 接口
private Object service;
public Builder uesMethod(Method method) {
this.method = method;
return this;
}
public Builder returnFuture(ActiveFuture<Object> future) {
this.future = future;
return this;
}
public Builder withObjects(Object[] objects) {
this.objects = objects;
return this;
}
public Builder forService(Object service) {
this.service = service;
return this;
}
// 构建 ActiveMessage 实例
public ActiveMessage build() {
return new ActiveMessage(this);
}
}
}
3 ActiveMessageQueue
package concurrent.activeobject3;
import java.util.LinkedList;
public class ActiveMessageQueue {
// 用于存放提交的 MethodMessage 消息
private final LinkedList<ActiveMessage> messages = new LinkedList<>();
public ActiveMessageQueue() {
// 启动 Worker 线程
new ActiveDaemonThread(this).start();
}
public void offer(ActiveMessage methodMessage) {
synchronized (this) {
messages.addLast(methodMessage);
// 因为只有一个线程负责 take 数据,因此没有必要使用 notifyAll 方法
this.notify();
}
}
protected ActiveMessage take() {
synchronized (this) {
// 当 MethodMessage 队列中没有 Message 的时候,执行线程进入阻塞
while (messages.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取其中一个 MethodMessage 并且从队列中移除
return messages.removeFirst();
}
}
}
4 ActiveMethod
package concurrent.activeobject3;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @className: ActiveMethod
* @description: 该注解用于将任意方法转换为 ActiveMethod
* @date: 2022/5/5
* @author: 贝医
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface ActiveMethod {
}
5 ActiveServiceFactory
package concurrent.activeobject3;
import concurrent.activeobject2.ActiveFuture;
import concurrent.future.Future;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @className: ActiveServiceFactory
* @description: 生成 Service 代理以及构建 ActiveMessage
* @date: 2022/5/5
* @author: 贝医
*/
public class ActiveServiceFactory {
// 定义 ActiveMessageQueue,用于存放 ActiveMessage
private final static ActiveMessageQueue queue = new ActiveMessageQueue();
public static <T> T active(T instance) {
Object proxy = Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), new ActiveInvocationHandler<>(instance));
return (T) proxy;
}
// ActiveInvocationHandler 是 InvocationHandler 的子类,生成 Proxy 时需要使用
private static class ActiveInvocationHandler<T> implements InvocationHandler {
private final T instance;
private ActiveInvocationHandler(T instance) {
this.instance = instance;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 如果接口方法被 @ActiveMethod 注解,则会转换为 ActiveMessage
if (method.isAnnotationPresent(ActiveMethod.class)) {
// 检查该方法是否符合规范
this.checkMethod(method);
ActiveMessage.Builder builder = new ActiveMessage.Builder();
builder.uesMethod(method).withObjects(args).forService(instance);
Object result = null;
if(this.isReturnFutureType(method)){
result = new ActiveFuture<>();
builder.returnFuture((ActiveFuture)result);
}
// 将 ActiveMessage 加入到队列中
queue.offer(builder.build());
return result;
} else {
// 如果是普通方法,则会正常执行
return method.invoke(instance, args);
}
}
// 检查有返回值的方法是否为 Future,否则会抛出异常
private void checkMethod(Method method) throws IllegalActiveMethod {
// 有返回值,必须是 ActiveFuture 类型的返回值
if (!isReturnVoidType(method)&&isReturnFutureType(method)){
throw new IllegalActiveMethod("the method ["+method.getName()+" return type must be void/Future");
}
}
// 判断方法是否为 Future 返回类型
private boolean isReturnFutureType(Method method) {
return method.getReturnType().isAssignableFrom(Future.class);
}
// 判断方法是否无返回值
private boolean isReturnVoidType(Method method) {
return method.getReturnType().equals(Void.TYPE);
}
}
}
6 IllegalActiveMethod
package concurrent.activeobject3;
public class IllegalActiveMethod extends Exception {
public IllegalActiveMethod(String message) {
super(message);
}
}
7?OrderServiceImpl
package concurrent.activeobject3;
import concurrent.activeobject2.OrderService;
import concurrent.future.Future;
import concurrent.future.FutureService;
import java.util.concurrent.TimeUnit;
/**
* @className: OrderServiceImpl
* @description: 在执行线程中将被使用的类
* @date: 2022/5/4
* @author: cakin
*/
public class OrderServiceImpl implements OrderService {
@ActiveMethod
@Override
public Future<String> findOrderDetails(long orderId) {
return FutureService.<Long, String>newService().submit(input -> {
try {
TimeUnit.SECONDS.sleep(10);
System.out.println("Process the orderId->" + orderId);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "The order Details Information";
}, orderId, null);
}
@ActiveMethod
@Override
public void order(String account, long orderId) {
try {
TimeUnit.SECONDS.sleep(10);
System.out.println("Process the order for account " + account + ",orderId " + orderId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
8 Test
package concurrent.activeobject3;
import concurrent.activeobject2.OrderService;
import concurrent.activeobject2.OrderServiceFactory;
import concurrent.future.Future;
public class Test {
public static void main(String[] args) throws InterruptedException {
// 需要传递 OrderService 的具体实现
OrderService orderService = OrderServiceFactory.toActiveObject(new OrderServiceImpl());
orderService.order("hello", 434543);
Future<String> orderDetails = orderService.findOrderDetails(434543);
String result = orderDetails.get();
System.out.println(result);
}
}
二?测试
Process the order for account hello,orderId 434543
Process the orderId->434543
The order Details Information
|