Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.6k views
in Technique[技术] by (71.8m points)

databricks - Airflow on_success_callback and on_failure_callback not working with data bricks notebook

I want to customize my DAG to call a datarbicks notebook when it is success or failure. I have created two different functions to call a databricks notebook based on the success/failure cases. success or failure callback function is calling but databricsks notebook is not executing. here is the sample code.

def task_success_callback(context):
    """ task_success callback """
    context['task_instance'].task_id
    print("success case")
    dq_notebook_success_task_params = {
        'existing_cluster_id': Variable.get("DATABRICKS_CLUSTER_ID"),
        'notebook_task': {
            'notebook_path': '/AAA/Airflow/Operators/audit_file_operator',
             'base_parameters': {
                "root": "dbfs:/mnt/aaa",
                "audit_file_path": "/success_file_path/",
                "table_name": "sample_data_table",
                "audit_flag": "success"
            }
        }
    }

    DatabricksSubmitRunOperator(
    task_id="weather_table_task_id",
    databricks_conn_id='databricks_conn',
    json=dq_notebook_success_task_params,
    do_xcom_push=True,
    secrets=[secret.Secret(
    deploy_type='env',
    deploy_target=None,
    secret='adf-service-principal'
    ), secret.Secret(
    deploy_type='env',
    deploy_target=None,
    secret='postgres-credentials',
    )],
    )

def task_failure_callback(context):
    """ task_success callback """
    context['task_instance'].task_id
    print("failure case")
    dq_notebook_failure_task_params = {
        'existing_cluster_id': Variable.get("DATABRICKS_CLUSTER_ID"),
        'notebook_task': {
            'notebook_path': '/AAA/Airflow/Operators/audit_file_operator',
            'base_parameters': {
                "root": "dbfs:/mnt/aaa",
                "audit_file_path": "/failure_file_path/",
                "table_name": "sample_data_table",
                "audit_flag": "failure"
            }
        }
    }

    DatabricksSubmitRunOperator(
    task_id="weather_table_task_id",
    databricks_conn_id='databricks_conn',
    json=dq_notebook_failure_task_params,
    do_xcom_push=True,
    secrets=[secret.Secret(
    deploy_type='env',
    deploy_target=None,
    secret='adf-service-principal'
    ), secret.Secret(
    deploy_type='env',
    deploy_target=None,
    secret='postgres-credentials',
    )],
    )

DEFAULT_ARGS = {
    "owner": "admin",
    "depends_on_past": False,
    "start_date": datetime(2020, 9, 23),
    "on_success_callback": task_success_callback,
    "on_failure_callback": task_failure_callback,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(seconds=10),
}

==================
Remaining DAG code
==================

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

In Airflow every operator has execute() method that define the operator logic. When You create your workflow Airflow initialize the constructor, render the templates & call the execute method for you. However when you define operator inside a python function you need also to handle this on your own.

So when you write:

def task_success_callback(context):
   DatabricksSubmitRunOperator(..)

All you did here is to initialize the DatabricksSubmitRunOperator contactor. You didn't invoke the operator logic.

What you need to do is:

def task_success_callback(context):
   op = DatabricksSubmitRunOperator(..)
   op.execute()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...