`
brandNewUser
  • 浏览: 447291 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java多线程简单实现取消和进度

阅读更多

Java中简单实现多线程调度时的可取消和显示进度

 

一个简单的多线程调度实现,统一开始,为了使得所有线程统一开始,类似运动员在听到发令枪时一起进行,使用了CountDownLatch进行控制。

CountDownLatch beginLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(personCount);

主线程建立线程池,并进行调度,由于要在最后进行汇总结果,使用了FutureTask

List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>();
for (int i = 0; i < personCount; i++) {
	futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch,i)));
}

ExecutorService execService = Executors.newFixedThreadPool(threadCount);
	for (FutureTask<String> futureTask : futureTaskList) {
		execService.execute(futureTask);
	}
beginLatch.countDown();

 这样所有线程就会统一开始执行,执行完成后,汇总结果,并关闭线程池。

 

endLatch.await();
System.out.println("--------------");
for (FutureTask<String> futureTask : futureTaskList) {
	System.out.println(futureTask.get());
}
execService.shutdown();

 对于每个线程的执行,都需要共享变量beginLatch和endLatch,各线程代码:

public class ExecuteCallable implements Callable<String> {

	private int id;
	private CountDownLatch beginLatch;
	private CountDownLatch endLatch;

	public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch,
			Exchanger<Integer> exchanger, int id,
			ConcurrentTaskExecutor concurrentTaskExecutor) {
		this.beginLatch = beginLatch;
		this.endLatch = endLatch;
		this.id = id;
	}

	@Override
	public String call() throws Exception {
		beginLatch.await();
		long millis = (long) (Math.random() * 10 * 1000);
		String result = String.format("Player :%s arrived, use %s millis", id, millis);
		Thread.sleep(millis);
		System.out.println(result);
		endLatch.countDown();
		return result;
	}

}

 每个线程在开始等待发令枪(beginLatch),随机等待一段时间(模拟执行时间),最后通知endLatch减一(执行完毕通知),并返回结果。

到这里只是一个简单的实现,我们并不能在主线程中实时了解各线程的执行情况,除非到了所有线程执行完毕(endLatch解除阻塞状态)。这时候我们使用Exchanger机制来进行线程之间数据的交换,在每个线程执行完成后,将其完成的数据量传给主线程进行刷新(模拟进度条工作)。

主线程ConcurrentTaskExecutor类中:

Exchanger<Integer> exchanger = new Exchanger<Integer>();
beginLatch.countDown();

Integer totalResult = Integer.valueOf(0);
for (int i = 0; i < personCount; i++) {
	Integer partialResult = exchanger.exchange(Integer.valueOf(0));
        if(partialResult != 0){
	        totalResult = totalResult + partialResult;
		System.out.println(String.format("Progress: %s/%s", totalResult, personCount));
	}
}

endLatch.await();

 线程类ExecuteCallable构造函数加入exchanger

@Override
public String call() throws Exception {
        beginLatch.await();
	long millis = (long) (Math.random() * 10 * 1000);
	String result = String.format("Player :%s arrived, use %s millis", id, millis);
	Thread.sleep(millis);
	System.out.println(result);
	exchanger.exchange(1);
	endLatch.countDown();
	return result;
}

 在执行完成进行数据交换,返回本次执行进度给主线程(当前默认设置成1,可修改),主线程在所有线程执行完成前,endLatch.await()必定是阻塞状态的,这样主线程就能实时拿到子线程执行完成的进度数据。

 

下面我们再加入一个可以取消的功能,加入系统随机在某个时间点进行取消操作,那么开始执行的线程是无法进行实时响应了,只能等待当前操作执行完毕;如果线程还没有开始执行,那么就取消其行为。

更改的ExecuteCallable执行方法如下:

@Override
public String call() throws Exception {
	beginLatch.await();
	if(concurrentTaskExecutor.isCanceled()){
		endLatch.countDown();
		exchanger.exchange(0);
		return String.format("Player :%s is given up", id);
	}
	long millis = (long) (Math.random() * 10 * 1000);
	String result = String.format("Player :%s arrived, use %s millis", id, millis);
	Thread.sleep(millis);
	System.out.println(result);
	exchanger.exchange(1);
	endLatch.countDown();
	return result;
}

 其中concurrentTaskExecutor类中加入一个类型为boolean的canceled变量,注意这个变量必须是volatile的,以便能够在线程间共享数据,并且该变量的setter和getter方法也是原子性的。

我们的取消操作不能放在主线程中操作,需要额外建立一个线程,并且这个线程也不能通过线程池进行调度,新建的InterruptRunnable类:

public class InterruptRunnable implements Runnable {

	private CountDownLatch beginLatch;
	private ConcurrentTaskExecutor concurrentTaskExecutor;

	public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) {
		this.beginLatch = beginLatch;
		this.concurrentTaskExecutor = currConcurrentTaskExecutor;
	}

	@Override
	public void run() {
		try {
			beginLatch.await();
			long millis = (long) (Math.random() * 10 * 1000);
			System.out.println(String.format("System need sleep %s millis", millis));
			Thread.sleep(millis);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		concurrentTaskExecutor.setCanceled(true);
	}

}

 更改后的ConcurrentTaskExecutor,在执行发令前,先让该中断线程启动,以便一起等待开始命令:

new Thread(new InterruptRunnable(this, beginLatch)).start();

beginLatch.countDown();

 最后执行结果(取决于中断线程的随机时间长短):

System need sleep 2920 millis
Player :4 arrived, use 917 millis
Progress: 1/10
Player :5 arrived, use 1076 millis
Progress: 2/10
Player :3 arrived, use 2718 millis
Progress: 3/10
Player :1 arrived, use 4013 millis
Progress: 4/10
Player :0 arrived, use 8541 millis
Progress: 5/10
Player :2 arrived, use 8570 millis
Progress: 6/10
Player :6 arrived, use 7261 millis
Progress: 7/10
Player :7 arrived, use 7015 millis
Progress: 8/10
--------------
Player :0 arrived, use 8541 millis
Player :1 arrived, use 4013 millis
Player :2 arrived, use 8570 millis
Player :3 arrived, use 2718 millis
Player :4 arrived, use 917 millis
Player :5 arrived, use 1076 millis
Player :6 arrived, use 7261 millis
Player :7 arrived, use 7015 millis
Player :8 is given up
Player :9 is given up

 

最后,附上最终的程序代码

ConcurrentTaskExecutor:

public class ConcurrentTaskExecutor {

	private volatile boolean canceled = false;

	public void executeTask() throws Exception {
		int personCount = 10;
		int threadCount = 5;

		CountDownLatch beginLatch = new CountDownLatch(1);
		CountDownLatch endLatch = new CountDownLatch(personCount);
		Exchanger<Integer> exchanger = new Exchanger<Integer>();

		List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>();
		for (int i = 0; i < personCount; i++) {
			futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch, exchanger, i, this)));
		}

		ExecutorService execService = Executors.newFixedThreadPool(threadCount);
		for (FutureTask<String> futureTask : futureTaskList) {
			execService.execute(futureTask);
		}
		
		new Thread(new InterruptRunnable(this, beginLatch)).start();

		beginLatch.countDown();

		Integer totalResult = Integer.valueOf(0);
		for (int i = 0; i < personCount; i++) {
			Integer partialResult = exchanger.exchange(Integer.valueOf(0));
			if(partialResult != 0){
				totalResult = totalResult + partialResult;
				System.out.println(String.format("Progress: %s/%s", totalResult, personCount));
			}
		}

		endLatch.await();
		System.out.println("--------------");
		for (FutureTask<String> futureTask : futureTaskList) {
			System.out.println(futureTask.get());
		}
		execService.shutdown();
	}

	public boolean isCanceled() {
		return canceled;
	}
	
	public void setCanceled(boolean canceled){
		this.canceled = canceled;
	}

	public static void main(String[] args) throws Exception {
		ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor();
		executor.executeTask();
	}

}

 ExecuteCallable

public class ExecuteCallable implements Callable<String> {

	private int id;
	private CountDownLatch beginLatch;
	private CountDownLatch endLatch;
	private Exchanger<Integer> exchanger;
	private ConcurrentTaskExecutor concurrentTaskExecutor;

	public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch,
			Exchanger<Integer> exchanger, int id,
			ConcurrentTaskExecutor concurrentTaskExecutor) {
		this.beginLatch = beginLatch;
		this.endLatch = endLatch;
		this.exchanger = exchanger;
		this.id = id;
		this.concurrentTaskExecutor = concurrentTaskExecutor;
	}

	@Override
	public String call() throws Exception {
		beginLatch.await();
		if(concurrentTaskExecutor.isCanceled()){
			endLatch.countDown();
			exchanger.exchange(0);
			return String.format("Player :%s is given up", id);
		}
		long millis = (long) (Math.random() * 10 * 1000);
		String result = String.format("Player :%s arrived, use %s millis", id, millis);
		Thread.sleep(millis);
		System.out.println(result);
		exchanger.exchange(1);
		endLatch.countDown();
		return result;
	}

}

 InterruptRunnable

public class InterruptRunnable implements Runnable {

	private CountDownLatch beginLatch;
	private ConcurrentTaskExecutor concurrentTaskExecutor;

	public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) {
		this.beginLatch = beginLatch;
		this.concurrentTaskExecutor = currConcurrentTaskExecutor;
	}

	@Override
	public void run() {
		try {
			beginLatch.await();
			long millis = (long) (Math.random() * 10 * 1000);
			System.out.println(String.format("System need sleep %s millis", millis));
			Thread.sleep(millis);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		concurrentTaskExecutor.setCanceled(true);
	}

}

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Java并发编程实战

    本书深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册。书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发构建块,用于避免并发危险、构造线程安全的类及验证线程安全的规则...

    Java开发实战1200例(第1卷).(清华出版.李钟尉.陈丹丹).part3

    本书是第II卷,以开发人员在项目开发中经常遇到的问题和必须掌握的技术为中心,介绍了应用Java进行桌面程序开发各个方面的知识和技巧,主要包括Java语法与面向对象技术、Java高级应用、窗体与控件应用、文件操作...

    android_external_koush_ion

    Android 异步网络和图像加载 下载 特征 异步下载: 转换为 ImageViews 或 Bitmaps(也支持动画 GIF)...请求的分组和取消 下载进度回调 支持 file:/、http(s):/ 和 content:/ URI 请求级别的日志记录和分析 支持Charl

    asp.net知识库

    利用反射实现ASP.NET控件和数据实体之间的双向绑定,并且在客户端自动验证输入的内容是否合法 asp.net报表解决方法 SQLDMO类的使用 SQL过程自动C#封装,支持从表到基本存储过程生成 使用SQLDMO控制 SQL Server 使用SQL...

    Eclipse权威开发指南2.pdf

    5.5.2 取消修改:使用替换和比较操作..... 152 5.5.3 通过建立分支来进行版本维护和新版本开发..... 152 5.6 其他功能...... 152 5.6.1 编辑器中的快速差别功能对CVS的支持..... 153 5.6.2 补丁程序:快速而又...

    Eclipse权威开发指南3.pdf

    5.5.2 取消修改:使用替换和比较操作..... 152 5.5.3 通过建立分支来进行版本维护和新版本开发..... 152 5.6 其他功能...... 152 5.6.1 编辑器中的快速差别功能对CVS的支持..... 153 5.6.2 补丁程序:...

    Eclipse权威开发指南1.pdf

    5.5.2 取消修改:使用替换和比较操作..... 152 5.5.3 通过建立分支来进行版本维护和新版本开发..... 152 5.6 其他功能...... 152 5.6.1 编辑器中的快速差别功能对CVS的支持..... 153 5.6.2 补丁程序:...

    易语言程序免安装版下载

    修改BUG:办公组件支持库打印进度对话框的标题和用户设置的内容不一致。 18. 修改BUG:办公组件静态编译后无法正常销毁。 19. 修改BUG:应用接口支持库“取内存容量信息()”命令不能正常处理大于2G的内存。 20. ...

    手机 pdf 阅读器

    MOTO-LINUX平台的手机上JAVA无法实现背景常亮功能,启用该功能后会闪屏,请勿再询问此问题了。 允许用户设置阅读时背景灯的亮度(对大部分NOKIA S40、SE、SAMSUNG手机有效,MOTO上无法实现) 修改跳转界面,左/右键...

Global site tag (gtag.js) - Google Analytics