聊聊flink的ScheduledExecutor

聊聊flink的ScheduledExecutor序本文主要研究一下flink的ScheduledExecutorExecutorjava.base/java/util/concurrent/E

大家好,欢迎来到IT知识分享网。

本文主要研究一下flink的ScheduledExecutor

聊聊flink的ScheduledExecutor

Executor

java.base/java/util/concurrent/Executor.java

 public interface Executor { ​ /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); } 
  • jdk的Executor接口定义了execute方法,接收参数类型为Runnable

ScheduledExecutor

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java

 public interface ScheduledExecutor extends Executor { ​ /** * Executes the given command after the given delay. * * @param command the task to execute in the future * @param delay the time from now to delay the execution * @param unit the time unit of the delay parameter * @return a ScheduledFuture representing the completion of the scheduled task */ ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); ​ /** * Executes the given callable after the given delay. The result of the callable is returned * as a {@link ScheduledFuture}. * * @param callable the callable to execute * @param delay the time from now to delay the execution * @param unit the time unit of the delay parameter * @param <V> result type of the callable * @return a ScheduledFuture which holds the future value of the given callable */ <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); ​ /** * Executes the given command periodically. The first execution is started after the * {@code initialDelay}, the second execution is started after {@code initialDelay + period}, * the third after {@code initialDelay + 2*period} and so on. * The task is executed until either an execution fails, or the returned {@link ScheduledFuture} * is cancelled. * * @param command the task to be executed periodically * @param initialDelay the time from now until the first execution is triggered * @param period the time after which the next execution is triggered * @param unit the time unit of the delay and period parameter * @return a ScheduledFuture representing the periodic task. This future never completes * unless an execution of the given task fails or if the future is cancelled */ ScheduledFuture<?> scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit); ​ /** * Executed the given command repeatedly with the given delay between the end of an execution * and the start of the next execution. * The task is executed repeatedly until either an exception occurs or if the returned * {@link ScheduledFuture} is cancelled. * * @param command the task to execute repeatedly * @param initialDelay the time from now until the first execution is triggered * @param delay the time between the end of the current and the start of the next execution * @param unit the time unit of the initial delay and the delay parameter * @return a ScheduledFuture representing the repeatedly executed task. This future never * completes unless the execution of the given task fails or if the future is cancelled */ ScheduledFuture<?> scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit); } 
  • ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter

ScheduledExecutorServiceAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java

 public class ScheduledExecutorServiceAdapter implements ScheduledExecutor { ​ private final ScheduledExecutorService scheduledExecutorService; ​ public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) { this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); } ​ @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return scheduledExecutorService.schedule(command, delay, unit); } ​ @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return scheduledExecutorService.schedule(callable, delay, unit); } ​ @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit); } ​ @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit); } ​ @Override public void execute(Runnable command) { scheduledExecutorService.execute(command); } } 
  • ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、
  • scheduleWithFixedDelay、execute方法

ActorSystemScheduledExecutorAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java

 public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor { ​ private final ActorSystem actorSystem; ​ public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) { this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService"); } ​ @Override @Nonnull public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L); ​ Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); ​ scheduledFutureTask.setCancellable(cancellable); ​ return scheduledFutureTask; } ​ @Override @Nonnull public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L); ​ Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); ​ scheduledFutureTask.setCancellable(cancellable); ​ return scheduledFutureTask; } ​ @Override @Nonnull public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>( command, triggerTime(unit.toNanos(initialDelay)), unit.toNanos(period)); ​ Cancellable cancellable = actorSystem.scheduler().schedule( new FiniteDuration(initialDelay, unit), new FiniteDuration(period, unit), scheduledFutureTask, actorSystem.dispatcher()); ​ scheduledFutureTask.setCancellable(cancellable); ​ return scheduledFutureTask; } ​ @Override @Nonnull public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>( command, triggerTime(unit.toNanos(initialDelay)), unit.toNanos(-delay)); ​ Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit); ​ scheduledFutureTask.setCancellable(cancellable); ​ return scheduledFutureTask; } ​ @Override public void execute(@Nonnull Runnable command) { actorSystem.dispatcher().execute(command); } ​ private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) { return actorSystem.scheduler().scheduleOnce( new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); } ​ private long now() { return System.nanoTime(); } ​ private long triggerTime(long delay) { return now() + delay; } ​ private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { ​ private long time; ​ private final long period; ​ private volatile Cancellable cancellable; ​ ScheduledFutureTask(Callable<V> callable, long time, long period) { super(callable); this.time = time; this.period = period; } ​ ScheduledFutureTask(Runnable runnable, long time, long period) { super(runnable, null); this.time = time; this.period = period; } ​ public void setCancellable(Cancellable newCancellable) { this.cancellable = newCancellable; } ​ @Override public void run() { if (!isPeriodic()) { super.run(); } else if (runAndReset()){ if (period > 0L) { time += period; } else { cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS); ​ // check whether we have been cancelled concurrently if (isCancelled()) { cancellable.cancel(); } else { time = triggerTime(-period); } } } } ​ @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = super.cancel(mayInterruptIfRunning); ​ return result && cancellable.cancel(); } ​ @Override public long getDelay(@Nonnull TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } ​ @Override public int compareTo(@Nonnull Delayed o) { if (o == this) { return 0; } ​ long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0; } ​ @Override public boolean isPeriodic() { return period != 0L; } } } 
  • ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法
  • schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果
  • scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period

小结

  • ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter
  • ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、scheduleWithFixedDelay、execute方法
  • ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法;schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果;scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period

doc

  • ScheduledExecutor
  • ScheduledExecutorServiceAdapter
  • ActorSystemScheduledExecutorAdapter

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/52350.html

(0)
上一篇 2024-05-17 15:26
下一篇 2024-08-31 09:15

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信