WherobotsSqlOperator¶
Use the WherobotsSqlOperator
to execute SQL queries on Wherobots Cloud
against your datasets in your Wherobots catalogs.
Using the Operator¶
The WherobotsSqlOperator
requires a sql
argument, which can be a SQL query
string, or a list of query strings. You can also optionally specify the runtime
you want to use to power your query.
Below is a simple example of using the operator.
simple-operator-example.py | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
Runtime and region selection¶
You can choose the Wherobots runtime you want to use with the runtime
parameter, passing in one of the Runtime
enum
values.
For guidance on runtime sizing and selection, see Runtimes.
Region parameter will become mandatory
To prepare for the expansion of Wherobots Cloud to new regions and cloud providers, the region
parameter will become mandatory in a future SDK version.
Before this support for new regions is added, we will release an updated version of the SDK.
If you continue using an older SDK version, your existing Airflow tasks will still work. However, any new or existing tasks you create without specifying the region parameter will be hosted in the aws-us-west-2
region.
The following AWS regions are currently supported in the Wherobots Spatial SQL API:
Wherobots Parameter | AWS Region Name | AWS Region Code | Access |
---|---|---|---|
Region.AWS_US_WEST_2 |
Oregon | us-west-2 |
All Organization Editions |
Region.AWS_EU_WEST_1 |
Ireland | eu-west-1 |
Paid Organizations Only |
Region.AWS_US_EAST_1 |
Northern Virginia | us-east-1 |
Paid Organizations Only |
You can see the runtimes available to your organization within the Start a Notebook dropdown in Wherobots Cloud.
Build ETL pipelines with the WherobotsSqlOperator¶
Loading or creating tables into the Wherobots Catalog allows you to query, process, and work with your data using pure SQL queries. In this example, we'll use a SQL query to create a new table from the result of a query on an existing table of the Overture Maps public dataset.
First, create a new database in your wherobots
catalog. You can execute those SQL queries
using our Spatial SQL API or from a
notebook.
CREATE DATABASE IF NOT EXISTS wherobots.test_db
Now we build a new table called wherobots.test_db.top_100_hot_buildings_daily
from the query result on tables in the wherobots_open_data
catalog.
It finds out the 100 buildings from wherobots_open_data.overture.buildings_building
table
that contains the most points recorded in wherobots_open_data.overture.places_place
table
at 2023-07-24
.
CREATE TABLE wherobots.test_db.top_100_hot_buildings_daily AS
SELECT buildings.id, first(buildings.names) as names, count(places.geometry), '2023-07-24' as ts
FROM wherobots_open_data.overture.places_place places
JOIN wherobots_open_data.overture.buildings_building buildings
ON ST_CONTAINS(buildings.geometry, places.geometry)
WHERE places.updatetime >= '2023-07-24'
AND places.updatetime < '2023-07-25'
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
GROUP BY 1
ORDER BY 3 desc
LIMIT 100
Now you can query the resulting table to verify the results:
SELECT * FROM wherobots.test_db.top_100_hot_buildings_daily
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|id |first(names) |count(geometry)|ts |
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|tmp_72313131373137393340342D333031342E373539343833333034343439 |{common -> [{value -> Grand Central Terminal, language -> local}]} |624 |2023-07-24|
|tmp_7732373634313033373340312D333032302E333535353036393230383134 |{} |476 |2023-07-24|
|tmp_77343332313331324034342D333436352E32313130363731393438343132 |{common -> [{value -> CF ν λ‘ ν μ΄νΌ μΌν°, language -> ko}, {value -> CF Toronto Eaton Centre, language -> en}, {value -> CF Toronto Eaton Centre, language -> local}]}|381 |2023-07-24|
|tmp_773337333136323234364031352D333437312E38313636303038323138393035|{common -> [{value -> Square One, language -> local}]} |360 |2023-07-24|
|tmp_7731393838373435394034312D333437342E31313337343135303734303133 |{common -> [{value -> Yorkdale Shopping Centre, language -> local}]} |258 |2023-07-24|
|tmp_7733323836363031393440362D323939382E373635383131313635353434 |{common -> [{value -> Roosevelt Field Mall, language -> local}]} |249 |2023-07-24|
|tmp_723238363531343040332D333035382E30383231323232313334323134 |{}
To turn this ETL into a daily process orchestrated by Apache Airflow,
bring the query into your DAG's definition of the
WherobotsSqlOperator
, changing the CREATE TABLE ... AS
into INSERT
INTO ...
to append new data each day into your table, and leveraging
Apache Airflow's macros for the daily date range.
Below is an example DAG file.
The macros variables {{ ds }}
and {{ next_ds }}
will be replaced dynamically by the actual schedule time.
example-DAG.py | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
Test your DAG file¶
There are two ways to test the DAG file, within the Airflow UI or through pytest framework.
You can also refer to the official Apache Airflow Guidance for DAG testing best practices.
Test in Airflow UI¶
You can put the DAG file into the $AIRFLOW_HOME/dags
directory and trigger the DAG from the Airflow UI.
Below is an example run of the DAG file. You will find the exact queries executed from the logs.
-
If you are launching Apache Airflow instance through
airflow standalone
, and you are working on macOS, you may need to execute the following line:# To handle the issue https://bugs.python.org/issue28342 export no_proxy=* # Then launch your Apache Airflow standalone instance airflow standalone
-
The second batch will fail because there is no data in the source tables at after
2023-07-24
.
Test using pytest¶
Pytest is an open-source testing framework for Python.
It can be used to write various types of software tests, including unit tests, integration tests, end-to-end tests, and functional tests. For more information on installing and using pytest, refer to the pytest PyPi page.
Example DAG with pytest¶
The following is an example Python file that demonstrates how to use your DAG with pytest:
example-DAG-with-pytest.py | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
|
Execute the test¶
To execute this test:
- Copy this DAG example into a Python file.
- Save the file with a name of your choosing (e.g.,
YOUR_DAG_FILE_EXAMPLE_NAME.py
). - Execute it using the command:
pytest YOUR_DAG_FILE_EXAMPLE_NAME.py
It can take a few minutes for your WherobotsDB SQL Session to initialize. Logs will appear once the test completes.