package cn.sliew.flink.demo;
import cn.sliew.flink.demo.dw.base.util.ParameterToolUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Arrays;
import java.util.Date;
public class TableDataStreamDemoJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterToolUtil.createParameterTool(args);
env.getConfig().setGlobalJobParameters(parameterTool);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env, settings);
SingleOutputStreamOperator<Order> leftStream = getLeftStream(env);
SingleOutputStreamOperator<Refund> rightStream = getRightStream(env);
Schema leftSchema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("userId", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("timestampLong", DataTypes.BIGINT())
.columnByExpression("order_time", "TO_TIMESTAMP_LTZ(timestampLong, 3)")
.watermark("order_time", "order_time - INTERVAL '5' SECOND")
.build();
Schema rightSchema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("orderId", DataTypes.INT())
.column("timestampLong", DataTypes.BIGINT())
.columnByExpression("refund_time", "TO_TIMESTAMP_LTZ(timestampLong, 3)")
.watermark("refund_time", "refund_time - INTERVAL '5' SECOND")
.build();
streamTableEnv.createTemporaryView("orders", leftStream, leftSchema);
streamTableEnv.createTemporaryView("refunds", rightStream, rightSchema);
String leftIntervalJoin = """
SELECT
orders.id as order_id,
orders.userId as user_id,
orders.name as name,
DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm:ss') as order_time_str,
refunds.id as refund_id,
DATE_FORMAT(refund_time, 'yyyy-MM-dd HH:mm:ss') as refund_time_str
FROM orders LEFT JOIN refunds ON orders.id = refunds.orderId
AND orders.order_time BETWEEN refunds.refund_time - INTERVAL '1' MINUTE AND refunds.refund_time;
""";
Table table = streamTableEnv.sqlQuery(leftIntervalJoin);
DataStream<OrderWithRefund> dataStream = streamTableEnv.toDataStream(table, OrderWithRefund.class);
dataStream.print();
env.execute();
}
private static SingleOutputStreamOperator<Order> getLeftStream(StreamExecutionEnvironment env) {
// 必须设置 watermark
return env.fromCollection(
Arrays.asList(
new Order(1, 1, "ken", 1662022777000L), // 2022-09-01 16:59:37
new Order(2, 1, "ken", 1662022878000L), // 2022-09-01 17:01:18
new Order(3, 1, "ken", 1662022890000L), // 2022-09-01 17:01:30
new Order(4, 1, "ken", 1662023120000L), // 2022-09-01 17:05:20
new Order(5, 1, "ken", 1662023290000L) // 2022-09-01 17:08:10
)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.
<Order>forMonotonousTimestamps().withTimestampAssigner((event, ts) -> event.getTimestampLong()));
}
private static SingleOutputStreamOperator<Refund> getRightStream(StreamExecutionEnvironment env) {
// 必须设置 watermark
return env.fromCollection(
Arrays.asList(
new Refund(1, 1, 1662022781000L), // 2022-09-01 16:59:41
new Refund(2, 3, 1662023310000L), // 2022-09-01 17:08:30
new Refund(3, 4, 1662023321000L) // 2022-09-01 17:08:41
)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.
<Refund>forMonotonousTimestamps().withTimestampAssigner((event, ts) -> event.getTimestampLong()));
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class OrderWithRefund {
// 这里不能用 int 类型,因为 int 类型为 not null,而 Integer 则可以为 null
private Integer order_id;
private Integer user_id;
private String name;
private String order_time_str;
private Integer refund_id;
private String refund_time_str;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private int id;
private int userId;
private String name;
private long timestampLong;
@Override
public String toString() {
return "Order{" +
"id=" + id +
", timestamp=" + DateFormatUtils.format(new Date(timestampLong), "yyyy-MM-dd HH:mm:ss") +
'}';
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Refund {
private int id;
private int orderId;
private long timestampLong;
@Override
public String toString() {
return "Refund{" +
"id=" + id +
", orderId=" + orderId +
", timestamp=" + DateFormatUtils.format(new Date(timestampLong), "yyyy-MM-dd HH:mm:ss") +
'}';
}
}
}