Spark Writes
Writing with SQL
INSERT OVERWRITE
INSERT OVERWRITE
can replace the partition in a table with the results of a query.
The default overwrite mode of Spark is Static
, you can change the overwrite mode by
SET spark.sql.sources.partitionOverwriteMode=dynamic
To demonstrate the behavior of dynamic and static overwrites, a test table is defined using the following DDL:
CREATE TABLE mixed_catalog.db.sample (
id int,
data string,
ts timestamp,
primary key (id))
USING mixed_iceberg
PARTITIONED BY (days(ts))
When Spark’s overwrite mode is dynamic, the partitions of the rows generated by the SELECT query will be replaced.
INSERT OVERWRITE mixed_catalog.db.sample values
(1, 'aaa', timestamp(' 2022-1-1 09:00:00 ')),
(2, 'bbb', timestamp(' 2022-1-2 09:00:00 ')),
(3, 'ccc', timestamp(' 2022-1-3 09:00:00 '))
When Spark’s overwrite mode is static, the PARTITION clause will be translated into the result set of the SELECT from the table. If the PARTITION clause is omitted, all partitions will be replaced.
INSERT OVERWRITE mixed_catalog.db.sample
partition( dt = '2021-1-1') values
(1, 'aaa'), (2, 'bbb'), (3, 'ccc')
In Static mode, it is not supported to define transforms on partitioning columns.
You can enable uniqueness check of the primary key on the source table by setting
spark.sql.mixed-format.check-source-data-uniqueness.enabled = true
in SPARK SQL. If there are duplicate primary keys, an error will be thrown during the write operation.
INSERT INTO
To append new data to a table, use INSERT INTO
.
INSERT INTO mixed_catalog.db.sample VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...
Upsert to table with primary keys.
To add new data to a table with a primary key, you can control whether to enable the UPSERT
function by setting
the write.upsert.enabled parameter
.
When UPSERT
is enabled, if a row with the same primary key already exists, an UPDATE
operation will be performed,
and if it does not exist, an INSERT operation will be performed.
When UPSERT
is disabled, only INSERT
operation will be performed,
even if there are rows with the same primary key in the table.
CREATE TABLE mixed_catalog.db.keyedTable (
id int,
data string,
primary key (id))
USING mixed_iceberg
TBLPROPERTIES ('write.upsert.enabled' = 'true')
INSERT INTO mixed_catalog.db.keyedTable VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.keyedTable SELECT ...
You can enable uniqueness check of the primary key on the source table by setting
spark.sql.mixed-format.check-source-data-uniqueness.enabled = true
in SPARK SQL. If there are duplicate primary keys, an error will be thrown during the write operation.
DELETE FROM
The DELETE FROM
statements delete rows from table.
DELETE FROM mixed_catalog.db.sample
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
DELETE FROM mixed_catalog.db.sample
WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)
DELETE FROM mixed_catalog.db.sample AS t1
WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
UPDATE
The UPDATE
statement modifies rows in the table.
UPDATE mixed_catalog.db.sample
SET c1 = 'update_c1', c2 = 'update_c2'
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
UPDATE mixed_catalog.db.sample
SET session_time = 0, ignored = true
WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)
UPDATE mixed_catalog.db.sample AS t1
SET order_status = 'returned'
WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
MERGE INTO
MERGE INTO prod.db.target t -- a target table
USING (SELECT ...) s -- the source updates
ON t.id = s.id -- condition to find updates for target rows
WHEN ... -- updates
The MERGE INTO
statement supports multi action WHEN MATCHED ... THEN ...
to execute UPDATE
, DELETE
, INSERT
.
MERGE INTO prod.db.target t
USING prod.db.source s
ON t.id = s.id
WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
WHEN NOT MATCHED THEN INSERT *
Writing with DataFrames
Appending data
Using append()
to add data to a MixedFormat table.
val data: DataFrame = ...
data.writeTo("mixed_catalog.db.sample").append()
Overwriting data
Using overwritePartitions()
to overwriting data.
val data: DataFrame = ...
data.writeTo("mixed_catalog.db.sample").overwritePartitions()
Creating tables
The create()
will create a table and write data to the table, just like CREATE TABLE AS SELECT
val data: DataFrame = ...
data.writeTo("mixed_catalog.db.sample").create()
The primary keys and partition keys could be specified by partitionBy()
and option("primary.keys", "'xxx'")
.
val data: DataFrame = ...
data.write().format("mixed_iceberg")
.partitionBy("data")
.option("primary.keys", "'xxx'")
.save("mixed_catalog.db.sample")