Save data to external storage
Save as txt files¶
To save a Spatial DataFrame to some permanent storage such as Hive tables and HDFS, you can simply convert each geometry in the Geometry type column back to a plain String and save the plain DataFrame to wherever you want.
Use the following code to convert the Geometry column in a DataFrame back to a WKT string column:
SELECT ST_AsText(countyshape)
FROM polygondf
Then you can use any Spark writer to save this DataFrame.
df.write.format("YOUR_FORMAT").save("YOUR_PATH")
Note
Spatial SQL provides lots of functions to save the Geometry column, please read Spatial SQL API.
Save as GeoParquet¶
WherobotsDB can directly save a DataFrame with the Geometry column as a GeoParquet file. You need to specify geoparquet
as the write format. The Geometry type will be preserved in the GeoParquet file.
df.write.format("geoparquet").save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
To maximize the performance of WherobotsDB GeoParquet filter pushdown, we suggest that you sort the data by their geohash values (see ST_GeoHash) and then save as a GeoParquet file. An example is as follows:
SELECT col1, col2, geom, ST_GeoHash(geom, 5) as geohash
FROM spatialDf
ORDER BY geohash
WherobotsDB supports writing GeoParquet files with custom GeoParquet spec version and crs.
The default GeoParquet spec version is 1.0.0
and the default crs is null
. You can specify the GeoParquet spec version and crs as follows:
val projjson = "{...}" // PROJJSON string for all geometry columns
df.write.format("geoparquet")
.option("geoparquet.version", "1.0.0")
.option("geoparquet.crs", projjson)
.save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
If you have multiple geometry columns written to the GeoParquet file, you can specify the CRS for each column.
For example, g0
and g1
are two geometry columns in the DataFrame df
, and you want to specify the CRS for each column as follows:
val projjson_g0 = "{...}" // PROJJSON string for g0
val projjson_g1 = "{...}" // PROJJSON string for g1
df.write.format("geoparquet")
.option("geoparquet.version", "1.0.0")
.option("geoparquet.crs.g0", projjson_g0)
.option("geoparquet.crs.g1", projjson_g1)
.save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
The value of geoparquet.crs
and geoparquet.crs.<column_name>
can be one of the following:
"null"
: Explicitly settingcrs
field tonull
. This is the default behavior.""
(empty string): Omit thecrs
field. This implies that the CRS is OGC:CRS84 for CRS-aware implementations."{...}"
(PROJJSON string): Thecrs
field will be set as the PROJJSON object representing the Coordinate Reference System (CRS) of the geometry. You can find the PROJJSON string of a specific CRS from here: https://epsg.io/ (click the JSON option at the bottom of the page). You can also customize your PROJJSON string as needed.
Please note that WherobotsDB currently cannot set/get a projjson string to/from a CRS. Its geoparquet reader will ignore the projjson metadata and you will have to set your CRS via ST_SetSRID
after reading the file.
Its geoparquet writer will not leverage the SRID field of a geometry so you will have to always set the geoparquet.crs
option manually when writing the file, if you want to write a meaningful CRS field.
Due to the same reason, WherobotsDB geoparquet reader and writer do NOT check the axis order (lon/lat or lat/lon) and assume they are handled by the users themselves when writing / reading the files. You can always use ST_FlipCoordinates
to swap the axis order of your geometries.
Save as GeoJSON¶
The GeoJSON data source in WherobotsDB can be used to save a Spatial DataFrame to a single-line JSON file, with geometries written in GeoJSON format.
df.write.format("geojson").save("YOUR/PATH.json")
The structure of the generated file will be like this:
{"type":"Feature","geometry":{"type":"Point","coordinates":[102.0,0.5]},"properties":{"prop0":"value0"}}
{"type":"Feature","geometry":{"type":"LineString","coordinates":[[102.0,0.0],[103.0,1.0],[104.0,0.0],[105.0,1.0]]},"properties":{"prop0":"value1"}}
{"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[100.0,0.0],[101.0,0.0],[101.0,1.0],[100.0,1.0],[100.0,0.0]]]},"properties":{"prop0":"value2"}}
Save to PostGIS¶
Unfortunately, the Spark SQL JDBC data source doesn't support creating geometry types in PostGIS using the 'createTableColumnTypes' option. Only the Spark built-in types are recognized. This means that you'll need to manage your PostGIS schema separately from Spark. One way to do this is to create the table with the correct geometry column before writing data to it with Spark. Alternatively, you can write your data to the table using Spark and then manually alter the column to be a geometry type afterward.
Postgis uses EWKB to serialize geometries. If you convert your geometries to EWKB format in WherobotsDB you don't have to do any additional conversion in Postgis.
Step 1: In PostGIS¶
my_postgis_db# create table my_table (id int8, geom geometry);
Step 2: In Spark¶
df.withColumn("geom", expr("ST_AsEWKB(geom)")
.write.format("jdbc")
.option("truncate","true") // Don't let Spark recreate the table.
// Other options.
.save()
Step 3 (optional): In PostGIS¶
If you didn't create the table before writing you can change the type afterward.
my_postgis_db# alter table my_table alter column geom type geometry;
Save to GeoPandas¶
WherobotsDB DataFrame can be directly converted to a GeoPandas DataFrame.
import geopandas as gpd
df = spatialDf.toPandas()
gdf = gpd.GeoDataFrame(df, geometry="geometry")
You can then plot the GeoPandas DataFrame using many tools in the GeoPandas ecosystem.
gdf.plot(
figsize=(10, 8),
column="value",
legend=True,
cmap='YlOrBr',
scheme='quantiles',
edgecolor='lightgray'
)
Save to Snowflake¶
WherobotsDB Dataframes can be exported to Snowflake tables for persistent storage.
In order to enable bi-directional communication between Spark and Snowflake, a map of configuration parameters must be passed as options
to the SedonaContext object.
The configuration parameters include connection and context options. Details on the possible values of these options can be found here
The SedonaContext object also needs to be passed a SaveMode
parameter using mode
which specifies how to handle collisions with existing tables if any
# snowflakeUrl is https://<accountIdentifier>.snowflakecomputing.com
sfOptions = {"sfUrl": snowflake_url, "sfUser": username, "sfPassword" : password, "sfDatabase": database, "sfSchema": schema}
dest_table_name = "<DESTINATION_TABLE_NAME>"
save_mode = "append" # Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_final.write.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", destination_table) \
.mode(saveMode=save_mode) \
.save()
// snowflakeUrl is https://<accountIdentifier>.snowflakecomputing.com
val sfOptions = Map("sfUrl" -> snowflakeUrl, "sfUser" -> username, "sfPassword" -> password, "sfDatabase" -> database, "sfSchema" -> schema)
val dest_table_name = "<DESTINATION_TABLE_NAME>"
val save_mode = "append" // Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", dest_table_name)
.mode(saveMode=save_mode)
.save()
import java.util.HashMap;
HashMap<String, String> sfOptions = new HashMap<>();
sfOptions.put("sfUrl", snowflakeUrl); // snowflakeUrl is https://<accountIdentifier>.snowflakecomputing.com
sfOptions.put("sfUser", username);
sfOptions.put("sfPassword", password);
sfOptions.put("sfDatabase", database);
sfOptions.put("sfSchema", schema);
String dest_table_name = "<DESTINATION_TABLE_NAME>";
String save_mode = "append" // Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", dest_table_name)
.mode(save_mode)
.save()
Save to an AWS RDS PostGIS instance¶
WherobotsDB dataframes can be saved to PostGIS tables hosted in an AWS RDS instance for persistent storage.
A map of configuration and context options must be passed to establish connection with the RDS instance.
If you're unable to establish connection with the RDS instance, double check if the instance is accessible by the server running this code. For more information on intra or inter VPC connection with the RDS instance, consult here.
The SedonaContext object also needs to be passed a SaveMode
parameter using mode
which specifies how to handle collisions with existing tables if any.
url = '<URL>' # jdbc:postgresql://ENDPOINT/DATABASE_NAME
driver = 'org.postgresql.Driver'
user = '<USERNAME>'
password = '<PASSWORD>'
options = {"url": url, "driver": driver, "user": user, "password": password}
dest_table_name = "<DESTINATION_TABLE_NAME>"
save_mode = "append" # Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_final.write.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", destination_table) \
.mode(saveMode=save_mode) \
.save()
val url = "<URL>" // jdbc:postgresql://ENDPOINT/DATABASE_NAME
val driver = "org.postgresql.Driver"
val user = "<USERNAME>"
val password = "<PASSWORD>"
val options = Map("url" -> url, "user" -> user, "password" -> password, "driver" -> driver)
val save_mode = "append" // Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write.format("jdbc")
.options(options)
.option("dbtable", dest_table_name)
.mode(saveMode=save_mode)
.save()
import java.util.HashMap;
HashMap<String, String> options = new HashMap<>();
options.put("url", url); // jdbc:postgresql://ENDPOINT/DATABASE_NAME
options.put("user", username);
options.put("password", password);
options.put("driver", "org.postgresql.Driver");
String dest_table_name = "<DESTINATION_TABLE_NAME>";
String save_mode = "append" // Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write.format("jdbc")
.options(options)
.option("dbtable", dest_table_name)
.mode(save_mode)
.save()
Save to Havasu (Iceberg) tables¶
Please read the Havasu documentation for more information on how to save a Spatial DataFrame to Havasu tables.