roman_日积跬步-终至千里

roman_日积跬步-终至千里

SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。

SQL 提示一般可以用于以下:

 

一. create table hints

动态表选项允许动态地指定或覆盖表选项,不同于用 SQL DDL 或 连接 API 定义的静态表选项,这些选项可以在每个查询的每个表范围内灵活地指定。

因此,它非常适合用于交互式终端中的特定查询,例如,在 SQL-CLI 中,你可以通过添加动态选项/*+ OPTIONS('csv.ignore-parse-errors'='true') */来指定忽略 CSV 源的解析错误。

 

1. 语法

为了不破坏 SQL 兼容性,我们使用 Oracle 风格的 SQL hints 语法:

table_path /*+ OPTIONS(key=val [, key=val]*) */

key: string字符
val: string字符

 

2. 示例


CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);

-- `覆盖`查询语句中源表的选项
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

-- 覆盖 join 中源表的选项
select * from
    kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
    join
    kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
    on t1.id = t2.id;

-- 覆盖插入语句中结果表的选项
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

 

3. 注意

create table hints 传递的连接器中catalog的相关参数,即create table with下参数,具体到源代码是:context.getCatalogTable().getOptions()

 

如果传参无效且在日志中看到参数已经设置成功,那

 

二. 实战:简化hive连接器参数设置

对于hive连接器,Flink实现了通过catalog的方式来管理hive表,在使用hive表时需要使用hive相关语法,此时需要声明,hive dialect,如下:


CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'aaa',
    'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);

SET table.sql-dialect=hive;

-- 因为需要使用hive连接器中的写特性,所以需要create table ,此时sql语法为hive语法
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- 对于某些框架例如chunjun,此处不能很好的适配:
--
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列声明 watermark。
) WITH (...);

-- streaming sql, insert into hive table
INSERT INTO TABLE myhive.aaa.hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;

如下可以把写hive的一些行为通过sql hint方式,放到Flink sql语句中,如下整个Flink sql 会清爽很多。

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'database_name',
    'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);


CREATE TABLE source_kafka (

    `pv` string,
    `uv` string,
    `p_day_id` string
) WITH (
      'connector' = 'kafka-x'
      ,'topic' = 'hive_kafka'
      ,'properties.bootstrap.servers' = 'xxx:9092'
      ,'properties.group.id' = 'luna_g'
      ,'scan.startup.mode' = 'earliest-offset'
      ,'json.timestamp-format.standard' = 'SQL'
      ,'json.ignore-parse-errors' = 'true'
      ,'format' = 'json'
      ,'scan.parallelism' = '1'
      );

insert into 
 myhive.database_name.table_name /*+ OPTIONS('partition.time-extractor.timestamp-pattern'='$p_day_id:00:00','sink.partition-commit.policy.kind'='metastore,success-file','sink.partition-commit.success-file.name'='_SUCCESS_gao111') */
    select *  from source_kafka; 

 

三. select hints(ing)

查询提示(Query Hints)用于为优化器修改执行计划提供建议,该修改只能在当前查询提示所在的查询块中生效(Query block, 什么是查询块)。 目前,Flink 查询提示只支持联接提示(Join Hints)。

具体见:官网

 

03-30 10:04