Spark Getting Started
Iceberg Format
The Iceberg Format can be accessed using the Connector provided by Iceberg. Refer to the documentation at Iceberg Spark Connector for more information.
Paimon Format
The Paimon Format can be accessed using the Connector provided by Paimon. Refer to the documentation at Paimon Spark Connector for more information.
Mixed Format
To use Amoro in a Spark shell, use the –packages option:
spark-shell --packages org.apache.amoro:amoro-mixed-spark-3.3-runtime:0.7.0
If you want to include the connector in your Spark installation, add the
amoro-mixed-spark-3.3-runtime
Jar to Spark’sjars
folder.
Adding catalogs
${SPARK_HOME}/bin/spark-sql \
--conf spark.sql.extensions=org.apache.amoro.spark.MixedFormatSparkExtensions \
--conf spark.sql.catalog.local_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog \
--conf spark.sql.catalog.local_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME}
Amoro manages the Catalog through AMS, and Spark catalog needs to be mapped to Amoro Catalog via URL, in the following format:
thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME}
, The mixed-format-spark-connector will automatically download the Hadoop site configuration file through the thrift protocol for accessing the HDFS cluster
The AMS_PORT is the port number of the AMS service’s thrift API interface, with a default value of 1260 The AMS_CATALOG_NAME is the name of the Catalog you want to access on AMS.
Regarding detailed configurations for Spark, please refer to Spark Configurations
Creating a table
In Spark SQL command line, you can execute a create table command using the CREATE TABLE
statement.
Before executing a create table operation, please make sure to create the database
first.
-- switch to mixed catalog defined in spark conf
use local_catalog;
-- create databsae first
create database if not exists test_db;
Then switch to the newly created database and perform the create table operation.
use test_db;
-- create a table with 3 columns
create table test1 (id int, data string, ts timestamp) using mixed_iceberg;
-- create a table with hidden partition
create table test2 (id int, data string, ts timestamp) using mixed_iceberg partitioned by (days(ts));
-- create a table with hidden partition and primary key
create table test3 (id int, data string, ts timestamp, primary key(id)) using mixed_iceberg partitioned by (days(ts));
For more information on Spark DDL related to tables, please refer to Spark DDL
Writing to the table
If you are using Spark SQL, you can use the INSERT OVERWRITE
or INSERT
SQL statement to write data to an Amoro table.
-- insert values into unkeyed table
insert into test2 values
( 1, "aaa", timestamp('2022-1-1 00:00:00')),
( 2, "bbb", timestamp('2022-1-2 00:00:00')),
( 3, "bbb", timestamp('2022-1-3 00:00:00'));
-- dynamic overwrite table
insert overwrite test3 values
( 1, "aaa", timestamp('2022-1-1 00:00:00')),
( 2, "bbb", timestamp('2022-1-2 00:00:00')),
( 3, "bbb", timestamp('2022-1-3 00:00:00'));
If you are using Static Overwrite, you cannot define transforms on partition fields.
Alternatively, you can use the DataFrame API to write data to an Amoro table within a JAR job.
val df = spark.read().load("/path-to-table")
df.writeTo('test_db.table1').overwritePartitions()
For more information on writing to tables, please refer to Spark Writes
Reading from the table
To query the table using SELECT
SQL statements
select count(1) as count, data
from test2
group by data;
For table with primary keys defined, you can query on ChangeStore
by .change
select count(1) as count, data
from test_db.test3.change group by data;
For more information on reading from tables, please refer to Spark Queries