Java中简单实现多线程调度时的可取消和显示进度
一个简单的多线程调度实现,统一开始,为了使得所有线程统一开始,类似运动员在听到发令枪时一起进行,使用了CountDownLatch进行控制。
CountDownLatch beginLatch = new CountDownLatch(1); CountDownLatch endLatch = new CountDownLatch(personCount);
主线程建立线程池,并进行调度,由于要在最后进行汇总结果,使用了FutureTask
List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>(); for (int i = 0; i < personCount; i++) { futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch,i))); } ExecutorService execService = Executors.newFixedThreadPool(threadCount); for (FutureTask<String> futureTask : futureTaskList) { execService.execute(futureTask); } beginLatch.countDown();
这样所有线程就会统一开始执行,执行完成后,汇总结果,并关闭线程池。
endLatch.await(); System.out.println("--------------"); for (FutureTask<String> futureTask : futureTaskList) { System.out.println(futureTask.get()); } execService.shutdown();
对于每个线程的执行,都需要共享变量beginLatch和endLatch,各线程代码:
public class ExecuteCallable implements Callable<String> { private int id; private CountDownLatch beginLatch; private CountDownLatch endLatch; public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch, Exchanger<Integer> exchanger, int id, ConcurrentTaskExecutor concurrentTaskExecutor) { this.beginLatch = beginLatch; this.endLatch = endLatch; this.id = id; } @Override public String call() throws Exception { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); endLatch.countDown(); return result; } }
每个线程在开始等待发令枪(beginLatch),随机等待一段时间(模拟执行时间),最后通知endLatch减一(执行完毕通知),并返回结果。
到这里只是一个简单的实现,我们并不能在主线程中实时了解各线程的执行情况,除非到了所有线程执行完毕(endLatch解除阻塞状态)。这时候我们使用Exchanger机制来进行线程之间数据的交换,在每个线程执行完成后,将其完成的数据量传给主线程进行刷新(模拟进度条工作)。
主线程ConcurrentTaskExecutor类中:
Exchanger<Integer> exchanger = new Exchanger<Integer>(); beginLatch.countDown(); Integer totalResult = Integer.valueOf(0); for (int i = 0; i < personCount; i++) { Integer partialResult = exchanger.exchange(Integer.valueOf(0)); if(partialResult != 0){ totalResult = totalResult + partialResult; System.out.println(String.format("Progress: %s/%s", totalResult, personCount)); } } endLatch.await();
线程类ExecuteCallable构造函数加入exchanger
@Override public String call() throws Exception { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); exchanger.exchange(1); endLatch.countDown(); return result; }
在执行完成进行数据交换,返回本次执行进度给主线程(当前默认设置成1,可修改),主线程在所有线程执行完成前,endLatch.await()必定是阻塞状态的,这样主线程就能实时拿到子线程执行完成的进度数据。
下面我们再加入一个可以取消的功能,加入系统随机在某个时间点进行取消操作,那么开始执行的线程是无法进行实时响应了,只能等待当前操作执行完毕;如果线程还没有开始执行,那么就取消其行为。
更改的ExecuteCallable执行方法如下:
@Override public String call() throws Exception { beginLatch.await(); if(concurrentTaskExecutor.isCanceled()){ endLatch.countDown(); exchanger.exchange(0); return String.format("Player :%s is given up", id); } long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); exchanger.exchange(1); endLatch.countDown(); return result; }
其中concurrentTaskExecutor类中加入一个类型为boolean的canceled变量,注意这个变量必须是volatile的,以便能够在线程间共享数据,并且该变量的setter和getter方法也是原子性的。
我们的取消操作不能放在主线程中操作,需要额外建立一个线程,并且这个线程也不能通过线程池进行调度,新建的InterruptRunnable类:
public class InterruptRunnable implements Runnable { private CountDownLatch beginLatch; private ConcurrentTaskExecutor concurrentTaskExecutor; public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) { this.beginLatch = beginLatch; this.concurrentTaskExecutor = currConcurrentTaskExecutor; } @Override public void run() { try { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); System.out.println(String.format("System need sleep %s millis", millis)); Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } concurrentTaskExecutor.setCanceled(true); } }
更改后的ConcurrentTaskExecutor,在执行发令前,先让该中断线程启动,以便一起等待开始命令:
new Thread(new InterruptRunnable(this, beginLatch)).start(); beginLatch.countDown();
最后执行结果(取决于中断线程的随机时间长短):
System need sleep 2920 millis Player :4 arrived, use 917 millis Progress: 1/10 Player :5 arrived, use 1076 millis Progress: 2/10 Player :3 arrived, use 2718 millis Progress: 3/10 Player :1 arrived, use 4013 millis Progress: 4/10 Player :0 arrived, use 8541 millis Progress: 5/10 Player :2 arrived, use 8570 millis Progress: 6/10 Player :6 arrived, use 7261 millis Progress: 7/10 Player :7 arrived, use 7015 millis Progress: 8/10 -------------- Player :0 arrived, use 8541 millis Player :1 arrived, use 4013 millis Player :2 arrived, use 8570 millis Player :3 arrived, use 2718 millis Player :4 arrived, use 917 millis Player :5 arrived, use 1076 millis Player :6 arrived, use 7261 millis Player :7 arrived, use 7015 millis Player :8 is given up Player :9 is given up
最后,附上最终的程序代码
ConcurrentTaskExecutor:
public class ConcurrentTaskExecutor { private volatile boolean canceled = false; public void executeTask() throws Exception { int personCount = 10; int threadCount = 5; CountDownLatch beginLatch = new CountDownLatch(1); CountDownLatch endLatch = new CountDownLatch(personCount); Exchanger<Integer> exchanger = new Exchanger<Integer>(); List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>(); for (int i = 0; i < personCount; i++) { futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch, exchanger, i, this))); } ExecutorService execService = Executors.newFixedThreadPool(threadCount); for (FutureTask<String> futureTask : futureTaskList) { execService.execute(futureTask); } new Thread(new InterruptRunnable(this, beginLatch)).start(); beginLatch.countDown(); Integer totalResult = Integer.valueOf(0); for (int i = 0; i < personCount; i++) { Integer partialResult = exchanger.exchange(Integer.valueOf(0)); if(partialResult != 0){ totalResult = totalResult + partialResult; System.out.println(String.format("Progress: %s/%s", totalResult, personCount)); } } endLatch.await(); System.out.println("--------------"); for (FutureTask<String> futureTask : futureTaskList) { System.out.println(futureTask.get()); } execService.shutdown(); } public boolean isCanceled() { return canceled; } public void setCanceled(boolean canceled){ this.canceled = canceled; } public static void main(String[] args) throws Exception { ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(); executor.executeTask(); } }
ExecuteCallable
public class ExecuteCallable implements Callable<String> { private int id; private CountDownLatch beginLatch; private CountDownLatch endLatch; private Exchanger<Integer> exchanger; private ConcurrentTaskExecutor concurrentTaskExecutor; public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch, Exchanger<Integer> exchanger, int id, ConcurrentTaskExecutor concurrentTaskExecutor) { this.beginLatch = beginLatch; this.endLatch = endLatch; this.exchanger = exchanger; this.id = id; this.concurrentTaskExecutor = concurrentTaskExecutor; } @Override public String call() throws Exception { beginLatch.await(); if(concurrentTaskExecutor.isCanceled()){ endLatch.countDown(); exchanger.exchange(0); return String.format("Player :%s is given up", id); } long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); exchanger.exchange(1); endLatch.countDown(); return result; } }
InterruptRunnable
public class InterruptRunnable implements Runnable { private CountDownLatch beginLatch; private ConcurrentTaskExecutor concurrentTaskExecutor; public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) { this.beginLatch = beginLatch; this.concurrentTaskExecutor = currConcurrentTaskExecutor; } @Override public void run() { try { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); System.out.println(String.format("System need sleep %s millis", millis)); Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } concurrentTaskExecutor.setCanceled(true); } }
相关推荐
本书深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册。书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发构建块,用于避免并发危险、构造线程安全的类及验证线程安全的规则...
本书是第II卷,以开发人员在项目开发中经常遇到的问题和必须掌握的技术为中心,介绍了应用Java进行桌面程序开发各个方面的知识和技巧,主要包括Java语法与面向对象技术、Java高级应用、窗体与控件应用、文件操作...
Android 异步网络和图像加载 下载 特征 异步下载: 转换为 ImageViews 或 Bitmaps(也支持动画 GIF)...请求的分组和取消 下载进度回调 支持 file:/、http(s):/ 和 content:/ URI 请求级别的日志记录和分析 支持Charl
利用反射实现ASP.NET控件和数据实体之间的双向绑定,并且在客户端自动验证输入的内容是否合法 asp.net报表解决方法 SQLDMO类的使用 SQL过程自动C#封装,支持从表到基本存储过程生成 使用SQLDMO控制 SQL Server 使用SQL...
5.5.2 取消修改:使用替换和比较操作..... 152 5.5.3 通过建立分支来进行版本维护和新版本开发..... 152 5.6 其他功能...... 152 5.6.1 编辑器中的快速差别功能对CVS的支持..... 153 5.6.2 补丁程序:快速而又...
5.5.2 取消修改:使用替换和比较操作..... 152 5.5.3 通过建立分支来进行版本维护和新版本开发..... 152 5.6 其他功能...... 152 5.6.1 编辑器中的快速差别功能对CVS的支持..... 153 5.6.2 补丁程序:...
5.5.2 取消修改:使用替换和比较操作..... 152 5.5.3 通过建立分支来进行版本维护和新版本开发..... 152 5.6 其他功能...... 152 5.6.1 编辑器中的快速差别功能对CVS的支持..... 153 5.6.2 补丁程序:...
修改BUG:办公组件支持库打印进度对话框的标题和用户设置的内容不一致。 18. 修改BUG:办公组件静态编译后无法正常销毁。 19. 修改BUG:应用接口支持库“取内存容量信息()”命令不能正常处理大于2G的内存。 20. ...
MOTO-LINUX平台的手机上JAVA无法实现背景常亮功能,启用该功能后会闪屏,请勿再询问此问题了。 允许用户设置阅读时背景灯的亮度(对大部分NOKIA S40、SE、SAMSUNG手机有效,MOTO上无法实现) 修改跳转界面,左/右键...