Flink DDL

Create catalogs

The following statement can be executed to create a Flink catalog:

CREATE CATALOG <catalog_name> WITH (
  'type'='mixed_iceberg',
  `<config_key>`=`<config_value>`
); 

Where <catalog_name> is the user-defined name of the Flink catalog, and <config_key>=<config_value> has the following configurations:

Key Default Value Type Required Description
type N/A String Yes Catalog type, validate values are mixed_iceberg and mixed_hive
metastore.url (none) String Yes The URL for Amoro Metastore is thrift://<ip>:<port>/<catalog_name_in_metastore>.
If high availability is enabled for AMS, it can also be specified in the form of zookeeper://{zookeeper-server}/{cluster-name}/{catalog-name}.
default-database default String No The default database to use
property-version 1 Integer No Catalog properties version, this option is for future backward compatibility

The authentication information of AMS catalog can upload configuration files on AMS website, or specify the authentication information and configuration file paths when creating catalogs with Flink DDL

Key Default Value Type Required Description
properties.auth.load-from-ams True BOOLEAN No Whether to load security verification configuration from AMS.
True: load from AMS;
false: do not use AMS configuration.
Note: regardless of whether this parameter is configured, as long as the user has configured the auth.*** related parameters below, this configuration will be used for access.
properties.auth.type (none) String No Table security verification type, valid values: simple, kerberos, or not configured. Default not configured, no permission check is required. simple: use the hadoop username, used in conjunction with the parameter ‘properties.auth.simple.hadoop_username’; kerberos: configure kerberos permission verification, used in conjunction with the parameters ‘properties.auth.kerberos.principal’, ‘properties.auth.kerberos.keytab’, ‘properties.auth.kerberos.krb’
properties.auth.simple.hadoop_username (none) String No Access using this hadoop username, required when ‘properties.auth.type’=‘simple’.
properties.auth.kerberos.principal (none) String No Configuration of kerberos principal, required when ‘properties.auth.type’=‘kerberos’.
properties.auth.kerberos.krb.path (none) String No The absolute path to the krb5.conf configuration file for kerberos (the local file path of the Flink SQL submission machine, if the SQL task is submitted with the Flink SQL Client, the path is the local path of the same node, e.g. /XXX/XXX/krb5.conf).’ required if ‘properties.auth.type’ = ‘kerberos’.
properties.auth.kerberos.keytab.path (none) String No The absolute path to the XXX.keytab configuration file for kerberos (the local file path of the Flink SQL submission machine, if the SQL task is submitted with the Flink SQL Client, the path is the local path of the same node, e.g. /XXX/XXX/XXX.keytab).’ required if ‘properties.auth.type’ = ‘kerberos’.

YAML configuration

Refer to the Flink SQL Client official configuration. Modify the conf/sql-client-defaults.yaml file in the Flink directory.

catalogs:
- name: <catalog_name>
  type: mixed_iceberg
  metastore.url: ...
  ...

CREATE statement

CREATE DATABASE

By default, the default-database configuration (default value: default) when creating catalog is used. You can create a database using the following example:

CREATE DATABASE [catalog_name.]mixed_db;

USE mixed_db;

CREATE TABLE

CREATE TABLE `mixed_catalog`.`mixed_db`.`test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP,
    ts3 AS CAST(op_time as TIMESTAMP(3)),
    watermark FOR ts3 AS ts3 - INTERVAL '5' SECOND,
    proc AS PROCTIME(),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'key' = 'value'
);

Currently, most of the syntax supported by Flink SQL create table is supported, including:

  • PARTITION BY (column1, column2, …): configure Flink partition fields, but Flink does not yet support hidden partitions.
  • PRIMARY KEY (column1, column2, …): configure primary keys.
  • WITH (‘key’=‘value’, …): configure Amoro Table properties.
  • computed_column_definition: column_name AS computed_column_expression. Currently, compute column must be listed after all physical columns.
  • watermark_definition: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression, rowtime_column_name must be of type TIMESTAMP(3).

PARTITIONED BY

Create a partitioned table using PARTITIONED BY.

CREATE TABLE `mixed_catalog`.`new`.`test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP
) PARTITIONED BY(op_time) WITH (
    'key' = 'value'
);

Amoro tables support hidden partitions, but Flink does not support function-based partitions. Therefore, currently only partitions with the same value can be created through Flink SQL.

Alternatively, tables can be created without creating a Flink catalog:

CREATE TABLE `test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP,
    proc as PROCTIME(),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mixed-format',
    'metastore.url' = '',
    'mixed_format.catalog' = '',
    'mixed_format.database' = '',
    'mixed_format.table' = ''
);

where <metastore.url> is the URL of the Amoro Metastore, and mixed_format.catalog, mixed_format.database and mixed_format.table are the catalog name, database name and table name of this table under the AMS, respectively.

CREATE TABLE LIKE

Create a table with the same table structure, partitions, and table properties as an existing table. This can be achieved using CREATE TABLE LIKE.

CREATE TABLE `mixed_catalog`.`mixed_db`.`test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP
);

CREATE TABLE  `mixed_catalog`.`mixed_db`.`test_table_like` 
    LIKE `mixed_catalog`.`mixed_db`.`test_table`;

Further details can be found in Flink create table like

DROP statement

DROP DATABASE

DROP DATABASE catalog_name.mixed_db

DROP TABLE

DROP TABLE `mixed_catalog`.`mixed_db`.`test_table`;

SHOW statement

SHOW DATABASES

View all database names under the current catalog:

SHOW DATABASES;

SHOW TABLES

View all table names in the current database:

SHOW TABLES;

SHOW CREATE TABLE

View table details:

SHOW CREATE TABLE;

DESC statement

View table description:

DESC TABLE;

ALTER statement

Not supported at the moment

Supported types

Mixed-Hive data types

Flink Data Type Hive Data Type
STRING CHAR(p)
STRING VARCHAR(p)
STRING STRING
BOOLEAN BOOLEAN
INT TINYINT
INT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(6) TIMESTAMP
VARBINARY BINARY
ARRAY ARRAY
MAP<K, V> MAP<K, V>
ROW STRUCT

mixed_iceberg data types

Flink Data Type Mixed-Iceberg Data Type
CHAR(p) STRING
VARCHAR(p) STRING
STRING STRING
BOOLEAN BOOLEAN
TINYINT INT
SMALLINT INT
INT INT
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(6) TIMESTAMP
TIMESTAMP(6) WITH LOCAL TIME ZONE TIMESTAMPTZ
BINARY(p) FIXED(p)
BINARY(16) UUID
VARBINARY BINARY
ARRAY ARRAY
MAP<K, V> MAP<K, V>
ROW STRUCT
MULTISET MAP<T, INT>