Spring 定时器 Scheduling

启用Scheduling

@EnableScheduling

@SpringBootApplication
@EnableScheduling
public class AppMain {
	
    public static void main(String[] args) throws Exception {
        SpringApplication.run(AppMain.class, args);
    }
}

@Scheduled 注解

org.springframework.scheduling.annotation.Scheduled

/*
 * Copyright 2002-2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
package org.springframework.scheduling.annotation;
 
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;
 
import org.springframework.aot.hint.annotation.Reflective;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
 
/**
 * Annotation that marks a method to be scheduled. For periodic tasks, exactly one
 * of the {@link #cron}, {@link #fixedDelay}, or {@link #fixedRate} attributes
 * must be specified, and additionally an optional {@link #initialDelay}.
 * For a one-time task, it is sufficient to just specify an {@link #initialDelay}.
 *
 * <p>The annotated method must not accept arguments. It will typically have
 * a {@code void} return type; if not, the returned value will be ignored
 * when called through the scheduler.
 *
 * <p>Methods that return a reactive {@code Publisher} or a type which can be adapted
 * to {@code Publisher} by the default {@code ReactiveAdapterRegistry} are supported.
 * The {@code Publisher} must support multiple subsequent subscriptions. The returned
 * {@code Publisher} is only produced once, and the scheduling infrastructure then
 * periodically subscribes to it according to configuration. Values emitted by
 * the publisher are ignored. Errors are logged at {@code WARN} level, which
 * doesn't prevent further iterations. If a fixed delay is configured, the
 * subscription is blocked in order to respect the fixed delay semantics.
 *
 * <p>Kotlin suspending functions are also supported, provided the coroutine-reactor
 * bridge ({@code kotlinx.coroutine.reactor}) is present at runtime. This bridge is
 * used to adapt the suspending function to a {@code Publisher} which is treated
 * the same way as in the reactive method case (see above).
 *
 * <p>Processing of {@code @Scheduled} annotations is performed by registering a
 * {@link ScheduledAnnotationBeanPostProcessor}. This can be done manually or,
 * more conveniently, through the {@code <task:annotation-driven/>} XML element
 * or {@link EnableScheduling @EnableScheduling} annotation.
 *
 * <p>This annotation can be used as a <em>{@linkplain Repeatable repeatable}</em>
 * annotation. If several scheduled declarations are found on the same method,
 * each of them will be processed independently, with a separate trigger firing
 * for each of them. As a consequence, such co-located schedules may overlap
 * and execute multiple times in parallel or in immediate succession.
 *
 * <p>This annotation may be used as a <em>meta-annotation</em> to create custom
 * <em>composed annotations</em> with attribute overrides.
 *
 * @author Mark Fisher
 * @author Juergen Hoeller
 * @author Dave Syer
 * @author Chris Beams
 * @author Victor Brown
 * @author Sam Brannen
 * @since 3.0
 * @see EnableScheduling
 * @see ScheduledAnnotationBeanPostProcessor
 * @see Schedules
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
@Reflective
public @interface Scheduled {
 
	/**
	 * A special cron expression value that indicates a disabled trigger: {@value}.
	 * <p>This is primarily meant for use with <code>${...}</code> placeholders,
	 * allowing for external disabling of corresponding scheduled methods.
	 * @since 5.1
	 * @see ScheduledTaskRegistrar#CRON_DISABLED
	 */
	String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
 
 
	/**
	 * A cron-like expression, extending the usual UN*X definition to include triggers
	 * on the second, minute, hour, day of month, month, and day of week.
	 * <p>For example, {@code "0 * * * * MON-FRI"} means once per minute on weekdays
	 * (at the top of the minute - the 0th second).
	 * <p>The fields read from left to right are interpreted as follows.
	 * <ul>
	 * <li>second</li>
	 * <li>minute</li>
	 * <li>hour</li>
	 * <li>day of month</li>
	 * <li>month</li>
	 * <li>day of week</li>
	 * </ul>
	 * <p>The special value {@link #CRON_DISABLED "-"} indicates a disabled cron
	 * trigger, primarily meant for externally specified values resolved by a
	 * <code>${...}</code> placeholder.
	 *
	 * 使用 Cron 表达式定义执行计划
	 *
	 * @return an expression that can be parsed to a cron schedule
	 * @see org.springframework.scheduling.support.CronExpression#parse(String)
	 */
	String cron() default "";
 
	/**
	 * A time zone for which the cron expression will be resolved. By default, this
	 * attribute is the empty String (i.e. the server's local time zone will be used).
	 *
	 * 解析cron表达式所使用的时区。默认情况下,此属性是空字符串 (即使用服务器的本地时区)。
	 *
	 * @return a zone id accepted by {@link java.util.TimeZone#getTimeZone(String)},
	 * or an empty String to indicate the server's default time zone
	 * @since 4.0
	 * @see org.springframework.scheduling.support.CronTrigger#CronTrigger(String, java.util.TimeZone)
	 * @see java.util.TimeZone
	 */
	String zone() default "";
 
	/**
	 * Execute the annotated method with a fixed period between the end of the
	 * last invocation and the start of the next.
	 * 在上次调用结束和下一次调用开始之间的固定周期(会等待上一次调用完成)
	 *
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * @return the delay
	 */
	long fixedDelay() default -1;
 
	/**
	 * Execute the annotated method with a fixed period between the end of the
	 * last invocation and the start of the next.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * <p>This attribute variant supports Spring-style "${...}" placeholders
	 * as well as SpEL expressions.
	 * xxxString 带String 表示它支持变量表达式(EL), 换句话说每次可以动态的计算变量实现修改间隔时间
	 * @return the delay as a String value &mdash; for example, a placeholder
	 * or a {@link java.time.Duration#parse java.time.Duration} compliant value
	 * @since 3.2.2
	 * @see #fixedDelay()
	 */
	String fixedDelayString() default "";
 
	/**
	 * Execute the annotated method with a fixed period between invocations.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * 该属性的含义是**上一个调用开始**后再次调用的延时(不用等待上一次调用完成), 这可能会重复执行
	 * {@link #timeUnit}.
	 * @return the period
	 */
	long fixedRate() default -1;
 
	/**
	 * Execute the annotated method with a fixed period between invocations.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * <p>This attribute variant supports Spring-style "${...}" placeholders
	 * as well as SpEL expressions.
	 * @return the period as a String value &mdash; for example, a placeholder
	 * or a {@link java.time.Duration#parse java.time.Duration} compliant value
	 * @since 3.2.2
	 * @see #fixedRate()
	 */
	String fixedRateString() default "";
 
	/**
	 * Number of units of time to delay before the first execution of a
	 * {@link #fixedRate} or {@link #fixedDelay} task.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 *
	 * 该属性的作用是第一次执行延迟时间, 只是做延迟的设定, 并不会控制其他逻辑, 所以要配合fixedDelay或者fixedRate来使用
	 *
	 * @return the initial
	 * @since 3.2
	 */
	long initialDelay() default -1;
 
	/**
	 * Number of units of time to delay before the first execution of a
	 * {@link #fixedRate} or {@link #fixedDelay} task.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * <p>This attribute variant supports Spring-style "${...}" placeholders
	 * as well as SpEL expressions.
	 * @return the initial delay as a String value &mdash; for example, a placeholder
	 * or a {@link java.time.Duration#parse java.time.Duration} compliant value
	 * @since 3.2.2
	 * @see #initialDelay()
	 */
	String initialDelayString() default "";
 
	/**
	 * The {@link TimeUnit} to use for {@link #fixedDelay}, {@link #fixedDelayString},
	 * {@link #fixedRate}, {@link #fixedRateString}, {@link #initialDelay}, and
	 * {@link #initialDelayString}.
	 * <p>Defaults to {@link TimeUnit#MILLISECONDS}.
	 * <p>This attribute is ignored for {@linkplain #cron() cron expressions}
	 * and for {@link java.time.Duration} values supplied via {@link #fixedDelayString},
	 * {@link #fixedRateString}, or {@link #initialDelayString}.
	 * @return the {@code TimeUnit} to use
	 * @since 5.3.10
	 */
	TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
 
	/**
	 * A qualifier for determining a scheduler to run this scheduled method on.
	 * <p>Defaults to an empty String, suggesting the default scheduler.
	 * <p>May be used to determine the target scheduler to be used,
	 * matching the qualifier value (or the bean name) of a specific
	 * {@link org.springframework.scheduling.TaskScheduler} or
	 * {@link java.util.concurrent.ScheduledExecutorService} bean definition.
	 * @since 6.1
	 * @see org.springframework.scheduling.SchedulingAwareRunnable#getQualifier()
	 */
	String scheduler() default "";
 
}
 
  • cron:通过cron表达式来配置执行规则
  • zone:cron表达式解析时使用的时区
  • fixedDelay:上一次执行结束到下一次执行开始的间隔时间(单位:ms)
  • fixedDelayString:上一次任务执行结束到下一次执行开始的间隔时间,使用java.time.Duration#parse解析
  • fixedRate:以固定间隔执行任务,即上一次任务执行开始到下一次执行开始的间隔时间(单位:ms),若在调度任务执行时,上一次任务还未执行完毕,会加入worker队列,等待上一次执行完成后立即执行下一次任务
  • fixedRateString:与fixedRate逻辑一致,只是使用java.time.Duration#parse解析
  • initialDelay:首次任务执行的延迟时间
  • initialDelayString:首次任务执行的延迟时间,使用java.time.Duration#parse解析

在线 cron 表达式生成 - https://tool.lu/crontab/

动态任务调度调度器

人生不如意之事十有八九,写代码亦是如此,以上是基于注解,配置固定死了, 虽然xxxString可以支持部分动态,但没有办法动态的加载和卸载定时任务, 也没有cronString方法

在spring 中有个org.springframework.scheduling.TaskScheduler 接口, 它有一系列的 scheduleXXX 方法, 也有支持 cron 表达式的 Trigger

org.springframework.scheduling.TaskScheduler

/**
 * Schedule the given {@link Runnable}, invoking it whenever the trigger
 * indicates a next execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param trigger an implementation of the {@link Trigger} interface,
 * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
 * wrapping a cron expression
 * @return a {@link ScheduledFuture} representing pending completion of the task,
 * or {@code null} if the given Trigger object never fires (i.e. returns
 * {@code null} from {@link Trigger#nextExecutionTime})
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * @see org.springframework.scheduling.support.CronTrigger
 */
@Nullable
ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

找一个线程池的实现 org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler 封装一下

 
@Component
public class DynamicTaskScheduler {
    private static final Logger log = LoggerFactory.getLogger(DynamicTaskScheduler.class);
 
    private final ThreadPoolTaskScheduler taskScheduler;
    private final Map<String, ScheduledFuture<?>> taskMap = new ConcurrentHashMap<>();
    
    public DynamicTaskScheduler() {
        this.taskScheduler = new ThreadPoolTaskScheduler();
        this.taskScheduler.setPoolSize(1); // 设置线程池大小
        this.taskScheduler.setThreadNamePrefix("dynamic-task-");
        this.taskScheduler.initialize();
    }
    
    /**
     * 添加/更新定时任务
     */
    public void scheduleTask(String taskId, String cronExpression, Runnable task) {
        log.info("添加/更新任务: {}", taskId);
        // 如果任务已存在,先取消
        if (taskMap.containsKey(taskId)) {
            log.info("任务: {} 已存在,取消任务", taskId);
            taskMap.get(taskId).cancel(true);
        }
        // 创建新的定时任务
        ScheduledFuture<?> future = taskScheduler.schedule(task, 
            new CronTrigger(cronExpression));
        
        taskMap.put(taskId, future);
    }
    
    /**
     * 取消定时任务
     */
    public void cancelTask(String taskId) {
        log.info("取消任务: {}", taskId);
        if (taskMap.containsKey(taskId)) {
            taskMap.get(taskId).cancel(true);
            taskMap.remove(taskId);
        }
    }
    
    /**
     * 获取所有任务ID
     */
    public Set<String> getTaskIds() {
        return taskMap.keySet();
    }
    
    /**
     * 检查任务是否存在
     */
    public boolean containsTask(String taskId) {
        return taskMap.containsKey(taskId);
    }
}

用法

final String cron = taskConfig.getCron();
String startdate = DateTime.now().toString();
dynamicTaskScheduler.scheduleTask(TASK_ID, cron, ()->{
	try {
		log.info("开始执行任务 {}", startdate);
		Thread.sleep(5000);
		log.info("任务执行完毕 {}", startdate);
	} catch (InterruptedException e) {
		throw new RuntimeException(e);
	}
});