求助:Flink 1.9 sql 两个表 Join 后如何做 CEP ?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); // 构造订单数据 DataStream<Order> ordersData = env.fromCollection(Arrays.asList( new Order("001", "iphone", new Timestamp(1545800002000L)), new Order("002", "mac", new Timestamp(1545800003000L)), new Order("003", "book", new Timestamp(1545800004000L)), new Order("004", "cup", new Timestamp(1545800018000L)) )) .assignTimestampsAndWatermarks(new OrderTimestampExtractor()); // 构造付款表 DataStream<Payment> paymentData = env.fromCollection(Arrays.asList( new Payment("001", "alipay", new Timestamp(1545803501000L)), new Payment("002", "card", new Timestamp(1545803602000L)), new Payment("003", "card", new Timestamp(1545803610000L)), new Payment("004", "alipay", new Timestamp(1545803611000L)) )) .assignTimestampsAndWatermarks(new PaymentTimestampExtractor()); tEnv.registerDataStream("t_order", ordersData, "orderId, productName, orderTime"); tEnv.registerDataStream("t_payment", paymentData, "orderId, payType, payTime"); // 两表 JOIN String sqlQuery = "SELECT o.orderId as orderId, o.productName as productName, \n" + "p.payType as payType, o.orderTime as orderTime, \n" + "cast(payTime as timestamp) as payTime\n" + "FROM t_order AS o \n" + "JOIN t_payment AS p \n" + "ON o.orderId = p.orderId AND\n" + "\tp.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR"; Table queryResult = tEnv.sqlQuery(sqlQuery); tEnv.registerTable("TemporalJoinResult", queryResult); String cepSQL = "select *, MATCH_ROWTIME() as rowtime from TemporalJoinResult\n" + "\tMATCH_RECOGNIZE (\n" + "\tORDER BY rowtime\n" + " MEASURES\n" + " A.orderId AS orderId,\n" + " A.productName AS productName,\n" + " A.orderTime AS orderTime,\n" + "\t\tB.payTime AS payTime\n" + "\tONE ROW PER MATCH \n" + "\tAFTER MATCH SKIP PAST LAST ROW\n" + " PATTERN (A B)\n" + " DEFINE\n" + " A AS payType = 'alipay',\n" + " B AS productName = 'iphone'\n" + "\t) as T"; Table cepResult = tEnv.sqlQuery(cepSQL); tEnv.toAppendStream(cepResult, Row.class).print(); env.execute();
MATCH_RECOGNIZE 里边 ORDER BY rowtime 不清楚怎样指定?求大佬帮忙