1、定時(shí)任務(wù)
在公司做項(xiàng)目時(shí),經(jīng)常遇到要用到定時(shí)任務(wù)的事情,但是對定時(shí)任務(wù)不熟練的時(shí)候會(huì)出現(xiàn)重復(fù)任務(wù)的情況,不深入,這次將定時(shí)任務(wù)好好學(xué)習(xí)分析一下
定時(shí)任務(wù)的原理
假如我將一個(gè)現(xiàn)場通過不斷輪詢的方式去判斷,就能實(shí)現(xiàn)定時(shí)任務(wù)功能
public class Task {
public static void main(String[] args) {
// run in a second
final long timeInterval = 1000;
Runnable runnable = new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("Hello !!");
try {
Thread.sleep(timeInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread = new Thread(runnable);
thread.start();
}
}
這里解釋下InterruptedException異常
Thrown when a thread is waiting, sleeping, or otherwise occupied, and the thread is interrupted,
either before or during the activity. Occasionally a method may wish to test whether the current
thread has been interrupted, and if so, to immediately throw this exception.
The following code can be used to achieve this effect:
簡單來說就是當(dāng)阻塞方法收到中斷請求的時(shí)候就會(huì)拋出InterruptedException異常,當(dāng)一個(gè)方法后面聲明可能會(huì)拋出InterruptedException 異常時(shí),說明該方法是可能會(huì)花一點(diǎn)時(shí)間,但是可以取消的方法。
拋InterruptedException的代表方法有:sleep(),wait(),join()
執(zhí)行wait方法的線程,會(huì)進(jìn)入等待區(qū)等待被notify/notify All。在等待期間,線程不會(huì)活動(dòng)。
執(zhí)行sleep方法的線程,會(huì)暫停執(zhí)行參數(shù)內(nèi)所設(shè)置的時(shí)間。
執(zhí)行join方法的線程,會(huì)等待到指定的線程結(jié)束為止。
因此,上面的方法都是需要花點(diǎn)時(shí)間的方法。這三個(gè)方法在執(zhí)行過程中會(huì)不斷輪詢中斷狀態(tài)(interrupted方法),從而自己拋出InterruptedException。
interrupt方法其實(shí)只是改變了中斷狀態(tài)而已。
所以,如果在線程進(jìn)行其他處理時(shí),調(diào)用了它的interrupt方法,線程也不會(huì)拋出InterruptedException的,只有當(dāng)線程走到了sleep, wait, join這些方法的時(shí)候,才會(huì)拋出InterruptedException。若是沒有調(diào)用sleep, wait, join這些方法,或者沒有在線程里自己檢查中斷狀態(tài),自己拋出InterruptedException,那InterruptedException是不會(huì)拋出來的。
Timer實(shí)現(xiàn)
Java在1.3版本引入了Timer工具類,它是一個(gè)古老的定時(shí)器,搭配TimerTask和TaskQueue一起使用,示例
public class TimeTaskTest {
public static void main(String[] args) {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("hell world");
}
};
Timer timer = new Timer();
timer.schedule(timerTask, 10, 3000);
}
}
//Timer類
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");
// Constrain value of period sufficiently to prevent numeric
// overflow while still being effectively infinitely large.
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;
synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");
synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
queue.add(task);
if (queue.getMin() == task)
queue.notify();
}
}
imer中用到的主要是兩個(gè)成員變量:
1、TaskQueue:一個(gè)按照時(shí)間優(yōu)先排序的隊(duì)列,這里的時(shí)間是每個(gè)定時(shí)任務(wù)下一次執(zhí)行的毫秒數(shù)(相對于1970年1月1日而言)
2、TimerThread:對TaskQueue里面的定時(shí)任務(wù)進(jìn)行編排和觸發(fā)執(zhí)行,它是一個(gè)內(nèi)部無限循環(huán)的線程。
主要方法:
// 在指定延遲時(shí)間后執(zhí)行指定的任務(wù)(只執(zhí)行一次)
schedule(TimerTask task,long delay);
// 在指定時(shí)間執(zhí)行指定的任務(wù)。(只執(zhí)行一次)
schedule(TimerTask task, Date time);
// 延遲指定時(shí)間(delay)之后,開始以指定的間隔(period)重復(fù)執(zhí)行指定的任務(wù)
schedule(TimerTask task,long delay,long period);
// 在指定的時(shí)間開始按照指定的間隔(period)重復(fù)執(zhí)行指定的任務(wù)
schedule(TimerTask task, Date firstTime , long period);
// 在指定的時(shí)間開始進(jìn)行重復(fù)的固定速率執(zhí)行任務(wù)
scheduleAtFixedRate(TimerTask task,Date firstTime,long period);
// 在指定的延遲后開始進(jìn)行重復(fù)的固定速率執(zhí)行任務(wù)
scheduleAtFixedRate(TimerTask task,long delay,long period);
// 終止此計(jì)時(shí)器,丟棄所有當(dāng)前已安排的任務(wù)。
cancal();
// 從此計(jì)時(shí)器的任務(wù)隊(duì)列中移除所有已取消的任務(wù)。
purge();
先說Fixed Delay模式
//從當(dāng)前時(shí)間開始delay個(gè)毫秒數(shù)開始定期執(zhí)行,周期是period個(gè)毫秒數(shù)
public void schedule(TimerTask task, long delay, long period) {...}
從指定的firstTime開始定期執(zhí)行,往后每次執(zhí)行的周期是period個(gè)毫秒數(shù)
public void schedule(TimerTask task, Date firstTime, long period){...}
它的工作方式是:
第一次執(zhí)行的時(shí)間將按照指定的時(shí)間點(diǎn)執(zhí)行(如果此時(shí)TimerThread不在執(zhí)行其他任務(wù)),如有其他任務(wù)在執(zhí)行,那就需要等到其他任務(wù)執(zhí)行完成才能執(zhí)行。
從第二次開始,每次任務(wù)的執(zhí)行時(shí)間是上一次任務(wù)開始執(zhí)行的時(shí)間加上指定的period毫秒數(shù)。
如何理解呢,我們還是看代碼
public static void main(String[] args) {
TimerTask task1 = new DemoTimerTask("Task1");
TimerTask task2 = new DemoTimerTask("Task2");
Timer timer = new Timer();
timer.schedule(task1, 1000, 5000);
timer.schedule(task2, 1000, 5000);
}
static class DemoTimerTask extends TimerTask {
private String taskName;
private DateFormat df = new SimpleDateFormat("HH:mm:ss---");
public DemoTimerTask(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println(df.format(new Date()) + taskName + " is working.");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(df.format(new Date()) + taskName + " finished work.");
}
}
task1和task2是幾乎同時(shí)執(zhí)行的兩個(gè)任務(wù),而且執(zhí)行時(shí)長都是2秒鐘,如果此時(shí)我們把第六行注掉不執(zhí)行,我們將得到如下結(jié)果(和第三種Fixed Rate模式結(jié)果相同):
13:42:58---Task1 is working.
13:43:00---Task1 finished work.
13:43:03---Task1 is working.
13:43:05---Task1 finished work.
13:43:08---Task1 is working.
13:43:10---Task1 finished work.
如果打開第六行,我們再看下兩個(gè)任務(wù)的執(zhí)行情況。我們是期望兩個(gè)任務(wù)能夠同時(shí)執(zhí)行,但是Task2是在Task1執(zhí)行完成后才開始執(zhí)行(原因是TimerThread是單線程的,每個(gè)定時(shí)任務(wù)的執(zhí)行也在該線程內(nèi)完成,當(dāng)多個(gè)任務(wù)同時(shí)需要執(zhí)行時(shí),只能是阻塞了),從而導(dǎo)致Task2第二次執(zhí)行的時(shí)間是它上一次執(zhí)行的時(shí)間(13:43:57)加上5秒鐘(13:44:02)。
13:43:55---Task1 is working.
13:43:57---Task1 finished work.
13:43:57---Task2 is working.
13:43:59---Task2 finished work.
13:44:00---Task1 is working.
13:44:02---Task1 finished work.
13:44:02---Task2 is working.
13:44:04---Task2 finished work.
那如果此時(shí)還有個(gè)Task3也是同樣的時(shí)間點(diǎn)和間隔執(zhí)行會(huì)怎么樣呢?
結(jié)論是:也將依次排隊(duì),執(zhí)行的時(shí)間依賴兩個(gè)因素:
1.上次執(zhí)行的時(shí)間
2.期望執(zhí)行的時(shí)間點(diǎn)上有沒有其他任務(wù)在執(zhí)行,有則只能排隊(duì)了
Fixed Rate模式
public static void main(String[] args) {
TimerTask task1 = new DemoTimerTask("Task1");
TimerTask task2 = new DemoTimerTask("Task2");
Timer timer = new Timer();
timer.scheduleAtFixedRate(task1, 1000, 5000);
timer.scheduleAtFixedRate(task2, 1000, 5000);
}
static class DemoTimerTask extends TimerTask {
private String taskName;
private DateFormat df = new SimpleDateFormat("HH:mm:ss---");
public DemoTimerTask(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println(df.format(new Date()) + taskName + " is working.");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(df.format(new Date()) + taskName + " finished work.");
}
}
Task1和Task2還是在相同的時(shí)間點(diǎn),按照相同的周期定時(shí)執(zhí)行任務(wù),我們期望Task1能夠每5秒定時(shí)執(zhí)行任務(wù),期望的時(shí)間點(diǎn)是:14:21:47-14:21:52-14:21:57-14:22:02-14:22:07,實(shí)際上它能夠交替著定期執(zhí)行,原因是Task2也會(huì)定期執(zhí)行,并且對TaskQueue的鎖他們是交替著拿的(這個(gè)在下面分析TimerThread源碼的時(shí)候會(huì)講到)
14:21:47---Task1 is working.
14:21:49---Task1 finished work.
14:21:49---Task2 is working.
14:21:51---Task2 finished work.
14:21:52---Task2 is working.
14:21:54---Task2 finished work.
14:21:54---Task1 is working.
14:21:56---Task1 finished work.
14:21:57---Task1 is working.
14:21:59---Task1 finished work.
14:21:59---Task2 is working.
14:22:01---Task2 finished work.
TimerThread
上面我們主要講了Timer的一些主要源碼及定時(shí)模式,下面我們來分析下支撐Timer的定時(shí)任務(wù)線程TimerThread。
TimerThread大概流程圖如下:
源碼如下
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// 如果queue里面沒有要執(zhí)行的任務(wù),則掛起TimerThread線程
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
// 如果TimerThread被激活,queue里面還是沒有任務(wù),則介紹該線程的無限循環(huán),不再接受新任務(wù)
if (queue.isEmpty())
break;
long currentTime, executionTime;
// 獲取queue隊(duì)列里面下一個(gè)要執(zhí)行的任務(wù)(根據(jù)時(shí)間排序,也就是接下來最近要執(zhí)行的任務(wù))
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
// taskFired表示是否需要立刻執(zhí)行線程,當(dāng)task的下次執(zhí)行時(shí)間到達(dá)當(dāng)前時(shí)間點(diǎn)時(shí)為true
if (taskFired = (executionTime<=currentTime)) {
//task.period==0表示這個(gè)任務(wù)只需要執(zhí)行一次,這里就從queue里面刪掉了
if (task.period == 0) {
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
//針對task.period不等于0的任務(wù),則計(jì)算它的下次執(zhí)行時(shí)間點(diǎn)
//task.period<0表示是fixed delay模式的任務(wù)
//task.period>0表示是fixed rate模式的任務(wù)
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
// 如果任務(wù)的下次執(zhí)行時(shí)間還沒有到達(dá),則掛起TimerThread線程executionTime - currentTime毫秒數(shù),到達(dá)執(zhí)行時(shí)間點(diǎn)再自動(dòng)激活
if (!taskFired)
queue.wait(executionTime - currentTime);
}
// 如果任務(wù)的下次執(zhí)行時(shí)間到了,則執(zhí)行任務(wù)
// 注意:這里任務(wù)執(zhí)行沒有另起線程,還是在TimerThread線程執(zhí)行的,所以當(dāng)有任務(wù)在同時(shí)執(zhí)行時(shí)會(huì)出現(xiàn)阻塞
if (taskFired)
// 這里沒有try catch異常,當(dāng)TimerTask拋出異常會(huì)導(dǎo)致整個(gè)TimerThread跳出循環(huán),從而導(dǎo)致Timer失效
task.run();
} catch(InterruptedException e) {
}
}
}
TimerThread中并沒有處理好任務(wù)的異常,因此每個(gè)TimerTask的實(shí)現(xiàn)必須自己try catch防止異常拋出,導(dǎo)致Timer整體失效。同時(shí),已經(jīng)被安排單尚未執(zhí)行的TimerTask也不會(huì)再執(zhí)行了,新的任務(wù)也不能被調(diào)度。故如果TimerTask拋出未檢查的異常,Timer將會(huì)產(chǎn)生無法預(yù)料的行為。
schedule與scheduleAtFixedRate區(qū)別
在了解schedule與scheduleAtFixedRate方法的區(qū)別之前,先看看它們的相同點(diǎn):
任務(wù)執(zhí)行未超時(shí),下次執(zhí)行時(shí)間 = 上次執(zhí)行開始時(shí)間 + period; 任務(wù)執(zhí)行超時(shí),下次執(zhí)行時(shí)間 = 上次執(zhí)行結(jié)束時(shí)間;
在任務(wù)執(zhí)行未超時(shí)時(shí),它們都是上次執(zhí)行時(shí)間加上間隔時(shí)間,來執(zhí)行下一次任務(wù)。而執(zhí)行超時(shí)時(shí),都是立馬執(zhí)行。
它們的不同點(diǎn)在于側(cè)重點(diǎn)不同,schedule方法側(cè)重保持間隔時(shí)間的穩(wěn)定,而scheduleAtFixedRate方法更加側(cè)重于保持執(zhí)行頻率的穩(wěn)定。
schedule側(cè)重保持間隔時(shí)間的穩(wěn)定
schedule方法會(huì)因?yàn)榍耙粋€(gè)任務(wù)的延遲而導(dǎo)致其后面的定時(shí)任務(wù)延時(shí)。計(jì)算公式為scheduledExecutionTime(第n+1次) = realExecutionTime(第n次) + periodTime。
也就是說如果第n次執(zhí)行task時(shí),由于某種原因這次執(zhí)行時(shí)間過長,執(zhí)行完后的systemCurrentTime>= scheduledExecutionTime(第n+1次),則此時(shí)不做時(shí)隔等待,立即執(zhí)行第n+1次task。
而接下來的第n+2次task的scheduledExecutionTime(第n+2次)就隨著變成了realExecutionTime(第n+1次)+periodTime。這個(gè)方法更注重保持間隔時(shí)間的穩(wěn)定。
scheduleAtFixedRate保持執(zhí)行頻率的穩(wěn)定
scheduleAtFixedRate在反復(fù)執(zhí)行一個(gè)task的計(jì)劃時(shí),每一次執(zhí)行這個(gè)task的計(jì)劃執(zhí)行時(shí)間在最初就被定下來了,也就是scheduledExecutionTime(第n次)=firstExecuteTime +n*periodTime。
如果第n次執(zhí)行task時(shí),由于某種原因這次執(zhí)行時(shí)間過長,執(zhí)行完后的systemCurrentTime>= scheduledExecutionTime(第n+1次),則此時(shí)不做period間隔等待,立即執(zhí)行第n+1次task。
接下來的第n+2次的task的scheduledExecutionTime(第n+2次)依然還是firstExecuteTime+(n+2)*periodTime這在第一次執(zhí)行task就定下來了。說白了,這個(gè)方法更注重保持執(zhí)行頻率的穩(wěn)定。
如果用一句話來描述任務(wù)執(zhí)行超時(shí)之后schedule和scheduleAtFixedRate的區(qū)別就是:schedule的策略是錯(cuò)過了就錯(cuò)過了,后續(xù)按照新的節(jié)奏來走;scheduleAtFixedRate的策略是如果錯(cuò)過了,就努力追上原來的節(jié)奏(制定好的節(jié)奏)。
ScheduledExecutorService
基于線程池設(shè)計(jì)的定時(shí)任務(wù)解決方案,每個(gè)調(diào)度任務(wù)都會(huì)分配到線程池中的一個(gè)線程去執(zhí)行,解決 Timer 定時(shí)器無法并發(fā)執(zhí)行的問題,支持 fixedRate 和 fixedDelay。
ScheduledExecutorService是JAVA 1.5后新增的定時(shí)任務(wù)接口,它是基于線程池設(shè)計(jì)的定時(shí)任務(wù)類,每個(gè)調(diào)度任務(wù)都會(huì)分配到線程池中的一個(gè)線程去執(zhí)行。也就是說,任務(wù)是并發(fā)執(zhí)行,互不影響。
需要注意:只有當(dāng)執(zhí)行調(diào)度任務(wù)時(shí),ScheduledExecutorService才會(huì)真正啟動(dòng)一個(gè)線程,其余時(shí)間ScheduledExecutorService都是出于輪詢?nèi)蝿?wù)的狀態(tài)。
ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnitunit);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnitunit);
其中scheduleAtFixedRate和scheduleWithFixedDelay在實(shí)現(xiàn)定時(shí)程序時(shí)比較方便,運(yùn)用的也比較多。
ScheduledExecutorService中定義的這四個(gè)接口方法和Timer中對應(yīng)的方法幾乎一樣,只不過Timer的scheduled方法需要在外部傳入一個(gè)TimerTask的抽象任務(wù)。
而ScheduledExecutorService封裝的更加細(xì)致了,傳Runnable或Callable內(nèi)部都會(huì)做一層封裝,封裝一個(gè)類似TimerTask的抽象任務(wù)類(ScheduledFutureTask)。然后傳入線程池,啟動(dòng)線程去執(zhí)行該任務(wù)。
public class ScheduleAtFixedRateDemo implements Runnable{
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(
new ScheduleAtFixedRateDemo(),
0,
1000,
TimeUnit.MILLISECONDS);
}
@Override
public void run() {
System.out.println(new Date() + " : 任務(wù)「ScheduleAtFixedRateDemo」被執(zhí)行。");
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上面是scheduleAtFixedRate方法的基本使用方式,但當(dāng)執(zhí)行程序時(shí)會(huì)發(fā)現(xiàn)它并不是間隔1秒執(zhí)行的,而是間隔2秒執(zhí)行。
這是因?yàn)?,scheduleAtFixedRate是以period為間隔來執(zhí)行任務(wù)的,如果任務(wù)執(zhí)行時(shí)間小于period,則上次任務(wù)執(zhí)行完成后會(huì)間隔period后再去執(zhí)行下一次任務(wù);但如果任務(wù)執(zhí)行時(shí)間大于period,則上次任務(wù)執(zhí)行完畢后會(huì)不間隔的立即開始下次任務(wù)。