大家好,欢迎来到IT知识分享网。
LinkBlockingQueue改进
问题背景
https://github.com/apache/dubbo/pull/9722/files
使用线程池的同学对于标题中的队列想必都有过使用,但上述队列使用不当时则会造成程序OOM,那怎么来控制呢?
使用ArrayBlockingQueue?如何来评估长度?
是否有一个完美的解决方案呢,MemorySafeLinkedBlockingQueue则通过对内存的限制判断尽面控制队列的容量,完成解决了可能存在的OOM问题。
获取内存大小(注:单位大B;支持准实时更新):
Runtime.getRuntime().freeMemory()//JVM中已经申请到的堆内存中还未使用的大小 Runtime.getRuntime().maxMemory()// JVM可从操作系统申请到的最大内存值 -Xxm Runtime.getRuntime().totalMemory()// JVM已从操作系统申请到的内存大小 —Xxs可设置该值大小-初始堆的大小
线程池在excute任务时,放队列,放不进去,使用新线程运行任务。这个放不进行,是使用的offer??非阻塞方法吗?
参考:https://blog.csdn.net/weixin_/article/details/
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //拿到32位的int int c = ctl.get(); //工作线程数<核心线程数 if (workerCountOf(c) < corePoolSize) { //进入if,代表可以创建 核心 线程数 if (addWorker(command, true)) return; //如果没进入if,代表创建核心线程数失败,重新获取 ctl c = ctl.get(); } //判断线程池为Running状态,将任务添加入阻塞队列,使用offer if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次判断是否为Running状态,若不是Running状态,remove任务 if (! isRunning(recheck) && remove(command)) reject(command); //如果线程池在Running状态,线程池数量为0 else if (workerCountOf(recheck) == 0) //阻塞队列有任务,但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务 addWorker(null, false); } //阻塞队列已满,创建非核心线程,拒绝策略-addWorker中有判断核心线程数是否超过最大线程数 else if (!addWorker(command, false)) reject(command); }
空闲内存计算
package com.zte.sdn.oscp.queue; import cn.hutool.core.thread.NamedThreadFactory; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class MemoryLimitCalculator { private static volatile long maxAvailable; private static final AtomicBoolean refreshStarted = new AtomicBoolean(false); private static void refresh() { maxAvailable = Runtime.getRuntime().freeMemory(); } private static void checkAndScheduleRefresh() { if (!refreshStarted.get()) { // immediately refresh when first call to prevent maxAvailable from being 0 // to ensure that being refreshed before refreshStarted being set as true // notice: refresh may be called for more than once because there is no lock refresh(); if (refreshStarted.compareAndSet(false, true)) { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Memory-Calculator")); // check every 50 ms to improve performance scheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS); Runtime.getRuntime().addShutdownHook(new Thread(() -> { refreshStarted.set(false); scheduledExecutorService.shutdown(); })); } } } / * Get the maximum available memory of the current JVM. * * @return maximum available memory */ public static long maxAvailable() { checkAndScheduleRefresh(); return maxAvailable; } / * Take the current JVM's maximum available memory * as a percentage of the result as the limit. * * @param percentage percentage * @return available memory */ public static long calculate(final float percentage) { if (percentage <= 0 || percentage > 1) { throw new IllegalArgumentException(); } checkAndScheduleRefresh(); return (long) (maxAvailable() * percentage); } / * By default, it takes 80% of the maximum available memory of the current JVM. * * @return available memory */ public static long defaultLimit() { checkAndScheduleRefresh(); return (long) (maxAvailable() * 0.8); } }
内存安全队列
package com.zte.sdn.oscp.queue; import java.util.Collection; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> { private static final long serialVersionUID = L; public static int THE_256_MB = 256 * 1024 * 1024; private int maxFreeMemory; private Rejector<E> rejector; public MemorySafeLinkedBlockingQueue() { this(THE_256_MB); } public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) { super(Integer.MAX_VALUE); this.maxFreeMemory = maxFreeMemory; //default as DiscardPolicy to ensure compatibility with the old version this.rejector = new DiscardPolicy<>(); } public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c, final int maxFreeMemory) { super(c); this.maxFreeMemory = maxFreeMemory; //default as DiscardPolicy to ensure compatibility with the old version this.rejector = new DiscardPolicy<>(); } / * set the max free memory. * * @param maxFreeMemory the max free memory */ public void setMaxFreeMemory(final int maxFreeMemory) { this.maxFreeMemory = maxFreeMemory; } / * get the max free memory. * * @return the max free memory limit */ public int getMaxFreeMemory() { return maxFreeMemory; } / * set the rejector. * * @param rejector the rejector */ public void setRejector(final Rejector<E> rejector) { this.rejector = rejector; } / * determine if there is any remaining free memory. * * @return true if has free memory */ public boolean hasRemainedMemory() { return MemoryLimitCalculator.maxAvailable() > maxFreeMemory; } @Override public void put(final E e) throws InterruptedException { if (hasRemainedMemory()) { super.put(e); } else { rejector.reject(e, this); } } @Override public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { if (!hasRemainedMemory()) { rejector.reject(e, this); return false; } return super.offer(e, timeout, unit); } @Override public boolean offer(final E e) { if (!hasRemainedMemory()) { rejector.reject(e, this); return false; } return super.offer(e); } }
拒绝策略
注意其中的rejector是拒绝策略,默认的DiscardPolicy什么也不处理;
而DiscardOldPolicy的处理逻辑很简单
public class DiscardOldestPolicy<E> implements Rejector<E> { @Override public void reject(final E e, final Queue<E> queue) { queue.poll(); queue.offer(e); } }
AbortPolicy则直接抛出异常
public class AbortPolicy<E> implements Rejector<E> { @Override public void reject(final E e, final Queue<E> queue) { throw new RejectException("no more memory can be used !"); } }
个人建议增加日志打印即可。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/160116.html