public class ActiveLimitFilter implements Filter, Filter.Listener {
private static final String ACTIVE_LIMIT_FILTER_START_TIME = "active_limit_filter_start_time";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 最大并发量
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// 尝试给计数加一
if (!RpcStatus.beginCount(url, methodName, max)) {
// 获取计数失败
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (rpcStatus) {
while (!RpcStatus.beginCount(url, methodName, max)) {
try {
rpcStatus.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
// 等待剩余时间
remain = timeout - elapsed;
if (remain <= 0) {
// 等待超时
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Waiting concurrent invoke timeout in client-side for service: " +
invoker.getInterface().getName() + ", method: " + invocation.getMethodName() +
", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " +
rpcStatus.getActive() + ". max concurrent invoke limit: " + max);
}
}
}
}
invocation.put(ACTIVE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
String methodName = invocation.getMethodName();
URL url = invoker.getUrl();
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
// 这里需要先计数减一再唤醒其他线程
RpcStatus.endCount(url, methodName, getElapsed(invocation), true);
notifyFinish(RpcStatus.getStatus(url, methodName), max);
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
String methodName = invocation.getMethodName();
URL url = invoker.getUrl();
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
if (t instanceof RpcException) {
RpcException rpcException = (RpcException) t;
if (rpcException.isLimitExceed()) {
return;
}
}
RpcStatus.endCount(url, methodName, getElapsed(invocation), false);
notifyFinish(RpcStatus.getStatus(url, methodName), max);
}
private long getElapsed(Invocation invocation) {
Object beginTime = invocation.get(ACTIVE_LIMIT_FILTER_START_TIME);
return beginTime != null ? System.currentTimeMillis() - (Long) beginTime : 0;
}
private void notifyFinish(final RpcStatus rpcStatus, int max) {
if (max > 0) {
synchronized (rpcStatus) {
rpcStatus.notifyAll();
}
}
}
}
public class RpcStatus {
/**
* @param url
*/
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
RpcStatus appStatus = getStatus(url);
RpcStatus methodStatus = getStatus(url, methodName);
if (methodStatus.active.get() == Integer.MAX_VALUE) {
return false;
}
????????// 这里的CAS写法值得学习
for (int i; ; ) {
i = methodStatus.active.get();
if (i == Integer.MAX_VALUE || i + 1 > max) {
return false;
}
if (methodStatus.active.compareAndSet(i, i + 1)) {
break;
}
}
appStatus.active.incrementAndGet();
return true;
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
status.active.decrementAndGet();
status.total.incrementAndGet();
status.totalElapsed.addAndGet(elapsed);
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
if (succeeded) {
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
} else {
status.failed.incrementAndGet();
status.failedElapsed.addAndGet(elapsed);
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}
}
|