-5.7 C
United States of America
Thursday, January 23, 2025

Introducing simplified interplay with the Airflow REST API in Amazon MWAA


Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a completely managed service that builds upon Apache Airflow, providing its advantages whereas eliminating the necessity so that you can arrange, function, and preserve the underlying infrastructure, decreasing operational overhead whereas rising safety and resilience.

Immediately, we’re excited to announce an enhancement to the Amazon MWAA integration with the Airflow REST API. This enchancment streamlines the flexibility to entry and handle your Airflow environments and their integration with exterior methods, and permits you to work together along with your workflows programmatically. The Airflow REST API facilitates a variety of use circumstances, from centralizing and automating administrative duties to constructing event-driven, data-aware knowledge pipelines.

On this put up, we focus on the enhancement and current a number of use circumstances that the enhancement unlocks in your Amazon MWAA setting.

Airflow REST API

The Airflow REST API is a programmatic interface that permits you to work together with Airflow’s core functionalities. It’s a set of HTTP endpoints to carry out operations corresponding to invoking Directed Acyclic Graphs (DAGs), checking process statuses, retrieving metadata about workflows, managing connections and variables, and even initiating dataset-related occasions, with out immediately accessing the Airflow net interface or command line instruments.

Earlier than right now, Amazon MWAA offered the inspiration for interacting with the Airflow REST API. Although practical, the method of acquiring and managing entry tokens and session cookies added complexity to the workflow. Amazon MWAA now helps a simplified mechanism for interacting with the Airflow REST API utilizing AWS credentials, considerably decreasing complexity and enhancing general usability.

Enhancement overview

The brand new InvokeRestApi functionality permits you to run Airflow REST API requests with a sound SigV4 signature utilizing your present AWS credentials. This function is now accessible to all Amazon MWAA environments (2.4.3+) in supported Amazon MWAA AWS Areas. By performing as an middleman, this REST API processes requests on behalf of customers, requiring solely the setting identify and API request payload as inputs.

Integrating with the Airflow REST API by way of the improved Amazon MWAA API gives a number of key advantages:

  • Simplified integration – The brand new InvokeRestApi functionality in Amazon MWAA removes the complexity of managing entry tokens and session cookies, making it simple to work together with the Airflow REST API.
  • Improved usability – By performing as an middleman, the improved API delivers Airflow REST API execution outcomes on to the shopper, decreasing complexity and enhancing general usability.
  • Automated administration – The simplified REST API entry permits automating numerous administrative and administration duties, corresponding to managing Airflow variables, connections, slot swimming pools, and extra.
  • Occasion-driven architectures – The improved API facilitates seamless integration with exterior occasions, enabling the triggering of Airflow DAGs based mostly on these occasions. This helps the rising emphasis on event-driven knowledge pipelines.
  • Information-aware scheduling – Utilizing the dataset-based scheduling function in Airflow, the improved API permits the Amazon MWAA setting to handle the incoming workload and scale sources accordingly, enhancing the general reliability and effectivity of event-driven pipelines.

Within the following sections, we reveal tips on how to use the improved API in numerous use circumstances.

The best way to use the improved Amazon MWAA API

The next code snippet exhibits the final request format for the improved REST API:

POST /restapi/Identify HTTP/1.1
Content material-type: utility/json

{
    Identify: String,
    Methodology: String,
    Path: String,
    QueryParameters: Json,
    Physique: Json
}

The Identify of the Amazon MWAA setting, the Path of the Airflow REST API endpoint to be referred to as, and the HTTP Methodology to make use of are the required parameters, whereas QueryParameters and Physique are non-obligatory and can be utilized as wanted within the API calls.

The next code snippet exhibits the final response format:

{
    RestApiStatusCode: Quantity,
    RestApiResponse: Json
}

The RestApiStatusCode represents the HTTP standing code returned by the Airflow REST API name, and the RestApiResponse incorporates the response payload from the Airflow REST API.

The next pattern code snippet showcases tips on how to replace the outline discipline of an Airflow variable utilizing the improved integration. The decision makes use of the AWS Python SDK to invoke the Airflow REST API for the duty.

import boto3

# Create a boto3 shopper
mwaa_client = boto3.shopper("mwaa")

# Name the improved REST API utilizing boto3 shopper
# Utilizing QueryParameters, you may selectively specify the sphere to be up to date
# With out QueryParameters, all fields might be up to date
response = mwaa_client.invoke_rest_api(
    Identify="<your-environment-name>",
    Methodology="PATCH",
    Path=f"/variables/<your-variable-key>",
    Physique={
        "key": "<key>",
        "worth": "<worth>",
        "description": "<description>"
    },
    QueryParameters={
        "update_mask": ["description"]
    }
)

# Entry the outputs of the REST name
status_code = response["RestApiStatusCode"]
end result = response['RestApiResponse']

To make the invoke_rest_api SDK name, the calling shopper ought to have an AWS Id and Entry Administration (IAM) principal of airflow:InvokeRestAPI connected to name the requisite setting. The permission may be scoped to particular Airflow roles (Admin, Op, Person, Viewer, or Public) to manage entry ranges.

This straightforward but highly effective REST API helps numerous use circumstances in your Amazon MWAA environments. Let’s evaluate some vital ones within the subsequent sections.

Automate administration and administration duties

Previous to this launch, to automate configurations and setup of sources corresponding to variables, connections, slot swimming pools, and extra, you needed to develop a prolonged boilerplate code to make API requests to the Amazon MWAA net servers. You needed to deal with the cookie and session administration within the course of. You possibly can simplify this automation with the brand new enhanced REST API help.

For this instance, let’s assume you need to automate sustaining your Amazon MWAA setting variables. You should carry out API operations corresponding to create, learn, replace, and delete on Airflow variables to realize this process. The next is a straightforward Python shopper to take action (mwaa_variables_client.py):

import boto3

# Consumer for managing MWAA setting variables
class MWAAVariablesClient:
    # Initialize the shopper with setting identify and non-obligatory MWAA boto3 shopper
    def __init__(self, env_name, mwaa_client=None):
        self.env_name = env_name
        self.shopper = mwaa_client or boto3.shopper("mwaa")

    # Checklist all variables within the MWAA setting
    def record(self):
        response = self.shopper.invoke_rest_api(
            Identify=self.env_name,
            Methodology="GET",
            Path="/variables"
        )

        output = response['RestApiResponse']['variables']
        return output

    # Get a particular variable by key
    def get(self, key):
        response = self.shopper.invoke_rest_api(
            Identify=self.env_name,
            Methodology="GET",
            Path=f"/variables/{key}"
        )

        return response['RestApiResponse']

    # Create a brand new variable with key, worth, and non-obligatory description
    def create(self, key, worth, description=None):
        response = self.shopper.invoke_rest_api(
            Identify=self.env_name,
            Methodology="POST",
            Path="/variables",
            Physique={
                "key": key,
                "worth": worth,
                "description": description
            }
        )

        return response['RestApiResponse']
    
    # Replace an present variable's worth and outline
    def replace(self, key, worth, description, query_parameters=None):
        response = self.shopper.invoke_rest_api(
            Identify=self.env_name,
            Methodology="PATCH",
            Path=f"/variables/{key}",
            Physique={
                "key": key,
                "worth": worth,
                "description": description
            },
            QueryParameters=query_parameters
        )

        return response['RestApiResponse']

    # Delete a variable by key
    def delete(self, key):
        response = self.shopper.invoke_rest_api(
            Identify=self.env_name,
            Methodology="DELETE",
            Path=f"/variables/{key}"
        )
        return response['RestApiStatusCode']

if __name__ == "__main__":
    shopper = MWAAVariablesClient("<your-mwaa-environment-name>")

    print("nCreating a check variable ...")
    response = shopper.create(
        key="check",
        worth="Take a look at worth",
        description="Take a look at description"
    )
    print(response)

    print("nListing all variables ...")
    variables = shopper.record()
    print(variables)

    print("nGetting the check variable ...")
    response = shopper.get("check")
    print(response)

    print("nUpdating the worth and outline of check variable ...")
    response = shopper.replace(
        key="check",
        worth="Up to date Worth",
        description="Up to date description"
    )
    print(response)

    print("nUpdating solely description of check variable ...")
    response = shopper.replace(
        key="check", 
        worth="Up to date Worth", 
        description="Yet one more up to date description", 
        query_parameters={ "update_mask": ["description"] }
    )
    print(response)

    print("nDeleting the check variable ...")
    response_code = shopper.delete("check")
    print(f"Response code: {response_code}")

    print("nFinally, getting the deleted check variable ...")
    attempt:
        response = shopper.get("check")
        print(response)
    besides Exception as e:
        print(e.response["RestApiResponse"])

Assuming that you’ve got configured your terminal with acceptable AWS credentials, you may run the previous Python script to realize the next outcomes:

$python mwaa_variables_client.py 

Making a check variable ...
{'description': 'Take a look at description', 'key': 'check', 'worth': 'Take a look at worth'}

Itemizing all variables ...
[{'key': 'test', 'value': 'Test value'}]

Getting the check variable ...
{'key': 'check', 'worth': 'Take a look at worth'}

Updating the worth and outline of check variable ...
{'description': 'Up to date description', 'key': 'check', 'worth': 'Up to date Worth'}

Updating solely description of check variable ...
{'description': 'Yet one more up to date description', 'key': 'check', 'worth': 'Up to date Worth'}

Deleting the check variable ...
Response code: 204

Lastly, getting the deleted check variable ...
{'element': 'Variable doesn't exist', 'standing': 404, 'title': 'Variable not discovered', 'sort': 'https://airflow.apache.org/docs/apache-airflow/2.8.1/stable-rest-api-ref.html#part/Errors/NotFound'}

Let’s additional discover different helpful use circumstances.

Construct event-driven knowledge pipelines

The Airflow group has been actively innovating to boost the platform’s knowledge consciousness, enabling you to construct extra dynamic and responsive workflows. After we introduced help for model 2.9.2 in Amazon MWAA, we launched capabilities that permit pipelines to react to modifications in datasets, each inside Airflow environments and in exterior methods. The brand new simplified integration with the Airflow REST API makes the implementation of data-driven pipelines extra simple.

Take into account a use case the place that you must run a pipeline that makes use of enter from an exterior occasion. The next pattern DAG runs a bash command provided as a parameter (any_bash_command.py):

"""
This DAG permits you to execute a bash command provided as a parameter to the DAG.
The command is handed as a parameter referred to as `command` within the DAG configuration.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.fashions.param import Param

from datetime import datetime

with DAG(
    dag_id="any_bash_command", 
    schedule=None, 
    start_date=datetime(2022, 1, 1), 
    catchup=False,
    params={
        "command": Param("env", sort="string")
    },
) as dag:
    cli_command = BashOperator(
        task_id="triggered_bash_command",
        bash_command="{{ dag_run.conf['command'] }}"
    )

With the assistance of the improved REST API, you may create a shopper that may invoke this DAG, supplying the bash command of your selection as follows (mwaa_dag_run_client.py):

import boto3

# Consumer for triggering DAG runs in Amazon MWAA
class MWAADagRunClient:
    # Initialize the shopper with MWAA setting identify and non-obligatory MWAA boto3 shopper
    def __init__(self, env_name, mwaa_client=None):
        self.env_name = env_name
        self.shopper = mwaa_client or boto3.shopper("mwaa")

    # Set off a DAG run with specified parameters
    def trigger_run(self, 
            dag_id, 
            dag_run_id=None,
            logical_date=None,
            data_interval_start=None,
            data_interval_end=None,
            observe=None,
            conf=None,
    ):
        physique = {}
        if dag_run_id:
            physique["dag_run_id"] = dag_run_id
        if logical_date:
            physique["logical_date"] = logical_date
        if data_interval_start:
            physique["data_interval_start"] = data_interval_start
        if data_interval_end:
            physique["data_interval_end"] = data_interval_end
        if observe:
            physique["note"] = observe
        physique["conf"] = conf or {}            

        response = self.shopper.invoke_rest_api(
            Identify=self.env_name,
            Methodology="POST",
            Path=f"/dags/{dag_id}/dagRuns",
            Physique=physique
        )
        return response['RestApiResponse']

if __name__ == "__main__":
    shopper = MWAADagRunClient("<your-mwaa-environment-name>")
	
    print("nTriggering a dag run ...")
    end result = shopper.trigger_run(
        dag_id="any_bash_command", 
        conf={
            "command": "echo 'Howdy from exterior set off!'"
        }
    )
    print(end result)

The next snippet exhibits a pattern run of the script:

$python mwaa_dag_run_client.py
Triggering a dag run ...
{'conf': {'command': "echo 'Howdy from exterior set off!'"}, 'dag_id': 'any_bash_command', 'dag_run_id': 'manual__2024-10-21T16:56:09.852908+00:00', 'data_interval_end': '2024-10-21T16:56:09.852908+00:00', 'data_interval_start': '2024-10-21T16:56:09.852908+00:00', 'execution_date': '2024-10-21T16:56:09.852908+00:00', 'external_trigger': True, 'logical_date': '2024-10-21T16:56:09.852908+00:00', 'run_type': 'handbook', 'state': 'queued'}

On the Airflow UI, the trigger_bash_command process exhibits the next execution log:

[2024-10-21, 16:56:12 UTC] {local_task_job_runner.py:123} â–¶ Pre process execution logs
[2024-10-21, 16:56:12 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-10-21, 16:56:12 UTC] {subprocess.py:75} INFO - Working command: ['/usr/bin/bash', '-c', "echo 'Hello from external trigger!'"]
[2024-10-21, 16:56:12 UTC] {subprocess.py:86} INFO - Output:
[2024-10-21, 16:56:12 UTC] {subprocess.py:93} INFO - Howdy from exterior set off!
[2024-10-21, 16:56:12 UTC] {subprocess.py:97} INFO - Command exited with return code 0
[2024-10-21, 16:56:12 UTC] {taskinstance.py:340} â–¶ Submit process execution logs

You possibly can additional increase this instance to extra helpful event-driven architectures. Let’s increase the use case to run your knowledge pipeline and carry out extract, remodel, and cargo (ETL) jobs when a brand new file lands in an Amazon Easy Storage Service (Amazon S3) bucket in your knowledge lake. The next diagram illustrates one architectural strategy.

Within the context of invoking a DAG by way of an exterior enter, the AWS Lambda operate would haven’t any data of how busy the Amazon MWAA net server is, probably resulting in the operate overwhelming the Amazon MWAA net server by processing a lot of recordsdata in a brief timeframe.

One technique to regulate the file processing throughput can be to introduce an Amazon Easy Queue Service (Amazon SQS) queue between the S3 bucket and the Lambda operate, which might help with price limiting the API requests to the net server. You possibly can obtain this by configuring most concurrency for Lambda for the SQS occasion supply. Nonetheless, the Lambda operate would nonetheless be unaware of the processing capability accessible within the Amazon MWAA setting to run the DAGs.

Along with the SQS queue, to assist afford the Amazon MWAA setting handle the load natively, you should utilize the Airflow’s data-aware scheduling function utilizing datasets. This strategy includes utilizing the improved Amazon MWAA REST API to create dataset occasions, that are then utilized by the Airflow scheduler to schedule the DAG natively. This fashion, the Amazon MWAA setting can successfully batch the dataset occasions and scale sources based mostly on the load. Let’s discover this strategy in additional element.

Configure data-aware scheduling

Take into account the next DAG that showcases a framework for an ETL pipeline (data_aware_pipeline.py). It makes use of a dataset for scheduling.

"""
DAG to run the given ETL pipeline based mostly on the datalake dataset occasion.
"""
from airflow import DAG, Dataset
from airflow.decorators import process

from datetime import datetime

# Create a dataset
datalake = Dataset("datalake")

# Return a listing of S3 file URIs from the provided dataset occasions 
def get_resources(dataset_uri, triggering_dataset_events=None):
    occasions = triggering_dataset_events[dataset_uri] if triggering_dataset_events else []
    s3_uris = record(map(lambda e: e.additional["uri"], occasions))
    return s3_uris

with DAG(
    dag_id="data_aware_pipeline",
    schedule=[datalake],
    start_date=datetime(2022, 1, 1), 
    catchup=False
):
    @process
    def extract(triggering_dataset_events=None):
        sources = get_resources("datalake", triggering_dataset_events)
        for useful resource in sources:
            print(f"Working knowledge extraction for {useful resource} ...")

    @process
    def remodel(triggering_dataset_events=None):
        sources = get_resources("datalake", triggering_dataset_events)
        for useful resource in sources:
            print(f"Working knowledge transformation for {useful resource} ...")

    @process()
    def load(triggering_dataset_events=None):
        sources = get_resources("datalake", triggering_dataset_events)
        for useful resource in sources:
            print(f"Loading finalized knowledge for {useful resource} ...")
    
    extract() >> remodel() >> load()

Within the previous code snippet, a Dataset object referred to as datalake is used to schedule the DAG. The get_resources operate extracts the additional info that incorporates the areas of the newly added recordsdata within the S3 knowledge lake. Upon receiving dataset occasions, the Amazon MWAA setting batches the dataset occasions based mostly on the load and schedules the DAG to deal with them appropriately. The modified structure to help the data-aware scheduling is offered beneath.

The next is a simplified shopper that may create a dataset occasion by way of the improved REST API (mwaa_dataset_client.py):

import boto3

# Consumer for interacting with MWAA datasets
class MWAADatasetClient:
    # Initialize the shopper with setting identify and non-obligatory MWAA boto3 shopper
    def __init__(self, env_name, mwaa_client=None):
        self.env_name = env_name
        self.shopper = mwaa_client or boto3.shopper("mwaa")

    # Create a dataset occasion within the MWAA setting
    def create_event(self, dataset_uri, additional=None):
        physique = {
            "dataset_uri": dataset_uri
        }
        if additional:
            physique["extra"] = additional

        response = self.shopper.invoke_rest_api(
            Identify=self.env_name,
            Methodology="POST",
            Path="/datasets/occasions",
            Physique=physique
        )
        return response['RestApiResponse']

The next is a code snippet for the Lambda operate within the previous structure to generate the dataset occasion, assuming the operate is configured to deal with one S3 PUT occasion at a time (dataset_event_lambda.py):

import os
import json

from mwaa_dataset_client import MWAADatasetClient

setting = os.environ["MWAA_ENV_NAME"]
shopper = MWAADatasetClient(setting)

def handler(occasion, context):
    # Extract S3 file URI from SQS document
    document = occasion["Records"][0]
    bucket = document["s3"]["bucket"]["name"]
    key = document["s3"]["object"]["key"]
    s3_file_uri = f"s3://{bucket}/{key}"

    # Create a dataset occasion
    end result = shopper.create_event(
        dataset_uri="datalake",
        additional={"uri": s3_file_uri}
    )

    return {
        "statusCode": 200,
        "physique": json.dumps(end result)
    }

As new recordsdata get dropped into the S3 bucket, the Lambda operate will generate a dataset occasion per file, passing within the Amazon S3 location of the newly added recordsdata. The Amazon MWAA setting will schedule the ETL pipeline upon receiving the dataset occasions. The next diagram illustrates a pattern run of the ETL pipeline on the Airflow UI.

The next snippet exhibits the execution log of the extract process from the pipeline. The log exhibits how the Airflow scheduler batched three dataset occasions collectively to deal with the load.

[2024-10-21, 16:47:15 UTC] {local_task_job_runner.py:123} â–¶ Pre process execution logs
[2024-10-21, 16:47:15 UTC] {logging_mixin.py:190} INFO - Working knowledge extraction for s3://example-bucket/path/to/file1.csv ...
[2024-10-21, 16:47:15 UTC] {logging_mixin.py:190} INFO - Working knowledge extraction for s3://example-bucket/path/to/file2.csv ...
[2024-10-21, 16:47:15 UTC] {logging_mixin.py:190} INFO - Working knowledge extraction for s3://example-bucket/path/to/file3.csv ...
[2024-10-21, 16:47:15 UTC] {python.py:240} INFO - Executed. Returned worth was: None
[2024-10-21, 16:47:16 UTC] {taskinstance.py:340} â–¶ Submit process execution logs

On this means, you should utilize the improved REST API to create data-aware, event-driven pipelines.

Issues

When implementing options utilizing the improved Amazon MWAA REST API, it’s vital to think about the next:

  • IAM permissions – Make sure that the IAM principal making the invoke_rest_api SDK name has the airflow:InvokeRestAPI permission on the Amazon MWAA useful resource. To manage entry ranges, the permission may be scoped to particular Airflow roles (Admin, Op, Person, Viewer, or Public).
  • Error dealing with – Implement sturdy error dealing with mechanisms to deal with numerous HTTP standing codes and error responses from the Airflow REST API.
  • Monitoring and logging – Arrange acceptable monitoring and logging to trace the efficiency and reliability of your API-based integrations and knowledge pipelines.
  • Versioning and compatibility – Monitor the versioning of the Airflow REST API and the Amazon MWAA service to ensure your integrations stay appropriate with any future modifications.
  • Safety and compliance – Adhere to your group’s safety and compliance necessities when integrating exterior methods with Amazon MWAA and dealing with delicate knowledge.

You can begin utilizing the simplified integration with the Airflow REST API in your Amazon MWAA environments with Airflow model 2.4.3 or higher, in all presently supported Areas.

Conclusion

The improved integration between Amazon MWAA and the Airflow REST API represents a major enchancment within the ease of interacting with Airflow’s core functionalities. This new functionality opens up a variety of use circumstances, from centralizing and automating administrative duties, enhancing general usability, to constructing event-driven, data-aware knowledge pipelines.

As you discover this new function, contemplate the varied use circumstances and greatest practices outlined on this put up. By utilizing the brand new InvokeRestApi, you may streamline your knowledge administration processes, improve operational effectivity, and drive higher worth out of your data-driven methods.


Concerning the Authors

Chandan Rupakheti is a Senior Options Architect at AWS. His major focus at AWS lies within the intersection of analytics, serverless, and AdTech companies. He’s a passionate technical chief, researcher, and mentor with a knack for constructing progressive options within the cloud. Exterior of his skilled life, he loves spending time along with his household and associates, and listening to and enjoying music.

Hernan Garcia is a Senior Options Architect at AWS based mostly out of Amsterdam. He has labored within the monetary companies business since 2018, specializing in utility modernization and supporting prospects of their adoption of the cloud with a deal with serverless applied sciences.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles