Today we will learn on how to capture data lineage using airflow in Google Cloud Platform (GCP)
Create
a Cloud Composer environment in the Google Cloud Platform Console and run a
simple Apache Airflow DAG (also called a workflow).
An Airflow DAG is a collection of
organized tasks that you want to schedule and run. DAGs are defined in standard
Python files.
- Enable Cloud composer API in GCP
- On the settings page to create a cloud composer environment, enter the following:
- Enter a name
- Select a location closest to yours
- Leave all other fields as default
- Change the image version to 10.2 or above (this is important)
- Upload a sample python file (quickstart.py - code given at the end) to cloud composer's cloud storage
- Click Upload files
- After you've uploaded the file, cloud composer adds the DAG to Airflow and schedules the DAG immediately. It might take a few minutes for the DAG to show up in the Airflow web interface.
- Open the Airflow web server to see the logs, view metadata, etc.
- To see lineage, click: Data Profiling >Ad Hoc Query
- Data Lineage is captured in the xcom table
- Execute the following SQL Statement to view Lineage:
- You are Done.
You can also integrate Airflow with Apache Atlas.
Airflow can send its lineage metadata with Apache Atlas. For this, you need to enable the atlas backend and configure it properly, e.g: in you airflow.cfg (configuration):
[lineage]
backend = airflow.lineage.backend.atlas.AtlasBackend
[atlas]
username = <my_username>
password = <my_password>
host = <host>
port = 21000
Note: Please make sure to have the atlas client package
installed.
- Spin up a GCP compute engine instance (Linux OS)
- Open SSH
- Install Java
- sudo apt install openjdk-8-jdk
- Install Maven
- sudo apt-get install maven
Python file code for Lineage Testing (quickstart.py):
import airflow
from
airflow.operators.bash_operator import BashOperator
from
airflow.operators.dummy_operator import DummyOperator
from
airflow.lineage.datasets import File
from airflow.models import
DAG
from datetime import
timedelta
FILE_CATEGORIES =
["CAT1", "CAT2", "CAT3"]
args = {
'owner': 'airflow',
'start_date':
airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_lineage', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
f_final =
File("/tmp/final")
run_this_last =
DummyOperator(task_id='run_this_last', dag=dag,
inlets={"auto": True},
outlets={"datasets": [f_final,]})
f_in =
File("/tmp/whole_directory/")
outlets = []
for file in
FILE_CATEGORIES:
f_out = File("/tmp/{}/{{{{
execution_date }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(
task_id='run_me_first', bash_command='echo
1', dag=dag,
inlets={"datasets": [f_in,]},
outlets={"datasets": outlets}
)
run_this.set_downstream(run_this_last)
Hi Tushar,
ReplyDeleteI have tried this. However, I kept getting error due to lineage.backend doesn't contain atlas class.
Will you be able to help me out with follwoing:
1. there is no version 10.2 or above in composer
2. which atlas client package to be installed