Use the WherobotsSqlOperator to execute SQL queries on Wherobots Cloud
against your datasets in your Wherobots catalogs.
Using the Operator
The WherobotsSqlOperator requires a sql argument, which can be a SQL query
string, or a list of query strings. You can also optionally specify the runtime
you want to use to power your query.
Below is a simple example of using the operator.
simple-operator-example.py
from wherobots.db.runtime import Runtime
from wherobots.db.region import Region
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator
operator = WherobotsSqlOperator(
# region parameter establishes a connection to a specified AWS cloud provider region.
# Replace 'Region.AWS_US_WEST_2' with the desired AWS region, for example:
# - For AWS US East (N. Virginia): region=Region.AWS_US_EAST_1
region=Region.AWS_US_WEST_2,
task_id="execute_query",
# runtime parameter specifies the compute resources allocated for the runtime environment.
# Replace 'Runtime.TINY' with the desired runtime size, for example:
# - For a small runtime: runtime=Runtime.SMALL
# - For a medium runtime: runtime=Runtime.MEDIUM
runtime=Runtime.TINY,
sql="""
SELECT id, geometry, confidence, geohash
FROM wherobots_open_data.overture_maps_foundation.places_place
LIMIT 100
""",
return_last=False,
)
Runtime and region selection
You can choose the Wherobots runtime you want to use with the runtime
parameter, passing in one of the Runtime enum values.
For guidance on runtime sizing and selection, see Runtimes.
To prepare for the expansion of Wherobots Cloud to new regions and cloud providers, the region parameter will become mandatory in a future SDK version.
Before this support for new regions is added, we will release an updated version of the SDK.
If you continue using an older SDK version, your existing Airflow tasks will still work. However, any new or existing tasks you create without specifying the region parameter will be hosted in the aws-us-west-2 region.
For a full list of Wherobots’ supported AWS regions, see Cloud and Cloud Region Availability section
You can see the runtimes available to your organization within the
Start a Notebook dropdown in Wherobots Cloud.
Build ETL pipelines with the WherobotsSqlOperator
Loading or creating tables into the Wherobots Catalog
allows you to query, process, and work with your data using
pure SQL queries.
In this example, we’ll use a SQL query to create a new table from the
result of a query on an existing table of the
Overture Maps public dataset.
First, create a new database in your wherobots catalog. You can execute those SQL queries
using our Spatial SQL API or from a
notebook.
CREATE DATABASE IF NOT EXISTS org_catalog.test_db
Now we build a new table called org_catalog.test_db.top_100_hot_buildings_daily
from the query result on tables in the wherobots_open_data catalog.
It finds out the 100 buildings from wherobots_open_data.overture_maps_foundation.buildings_building table
that contains the most points recorded in wherobots_open_data.overture_maps_foundation.places_place table
at 2023-07-24.
CREATE TABLE org_catalog.test_db.top_100_hot_buildings_daily AS
SELECT buildings.id, first(buildings.names) as names, count(places.geometry), '2023-07-24' as ts
FROM wherobots_open_data.overture_maps_foundation.places_place places
JOIN wherobots_open_data.overture_maps_foundation.buildings_building buildings
ON ST_CONTAINS(buildings.geometry, places.geometry)
WHERE places.updatetime >= '2023-07-24'
AND places.updatetime < '2023-07-25'
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
GROUP BY 1
ORDER BY 3 desc
LIMIT 100
Now you can query the resulting table to verify the results:
SELECT * FROM org_catalog.test_db.top_100_hot_buildings_daily
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|id |first(names) |count(geometry)|ts |
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|tmp_72313131373137393340342D333031342E373539343833333034343439 |{common -> [{value -> Grand Central Terminal, language -> local}]} |624 |2023-07-24|
|tmp_7732373634313033373340312D333032302E333535353036393230383134 |{} |476 |2023-07-24|
|tmp_77343332313331324034342D333436352E32313130363731393438343132 |{common -> [{value -> CF 토론토 이튼 센터, language -> ko}, {value -> CF Toronto Eaton Centre, language -> en}, {value -> CF Toronto Eaton Centre, language -> local}]}|381 |2023-07-24|
|tmp_773337333136323234364031352D333437312E38313636303038323138393035|{common -> [{value -> Square One, language -> local}]} |360 |2023-07-24|
|tmp_7731393838373435394034312D333437342E31313337343135303734303133 |{common -> [{value -> Yorkdale Shopping Centre, language -> local}]} |258 |2023-07-24|
|tmp_7733323836363031393440362D323939382E373635383131313635353434 |{common -> [{value -> Roosevelt Field Mall, language -> local}]} |249 |2023-07-24|
|tmp_723238363531343040332D333035382E30383231323232313334323134 |{}
To turn this ETL into a daily process orchestrated by Apache Airflow,
bring the query into your DAG’s definition of the
WherobotsSqlOperator, changing the CREATE TABLE ... AS into INSERT INTO ... to append new data each day into your table, and leveraging
Apache Airflow’s macros for the daily date range.
Below is an example DAG file.
The macros variables {{ ds }} and {{ next_ds }}
will be replaced dynamically by the actual schedule time.
import datetime
from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator
from wherobots.db.region import Region
from wherobots.db.runtime import Runtime
with DAG(
dag_id="example_wherobots_sql_dag",
start_date=datetime.datetime.strptime("2023-07-24", "%Y-%m-%d"),
schedule="@daily",
catchup=True,
max_active_runs=1,
):
operator = WherobotsSqlOperator(
region=Region.AWS_US_WEST_2,
task_id="execute_query",
wait_for_downstream=True,
sql="""
INSERT INTO org_catalog.test_db.top_100_hot_buildings_daily
SELECT buildings.id, first(buildings.names), count(places.geometry), '{{ ds }}' as ts
FROM wherobots_open_data.overture_maps_foundation.places_place places
JOIN wherobots_open_data.overture_maps_foundation.buildings_building buildings
ON ST_CONTAINS(buildings.geometry, places.geometry)
WHERE places.updatetime >= '{{ ds }}'
AND places.updatetime < '{{ next_ds }}'
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
GROUP BY 1
ORDER BY 3 desc
LIMIT 100
""",
return_last=False,
)
Test your DAG file
There are two ways to test the DAG file, within the Airflow UI or through pytest framework.
You can also refer to the official Apache Airflow Guidance for DAG testing best practices.
Test in Airflow UI
You can put the DAG file into the $AIRFLOW_HOME/dags directory and trigger the DAG from the Airflow UI.
Below is an example run of the DAG file. You will find the exact queries executed from the logs.
-
If you are launching Apache Airflow instance through
airflow standalone,
and you are working on macOS, you may need to execute the following line:
# To handle the issue https://bugs.python.org/issue28342
export no_proxy=*
# Then launch your Apache Airflow standalone instance
airflow standalone
-
The second batch will fail because there is no data in the source tables at after
2023-07-24.
Test using pytest
Pytest is an open-source testing framework for Python.
It can be used to write various types of software tests, including unit tests, integration tests, end-to-end tests, and functional tests. For more information on installing and using pytest, refer to the pytest PyPi page.
Example DAG with pytest
The following is an example Python file that demonstrates how to use your DAG with pytest:
example-DAG-with-pytest.py
import datetime
import pendulum
import pytest
import uuid
from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
TEST_DAG_ID = "my_custom_operator_dag" + str(uuid.uuid4())
TEST_TASK_ID = "my_custom_operator_task" + str(uuid.uuid4())
DATA_INTERVAL_START = pendulum.datetime(2023, 7, 24, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
@pytest.fixture()
def dag():
with DAG(
dag_id=TEST_DAG_ID,
start_date=datetime.datetime.strptime("2023-07-24", "%Y-%m-%d"),
schedule="@daily",
catchup=True,
max_active_runs=1,
) as dag:
operator = WherobotsSqlOperator(
region=Region.AWS_US_WEST_2,
task_id=TEST_TASK_ID,
wait_for_downstream=True,
sql="""
INSERT INTO org_catalog.test_db.top_100_hot_buildings_daily
SELECT buildings.id, first(buildings.names), count(places.geometry), '{{ ds }}' as ts
FROM wherobots_open_data.overture_maps_foundation.places_place places
JOIN wherobots_open_data.overture_maps_foundation.buildings_building buildings
ON ST_CONTAINS(buildings.geometry, places.geometry)
WHERE places.updatetime >= '{{ ds }}'
AND places.updatetime < '{{ next_ds }}'
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
GROUP BY 1
ORDER BY 3 desc
LIMIT 100
""",
return_last=False,
)
return dag
def test_my_custom_operator_execute_no_trigger(dag):
dagrun = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DATA_INTERVAL_START,
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
start_date=DATA_INTERVAL_END,
run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = dag.get_task(task_id=TEST_TASK_ID)
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS
# Assert something related to tasks results.
Execute the test
To execute this test:
- Copy this DAG example into a Python file.
- Save the file with a name of your choosing (e.g.,
YOUR_DAG_FILE_EXAMPLE_NAME.py).
- Execute it using the command:
pytest YOUR_DAG_FILE_EXAMPLE_NAME.py
It can take a few minutes for your WherobotsDB SQL Session to initialize.
Logs will appear once the test completes.