Temporal

依靠 temporal 作为调度引擎,开发调度任务的管理后台和 web,将 temporal 变成类似 xxl-job 的调度系统。本文介绍相关核心信息

集成方式:

temporal 提供了 2 种启动 workflow 和 activity 的方式:基于接口的 type 方式,以及基于名称的 untyped 方式。

  • Workflow
    • WorkflowClient#newWorkflowStub
    • WorkflowClient#newUntypedWorkflowStub
  • Activity
    • Workflow#newActivityStub
    • Workflow#newUntypedActivityStub

从依赖的 sdk 上,有 2 种方式:

  • temporal
  • temporal-spring-boot-starter。

temporal 方式

public class TemporalUtil {

    public static WorkflowClient createWorkflowClient(final String temporalHost, final String namespace) {
        WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
                .setTarget(temporalHost)
                .setEnableKeepAlive(true)
                .setKeepAliveTime(Duration.ofMinutes(1L))
                .setKeepAliveTimeout(Duration.ofMinutes(5L))
                .build();
        WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(options);

        WorkflowClientOptions workflowClientOptions = WorkflowClientOptions.newBuilder()
                .setNamespace(namespace)
                .validateAndBuildWithDefaults();
        return WorkflowClient.newInstance(service, workflowClientOptions);
    }

    public static ScheduleClient createScheduleClient(final String temporalHost, final String namespace) {
        WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
                .setTarget(temporalHost)
                .setEnableKeepAlive(true)
                .setKeepAliveTime(Duration.ofMinutes(1L))
                .setKeepAliveTimeout(Duration.ofMinutes(5L))
                .build();
        WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(options);

        ScheduleClientOptions scheduleClientOptions = ScheduleClientOptions.newBuilder()
                .setNamespace(namespace)
                .build();
        return ScheduleClient.newInstance(service, scheduleClientOptions);
    }
}

启动 workflow:

# 接口方式:
HelloWorkflow workflow =
        client.newWorkflowStub(
                HelloWorkflow.class,
                WorkflowOptions.newBuilder()
                        .setTaskQueue("HelloSampleTaskQueue")
                        .setWorkflowId("HelloSample")
                        .build());

workflow.sayHello(person);

# 非接口方式
WorkflowStub workflowStub =
        client.newUntypedWorkflowStub(
                HelloWorkflow.class.getSimpleName(),
                WorkflowOptions.newBuilder()
                        .setTaskQueue("HelloSampleTaskQueue")
                        .setWorkflowId("UntypedHelloSample")
                        .build());

WorkflowExecution execution = workflowStub.start(person);
log.info("工作流启动! workflowId: {}, runId: {}", execution.getWorkflowId(), execution.getRunId());
String result = workflowStub.getResult(String.class);

启动 activity:

# 接口方式:
public class HelloWorkflowImpl implements HelloWorkflow {

    private HelloActivity activity =
            Workflow.newActivityStub(
                    HelloActivity.class,
                    ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());

    @Override
    public String sayHello(Person person) {
        return activity.hello(person);
    }
}

# 非接口方式
public class HelloWorkflowImpl implements HelloWorkflow {

  private ActivityStub untypedActivity =
          Workflow.newUntypedActivityStub(
                  ActivityOptions.newBuilder()
                          .setStartToCloseTimeout(Duration.ofSeconds(2))
                          .build());

  @Override
  public String sayHello(Person person) {
    // HelloActivity#hello() 方法名,经过 capitalized 处理
    return untypedActivity.execute("Hello", String.class, person);
  }
}

temporal-spring-boot-starter 方式

配置方式

自动发现方式

定制配置

Schedule 开发

temporal 对定时任务的支持。

Cron Job方式

参考文档:What is a Temporal Cron Job?

HelloWorkflow workflow = client.newWorkflowStub( HelloWorkflow.class, WorkflowOptions.newBuilder() .setTaskQueue("HelloSampleTaskQueue") .setWorkflowId("HelloSample") .setCronSchedule("@every 10s") .build());

只需在 workflow 的配置上添加 cron 表达式即可。temporal 采用的 cron 表达式和 quartz 的表达式有所区别。cron 表达式配置参考:Cron Schedules

Schedule 方式

参考文档:What is a Schedule?

Schedule 是 temporal 推出的比 CronJob 更灵活,更友好的定时任务使用方式,推荐用户采用这种方式。

public class ScheduleController {

    @Autowired
    private ScheduleClient client;

    public void start(String schedulerId) {
        Person person = new Person();
        person.setFirstName("start");
        person.setLastName("schedule");

        Schedule schedule =
                Schedule.newBuilder()
                        .setAction(
                                ScheduleActionStartWorkflow.newBuilder()
                                        .setWorkflowType(HelloWorkflow.class)
                                        .setArguments(person)
                                        .setOptions(
                                                WorkflowOptions.newBuilder()
                                                        .setWorkflowId("HelloWorkflow")
                                                        .setTaskQueue("HelloSampleTaskQueue")
                                                        .build())
                                        .build())
                        .setSpec(
                                ScheduleSpec.newBuilder()
                                        .setIntervals(Arrays.asList(new ScheduleIntervalSpec(Duration.ofSeconds(10L))))
                                        .setStartAt(Instant.now())
                                        .setEndAt(Instant.now().plus(10, ChronoUnit.MINUTES))
                                        .build())
                        .build();

        client.createSchedule(schedulerId, schedule, ScheduleOptions.newBuilder().build());
    }

    public void delete(String schedulerId) {
        ScheduleHandle handle = client.getHandle(schedulerId);
        handle.delete();
    }

    public void pause(String schedulerId) {
        ScheduleHandle handle = client.getHandle(schedulerId);
        handle.pause("暂停");
    }

    public void unpause(String schedulerId) {
        ScheduleHandle handle = client.getHandle(schedulerId);
        handle.unpause("解除暂停");
    }

    public void trigger(String schedulerId) {
        ScheduleHandle handle = client.getHandle(schedulerId);
        handle.trigger();
    }

    public ScheduleDescription getStatus(String schedulerId) {
        ScheduleHandle handle = client.getHandle(schedulerId);
        return handle.describe();
    }

}

调度系统

调度系统有多种使用方式,嵌入式和作为中间件独立部署。

在以 temporal 开发调度系统时,有如下角色:

  • temporal。部署的 temporal 集群
  • schedule-admin。以 temporal 为调度引擎开发的调度任务管理系统,对调度任务进行分组管理,提供调度任务的启动|删除,暂停|恢复,触发执行功能。
  • 应用。接入 temporal 调度的应用。按照 temporal workflow 的方式开发,将业务逻辑填入 workflow 实现,并注册到 temporal。

在 schedule-admin 上启动任务后,temporal 开始按照设定的调度频率触发应用中开发的 workflow 执行。

schedule-admin 中提交 workflow 方式有 2 种方式:

  • 定义通用的 workflow 接口。JobWorkflow#execute(java.lang.String)。应用统统实现这个通用的 workflow 接口,不同的应用,不同的任务使用 task queue 进行隔离:WorkflowOptions#setTaskQueue
  • 使用 untyped workflow 方式。用户提供应用中的 workflow 名称,schedule-admin 调度执行。

多 namespace 支持。temporal 支持 namespace 作为 workflow 隔离方式,可以对 workflow 进行分组管理。schedule-admin 也可以支持不同的 namespace,对 workflow 进行隔离:

  • 环境。比如 dev、daily、gray 和 prod
  • 应用。如 data-center、upload-center、export-center

temporal-spring-boot-starter 不支持多 namespace,只支持单 temporal。

多 temporal 实例支持。对于不同的业务和环境,也可能部署多个 temporal 实例。