Quartz 定时任务使用 —— 定时任务持久化到数据库(十四)

用数据库存储定时任务信息

之前的文章所做的demo是将定时任务的信息保存在内存(RAM)中的,见以下配置

org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore

如果用内存记录定时任务信息,应用重新启动后,定时任务信息将会丢失。比如,用户A通过系统设置1小时后执行Z操作,设置好后的,因系统重新启动,新启动的系统将会丢失“1小时后执行Z操作”的定时任务。

如果,我们需要在系统意外(或非意外)重新启动后,仍保留定时任务信息,可以使用数据库存储定时任务信息。

org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

默认情况下,Quartz使用RAMJobStore实现,它简单的把任务放在内存中。

其他可用实现是JobStoreCMT和JobStoreTX。这两个类都使用一个配置好的数据源来持久化任务细节,这就支持将任务的创建和修改作为事务的一部分。如果你要和你的应用容器一起管理,那你可以使用quartz的JobStoreCMT,quartz通过JobStoreCMT来的使用来让你的应用容器管理quartz的事务

Spring为JobStore提供了自己的LocalDataSourceJobStore实现,它可以参加Spring管理的事务。当我们讨论Spring对Quartz支持时我们会关注该实现。

以MySQL为例子,做个简单的DEMO

由于需要连接MySQL数据库,需要加上数据库的JDBC驱动,这里以pom形式下载,也可以直接引入包

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.35</version>
</dependency>

建立数据存储表

因为需要把quartz的数据保存到数据库,所以要建立相关的数据库。这个可以从下载到的quartz包里面找到对应的sql脚本,目前可以支持mysql,DB2,oracle等主流的数据库,自己可以根据项目需要选择合适的脚本运行。脚本文件在\docs\dbTables目录下

quartz数据表含义

各个表中的字段,约束,主外建关联含义,请阅读文章:Quartz 定时任务使用 —— 数据库各表字段的含义(十七)

表名 含义
qrtz_calendars 以 Blob 类型存储 Quartz 的 Calendar 信息
qrtz_cron_triggers 存储 Cron Trigger,包括Cron表达式和时区信息
qrtz_fired_triggers 存储与已触发的 Trigger 相关的状态信息,以及相联 Job的执行信息QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的 Trigger组的信息
qrtz_scheduler_state 存储少量的有关 Scheduler 的状态信息,和别的Scheduler实例(假如是用于一个集群中)
qrtz_locks 存储程序的悲观锁的信息(假如使用了悲观锁)
qrtz_job_details 存储每一个已配置的 Job 的详细信息
qrtz_simple_triggers 存储简单的Trigger,包括重复次数,间隔,以及已触的次数 
qrtz_simprop_triggers
qrtz_blob_triggers Trigger 作为 Blob 类型存储(用于 Quartz 用户用JDBC创建他们自己定制的 Trigger 类型,JobStore并不知道如何存储实例的时候) 
qrtz_triggers 触发器的基本信息 
qrtz_paused_trigger_graps 存放暂停掉的触发器。

主要的几张表:

qrtz_job_details:保存job详细信息,该表需要用户根据实际情况初始化 

  • job_name:集群中job的名字,可以随意定制

  • job_group:集群中job的所属组的名字,可以随意定制

  • job_class_name:集群中个notejob实现类的完全包名,quartz就是根据这个路径到classpath找到该job类 

  • is_durable:是否持久化,把该属性设置为1,quartz会把job持久化到数据库中 

  • job_data:一个blob字段,存放持久化job对象 

qrtz_triggers:保存trigger信息 

  • trigger_name:trigger的名字,可以随意定制

  • trigger_group:trigger所属组的名字,可以随意定制

  • job_name:qrtz_job_details表job_name的外键 

  • job_group:qrtz_job_details表job_group的外键 

  • trigger_state:当前trigger状态,设置为ACQUIRED,如果设置为WAITING,则job不会触发 

  • trigger_cron:触发器类型,使用cron表达式

qrtz_cron_triggers:存储cron表达式表 

  • trigger_name:qrtz_triggers表trigger_name的外键 

  • trigger_group:qrtz_triggers表trigger_group的外键 

  • cron_expression:cron表达式 

  • qrtz_scheduler_state:存储集群中note实例信息,quartz会定时读取该表的信息判断集群中每个实例的当前状态 

  • instance_name:之前配置文件中org.quartz.scheduler.instanceId配置的名字,就会写入该字段,如果设置为AUTO,quartz会根据物理机名和当前时间产生一个名字 

  • last_checkin_time:上次检查时间 

  • checkin_interval:检查间隔时间 

注:你可能也注意到了,这些表都是以QRTZ_为前缀的,这是默认的前缀。如果你需要用到其他前缀(个性化需求,或需要配置多个quartz实例),可以在以下项配置(在quartz.properties中)

org.quartz.jobStore.tablePrefix = QRTZ_

最主要的修改是quartz.properties

org.quartz.scheduler.instanceName = MyScheduler
org.quartz.threadPool.threadCount = 20

org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = myDS

org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/myDataBase?characterEncoding=utf-8
org.quartz.dataSource.myDS.user = root
org.quartz.dataSource.myDS.password = 123456
org.quartz.dataSource.myDS.maxConnections = 10

而org.quartz.jobStore.driverDelegateClass是根据选择的数据库不同而改变,mysql对应的是org.quartz.impl.jdbcjobstore.StdJDBCDelegate.

如果你使用其他数据库,可以选择不同的代理类(在org.quar.impl.jdbcjobstore package或者其子包中可以找到):包括DB2v6Delegate (for DB2 version 6 and earlier)、HSQLDBDelegate (for HSQLDB)、MSSQLDelegate (for Microsoft SQLServer)、PostgreSQLDelegate (for PostgreSQL)、WeblogicDelegate (for using JDBC drivers made by WebLogic)、OracleDelegate (for using Oracle)等等.

创建表后,在配置和启动JDBCJobStore之前,您还有一个重要的决定。 您需要确定应用程序需要哪种类型的事务。 如果您不需要将调度命令(例如添加和删除触发器)绑定到其他事务,那么可以通过使用JobStoreTX作为JobStore来管理事务(这是最常见的选择)。

如果您需要Quartz与其他事务(即在J2EE应用程序服务器中)一起工作,那么您应该使用JobStoreCMT - 在这种情况下,Quartz将让应用程序服务器容器管理事务。

最后一个难题是设置一个DataSource,JDBCJobStore可以从中获取与数据库的连接。 DataSources在Quartz属性中使用几种不同的方法之一进行定义。 一种方法是让Quartz创建和管理DataSource本身 - 通过提供数据库的所有连接信息。 另一种方法是让Quartz使用由Quartz正在运行的应用程序服务器管理的DataSource,通过提供JDBCJobStore DataSource的JNDI名称。 有关属性的详细信息,请参阅“docs / config”文件夹中的示例配置文件。

要使用JDBCJobStore(并假定您使用的是StdSchedulerFactory),首先需要将Quartz配置的JobStore类属性设置为org.quartz.impl.jdbcjobstore.JobStoreTX 或 org.quartz.impl.jdbcjobstore.JobStoreCMT

将“org.quartz.jobStore.useProperties”配置参数设置为“true”(默认为false),以表示JDBCJobStore将JobDataMaps中的所有值都作为字符串,因此可以作为名称 - 值对存储 而不是在BLOB列中以其序列化形式存储更多复杂的对象。 从长远来看,这是更安全的,因为您避免了将非String类序列化为BLOB的类版本问题。

完整代码示例

示例中涉及的文件不多,一个MyJob任务执行,SimpleExample测试,MyPoolingconnectionProvider数据源连接和quartz.properties配置文件。

数据源是自己定义的类,实现了quartz自带的ConnectionProvider类,如果不想使用它,你也可以选择其他数据源,比如Tomcat的DataSource,Spring的SimpleDriverDataSource等。自行选择。

org.quartz.dataSource.myDS.connectionProvider.class : com.anson.examples.jdbc_example.MyPoolingconnectionProvider
#org.quartz.dataSource.myDS.connectionProvider.class : org.springframework.jdbc.datasource.SimpleDriverDataSource
#org.quartz.dataSource.myDS.connectionProvider.class : org.apache.tomcat.jdbc.pool.DataSource

MyJob.java

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;

import java.text.SimpleDateFormat;
import java.util.Date;

public class MyJob implements Job {

    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobKey jobKey = context.getJobDetail().getKey();

        System.out.println("\n任务key " + jobKey + "执行时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    }

}

SimpleExample.java

mport org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.triggers.SimpleTriggerImpl;

import java.text.SimpleDateFormat;
import java.util.Date;

public class SimpleExample {

    public static void main(String[] args) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        try {
            
            System.out.println("------- 初始化 ----------------------");

            //通过调度器工厂获取调度器,初始化工程时须指定其使用我们自己的配置文件
            SchedulerFactory sf = new StdSchedulerFactory("quartz.properties");
            Scheduler sched = sf.getScheduler();

            //这儿clear一下,因为使用数据库储存方式时,shutdown的时候没有清除,第二次运行会报Job is already exist
            sched.clear();

            System.out.println("------- 初始化完成 -----------");

            // 下一分钟开始执行
            Date runTime = DateBuilder.evenMinuteDate(new Date());

            System.out.println("------- Scheduling Job  -------------------");

            // 任务详情
            JobDetail job = JobBuilder.newJob(MyJob.class).withIdentity("job1", "group1").build();

            // 触发器 重复5+1次 间隔15秒
            SimpleTriggerImpl trigger = (SimpleTriggerImpl) TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();
            trigger.setRepeatCount(5);
            trigger.setRepeatInterval(15000);

            System.out.println("------- 当前时间:" + sdf.format(new Date())+ " -----------------");

            //调度器、触发器、任务,三者关联
            sched.scheduleJob(job, trigger);

            System.out.println(job.getKey() + " 开始job运行时间:" + sdf.format(runTime));

            //调度启动
            sched.start();
            System.out.println("------- 开始调度器 Scheduler -----------------");

            System.out.println("------- 等待5分钟... -------------");
            try {
                Thread.sleep(5*60000L);
            } catch (Exception e) {
            }

            System.out.println("------- 关闭调度器 ---------------------");
            sched.shutdown(true);

            System.out.println("------- 关闭完成 -----------------");

            SchedulerMetaData metaData = sched.getMetaData();
            System.out.println("~~~~~~~~~~  执行了 " + metaData.getNumberOfJobsExecuted() + " 个 jobs.");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

MyPoolingconnectionProvider.java

import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.quartz.utils.ConnectionProvider;

import java.sql.Connection;
import java.sql.SQLException;

public class MyPoolingconnectionProvider implements ConnectionProvider {

    private String driver;
    private String url;
    private String user;
    private String password;
    private int maxConnections;
    private ComboPooledDataSource datasource = new ComboPooledDataSource();

    /**
     * 连接
     *
     * @return
     * @throws SQLException
     */
    @Override
    public Connection getConnection() throws SQLException {
        System.out.println("获取连接");
        return datasource.getConnection();
    }

    /**
     * 关闭连接
     *
     * @throws SQLException
     */
    @Override
    public void shutdown() throws SQLException {
        System.out.println("关闭连接");
        datasource.close();
    }

    /**
     * 初始化
     *
     * @throws SQLException
     */
    @Override
    public void initialize() throws SQLException {
        try {
            System.out.println("初始化连接");

            datasource.setDriverClass(this.driver);
            datasource.setJdbcUrl(this.url);
            datasource.setUser(this.user);
            datasource.setPassword(this.password);
            datasource.setMaxPoolSize(this.maxConnections);
            datasource.setMinPoolSize(1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /*-------------------------------------------------
     *
     * setters 如果有必要,你可以添加一些getter
     * ------------------------------------------------
     */
    public void setDriver(String driver) {
        this.driver = driver;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void setMaxConnections(int maxConnections) {
        this.maxConnections = maxConnections;
    }

    public void setDatasource(ComboPooledDataSource datasource) {
        this.datasource = datasource;
    }
}

quartz.properties

############################################
#############    调度器配置      ############
############################################

org.quartz.scheduler.instanceName : MyQuartzScheduler
org.quartz.threadPool.threadCount : 25

org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true


############################################
#############    持久化配置      ############
############################################

#org.quartz.jobStore.class : org.quartz.simpl.RAMJobStore

org.quartz.jobStore.class : org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass : org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix : QRTZ_
org.quartz.jobStore.dataSource : myDS
org.quartz.jobStore.useProperties : true
org.quartz.jobStore.misfireThreshold: 60000


############################################
#############    数据库连接      ############
############################################

org.quartz.dataSource.myDS.connectionProvider.class : com.anson.examples.jdbc_example.MyPoolingconnectionProvider

org.quartz.dataSource.myDS.driver : com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.url : jdbc:mysql://localhost:3306/myDatabase?useUnicode=true&characterEncoding=utf-8
org.quartz.dataSource.myDS.user : chengxumiao
org.quartz.dataSource.myDS.password : chengxumiao
org.quartz.dataSource.myDS.maxConnections : 10

执行结果

[INFO] 14 九月 09:28:31.672 下午 main [org.quartz.impl.StdSchedulerFactory]
Quartz scheduler version: 2.2.3

获取连接
[INFO] 14 九月 09:28:31.731 下午 main [com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource]
Initializing c3p0 pool... com.mchange.v2.c3p0.ComboPooledDataSource [ acquireIncrement -> 3, acquireRetryAttempts -> 30, acquireRetryDelay -> 1000, autoCommitOnClose -> false, automaticTestTable -> null, breakAfterAcquireFailure -> false, checkoutTimeout -> 0, connectionCustomizerClassName -> null, connectionTesterClassName -> com.mchange.v2.c3p0.impl.DefaultConnectionTester, dataSourceName -> 1hge6xt9qzobdptp7huv|3abfe836, debugUnreturnedConnectionStackTraces -> false, description -> null, driverClass -> com.mysql.jdbc.Driver, factoryClassLocation -> null, forceIgnoreUnresolvedTransactions -> false, identityToken -> 1hge6xt9qzobdptp7huv|3abfe836, idleConnectionTestPeriod -> 0, initialPoolSize -> 3, jdbcUrl -> jdbc:mysql://localhost:3306/myDatabase?useUnicode=true&characterEncoding=utf-8, lastAcquisitionFailureDefaultUser -> null, maxAdministrativeTaskTime -> 0, maxConnectionAge -> 0, maxIdleTime -> 0, maxIdleTimeExcessConnections -> 0, maxPoolSize -> 10, maxStatements -> 0, maxStatementsPerConnection -> 0, minPoolSize -> 1, numHelperThreads -> 3, numThreadsAwaitingCheckoutDefaultUser -> 0, preferredTestQuery -> null, properties -> {user=******, password=******}, propertyCycle -> 0, testConnectionOnCheckin -> false, testConnectionOnCheckout -> false, unreturnedConnectionTimeout -> 0, usesTraditionalReflectiveProxies -> false ]

------- 初始化完成 -----------
------- Scheduling Job  -------------------
------- 当前时间:2017-09-14 21:28:32 -----------------
获取连接
group1.job1 开始job运行时间:2017-09-14 21:29:00
获取连接
[INFO] 14 九月 09:28:32.202 下午 main [org.quartz.impl.jdbcjobstore.JobStoreTX]
Freed 0 triggers from 'acquired' / 'blocked' state.

[INFO] 14 九月 09:28:32.204 下午 main [org.quartz.impl.jdbcjobstore.JobStoreTX]
Recovering 0 jobs that were in-progress at the time of the last shut-down.

[INFO] 14 九月 09:28:32.204 下午 main [org.quartz.impl.jdbcjobstore.JobStoreTX]
Recovery complete.

[INFO] 14 九月 09:28:32.205 下午 main [org.quartz.impl.jdbcjobstore.JobStoreTX]
Removed 0 'complete' triggers.

[INFO] 14 九月 09:28:32.206 下午 main [org.quartz.impl.jdbcjobstore.JobStoreTX]
Removed 0 stale fired job entries.

获取连接
[INFO] 14 九月 09:28:32.207 下午 main [org.quartz.core.QuartzScheduler]
Scheduler MyQuartzScheduler_$_NON_CLUSTERED started.

------- 开始调度器 Scheduler -----------------
------- 等待5分钟... -------------
获取连接
获取连接
获取连接

任务key group1.job1执行时间:2017-09-14 21:29:00
获取连接
获取连接
获取连接

任务key group1.job1执行时间:2017-09-14 21:29:15
获取连接
获取连接
获取连接

任务key group1.job1执行时间:2017-09-14 21:29:30
获取连接
获取连接
获取连接
获取连接

任务key group1.job1执行时间:2017-09-14 21:29:45
获取连接
获取连接
获取连接

任务key group1.job1执行时间:2017-09-14 21:30:00
获取连接
获取连接
获取连接

任务key group1.job1执行时间:2017-09-14 21:30:15
获取连接
获取连接
获取连接
获取连接
获取连接
获取连接
获取连接
获取连接
获取连接
获取连接
获取连接
------- 关闭调度器 ---------------------
[INFO] 14 九月 09:33:32.211 下午 main [org.quartz.core.QuartzScheduler]
Scheduler MyQuartzScheduler_$_NON_CLUSTERED shutting down.

[INFO] 14 九月 09:33:32.211 下午 main [org.quartz.core.QuartzScheduler]
Scheduler MyQuartzScheduler_$_NON_CLUSTERED paused.

获取连接
关闭连接
[INFO] 14 九月 09:33:32.618 下午 main [org.quartz.core.QuartzScheduler]
Scheduler MyQuartzScheduler_$_NON_CLUSTERED shutdown complete.
------- 关闭完成 -----------------
~~~~~~~~~~  执行了 6 个 jobs.

运行示例时,注意观察数据库表的变化,新创建好的11张表中是没有数据的。

当任务创建好之后,如果不能立即执行,那么会将此任务存储在表中,时间到后,任务执行过会将此任务的相关表数据删除。

简单的10个字概括就是:未执行,插入执行过,删除


赞(52) 打赏
未经允许不得转载:优客志 » JAVA开发
分享到:

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏