Skip to content

Wherobots Apache Airflow Provider

Wherobots offers an Apache Airflow provider that allows users to process data with WherobotsDB directly from their managed Apache Airflow DAGs.

Setup

Install from PyPI

You can install the Wherobots Apache Airflow provider through PyPI with pip:

pip install airflow-providers-wherobots

Or add it to the dependencies of your Apache Airflow application.

Create a new Connection in Airflow Server

You first need to create a Connection in the Airflow Server. There are two ways to create a connection: through the CLI or through the UI.

Create through CLI

You can create the connection through the Apache Airflow CLI.

Copy the following command line, replace the $(< api.key) with your Wherobots API key. Then execute the command in your terminal.

$ airflow connections add "wherobots_default" \
    --conn-type "generic" \
    --conn-host "api.cloud.wherobots.com" \
    --conn-password "$(< api.key)"

Create through UI

Or you can create the connection through the Apache Airflow UI following the steps below:

  1. Navigate to your Apache Airflow UI home page. click the Admin tab on the top right corner, and select Connections. airflow-ui-connections.png
  2. Click the + button to add a new connection. airflow-ui-connections.png
  3. Fill in the connection details:
  4. Connection ID: wherobots_default
  5. Connection Type: generic
  6. Host: api.cloud.wherobots.com
  7. Password: Your Wherobots API key airflow-ui-connections.png
  8. Click Save to confirm the connection.

WherobotsSqlOperator

Use the WherobotsSqlOperator to execute SQL queries on Wherobots Cloud against your datasets in your Wherobots catalogs.

Using the Operator

The WherobotsSqlOperator requires a sql argument, which can be a SQL query string, or a list of query strings.

Below is a simple example of using the operator.

operator = WherobotsSqlOperator(
        task_id="execute_query",
        sql="""
        SELECT id, geometry, confidence, geohash
        FROM wherobots_open_data.overture.places_place
        LIMIT 100
        """,
        return_last=False,
    )

Build ETL pipelines with the WherobotsSqlOperator

Loading or creating tables into the Wherobots Catalog allows you to query, process, and work with your data using pure SQL queries. In this example, we'll use a SQL query to create a new table from the result of a query on an existing table of the Overture Maps public dataset.

First, create a new database in your wherobots catalog. You can execute those SQL queries using our Spatial SQL API or from a notebook.

CREATE DATABASE IF NOT EXISTS wherobots.test_db

Now we build a new table called wherobots.test_db.top_100_hot_buildings_daily from the query result on tables in the wherobots_open_data catalog. It finds out the 100 buildings from wherobots_open_data.overture.buildings_building table that contains the most points recorded in wherobots_open_data.overture.places_place table at 2023-07-24.

CREATE TABLE wherobots.test_db.top_100_hot_buildings_daily AS
SELECT buildings.id, first(buildings.names) as names, count(places.geometry), '2023-07-24' as ts
    FROM wherobots_open_data.overture.places_place places
         JOIN wherobots_open_data.overture.buildings_building buildings
         ON ST_CONTAINS(buildings.geometry, places.geometry)
    WHERE places.updatetime >= '2023-07-24'
          AND places.updatetime < '2023-07-25'
          AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
          AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
    GROUP BY 1
    ORDER BY 3 desc
    LIMIT 100

Now you can query the resulting table to verify the results:

SELECT * FROM wherobots.test_db.top_100_hot_buildings_daily
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|id                                                                  |first(names)                                                                                                                                                           |count(geometry)|ts        |
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|tmp_72313131373137393340342D333031342E373539343833333034343439      |{common -> [{value -> Grand Central Terminal, language -> local}]}                                                                                                     |624            |2023-07-24|
|tmp_7732373634313033373340312D333032302E333535353036393230383134    |{}                                                                                                                                                                     |476            |2023-07-24|
|tmp_77343332313331324034342D333436352E32313130363731393438343132    |{common -> [{value -> CF 토론토 이튼 센터, language -> ko}, {value -> CF Toronto Eaton Centre, language -> en}, {value -> CF Toronto Eaton Centre, language -> local}]}|381            |2023-07-24|
|tmp_773337333136323234364031352D333437312E38313636303038323138393035|{common -> [{value -> Square One, language -> local}]}                                                                                                                 |360            |2023-07-24|
|tmp_7731393838373435394034312D333437342E31313337343135303734303133  |{common -> [{value -> Yorkdale Shopping Centre, language -> local}]}                                                                                                   |258            |2023-07-24|
|tmp_7733323836363031393440362D323939382E373635383131313635353434    |{common -> [{value -> Roosevelt Field Mall, language -> local}]}                                                                                                       |249            |2023-07-24|
|tmp_723238363531343040332D333035382E30383231323232313334323134      |{}

To turn this ETL into a daily process orchestrated by Apache Airflow, bring the query into your DAG's definition of the WherobotsSqlOperator, changing the CREATE TABLE ... AS into INSERT INTO ... to append new data each day into your table, and leveraging Apache Airflow's macros for the daily date range.

Below is an example DAG file. The macros variables {{ ds }} and {{ next_ds }} will be replaced dynamically by the actual schedule time.

import datetime

from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator

with DAG(
    dag_id="example_wherobots_sql_dag",
    start_date=datetime.datetime.strptime("2023-07-24", "%Y-%m-%d"),
    schedule="@daily",
    catchup=True,
    max_active_runs=1,
):
    operator = WherobotsSqlOperator(
            task_id="execute_query",
            wait_for_downstream=True,
            sql="""
            INSERT INTO wherobots.test_db.top_100_hot_buildings_daily
            SELECT buildings.id, first(buildings.names), count(places.geometry), '{{ ds }}' as ts
            FROM wherobots_open_data.overture.places_place places
                 JOIN wherobots_open_data.overture.buildings_building buildings
                 ON ST_CONTAINS(buildings.geometry, places.geometry)
            WHERE places.updatetime >= '{{ ds }}'
                  AND places.updatetime < '{{ next_ds }}'
                  AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
                  AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
            GROUP BY 1
            ORDER BY 3 desc
            LIMIT 100
            """,
            return_last=False,
        )

There are two ways to test the DAG file:

Trigger through UI

You can put the DAG file into the $AIRFLOW_HOME/dags directory and trigger the DAG from the Airflow UI.

Below is an example run of the DAG file. You will find the exact queries executed from the logs.

example-dag-run

Note:

  • If you are launching Apache Airflow instance through airflow standalone, and you are working on macOS, you may need to execute the following line:
# To handle the issue https://bugs.python.org/issue28342
export no_proxy=*
# Then launch your Apache Airflow standalone instance
airflow standalone
  • The second batch will fail because there is no data in the source tables at after 2023-07-24.

Trigger through pytest

Follow the steps in the official Apache Airflow Guidance to test the DAG file with pytest. Here's an example Python file that shows how to bring your DAG and Pytest execution together:

import datetime

import pendulum
import pytest
import uuid
from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator

from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

TEST_DAG_ID = "my_custom_operator_dag" + str(uuid.uuid4())
TEST_TASK_ID = "my_custom_operator_task" + str(uuid.uuid4())
DATA_INTERVAL_START = pendulum.datetime(2023, 7, 24, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)


@pytest.fixture()
def dag():
    with DAG(
        dag_id=TEST_DAG_ID,
        start_date=datetime.datetime.strptime("2023-07-24", "%Y-%m-%d"),
        schedule="@daily",
        catchup=True,
        max_active_runs=1,
    ) as dag:
        operator = WherobotsSqlOperator(
            task_id=TEST_TASK_ID,
            wait_for_downstream=True,
            sql="""
                INSERT INTO wherobots.test_db.top_100_hot_buildings_daily
                SELECT buildings.id, first(buildings.names), count(places.geometry), '{{ ds }}' as ts
                FROM wherobots_open_data.overture.places_place places
                     JOIN wherobots_open_data.overture.buildings_building buildings
                     ON ST_CONTAINS(buildings.geometry, places.geometry)
                WHERE places.updatetime >= '{{ ds }}'
                      AND places.updatetime < '{{ next_ds }}'
                      AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
                      AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
                GROUP BY 1
                ORDER BY 3 desc
                LIMIT 100
                """,
            return_last=False,
        )
    return dag


def test_my_custom_operator_execute_no_trigger(dag):
    dagrun = dag.create_dagrun(
        state=DagRunState.RUNNING,
        execution_date=DATA_INTERVAL_START,
        data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
        start_date=DATA_INTERVAL_END,
        run_type=DagRunType.MANUAL,
    )
    ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
    ti.task = dag.get_task(task_id=TEST_TASK_ID)
    ti.run(ignore_ti_state=True)
    assert ti.state == TaskInstanceState.SUCCESS
    # Assert something related to tasks results.

Copy this piece of codes into a python file, for example test_wherobots_example_dag.py. Simply execute pytest test_wherobots_example_dag.py

It can take a few minutes for your WherobotsDB SQL Session to be initialized, so you may not see any output for a little while. Logs will appear once the test completes.


Last update: July 8, 2024 02:18:16