淄博
切换分站
免费发布信息
当前位置:淄博公司转让壳 > 淄博热点资讯 > 淄博增值电信业务经营许可证 >  数据中心日常运维工作的内容有什么?

数据中心日常运维工作的内容有什么?

发表时间:2024-01-02 12:33:02  来源:网络投稿  浏览:次   【】【】【
有什么|数据中心|日常|内容|工作|增值电信业务许可证办理

?# Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


本文介绍了表的union、unionall、intersect、intersectall、minus、minusall和in的操作,以示例形式展示每个操作的结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)

17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:

【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图

【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询

【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作

【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作

【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作

【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作

【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作

【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作

【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)

【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、表的union、unionall、intersect、intersectall、minus、minusall和in的操作

本示例的运行结果均在执行用例中,其中用例只能在批模式下工作,用例特意说明了,如果没说明的则意味着流批模式均可。

import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Executable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.tablesql.TestTableAPIJoinOperationDemo.Order;
import org.tablesql.TestTableAPIJoinOperationDemo.User;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.row;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestTableAPIJoinOperationDemo2 {

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class User {
        private long id;
        private String name;
        private double balance;
        private Long rowtime;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        private long id;
        private long user_id;
        private double amount;
        private Long rowtime;
    }

    final static List<User> userList = Arrays.asList(
            new User(1L, "alan", 18, 1698742358391L), 
            new User(2L, "alan", 19, 1698742359396L), 
            new User(3L, "alan", 25, 1698742360407L),
            new User(4L, "alanchan", 28, 1698742361409L), 
            new User(5L, "alanchan", 29, 1698742362424L)
            );

    final static List<Order> orderList = Arrays.asList(
            new Order(1L, 1, 18, 1698742358391L), 
            new Order(2L, 2, 19, 1698742359396L), 
            new Order(3L, 1, 25, 1698742360407L),
            new Order(4L, 3, 28, 1698742361409L), 
            new Order(5L, 1, 29, 1698742362424L),
            new Order(6L, 4, 49, 1698742362424L)
            );

     // 创建输出表
    final static String sinkSql = "CREATE TABLE sink_table (\n" +
            "  id BIGINT,\n" +
            "  user_id BIGINT,\n" +
            "  amount DOUBLE,\n" +
            "  rowtime BIGINT\n" +
            ") WITH (\n" +
            "  'connector' = 'print'\n" +
            ")";

    /**
     * 
     * @throws Exception
     */
    static void testUnionBySQL() throws Exception {
        // TODO 0.env
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
                StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

                DataStream<Order> orderA = env.fromCollection(orderList);
                DataStream<Order> orderB = env.fromCollection(orderList);

                // 将DataStream数据转Table和View,然后查询
                Table tableA = tenv.fromDataStream(orderA, $("id"), $("user_id"), $("amount"),$("rowtime"));
                tenv.createTemporaryView("tableB", orderB, $("id"), $("user_id"), $("amount"),$("rowtime"));

                // 查询:tableA中amount>2的和tableB中amount>1的数据最后合并
//              select * from tableA where amount > 2
//              union
//               select * from tableB where amount > 1
                String sql = "select * from " + tableA + " where amount > 2  union   select * from tableB where amount > 1";

                Table resultTable = tenv.sqlQuery(sql);

                DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(resultTable, Order.class);// union使用toRetractStream
//              String sql = "select * from " + tableA + " where amount > 2  union   select * from tableB where amount > 1";
//              9> (true,TestTableAPIJoinOperationDemo2.Order(id=1, user_id=1, amount=18.0, rowtime=1698742358391))
//              8> (true,TestTableAPIJoinOperationDemo2.Order(id=2, user_id=2, amount=19.0, rowtime=1698742359396))
//              4> (true,TestTableAPIJoinOperationDemo2.Order(id=5, user_id=1, amount=29.0, rowtime=1698742362424))
//              8> (true,TestTableAPIJoinOperationDemo2.Order(id=4, user_id=3, amount=28.0, rowtime=1698742361409))
//              14> (true,TestTableAPIJoinOperationDemo2.Order(id=6, user_id=4, amount=49.0, rowtime=1698742362424))
//              6> (true,TestTableAPIJoinOperationDemo2.Order(id=3, user_id=1, amount=25.0, rowtime=1698742360407))

                // toAppendStream → 将计算后的数据append到结果DataStream中去
                // toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
                // 类似StructuredStreaming中的append/update/complete

                // TODO 3.sink
                resultDS.print();

                // TODO 4.execute
                env.execute();
    }

    /**
     * 和 SQL UNION 子句类似。Union 两张表会删除重复记录。两张表必须具有相同的字段类型。
     * 本示例仅仅使用同一个表来演示
     * 该操作只能是在批处理模式下
     * 
     * @throws Exception
     */
    static void testUnion() throws Exception {
//      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//      StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
        TableEnvironment tenv = TableEnvironment.create(env);

        Table ordersTable = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(3L, 1, 25, 1698742360407L),
                        row(4L, 3, 28, 1698742361409L), 
                        row(5L, 1, 29, 1698742362424L),
                        row(6L, 4, 49, 1698742362424L)
                        ));

        Table left = ordersTable.select($("id"), $("user_id"),$("amount"),$("rowtime"));

        Table unionResult = left.union(left);

        tenv.createTemporaryView("order_union_t", unionResult);

        Table result = tenv.sqlQuery("select * from order_union_t");

        // 下面不能转换,只有流式表可以转成流
        // 出现异常:The UNION operation on two unbounded tables is currently not supported.
        //      DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(result, Order.class);
        //      resultDS.print();

        //输出表
        tenv.executeSql(sinkSql);

        result.executeInsert("sink_table");
//              +I[6, 4, 49.0, 1698742362424]
//              +I[5, 1, 29.0, 1698742362424]
//              +I[1, 1, 18.0, 1698742358391]
//              +I[3, 1, 25.0, 1698742360407]
//              +I[4, 3, 28.0, 1698742361409]
//              +I[2, 2, 19.0, 1698742359396]

    }

    /**
     * 和 SQL UNION ALL 子句类似。Union 两张表。 两张表必须具有相同的字段类型。
     * 本示例仅仅使用同一个表来演示
     * 
     * @throws Exception
     */
    static void testUnionAll() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        DataStream<User> users = env.fromCollection(userList);
        Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));

        Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));

        Table result = left.unionAll(left);

        DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
        resultDS.print();
//      14> (true,+I[5, alanchan, 29.0, 1698742362424])
//      8> (true,+I[4, alanchan, 28.0, 1698742361409])
//      5> (true,+I[1, alan, 18.0, 1698742358391])
//      10> (true,+I[1, alan, 18.0, 1698742358391])
//      11> (true,+I[2, alan, 19.0, 1698742359396])
//      6> (true,+I[2, alan, 19.0, 1698742359396])
//      7> (true,+I[3, alan, 25.0, 1698742360407])
//      13> (true,+I[4, alanchan, 28.0, 1698742361409])
//      12> (true,+I[3, alan, 25.0, 1698742360407])
//      9> (true,+I[5, alanchan, 29.0, 1698742362424])

        env.execute();
    }

    /**
     * 和 SQL INTERSECT 子句类似。Intersect 返回两个表中都存在的记录。
     * 如果一条记录在一张或两张表中存在多次,则只返回一条记录,也就是说,结果表中不存在重复的记录。
     * 两张表必须具有相同的字段类型。
     * 该操作只能是在批处理模式下
     * 
     * @throws Exception
     */
    static void testIntersect() throws Exception  {
        EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
        TableEnvironment tenv = TableEnvironment.create(env);

        Table ordersTableA = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(6L, 4, 49, 1698742362424L)
                        ));

        Table ordersTableB = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(3L, 1, 25, 1698742360407L),
                        row(4L, 3, 28, 1698742361409L), 
                        row(7L, 8, 4009, 1698782362424L)
                        ));

        Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
        Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));

        Table intersectResult = left.intersect(right);

        tenv.createTemporaryView("order_intersect_t", intersectResult);

        Table result = tenv.sqlQuery("select * from order_intersect_t");

        //输出表
        tenv.executeSql(sinkSql);

        result.executeInsert("sink_table");
//      +I[1, 1, 18.0, 1698742358391]

    }

    /**
     * 和 SQL INTERSECT ALL 子句类似。
     * IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次,那么该记录返回的次数同该记录在两个表中都出现的次数一致,也就是说,结果表可能存在重复记录。
     * 两张表必须具有相同的字段类型。
     * 该操作只能是在批处理模式下
     * 
     * @throws Exception
     */
    static void testIntersectAll() throws Exception  {
        EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
        TableEnvironment tenv = TableEnvironment.create(env);

        Table ordersTableA = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(6L, 4, 49, 1698742362424L)
                        ));

        Table ordersTableB = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(3L, 1, 25, 1698742360407L),
                        row(4L, 3, 28, 1698742361409L), 
                        row(7L, 8, 4009, 1698782362424L)
                        ));

        Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
        Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));

        Table intersectResult = left.intersectAll(right);

        tenv.createTemporaryView("order_intersect_t", intersectResult);

        Table result = tenv.sqlQuery("select * from order_intersect_t");

        //输出表
        tenv.executeSql(sinkSql);

        result.executeInsert("sink_table");
//              +I[2, 2, 19.0, 1698742359396]
//              +I[1, 1, 18.0, 1698742358391]

    }

    /**
     * 和 SQL EXCEPT 子句类似。Minus 返回左表中存在且右表中不存在的记录。
     * 左表中的重复记录只返回一次,换句话说,结果表中没有重复记录。
     * 两张表必须具有相同的字段类型。
     * 该操作只能是在批处理模式下
     * 
     * @throws Exception
     */
    static void testMinus() throws Exception  {
        EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
        TableEnvironment tenv = TableEnvironment.create(env);

        Table ordersTableA = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(6L, 4, 49, 1698742362424L)
                        ));

        Table ordersTableB = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(3L, 1, 25, 1698742360407L),
                        row(4L, 3, 28, 1698742361409L), 
                        row(7L, 8, 4009, 1698782362424L)
                        ));

        Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
        Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));

        Table intersectResult = left.minus(right);

        tenv.createTemporaryView("order_intersect_t", intersectResult);

        Table result = tenv.sqlQuery("select * from order_intersect_t");

        //输出表
        tenv.executeSql(sinkSql);

        result.executeInsert("sink_table");
//      +I[6, 4, 49.0, 1698742362424]

    }

    /**
     * 和 SQL EXCEPT ALL 子句类似。
     * MinusAll 返回右表中不存在的记录。在左表中出现 n 次且在右表中出现 m 次的记录,在结果表中出现 (n - m) 次,
     * 例如,也就是说结果中删掉了在右表中存在重复记录的条数的记录。
     * 两张表必须具有相同的字段类型。
     * 该操作只能是在批处理模式下
     * 
     * @throws Exception
     */
    static void testMinusAll() throws Exception  {
        EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
        TableEnvironment tenv = TableEnvironment.create(env);

        Table ordersTableA = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(6L, 4, 49, 1698742362424L)
                        ));

        Table ordersTableB = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(3L, 1, 25, 1698742360407L),
                        row(4L, 3, 28, 1698742361409L), 
                        row(7L, 8, 4009, 1698782362424L)
                        ));

        Table left = ordersTableA.select($("id"), $("user_id"),$("amount"),$("rowtime"));
        Table right = ordersTableB.select($("id"), $("user_id"),$("amount"),$("rowtime"));

        Table intersectResult = left.minus(right);

        tenv.createTemporaryView("order_intersect_t", intersectResult);

        Table result = tenv.sqlQuery("select * from order_intersect_t");

        //输出表
        tenv.executeSql(sinkSql);

        result.executeInsert("sink_table");
//      +I[6, 4, 49.0, 1698742362424]

    }

    /**
     * 和 SQL IN 子句类似。如果表达式的值存在于给定表的子查询中,那么 In 子句返回 true。
     * 子查询表必须由一列组成。
     * 这个列必须与表达式具有相同的数据类型。
     * 
     * @throws Exception
     */
    static void testIn() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        DataStream<User> users = env.fromCollection(userList);
        Table usersTable = tenv.fromDataStream(users, $("id"), $("name"),$("balance"),$("rowtime"));

        DataStream<Order> orders = env.fromCollection(orderList);
        Table ordersTable = tenv.fromDataStream(orders, $("id"), $("user_id"), $("amount"),$("rowtime"));

        Table left = usersTable.select($("id").as("userId"), $("name"), $("balance"),$("rowtime").as("u_rowtime"));
        Table right = ordersTable.select($("user_id"));

        Table result = left.select($("userId"), $("name"), $("balance"),$("u_rowtime")).where($("userId").in(right));

        DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
        resultDS.print();
//      3> (true,+I[4, alanchan, 28.0, 1698742361409])
//      12> (true,+I[1, alan, 18.0, 1698742358391])
//      15> (true,+I[3, alan, 25.0, 1698742360407])
//      12> (true,+I[2, alan, 19.0, 1698742359396])
        env.execute();
    }

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
//      testUnion();
//      testUnionAll();
//      testUnionBySQL();
//      testIntersect();
//      testIntersectAll() ;
//      testMinus();
//      testMinusAll();
        testIn();

    }

}

以上,本文介绍了表的union、unionall、intersect、intersectall、minus、minusall和in的操作,以示例形式展示每个操作的结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)

17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:

【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图

【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询

【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作

【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作

【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作

【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作

【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作

【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作

【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)

【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

责任编辑: