Skip to content

WherobotsRunOperator

You can submit your scripts for execution on the Wherobots Cloud. A Job Run tracks the status of that script's execution. When implemented, an Airflow TaskInstance executes a Job Run and waits for it to complete.

For a comprehensive list of the parameters associated with Job Runs, see Runs REST API.

Benefits

You can use Airflow to streamline, automate, and manage complex ETL workload tasks that are running on your vector or raster based data. By using WherobotsRunOperator, you can incorporate Wherobots’ geospatial data processing features into your Airflow workflows.

With WherobotsRunOperator, you no longer need to manage and configure your own cluster to complete your geospatial data processing. Wherobots can handle that resource allocation for you, while providing analytics capabilities that are optimized for geospatial concerns.

Before you start

Before using WherobotsRunOperator, ensure that you have the following required resources:

DAG files

Directed Acyclic Graph (DAG) files are Python scripts that define your Airflow workflows. DAG files need to be accessible to the Airflow scheduler so that your tasks can be executed. DAG files are commonly stored in $AIRFLOW_HOME/dags. For more information, see Setting Configuration Options in the Apache Airflow Documentation.

WherobotsRunOperator constructor

The following example details how to integrate WherobotsRunOperator into your DAG file.

In this documentation, we discuss the WherobotsRunOperator-related lines of code which are highlighted below. For more information on the general formatting of Airflow DAG files, see Best Practices in the Airflow Documentation.

Example DAG file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import datetime
import pendulum
from airflow import DAG
from airflow_providers_wherobots.operators.run import WherobotsRunOperator
with DAG(
    dag_id="test_run_operator",
    schedule="@once",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as test_run_dag:
    operator = WherobotsRunOperator(
            runtime="tiny",
        task_id="test_run_smoke",
        name="airflow_operator_test_run_{{ ts_nodash }}",
        run_python={
            "uri": "S3-PATH-TO-YOUR-FILE"
        },
        dag=test_run_dag,
        poll_logs=True,
    )

Can I use any of the runtimes listed in Accepted values?

You might not have access to all of the available runtimes. For example, Community Edition Organizations can only use the "tiny" runtime. You can see the runtimes available to your Organization in the Runtime Configuration section of Organization Settings.

Parameter Type Description Accepted values
operator WherobotsRunOperator instantiates WherobotsRunOperator.
runtime str Specifies the Wherobots runtime.
runtime is optional but cannot be an empty string.
See Runs REST API for more information.
"tiny", "small", "medium", "large", "x-large", "2x-large", "4x-large", "medium-himem", "large-himem", "x-large-himem", "2x-large-himem", "4x-large-himem", "tiny-a10-gpu", "small-a10-gpu","medium-a10-gpu"

You can see the runtimes
available to your organization in the Runtime Configuration section Organization Settings.
name str The name of the run. If not specified, a default name will be generated.
run_python dict Provide the Python file's Wherobots Managed Storage S3 path. You can use either run_python or run_jar, not both. Takes the following keys: uri:(str) and args: (list[str]).
poll_logs bool Enables Log polling when set to True. True, False

Additional parameters for Job Runs

There are additional parameters for the Runs API. For an exhaustive list of parameters and their accepted values see Runs REST API.

Other parameters

The following parameters aren't included in the example but are also compatible with the WherobotsRunsOperator.

Parameter Type Description Accepted values
run_jar dict Provide the JAR file's Wherobots Managed Storage
S3 path.
You can use either run_python or run_jar, not both.
Takes the following keys: uri:(str), args: (list[str]), mainClass: (str).
polling_interval int The interval in seconds to poll the status of the run.
The default value is 30.
environment dict The model for runtime environment configs, including Spark cluster configs and dependencies. Defaults to {}. For more information see Environment keys.
Environment keys
Environment parameter Type Required or Optional Parameter description
sparkConfigs dict{str:str} Optional
dependencies list Optional
sparkDriverDiskGB int Optional The driver disk size of the Spark cluster.
sparkExecutorDiskGB: int Optional The executor disk size of the Spark cluster.
sparkConfigs dict{str:str} Optional The user specified Spark configs.
dependencies list[dict] Optional Indicates the 3rd party dependencies need to
add to the runtime. Required if adding sparkConfigs. Must be an array (list) of objects (dictionaries). Each object must represent either a Python Package Index or a File Dependency.
Dependencies

The following details information on the third-party dependencies that can be added to the runtime environment.

Parameter Type Details Rules for accepted values
sourceType string Enum for the source type of dependency Valid values: PYPI or FILE.
filePath string The file path to the dependency file. Must be a valid file path accessible within the runtime environment.
The file extension must be one of: .jar, .whl, or .zip. Can only be used with the FILE sourceType.
libraryName string The python package name. Can only be used with the PYPI sourceType.
libraryVersion string The Python package version. Can only be used with the PYPI sourceType.

JAR and Python Scripts

You can use Python or JAR files to further leverage Wherobots' geospatial features and optimizations in conjunction with an Airflow DAG.

Tile generation

This example uses Sedona to process and generate vector tiles. The example reads buildings and roads data from Wherobots open datasets, filters the data based on a specified region, and then generates vector tiles using the wherobots.vtiles library. It writes the tiles to a PMTiles file in the user's S3 storage and displays a sample of the tiles.

Tile generation example Python script
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from sedona.spark import *
from wherobots import vtiles
from sedona.sql.st_constructors import ST_GeomFromText
from sedona.sql.st_predicates import ST_Intersects
import pyspark.sql.functions as f
import os
import pyspark.sql.functions as f

config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)

# Set to False to generate tiles for the entire dataset, True to generate only for region_wkt area
filter = True
region_wkt = "POLYGON ((-122.097931 47.538528, -122.048836 47.566566, -121.981888 47.510012, -122.057076 47.506302, -122.097931 47.538528))"
filter_expression = ST_Intersects(f.col("geometry"), ST_GeomFromText(f.lit(region_wkt)))

buildings_df = (
    sedona.table("wherobots_open_data.overture_2024_02_15.buildings_building")
    .select(
        f.col("geometry"),
        f.lit("buildings").alias("layer"),
        f.element_at(f.col("sources"), 1).dataset.alias("source")
    )
)

buildings_df.show()

roads_df = (
    sedona.table("wherobots_open_data.overture_2024_02_15.transportation_segment")
    .select(
        f.col("geometry"),
        f.lit("roads").alias("layer"),
        f.element_at(f.col("sources"), 1).dataset.alias("source")
    )
)

roads_df.show()

features_df = roads_df.union(buildings_df)

if filter:
    features_df = features_df.filter(ST_Intersects(f.col("geometry"), ST_GeomFromText(f.lit(region_wkt))))

features_df.count()

tiles_df = vtiles.generate(features_df)

tiles_df.show(3, 150, True)

full_tiles_path = os.getenv("USER_S3_PATH") + "tiles.pmtiles"
vtiles.write_pmtiles(tiles_df, full_tiles_path, features_df=features_df)  

vtiles.show_pmtiles(full_tiles_path)

sample_tiles_path = os.getenv("USER_S3_PATH") + "sampleTiles.pmtiles"
vtiles.generate_quick_pmtiles(features_df, sample_tiles_path)

Raster inference

This example performs batch image classification using Sedona and Wherobots. It reads raster data from an S3 bucket, applies a classification model to each image, extracts the most confident prediction for each image, and displays a few sample images along with their predicted labels.

Not available to Community Edition customers

Raster inference is not available to Community Edition customers. For more information on the different plans available, see Wherobots Pricing.

Raster inference example script
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from pyspark.sql.functions import col
from sedona.raster_utils.SedonaUtils import SedonaUtils
from pyspark.sql.functions import rand
from wherobots.inference.engine.register import create_single_label_classification_udfs
from wherobots.inference.data.io import read_raster_table
from sedona.spark import SedonaContext
from pyspark.sql.functions import expr

import warnings

warnings.filterwarnings('ignore')

config = SedonaContext.builder().appName('classification-batch-inference')\
    .getOrCreate()

sedona = SedonaContext.create(config)

tif_folder_path = 's3a://wherobots-examples/data/eurosat_small'
files_df = read_raster_table(tif_folder_path, sedona)
df_raster_input = files_df.withColumn(
        "outdb_raster", expr("RS_FromPath(path)")
    )
df_raster_input.cache().count()
df_raster_input.show(truncate=False)

# %%time
df_raster_input.createOrReplaceTempView("df_raster_input")
model_id = 'landcover-eurosat-sentinel2'
predictions_df = sedona.sql(f"SELECT name, outdb_raster, RS_CLASSIFY('{model_id}', outdb_raster) AS preds FROM df_raster_input")
predictions_df.cache().count()
predictions_df.show(truncate=False)
predictions_df.createOrReplaceTempView("predictions_df")

max_predictions_df = sedona.sql(f"SELECT name, outdb_raster, RS_MAX_CONFIDENCE(preds).max_confidence_label, RS_MAX_CONFIDENCE(preds).max_confidence_score FROM predictions_df")
max_predictions_df.show(20, truncate=False)

rs_classify, rs_max_confidence = create_single_label_classification_udfs(batch_size = 10, sedona=sedona)
df_predictions = df_raster_input.withColumn("preds", rs_classify(model_id, 'outdb_raster'))
df_predictions.show(1)

df_max_predictions = df_predictions.withColumn("max_confidence_temp", rs_max_confidence(col("preds"))) \
                            .withColumn("max_confidence_label", col("max_confidence_temp.max_confidence_label")) \
                            .withColumn("max_confidence_score", col("max_confidence_temp.max_confidence_score")) \
                            .drop("max_confidence_temp", "preds")
df_max_predictions.cache().count()
df_max_predictions.show(2, truncate=False)

df_rast = sedona.read.format("binaryFile").option("pathGlobFilter", "*.tif").option("recursiveFileLookup", "true").load(tif_folder_path).selectExpr("RS_FromGeoTiff(content) as raster")

htmlDF = df_max_predictions.selectExpr("RS_Band(outdb_raster, Array(4, 3, 2)) as image_raster", "name", "max_confidence_label")\
    .selectExpr("RS_NormalizeAll(image_raster, 1, 65535, True) as image_raster", "name", "max_confidence_label")\
    .selectExpr("RS_AsImage(image_raster, 500) as image_raster", "name", "max_confidence_label")

SedonaUtils.display_image(htmlDF.orderBy(rand()).limit(3))

Reviewing logs

WherobotsRunOperator removes the need to write your own logic to poll the Wherobots Job Run logs. Airflow users can access the logs by specifying poll_logs=true in a DAG. Wherobots polls the logs and streams them in the Airflow Task logs.

Airflow UI

When you start your Airflow DAG from the Airflow server, logs can be seen in the Airflow Server's Logs tab. Relevant logs begin with the line INFO - === Logs for Run <run_id> Start:. The run_id can be found in the Airflow Server's XCom tab.

Limitations

WherobotsRunOperator has the following limitations:

  • Only Wherobots Managed Storage S3 paths can be used in conjunction with Job Runs.
  • Wherobots only stores logs for 30 days.
  • The amount of concurrent Job runs that can occur are limited by the quotas associated with your Wherobots organization. For more information on reviewing your computational resources, see Quotas in the Wherobots documentation.

Last update: October 13, 2024 18:19:03