Java高并发编程:取消和关闭

Java没有提供任何机制来安全地终止线程,但是它提供了中断(Interruption).这是一种协作机制,能够使一个线程终止另一个线程当前的工作。

  • 在对一个线程对象调用Thread.interrupted()方法之后,一般情况下对这个线程不会产生任何影响。因为调用Thread.interrupted()方法只是将增线程的中断标志位设置为true。
  • 如果一个线程被调用Thread.interrupted()方法之后,如果它的状态是阻塞状态或者是等待状态,而且这个状态正是因为正在执行的wait、join、sleep线程造成的,那么是会改变运行的结果(抛出InterruptException异常)

1.线程终止

由于Java没有提供任何机制来安全地终止线程,那么我们应该如何终止线程呢?下面我们提供三种线程终止的方法:

  1. 使用退出标志,使线程正常退出,也就是当run方法完成后线程终止。
  2. 使用stop方法强行终止线程(这个方法不推荐使用,因为stop和suspend、resume一样,也可能发生不可预料的结果)。
  3. 使用interrupt方法中断线程。

1.1 使用退出标志

当run方法执行完后,线程就会退出。但有时run方法是永远不会结束的。如在服务端程序中使用线程进行监听客户端请求,或是其他的需要循环处理的任务。在这种情况下,一般是将这些任务放在一个循环中,如while循环。如果想让循环永远运行下去,可以使用while(true){……}来处理。但要想使while循环在某一特定条件下退出,最直接的方法就是设一个boolean类型的标志,并通过设置这个标志为true或false来控制while循环是否退出。

public class ThreadFlag implements Runnable{
    private volatile boolean exit=false;

    @Override
    public void run() {
        while (!exit){
            ///do something
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("-----------ThreadFlag shutdown----------");
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadFlag threadFlag=new ThreadFlag();
        Thread thread=new Thread(threadFlag);
        thread.start();
        Thread.sleep(3000);
        threadFlag.exit=true;
        thread.join();
        System.out.println("线程退出");
    }
}

上面代码使用了一个线程标志位来判断线程是否关闭.通过对线程标志位进行操作来决定线程是否关闭.

1.2 使用stop方法终止线程

使用stop方法可以强行终止正在运行或挂起的线程。我们可以使用如下的代码来终止线程:

public class ThreadStop implements Runnable {
    @Override
    public void run() {
        try {
            while (true){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            System.out.println("-----------ThreadStop shutdown----------");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadStop threadStop=new ThreadStop();
        Thread thread=new Thread(threadStop);
        thread.start();
        Thread.sleep(3000);
        thread.stop();
        System.out.println("线程退出");
    }
}

这种方法线程不安全,Java不建议使用这种stop方法关闭线程。

1.3 使用interrupt方法终止线程

使用interrupt方法来终端线程可分为两种情况:

(1)线程处于阻塞状态,如使用了sleep方法。

(2)使用while(!isInterrupted()){……}来判断线程是否被中断。

enum  FuntuionType{
    FunctionType1,
    FunctionType2,
}
public class ThreadInterrupt implements Runnable{
    private FuntuionType funtuionType;

    public ThreadInterrupt(FuntuionType funtuionType) {
        this.funtuionType = funtuionType;
    }

    @Override
    public void run() {
        switch (funtuionType){
            case FunctionType1:
                int i = 0;
                while (!Thread.interrupted()){
                    //do something
                    i++;
                }
                System.out.println("Thread.interrupted() shutdown");
                break;
            case FunctionType2:
                try {
                    Thread.sleep(50*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Sleep InterruptedException throws");
                break;
            default:
                break;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadInterrupt threadInterrupt=new ThreadInterrupt(FuntuionType.FunctionType2);
        Thread thread=new Thread(threadInterrupt);
        thread.start();
        Thread.sleep(2000);
        thread.interrupt();
        System.out.println("线程已经退出");
    }
}

2. 任务取消

Java中没有一种安全的抢占式方法来停止线程,因此就没有安全的抢占式方法来停止任务。下面我们就来介绍一中协作式的方式来取消一个任务。

2.1 取消标志位

第一种方式就是设置某个“已请求取消”的标志位,而任务周期性的查看这个标志位。如果设置了这个标志位,那么任务就提前结束。

public class PrimeGenerator implements Runnable{
    private final List<BigInteger> primes=new ArrayList<>();
    private volatile boolean cancelled = false;
    private volatile BigInteger p = BigInteger.ONE;
    @Override
    public void run() {

        while (!cancelled){
            //此方法返回一个整数大于该BigInteger的素数。
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel(){
        this.cancelled=true;
    }

    public synchronized List<BigInteger> get(){
        return new ArrayList<>(primes);
    }

    public static void main(String[] args) throws InterruptedException {
        PrimeGenerator primeGenerator=new PrimeGenerator();
        for (int i = 0; i < 10; i++) {
            Thread thread=new Thread(primeGenerator);
            thread.start();
        }

        Thread.sleep(2000);
        primeGenerator.cancel();
        for (BigInteger bigInteger : primeGenerator.get()) {
            System.out.println(bigInteger);
        }
    }
}

2.2 中断 Interrupt

由于PrimeGenerator中的取消机制最终会使得素数的任务进行退出。但是如果使用这个方法中的任务调用了一个阻塞方法,列如BlockingQueue.put,那么就会产生一个严重的问题————任务可能永远不会检查取消标志。因此永远不会结束。所以这个时候我们就采用中断Interrupt来取消任务。

public class PrimeProducer implements Runnable {
    private final BlockingQueue<BigInteger> bigIntegers;
    private Thread thread;

    public void setThread(Thread thread) {
        this.thread = thread;
    }

    public PrimeProducer(BlockingQueue<BigInteger> bigIntegers) {
        this.bigIntegers = bigIntegers;
    }

    private volatile BigInteger p = BigInteger.ONE;

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()){
                p = p.nextProbablePrime();
                bigIntegers.put(p.nextProbablePrime());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void cancel(){
        thread.interrupt();
    }

    public BlockingQueue<BigInteger> getBigIntegers() {
        return bigIntegers;
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<BigInteger> bigIntegers=new LinkedBlockingQueue<>();
        PrimeProducer primeProducer=new PrimeProducer(bigIntegers);

        Thread thread=new Thread(primeProducer);
        primeProducer.setThread(thread);
        thread.start();
        
        Thread.sleep(1000);
        primeProducer.cancel();
        for (BigInteger bigInteger : primeProducer.getBigIntegers()) {
            System.out.println(bigInteger);
        }
    }
}

每个线程都有一个Boolean类型的中断状态。当中断线程发生的时候,这个线程就把这个中断状态设置为true。咋Thread中包含了中断线程以及查询线程中断状态的方法。

Thrad中的中断方法:

public class Thread{
    public void interrupt(){}
    public boolean isInterrupted(){}
    public static boolean interrupted(){}
}
  • interrupt()方法能够中断目标线程
  • isInterrupted方法能够返回目标线程的中断状态
  • interrupted静态方法能够将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法

阻塞库方法,比如Thread.sleep和Object.wait和Thread.join等,都会检查线程何时中断,并且在发生中断时提前返回。他们在响应中断时只需的操作包括:清除中断状态,抛出InterruptExecption异常,表示阻塞操作由于中断而提前结束。

当线程在非阻塞状态下中断时,它的中断状态将被设置为true,然后根据将取消的操作来检查中断状态以判断发生了中断。通过这样的方法,中断操作将变得有粘性————如果不触发InterruptException,那么中断将一直保持,直到明确的清除中断状态。

总结:
对中断的正确理解是:它并不会真正地中断在一个正在运行的线程,而是发出中断请求,然后由线程在下一个合适的时刻中断自己。(这些合适的时刻称为取消点)。有些方法,比如wait、slee和join等,将严格处理这种请求,当他们收到中断请求或者开始执行发现某个已经被设置好的中断状态的时候,将抛出一个异常。
中断策略

最合理的中断策略使某
种线程级(Thread-Level)取消操作或者服务级的取消操作:尽快退出,在必要时候进行清理,通知某个所有线程已经退出。当前检查到中断请求时候,任务不需要放弃所有的操作————它可以推迟处理中断请求,并找到合适的时刻。因此需要记住中断请求,并在完成当前任务之后抛出InterruptExeception或者表示已经收到中断请求。这项技术能够确保在更新过程中发生中断时,数据结构不会发生破坏。除了将InterruptException传递给调用者外还需要执行额外的操作,那么应该在捕获InterruptException之后恢复中断状态:

Thread.currentThread().interrupt();

响应中断

当中断异常发生的时候,我们有两种方式进行响应中断请求:

  • 传递异常(可能在执行某个特定于任务的清除操作之后):从而使你的方法也可以是中断的阻塞方法
  • 恢复中断状态:从而使调用占中的上层代码能对其进行处理。如果不想处理中断请求,一种标准的方法就是通过再次调用interrupt来恢复中断状态。你不能屏蔽中断状态,你只能恢复中断状态。

2.3 通过Future来实现取消

我们已经使用了一种抽象机制来管理任务的生命周期,处理异常,下面我们来介绍一种使用Future类来实现任务取消。

public class TimeRun {
    private static ExecutorService executorService= Executors.newFixedThreadPool(5);
    
    public static void timeRun(Runnable runnable, long timeout, TimeUnit timeUnit){
        Future<?> submit = executorService.submit(runnable);
        try {
            submit.get(timeout,timeUnit);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            //接下来任务将被取消
            e.printStackTrace();
        } finally {
            //如果任务已经结束,那么执行取消操作也不会带来任何影响
            //如果任务正在运行,那么将会被中断
            submit.cancel(true);
        }
    }
}

当Future.get抛出InterruptException或者TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

2.4 处理不可中断的阻塞

如果一个线程由于执行同步的Socket I/O 或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程。

public class ReaderThread extends Thread {
    private final Socket socket;
    private final InputStream inputStream;
    public static final int BUFSIZE=1024;
    
    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.inputStream=socket.getInputStream();
    }

    @Override
    public void interrupt() {
        try {
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            super.interrupt();
        }
    }

    @Override
    public void run() {
        byte [] buf=new byte[BUFSIZE];
        try {
            while (true) {
                int count= inputStream.read(buf);
                //dosomething
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

通过重写Interrupt方法将非标准的取消操作封装到Thread中,实现中断功能

3. 停止线程的服务

正确的封装原则是:除非拥有某个线程,否则不能对线程进行操作。比如,中断线程,或者修改线程的优先级等等。

服务应该提供生命周期方法(Lifecycle Method)来关闭它自己以及它所拥有的线程。这样应用程序关闭服务的时候,服务就可以关闭所有线程了。对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,就应该提供生命周期方法。

3.1 关闭ExecutorService

关闭ExecutorService提供了两种关闭方法:shutdown和shutdownNow方法

强行关闭的速度更快,但是风险也更大,因为任务很可能执行到一半就结束
正常关闭的速度虽然慢,但是却更为安全,因为ExecutorService会一直等到队列中的所有任务都执行完成之后才关闭。

4. JVM关闭

JVM关闭应用程序可以分为两种方式:

正常关闭:当最后一个“正常(非守护)”线程结束的时候,或者调用了System.exit(0)时,或者通过其他特定于平台的方法关闭(比如发出了SIGNT信号或者Ctrl-C)
强行关闭:通过调用Runtime.halt或者在操作系统中“杀死”JVM进程来强行关闭JVM

4.1 关闭钩子

在正常关闭中,JVM首先调用以及注册的关闭钩子(shutdown Hook)。关闭钩子是指通过Runntime.addShutdwonHook注册的但是尚未开始的线程。JVM不能保证这些线程的执行顺序。在关闭应用程序线程时,如果有线程正在运行,那么这些线程接下来将于关闭进程并发执行。

public class JavaHook {
    private static class JavaTask implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----------JavaTask shutdown----------");
        }
    }

    public static void main(String[] args) {
        JavaTask javaTask=new JavaTask();
        Thread thread=new Thread(javaTask);
        thread.start();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("-----------JavaHook finish----------");
            }
        }));
        System.out.println("JVM finsih....");
    }
}

关闭钩子应该是线程安全的:他们在访问共享数据的时候必须使用同步机制,并且小心的避免发生死锁,这与其他并发代码的要求相同。而且关闭钩子不应该对应用程序的状态(比如其他服务是否已经关闭,后者所有的正常线是否已经执行完成)后者JVm的关闭原因作出任何假设。

5.总结

Java并没有提供某种抢占式的机制来取消或者终结线程。想法它提供一种协作式的中断机制来实现取消操作,但是这要依赖于如何构建取消操作的协议,以及能否遵循这些协议。

赞(52) 打赏
未经允许不得转载:优客志 » JAVA开发
分享到:

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏