1、线程池类型
有四种类型
- direct:调用线程直接调用
- fixed:固定线程数
- fixed_auto_queue_size:固定线程数,队列大小可以自动调节
- scaling:线程数可调节,在[core,max]之间。
通过枚举类型ThreadPoolType来定义
public enum ThreadPoolType {
DIRECT("direct"),
FIXED("fixed"),
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), // TODO: remove in 9.0
SCALING("scaling");
private final String type;
public String getType() {
return type;
}
ThreadPoolType(String type) {
this.type = type;
}
private static final Map<String, ThreadPoolType> TYPE_MAP =
Arrays.stream(ThreadPoolType.values()).collect(Collectors.toUnmodifiableMap(ThreadPoolType::getType, Function.identity()));
public static ThreadPoolType fromType(String type) {
ThreadPoolType threadPoolType = TYPE_MAP.get(type);
if (threadPoolType == null) {
throw new IllegalArgumentException("no ThreadPoolType for " + type);
}
return threadPoolType;
}
}
2、es中的线程池
es根据任务分配不同的线程池。支持的线程池有
名称 | 类型 | same | DIRECT | generic | SCALING | get | FIXED | analyze | FIXED | write | FIXED | search | FIXED | search_throttled | FIXED | management | SCALING | flush | SCALING | refresh | SCALING | warmer | SCALING | snapshot | SCALING | force_merge | FIXED | fetch_shard_started | SCALING | fetch_shard_store | SCALING | system_read | FIXED | system_write | FIXED |
其名称定义如下
public static class Names {
public static final String SAME = "same";
public static final String GENERIC = "generic";
public static final String GET = "get";
public static final String ANALYZE = "analyze";
public static final String WRITE = "write";
public static final String SEARCH = "search";
public static final String SEARCH_THROTTLED = "search_throttled";
public static final String MANAGEMENT = "management";
public static final String FLUSH = "flush";
public static final String REFRESH = "refresh";
public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot";
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
public static final String SYSTEM_READ = "system_read";
public static final String SYSTEM_WRITE = "system_write";
}
对应关系定义如下
public static final Map<String, ThreadPoolType> THREAD_POOL_TYPES = Map.ofEntries(
entry(Names.SAME, ThreadPoolType.DIRECT),
entry(Names.GENERIC, ThreadPoolType.SCALING),
entry(Names.GET, ThreadPoolType.FIXED),
entry(Names.ANALYZE, ThreadPoolType.FIXED),
entry(Names.WRITE, ThreadPoolType.FIXED),
entry(Names.SEARCH, ThreadPoolType.FIXED),
entry(Names.MANAGEMENT, ThreadPoolType.SCALING),
entry(Names.FLUSH, ThreadPoolType.SCALING),
entry(Names.REFRESH, ThreadPoolType.SCALING),
entry(Names.WARMER, ThreadPoolType.SCALING),
entry(Names.SNAPSHOT, ThreadPoolType.SCALING),
entry(Names.FORCE_MERGE, ThreadPoolType.FIXED),
entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING),
entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
entry(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED),
entry(Names.SYSTEM_READ, ThreadPoolType.FIXED),
entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED));
3、es线程池实现
ThreadPool的主要成员和方法有
?3.1?线程池的初始化
在node启动时,会创建线程池。
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
后面将线程池传递给需要使用线程池的模块。
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
在线程池的构造函数中,全部的线程池被初始化。
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000, false));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000, false));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, true));
builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));
builders.put(Names.FETCH_SHARD_STORE,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false));
builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false));
for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) {
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
}
builders.put(builder.name(), builder);
}
this.builders = Collections.unmodifiableMap(builders);
threadContext = new ThreadContext(settings);
final Map<String, ExecutorHolder> executors = new HashMap<>();
for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
if (executors.containsKey(executorHolder.info.getName())) {
throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
}
logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
executors.put(entry.getKey(), executorHolder);
}
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = unmodifiableMap(executors);
final List<Info> infos =
executors
.values()
.stream()
.filter(holder -> holder.info.getName().equals("same") == false)
.map(holder -> holder.info)
.collect(Collectors.toList());
this.threadPoolInfo = new ThreadPoolInfo(infos);
this.scheduler = Scheduler.initScheduler(settings, "scheduler");
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}
首次将线程池名与线程池的Builder对应关系,以及插件中自定义的线程池对应关系放到Map中。然后遍历该Map调用Buidler的build方法创建ExecutorHolder,放到名称与ExecutorHolder的对应关系executors中,最后向executors添加SAME的。
3.2 ExecutorBuilder
是创建Executor的Builder抽象类,子类有FixedExecutorBuilder和ScalingExecutorBuidler
3.3 Fixed线程池的创建
FixedExecutorBuilder用于创建fixed类型的线程池,通过EsExecutors.newFixed来创建。
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) {
int size = settings.size;
int queueSize = settings.queueSize;
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ExecutorService executor =
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext, trackEWMA);
final ThreadPool.Info info =
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
return new ThreadPool.ExecutorHolder(executor, info);
}
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,
ThreadFactory threadFactory, ThreadContext contextHolder, boolean trackEWMA) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue();
} else {
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
}
if (trackEWMA) {
return new EWMATrackingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, TimedRunnable::new, threadFactory, new EsAbortPolicy(), contextHolder);
} else {
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, threadFactory, new EsAbortPolicy(), contextHolder);
}
}
3.4?scaling线程池的创建
ScalingExecutorBuilder用于创建scaling类型的线程池,通过EsExecutors.newScaling来创建
ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final ThreadContext threadContext) {
TimeValue keepAlive = settings.keepAlive;
int core = settings.core;
int max = settings.max;
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ExecutorService executor =
EsExecutors.newScaling(
settings.nodeName + "/" + name(),
core,
max,
keepAlive.millis(),
TimeUnit.MILLISECONDS,
threadFactory,
threadContext);
return new ThreadPool.ExecutorHolder(executor, info);
}
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
EsThreadPoolExecutor executor =
new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
queue.executor = executor;
return executor;
}
3.5?direct线程池的创建
通过EsExecutors.newDirectExecutorService来创建。
public static ExecutorService newDirectExecutorService() {
return DIRECT_EXECUTOR_SERVICE;
}
private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new DirectExecutorService();
private static final class DirectExecutorService extends AbstractExecutorService {
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
DirectExecutorService() {
super();
}
@Override
public void shutdown() {
throw new UnsupportedOperationException();
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public void execute(Runnable command) {
command.run();
rethrowErrors(command);
}
}
3.6?支持优先级的线程池
PrioritizedEsThreadPoolExecutor通过EsExecutors.newSinglePrioritizing来创建,继承自EsThreadPoolExecutor,队列使用PriorityBlockingQueue
|