Temporal
依靠 temporal 作为调度引擎,开发调度任务的管理后台和 web,将 temporal 变成类似 xxl-job 的调度系统。本文介绍相关核心信息
集成方式:
temporal 提供了 2 种启动 workflow 和 activity 的方式:基于接口的 type 方式,以及基于名称的 untyped 方式。
- Workflow
- WorkflowClient#newWorkflowStub
- WorkflowClient#newUntypedWorkflowStub
- Activity
- Workflow#newActivityStub
- Workflow#newUntypedActivityStub
从依赖的 sdk 上,有 2 种方式:
- temporal
- temporal-spring-boot-starter。
temporal 方式
public class TemporalUtil {
public static WorkflowClient createWorkflowClient(final String temporalHost, final String namespace) {
WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
.setTarget(temporalHost)
.setEnableKeepAlive(true)
.setKeepAliveTime(Duration.ofMinutes(1L))
.setKeepAliveTimeout(Duration.ofMinutes(5L))
.build();
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(options);
WorkflowClientOptions workflowClientOptions = WorkflowClientOptions.newBuilder()
.setNamespace(namespace)
.validateAndBuildWithDefaults();
return WorkflowClient.newInstance(service, workflowClientOptions);
}
public static ScheduleClient createScheduleClient(final String temporalHost, final String namespace) {
WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
.setTarget(temporalHost)
.setEnableKeepAlive(true)
.setKeepAliveTime(Duration.ofMinutes(1L))
.setKeepAliveTimeout(Duration.ofMinutes(5L))
.build();
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(options);
ScheduleClientOptions scheduleClientOptions = ScheduleClientOptions.newBuilder()
.setNamespace(namespace)
.build();
return ScheduleClient.newInstance(service, scheduleClientOptions);
}
}
启动 workflow:
# 接口方式:
HelloWorkflow workflow =
client.newWorkflowStub(
HelloWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue("HelloSampleTaskQueue")
.setWorkflowId("HelloSample")
.build());
workflow.sayHello(person);
# 非接口方式
WorkflowStub workflowStub =
client.newUntypedWorkflowStub(
HelloWorkflow.class.getSimpleName(),
WorkflowOptions.newBuilder()
.setTaskQueue("HelloSampleTaskQueue")
.setWorkflowId("UntypedHelloSample")
.build());
WorkflowExecution execution = workflowStub.start(person);
log.info("工作流启动! workflowId: {}, runId: {}", execution.getWorkflowId(), execution.getRunId());
String result = workflowStub.getResult(String.class);
启动 activity:
# 接口方式:
public class HelloWorkflowImpl implements HelloWorkflow {
private HelloActivity activity =
Workflow.newActivityStub(
HelloActivity.class,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());
@Override
public String sayHello(Person person) {
return activity.hello(person);
}
}
# 非接口方式
public class HelloWorkflowImpl implements HelloWorkflow {
private ActivityStub untypedActivity =
Workflow.newUntypedActivityStub(
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(2))
.build());
@Override
public String sayHello(Person person) {
// HelloActivity#hello() 方法名,经过 capitalized 处理
return untypedActivity.execute("Hello", String.class, person);
}
}
temporal-spring-boot-starter 方式
配置方式
自动发现方式
定制配置
Schedule 开发
temporal 对定时任务的支持。
Cron Job方式
参考文档:What is a Temporal Cron Job?
HelloWorkflow workflow =
client.newWorkflowStub(
HelloWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue("HelloSampleTaskQueue")
.setWorkflowId("HelloSample")
.setCronSchedule("@every 10s")
.build());
只需在 workflow 的配置上添加 cron 表达式即可。temporal 采用的 cron 表达式和 quartz 的表达式有所区别。cron 表达式配置参考:Cron Schedules
Schedule 方式
参考文档:What is a Schedule?
Schedule 是 temporal 推出的比 CronJob 更灵活,更友好的定时任务使用方式,推荐用户采用这种方式。
public class ScheduleController {
@Autowired
private ScheduleClient client;
public void start(String schedulerId) {
Person person = new Person();
person.setFirstName("start");
person.setLastName("schedule");
Schedule schedule =
Schedule.newBuilder()
.setAction(
ScheduleActionStartWorkflow.newBuilder()
.setWorkflowType(HelloWorkflow.class)
.setArguments(person)
.setOptions(
WorkflowOptions.newBuilder()
.setWorkflowId("HelloWorkflow")
.setTaskQueue("HelloSampleTaskQueue")
.build())
.build())
.setSpec(
ScheduleSpec.newBuilder()
.setIntervals(Arrays.asList(new ScheduleIntervalSpec(Duration.ofSeconds(10L))))
.setStartAt(Instant.now())
.setEndAt(Instant.now().plus(10, ChronoUnit.MINUTES))
.build())
.build();
client.createSchedule(schedulerId, schedule, ScheduleOptions.newBuilder().build());
}
public void delete(String schedulerId) {
ScheduleHandle handle = client.getHandle(schedulerId);
handle.delete();
}
public void pause(String schedulerId) {
ScheduleHandle handle = client.getHandle(schedulerId);
handle.pause("暂停");
}
public void unpause(String schedulerId) {
ScheduleHandle handle = client.getHandle(schedulerId);
handle.unpause("解除暂停");
}
public void trigger(String schedulerId) {
ScheduleHandle handle = client.getHandle(schedulerId);
handle.trigger();
}
public ScheduleDescription getStatus(String schedulerId) {
ScheduleHandle handle = client.getHandle(schedulerId);
return handle.describe();
}
}
调度系统
调度系统有多种使用方式,嵌入式和作为中间件独立部署。
在以 temporal 开发调度系统时,有如下角色:
- temporal。部署的 temporal 集群
- schedule-admin。以 temporal 为调度引擎开发的调度任务管理系统,对调度任务进行分组管理,提供调度任务的启动|删除,暂停|恢复,触发执行功能。
- 应用。接入 temporal 调度的应用。按照 temporal workflow 的方式开发,将业务逻辑填入 workflow 实现,并注册到 temporal。
在 schedule-admin 上启动任务后,temporal 开始按照设定的调度频率触发应用中开发的 workflow 执行。
schedule-admin 中提交 workflow 方式有 2 种方式:
- 定义通用的 workflow 接口。
JobWorkflow#execute(java.lang.String)
。应用统统实现这个通用的 workflow 接口,不同的应用,不同的任务使用 task queue 进行隔离:WorkflowOptions#setTaskQueue
。
- 使用 untyped workflow 方式。用户提供应用中的 workflow 名称,schedule-admin 调度执行。
多 namespace 支持。temporal 支持 namespace 作为 workflow 隔离方式,可以对 workflow 进行分组管理。schedule-admin 也可以支持不同的 namespace,对 workflow 进行隔离:
- 环境。比如 dev、daily、gray 和 prod
- 应用。如 data-center、upload-center、export-center
temporal-spring-boot-starter 不支持多 namespace,只支持单 temporal。
多 temporal 实例支持。对于不同的业务和环境,也可能部署多个 temporal 实例。