Wherobots Apache Airflow Provider¶
Wherobots offers an Apache Airflow provider that allows users to process data with WherobotsDB directly from their managed Apache Airflow DAGs.
Setup¶
Install from PyPI¶
You can install the Wherobots Apache Airflow provider through PyPI with
pip
:
pip install airflow-providers-wherobots
Or add it to the dependencies of your Apache Airflow application.
Create a new Connection in Airflow Server¶
You first need to create a Connection in the Airflow Server. There are two ways to create a connection: through the CLI or through the UI.
Create through CLI
You can create the connection through the Apache Airflow CLI.
Copy the following command line, replace the $(< api.key)
with your Wherobots API key.
Then execute the command in your terminal.
$ airflow connections add "wherobots_default" \
--conn-type "generic" \
--conn-host "api.cloud.wherobots.com" \
--conn-password "$(< api.key)"
Create through UI
Or you can create the connection through the Apache Airflow UI following the steps below:
- Navigate to your Apache Airflow UI home page. click the
Admin
tab on the top right corner, and selectConnections
. - Click the
+
button to add a new connection. - Fill in the connection details:
- Connection ID:
wherobots_default
- Connection Type:
generic
- Host:
api.cloud.wherobots.com
- Password: Your Wherobots API key
- Click
Save
to confirm the connection.
WherobotsSqlOperator¶
Use the WherobotsSqlOperator
to execute SQL queries on Wherobots Cloud
against your datasets in your Wherobots catalogs.
Using the Operator¶
The WherobotsSqlOperator
requires a sql
argument, which can be a SQL
query string, or a list of query strings.
Below is a simple example of using the operator.
operator = WherobotsSqlOperator(
task_id="execute_query",
sql="""
SELECT id, geometry, confidence, geohash
FROM wherobots_open_data.overture.places_place
LIMIT 100
""",
return_last=False,
)
Build ETL pipelines with the WherobotsSqlOperator¶
Loading or creating tables into the Wherobots Catalog allows you to query, process, and work with your data using pure SQL queries. In this example, we'll use a SQL query to create a new table from the result of a query on an existing table of the Overture Maps public dataset.
First, create a new database in your wherobots
catalog. You can execute those SQL queries
using our Spatial SQL API or from a
notebook.
CREATE DATABASE IF NOT EXISTS wherobots.test_db
Now we build a new table called wherobots.test_db.top_100_hot_buildings_daily
from the query result on tables in the wherobots_open_data
catalog.
It finds out the 100 buildings from wherobots_open_data.overture.buildings_building
table
that contains the most points recorded in wherobots_open_data.overture.places_place
table
at 2023-07-24
.
CREATE TABLE wherobots.test_db.top_100_hot_buildings_daily AS
SELECT buildings.id, first(buildings.names) as names, count(places.geometry), '2023-07-24' as ts
FROM wherobots_open_data.overture.places_place places
JOIN wherobots_open_data.overture.buildings_building buildings
ON ST_CONTAINS(buildings.geometry, places.geometry)
WHERE places.updatetime >= '2023-07-24'
AND places.updatetime < '2023-07-25'
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
GROUP BY 1
ORDER BY 3 desc
LIMIT 100
Now you can query the resulting table to verify the results:
SELECT * FROM wherobots.test_db.top_100_hot_buildings_daily
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|id |first(names) |count(geometry)|ts |
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|tmp_72313131373137393340342D333031342E373539343833333034343439 |{common -> [{value -> Grand Central Terminal, language -> local}]} |624 |2023-07-24|
|tmp_7732373634313033373340312D333032302E333535353036393230383134 |{} |476 |2023-07-24|
|tmp_77343332313331324034342D333436352E32313130363731393438343132 |{common -> [{value -> CF 토론토 이튼 센터, language -> ko}, {value -> CF Toronto Eaton Centre, language -> en}, {value -> CF Toronto Eaton Centre, language -> local}]}|381 |2023-07-24|
|tmp_773337333136323234364031352D333437312E38313636303038323138393035|{common -> [{value -> Square One, language -> local}]} |360 |2023-07-24|
|tmp_7731393838373435394034312D333437342E31313337343135303734303133 |{common -> [{value -> Yorkdale Shopping Centre, language -> local}]} |258 |2023-07-24|
|tmp_7733323836363031393440362D323939382E373635383131313635353434 |{common -> [{value -> Roosevelt Field Mall, language -> local}]} |249 |2023-07-24|
|tmp_723238363531343040332D333035382E30383231323232313334323134 |{}
To turn this ETL into a daily process orchestrated by Apache Airflow,
bring the query into your DAG's definition of the
WherobotsSqlOperator
, changing the CREATE TABLE ... AS
into INSERT
INTO ...
to append new data each day into your table, and leveraging
Apache Airflow's macros for the daily date range.
Below is an example DAG file.
The macros variables {{ ds }}
and {{ next_ds }}
will be replaced dynamically by the actual schedule time.
import datetime
from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator
with DAG(
dag_id="example_wherobots_sql_dag",
start_date=datetime.datetime.strptime("2023-07-24", "%Y-%m-%d"),
schedule="@daily",
catchup=True,
max_active_runs=1,
):
operator = WherobotsSqlOperator(
task_id="execute_query",
wait_for_downstream=True,
sql="""
INSERT INTO wherobots.test_db.top_100_hot_buildings_daily
SELECT buildings.id, first(buildings.names), count(places.geometry), '{{ ds }}' as ts
FROM wherobots_open_data.overture.places_place places
JOIN wherobots_open_data.overture.buildings_building buildings
ON ST_CONTAINS(buildings.geometry, places.geometry)
WHERE places.updatetime >= '{{ ds }}'
AND places.updatetime < '{{ next_ds }}'
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
GROUP BY 1
ORDER BY 3 desc
LIMIT 100
""",
return_last=False,
)
There are two ways to test the DAG file:
Trigger through UI
You can put the DAG file into the $AIRFLOW_HOME/dags
directory and trigger the DAG from the Airflow UI.
Below is an example run of the DAG file. You will find the exact queries executed from the logs.
Note:
- If you are launching Apache Airflow instance through
airflow standalone
, and you are working on macOS, you may need to execute the following line:
# To handle the issue https://bugs.python.org/issue28342
export no_proxy=*
# Then launch your Apache Airflow standalone instance
airflow standalone
- The second batch will fail because there is no data in the source tables at after
2023-07-24
.
Trigger through pytest
Follow the steps in the official Apache Airflow Guidance
to test the DAG file with pytest
.
Here's an example Python file that shows how to bring your DAG and Pytest execution together:
import datetime
import pendulum
import pytest
import uuid
from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
TEST_DAG_ID = "my_custom_operator_dag" + str(uuid.uuid4())
TEST_TASK_ID = "my_custom_operator_task" + str(uuid.uuid4())
DATA_INTERVAL_START = pendulum.datetime(2023, 7, 24, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
@pytest.fixture()
def dag():
with DAG(
dag_id=TEST_DAG_ID,
start_date=datetime.datetime.strptime("2023-07-24", "%Y-%m-%d"),
schedule="@daily",
catchup=True,
max_active_runs=1,
) as dag:
operator = WherobotsSqlOperator(
task_id=TEST_TASK_ID,
wait_for_downstream=True,
sql="""
INSERT INTO wherobots.test_db.top_100_hot_buildings_daily
SELECT buildings.id, first(buildings.names), count(places.geometry), '{{ ds }}' as ts
FROM wherobots_open_data.overture.places_place places
JOIN wherobots_open_data.overture.buildings_building buildings
ON ST_CONTAINS(buildings.geometry, places.geometry)
WHERE places.updatetime >= '{{ ds }}'
AND places.updatetime < '{{ next_ds }}'
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
GROUP BY 1
ORDER BY 3 desc
LIMIT 100
""",
return_last=False,
)
return dag
def test_my_custom_operator_execute_no_trigger(dag):
dagrun = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DATA_INTERVAL_START,
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
start_date=DATA_INTERVAL_END,
run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = dag.get_task(task_id=TEST_TASK_ID)
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS
# Assert something related to tasks results.
Copy this piece of codes into a python file, for example test_wherobots_example_dag.py
.
Simply execute pytest test_wherobots_example_dag.py
It can take a few minutes for your WherobotsDB SQL Session to be initialized, so you may not see any output for a little while. Logs will appear once the test completes.