GCP Cloud - Capture Data Lineage with Airflow

Today we will learn on how to capture data lineage using airflow in Google Cloud Platform (GCP)


Create a Cloud Composer environment in the Google Cloud Platform Console and run a simple Apache Airflow DAG (also called a workflow).

An Airflow DAG is a collection of organized tasks that you want to schedule and run. DAGs are defined in standard Python files.
  • Enable Cloud composer API in GCP
  • On the settings page to create a cloud composer environment, enter the following:
    • Enter a name
    • Select a location closest to yours 
    • Leave all other fields as default
    • Change the image version to 10.2 or above (this is important)                     
  • Upload a sample python file (quickstart.py - code given at the end) to cloud composer's cloud storage
  • Click Upload files
  • After you've uploaded the file, cloud composer adds the DAG to Airflow and schedules the DAG immediately. It might take a few minutes for the DAG to show up in the Airflow web interface.
  • Open the Airflow web server to see the logs, view metadata, etc.

  • To see lineage, click: Data Profiling >Ad Hoc Query
    • Data Lineage is captured in the xcom table

  • Execute the following SQL Statement to view Lineage:
  • You are Done.

You can also integrate Airflow with Apache Atlas.

Airflow can send its lineage metadata with Apache Atlas. For this, you need to enable the atlas backend and configure it properly, e.g: in you airflow.cfg  (configuration):


[lineage]
backend = airflow.lineage.backend.atlas.AtlasBackend
 
[atlas]
username = <my_username>
password = <my_password>
host = <host>
port = 21000
Note: Please make sure to have the atlas client package installed. 





To install Apache Atlas on GCP:
  • Spin up a GCP compute engine instance (Linux OS)
  • Open SSH
  • Install Java
    • sudo apt install openjdk-8-jdk
  • Install Maven
    • sudo apt-get install maven 


Python file code for Lineage Testing (quickstart.py):

import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.lineage.datasets import File
from airflow.models import DAG
from datetime import timedelta

FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_lineage', default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60))

f_final = File("/tmp/final")
run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
    inlets={"auto": True},
    outlets={"datasets": [f_final,]})

f_in = File("/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
    f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
    outlets.append(f_out)
run_this = BashOperator(
    task_id='run_me_first', bash_command='echo 1', dag=dag,
    inlets={"datasets": [f_in,]},
    outlets={"datasets": outlets}
    )
run_this.set_downstream(run_this_last)


1 comment:

  1. Hi Tushar,
    I have tried this. However, I kept getting error due to lineage.backend doesn't contain atlas class.
    Will you be able to help me out with follwoing:
    1. there is no version 10.2 or above in composer
    2. which atlas client package to be installed

    ReplyDelete