// create a TableEnvironment for specific planner batch or streaming val tableEnv = ... // see "Create a TableEnvironment" section
// create an input Table tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )") // register an output Table tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )")
// create a Table from a Table API query val table2 = tableEnv.from("table1").select(...) // create a Table from a SQL query val table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
// emit a Table API result Table to a TableSink, same for SQL result val tableResult = table2.executeInsert("outputTable") tableResult...
// get a TableEnvironment val tEnv: TableEnvironment = ...; tEnv.useCatalog("custom_catalog") tEnv.useDatabase("custom_database")
val table: Table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog' // in the database named 'custom_database' tableEnv.createTemporaryView("exampleView", table)
// register the view named 'exampleView' in the catalog named 'custom_catalog' // in the database named 'other_database' tableEnv.createTemporaryView("other_database.exampleView", table)
// register the view named 'example.View' in the catalog named 'custom_catalog' // in the database named 'custom_database' tableEnv.createTemporaryView("`example.View`", table)
// register the view named 'exampleView' in the catalog named 'other_catalog' // in the database named 'other_database' tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
查询表
Table API
示例
1 2 3 4 5
// compute revenue for all customers from France val revenue = orders .filter($"cCountry" === "FRANCE") .groupBy($"cID", $"cName") .select($"cID", $"cName", $"revenue".sum AS"revSum")
SQL
示例
1 2 3 4 5 6 7
// compute revenue for all customers from France val revenue = tableEnv.sqlQuery(""" |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin)
输出表
Table API
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// create an output Table val schema = newSchema() .field("a", DataTypes.INT()) .field("b", DataTypes.STRING()) .field("c", DataTypes.BIGINT())
// compute a result Table using Table API operators and/or SQL queries val result: Table = ...
// emit the result Table to the registered TableSink result.executeInsert("CsvSinkTable")
SQL
示例
1 2 3 4 5 6 7 8
// compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.executeSql(""" |INSERT INTO RevenueFrance |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin)