A Data Lineage OSS Reference Solution

Do I have to create my own graph model and everything to set up a Data Lineage system? Thanks to many great open-source projects, the answer is: No!

Today, I would like to share my opinionated reference data infra stack with some of those best open-source projects with modern ETL, Dashboard, Metadata Governance, and Data Lineage Management.

A Metadata Governance system is a system providing a single view of where and how all the data are formatted, generated, transformed, consumed, presented, and owned.

Metadata Governance is like a catalog of all of the data warehouses, databases, tables, dashboards, ETL jobs, etc so that people don’t have to broadcast their queries on “Hi everyone, could I change the schema of this table?”, “Hey, anyone who knows how I could find the raw data of table-view-foo-bar?”, which, explains why we need a Metadata Governance system in a mature data stack with a relatively large scale of data and team(or one to be grown to).

For the other term, Data Lineage, is one of the Metadata that needs to be managed, for example, some dashboard is the downstream of a table view, which has an upstream as two other tables from different databases. That information should be managed at best when possible, too, to enable a trust chain on a data-driven team.

The metadata and data lineage are by nature fitting to the graph model/graph database well, and the relationship-oriented queries, for instance, “finding all n-depth data lineage per given component(i.e. a table)” is a FIND ALL PATH query in a graph database.

This also explains one observation of mine as an OSS contributor of Nebula Graph, a distributed graph database: (from their queries/graph modeling in discussions I could tell) a bunch of teams who are already levering Nebula Graph on their tech stack, are setting up a data lineage system on their own, from scratch.

A Metadata Governance system needs some of the following components:

  • Metadata Extractor
    • This part is needed to either pull or be pushed from the different parties of the data stack like databases, data warehouses, dashboards, or even from ETL pipeline and applications, etc.
  • Metadata Storage
    • This could be either a database or even large JSON manifest files
  • Metadata Catalog
    • This could be a system providing API and/or a GUI interface to read/write the metadata and data lineage

In Nebula Graph community, I had been seeing many graph database users were building their in-house data lineage system. It’s itching witnessing this entropy increase situation not be standarized or jointly contributed instead, as most of their work are parsing metadata from well-known big-data projects, and persistent into a graph database, which, I consider high probability that the work is common.

Then I came to create an opinionated reference data infra stack with some of those best open-source projects put together. Hopefully, those who were gonna define and iterate their own fashion of Graph Model on Nebula Graph and create in-house Metadata and data linage extracting pipelines can benefit from this project to have a relatively polished, beautifully designed, Metadata Governance system out of the box with a fully evolved graph model.

To make the reference project self-contained and runnable, I tried to put layers of data infra stack more than just pure metadata related ones, thus, maybe it will help new data engineers who would like to try and see how far had open-source pushed a modern data lab to.

This is a diagram of all the components in this reference data stack, where I see most of them as Metadata Sources:

diagram-of-ref-project

Then, let’s introduce the components.

For processing and consuming raw and intermediate data, one or more databases and/or warehouses should be used.

It could be any DB/DW like Hive, Apache Delta, TiDB, Cassandra, MySQL, or Postgres, in this reference project, we simply choose one of the most popular ones: Postgres. And our reference lab comes with the first service:

✅ - Data warehouse: Postgres

We should have some sort of DataOps setup to enable pipelines and environments to be repeatable, testable, and version-controlled.

Here, we used Meltano created by GitLab.

Meltano is a just-work DataOps platform that connected Singer as the EL and dbt as the T in a magically elegant way, it is also connected to some other dataInfra utilities such as Apache Superset and Apache Airflow, etc.

Thus, we have one more thing to be included:

✅ - GitOps: Meltano

And under the hood, we will E(extract) and L(load) data from many different data sources to data targets leveraging Singer together with Meltano, and do T(transformation) with dbt.

✅ - EL: Singer

✅ - T: dbt

How about creating dashboards, charts, and tables for getting the insights into all the data?

https://user-images.githubusercontent.com/1651790/172800854-8e01acae-696d-4e07-8e3e-a7d34dec8278.png

Apache Superset is one of the greatest visualization platforms we could choose from, and we just add it to our packet!

✅ - Dashboard: Apache Superset

In most cases, our DataOps jobs grow to the scale to be executed in a long time that needs to be orchestrated, and here comes the Apache Airflow.

✅ - DAG: Apache Airflow

With more components and data being introduced to a data infra, there will be massive metadata in all lifecycle of databases, tables, schemas, dashboards, DAGs, applications, and their administrators and teams could be collectively managed, connected, and discovered.

Linux Foundation Amundsen is one of the best projects solving this problem.

https://user-images.githubusercontent.com/1651790/172801018-ecd67fa9-2743-451f-8734-b14f2a814199.png

✅ - Data Discovery: Linux Foundation Amundsen

With a graph database as the source of truth to accelerate the multi-hop queries together with elasticsearch as the full-text search engine, Amundsen indexes all the metadata and their lineage smoothly, and beautifully in the next level.

By default, neo4j was used as the graph database, while I will be using Nebula Graph instead in this project due to I am more familiar with the latter.

✅ - Full-text Search: elasticsearch

✅ - Graph Database: Nebula Graph

Now, with the components in our stack being revealed, let’s have them assembled.

The reference runnable project is open-source and you could find it here:

I will try my best to make things clean and isolated. It’s assumed you are running on a UNIX-like system with internet and Docker Compose being installed.

Please refer here to install Docker and Docker Compose before moving forward.

I am running it on Ubuntu 20.04 LTS X86_64, but there shouldn’t be issues on other distros or versions of Linux.

First, let’s install Postgres as our data warehouse.

This oneliner will help create a Postgres running in the background with docker, and when being stopped it will be cleaned up(--rm).

1
2
3
4
5
docker run --rm --name postgres \
    -e POSTGRES_PASSWORD=lineage_ref \
    -e POSTGRES_USER=lineage_ref \
    -e POSTGRES_DB=warehouse -d \
    -p 5432:5432 postgres

Then we could verify it with Postgres CLI or GUI clients.

Hint: You could use VS Code extension: SQL tools to quickly connect to multiple RDBMS(MariaDB, Postgres, etc.) or even Non-SQL DBMS like Cassandra in a GUI fashion.

Then, let’s get Meltano with Singler and dbt installed.

Meltano helps us manage ETL utilities(as plugins) and all of their configurations(the pipelines). Those meta-information sits in meltano configurations and its system database, where the configurations are file-based(could be managed with git) and by default the system database is SQLite.

The workflow using Meltano is to initiate a meltano project and start to add E, L, and T into the configuration files. The initiation of a project just requires a CLI command call: meltano init yourprojectname and to do that, we could install Meltano either with Python’s package manager: pip or via a Docker image:

  • Install Meltano with pip in a python virtual env:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
mkdir .venv
# example in a debian flavor Linux distro
sudo apt-get install python3-dev python3-pip python3-venv python3-wheel -y
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano

# init a project
mkdir meltano_projects && cd meltano_projects
# replace <yourprojectname> with your own one
touch .env
meltano init <yourprojectname>
  • “Install” Meltano via Docker
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
docker pull meltano/meltano:latest
docker run --rm meltano/meltano --version

# init a project
mkdir meltano_projects && cd meltano_projects

# replace <yourprojectname> with your own one
touch .env
docker run --rm -v "$(pwd)":/projects \
             -w /projects --env-file .env \
             meltano/meltano init <yourprojectname>

Apart from meltano init, there are a couple of other commands like meltano etl to perform ETL executions, and meltano invoke <plugin> to call plugins' command, always check the cheatsheet for quick referencing.

Meltano also comes with a web-based UI, to start it, just run:

1
meltano ui

Then it’s listening to http://localhost:5000.

For Docker, just run the container with the 5000 port exposed, here we didn’t provide ui in the end due to the container’s default command being meltano ui already.

1
2
3
4
docker run -v "$(pwd)":/project \
             -w /project \
             -p 5000:5000 \
             meltano/meltano

When writing this article, I noticed that Pat Nadolny had created great examples on an example dataset for Meltano with dbt(And with Airflow and Superset, too!). We will not recreate the examples and use Pat’s great ones.

Note that Andrew Stewart had created another one with a slightly older version of configuration files.

You could follow here to run a pipeline of:

  • tap-CSV(Singer), extracting data from CSV files
  • target-postgres(Singer), loading data to Postgres
  • dbt, transform the data into aggregated tables or views

You should omit the step of running the local Postgres with docker as we had already created one, be sure to change the Postgres user and password in .env.

And it’s basically as this(with meltano being installed as above):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/singer_dbt_jaffle/

meltano install
touch .env
echo PG_PASSWORD="lineage_ref" >> .env
echo PG_USERNAME="lineage_ref" >> .env

# Extract and Load(with Singer)
meltano run tap-csv target-postgres

# Trasnform(with dbt)
meltano run dbt:run

# Generate dbt docs
meltano invoke dbt docs generate

# Serve generated dbt docs
meltano invoke dbt docs to serve

# Then visit http://localhost:8080

Now, I assumed you had finished trying out singer_dbt_jaffle following its README.md, and we could connect to the Postgres to see the loaded and transformed data being reflected as follow, the screenshot is from the SQLTool of VS Code:

https://user-images.githubusercontent.com/1651790/167540494-01e3dbd2-6ab1-41d2-998e-3b79f755bdc7.png

Now, we have the data in data warehouses, with ETL toolchains to pipe different data sources into it. How could those data be consumed?

BI tools like the dashboard could be one way to help us get insights from the data.

With Apache Superset, dashboards, and charts based on those data sources could be created and managed smoothly and beautifully.

The focus of this project was not on Apache Superset itself, thus, we simply reuse examples that Pat Nadolny had created in Superset as a utility if meltano Example.

Create a python venv with Meltano installed:

1
2
3
4
5
mkdir .venv
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano

Following Pat’s guide, with tiny modifications:

  • Clone the repo, enter the jaffle_superset project
1
2
git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/jaffle_superset/
  • Modify the meltano configuration files to let Superset connect to the Postgres we created:
1
vim meltano_projects/jaffle_superset/meltano.yml

In my example, I changed the hostname to 10.1.1.111, which is the IP of my current host, while if you are running it on your macOS machine, this should be fine to leave with it, the diff before and after the change would be:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
--- a/meltano_projects/jaffle_superset/meltano.yml
+++ b/meltano_projects/jaffle_superset/meltano.yml
@@ -71,7 +71,7 @@ plugins:
               A list of database driver dependencies can be found here https://superset.apache.org/docs/databases/installing-database-drivers
     config:
       database_name: my_postgres
-      sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@host.docker.internal:${PG_PORT}/${PG_DATABASE}
+      sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@10.1.1.168:${PG_PORT}/${PG_DATABASE}
       tables:
       - model.my_meltano_project.customers
       - model.my_meltano_project.orders
  • Add Postgres credential to .env file:
1
2
echo PG_USERNAME=lineage_ref >> .env
echo PG_PASSWORD=lineage_ref >> .env
  • Install the Meltano project, run ETL pipeline
1
2
meltano install
meltano run tap-csv target-postgres dbt:run
  • Start Superset, please note that the ui is not a meltano command but a user-defined action in the configuration file.
1
meltano invoke superset:ui
  • In another terminal, run the defined command load_datasources
1
meltano invoke superset:load_datasources
  • Access Superset in a web browser via http://localhost:8088/

We should now see Superset Web Interface:

https://user-images.githubusercontent.com/1651790/168570300-186b56a5-58e8-4ff1-bc06-89fd77d74166.png

Let’s try to create a Dashboard on the ETL data in Postgres defined in this Meltano project:

  • Click + DASHBOARD, fill a dashboard name, then click SAVE, then clieck + CREATE A NEW CHART

https://user-images.githubusercontent.com/1651790/168570363-c6b4f929-2aad-4f03-8e3e-b1b61f560ce5.png

  • In new chart view, we should select a chart type and DATASET. Here, I selected orders table as the data source and Pie Chart chart type:

https://user-images.githubusercontent.com/1651790/168570927-9559a2a1-fed7-43be-9f6a-f6fb3c263830.png

  • After clicking CREATE NEW CHART, we are in the chart defination view, where, I selected Query of status as DIMENSIONS, and COUNT(amount) as METRIC. Thus, we could see a Pie Chart per order status’s distribution.

https://user-images.githubusercontent.com/1651790/168571130-a65ba88e-1ebe-4699-8783-08e5ecf54a0c.png

  • Click SAVE , it will ask which dashboard this chart should be added to, after it’s selected, click SAVE & GO TO DASHBOARD.

https://user-images.githubusercontent.com/1651790/168571301-8ae69983-eda8-4e75-99cf-6904f583fc7c.png

  • Then, in the dashboard, we coulds see all charts there. You could see that I added another chart showing customer order count distribution, too:

https://user-images.githubusercontent.com/1651790/168571878-30a77057-1f66-448a-9bbd-0dedcee24cc9.png

  • We could set the refresh inteval, or download the dashboard as you wish by clicking the ··· button.

https://user-images.githubusercontent.com/1651790/168573874-b5d57919-2866-4b3c-a4e5-55b6e6ef342e.png

It’s quite cool, ah? For now, we have a simple but typical data stack like any hobby data lab with everything open-source!

Imagine we have 100 datasets in CSV, 200 tables in Data warehouse and a couple of data engineers running different projects that consume, generate different application, dashboard, and databases. When someone would like to discovery some of those table, dataset, dashboard and pipelines running across them, and then even modify some of them, it’s proven to be quite costly in both communicationand engineering.

Here comes the main part of our reference project: Metadata Discovery.

Then, we are stepping to deploy the Amundsen with Nebula Graph and Elasticsearch.

Note: For the time being, the PR Nebula Graph as the Amundsen backend is not yet merged, I am working with the Amundsen team to make it happen.

With Amundsen, we could have all metadata of the whole data stack being discovered and managed in one place. And there are mainly two parts of Amundsen:

We will be leveraging Data builder to pull metadata from different sources, and persist metadata into the backend storage of the Meta service and the backend storage of the Search service, then we could search, discover and manage them from the Frontend service or through the API of the Metadata service.

We are going to deploy a cluster of Amundsen with its docker-compose file. As the Nebula Graph backend support is not yet merged, we are referring to my fork.

First, let’s clone the repo with all submodules:

1
2
git clone -b amundsen_nebula_graph --recursive [email protected]:wey-gu/amundsen.git
cd amundsen

Then, start all catalog services and their backend storage:

1
docker-compose -f docker-amundsen-nebula.yml up

You could add -d to put the containers running in the background:

1
docker-compose -f docker-amundsen-nebula.yml up -d

And this will stop the cluster:

1
docker-compose -f docker-amundsen-nebula.yml stop

This will remove the cluster:

1
docker-compose -f docker-amundsen-nebula.yml down

Due to this docker-compose file is for developers to play and hack Amundsen easily rather than for production deployment, it’s building images from the codebase, which, will take some time for the very first time.

After it’s being deployed, please hold on a second before we load some dummy data into its storage with Data builder.

Amundsen Data builder is just like a Meltano but for ETL of Metadata to Metadata service and Search service‘s backend storage: Nebula Graph and Elasticsearch. The Data builder here is only a python module and the ETL job could be either run as a script or orchestrated with a DAG platform like Apache Airflow.

With Amundsen Data builder being installed:

1
2
3
4
5
6
cd databuilder
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install wheel
python3 -m pip install -r requirements.txt
python3 setup.py install

Let’s call this sample Data builder ETL script to have some dummy data filled in.

1
python3 example/scripts/sample_data_loader_nebula.py

Before accessing Amundsen, we need to create a test user:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# run a container with curl attached to amundsenfrontend
docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot

# Create a user with id test_user_id
curl -X PUT -v http://amundsenmetadata:5002/user \
    -H "Content-Type: application/json" \
    --data \
    '{"user_id":"test_user_id","first_name":"test","last_name":"user", "email":"[email protected]"}'

exit

Then we could view UI at http://localhost:5000 and try to search test, it should return some results.

https://github.com/amundsen-io/amundsen/raw/master/docs/img/search-page.png

Then you could click and explore those dummy metadata loaded to Amundsen during the sample_data_loader_nebula.py on your own.

Additionally, you could access the Graph Database with Nebula Studio(http://localhost:7001).

Note in Nebula Studio, the default fields to log in will be:

  • Hosts: graphd:9669
  • User: root
  • Password: nebula

This diagram shows some more details on the components of Amundsen:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
       ┌────────────────────────┐ ┌────────────────────────────────────────┐
       │ Frontend:5000          │ │ Metadata Sources                       │
       ├────────────────────────┤ │ ┌────────┐ ┌─────────┐ ┌─────────────┐ │
       │ Metaservice:5001       │ │ │        │ │         │ │             │ │
       │ ┌──────────────┐       │ │ │ Foo DB │ │ Bar App │ │ X Dashboard │ │
  ┌────┼─┤ Nebula Proxy │       │ │ │        │ │         │ │             │ │
  │    │ └──────────────┘       │ │ │        │ │         │ │             │ │
  │    ├────────────────────────┤ │ └────────┘ └─────┬───┘ └─────────────┘ │
┌─┼────┤ Search searvice:5002   │ │                  │                     │
│ │    └────────────────────────┘ └──────────────────┼─────────────────────┘
│ │    ┌─────────────────────────────────────────────┼───────────────────────┐
│ │    │                                             │                       │
│ │    │ Databuilder     ┌───────────────────────────┘                       │
│ │    │                 │                                                   │
│ │    │ ┌───────────────▼────────────────┐ ┌──────────────────────────────┐ │
│ │ ┌──┼─► Extractor of Sources           ├─► nebula_search_data_extractor │ │
│ │ │  │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │  │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ │
│ │ │  │ │ Loader filesystem_csv_nebula   │ │ Loader Elastic FS loader     │ │
│ │ │  │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │  │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ │
│ │ │  │ │ Publisher nebula_csv_publisher │ │ Publisher Elasticsearch      │ │
│ │ │  │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │  └─────────────────┼─────────────────────────────────┼─────────────────┘
│ │ └────────────────┐   │                                 │
│ │    ┌─────────────┼───►─────────────────────────┐ ┌─────▼─────┐
│ │    │ Nebula Graph│   │                         │ │           │
│ └────┼─────┬───────┴───┼───────────┐     ┌─────┐ │ │           │
│      │     │           │           │     │MetaD│ │ │           │
│      │ ┌───▼──┐    ┌───▼──┐    ┌───▼──┐  └─────┘ │ │           │
│ ┌────┼─►GraphD│    │GraphD│    │GraphD│          │ │           │
│ │    │ └──────┘    └──────┘    └──────┘  ┌─────┐ │ │           │
│ │    │ :9669                             │MetaD│ │ │  Elastic  │
│ │    │ ┌────────┐ ┌────────┐ ┌────────┐  └─────┘ │ │  Search   │
│ │    │ │        │ │        │ │        │          │ │  Cluster  │
│ │    │ │StorageD│ │StorageD│ │StorageD│  ┌─────┐ │ │  :9200    │
│ │    │ │        │ │        │ │        │  │MetaD│ │ │           │
│ │    │ └────────┘ └────────┘ └────────┘  └─────┘ │ │           │
│ │    ├───────────────────────────────────────────┤ │           │
│ └────┤ Nebula Studio:7001                        │ │           │
│      └───────────────────────────────────────────┘ └─────▲─────┘
└──────────────────────────────────────────────────────────┘

With the basic environment being set up, let’s put everything together.

Remember we had ELT some data to PostgreSQL as this?

https://user-images.githubusercontent.com/1651790/167540494-01e3dbd2-6ab1-41d2-998e-3b79f755bdc7.png

How could we let Amundsen discover metadata regarding those data and ETL?

We started on the data source: Postgres, first.

We install the Postgres Client for python3:

1
2
sudo apt-get install libpq-dev
pip3 install Psycopg2

Run a script to parse Postgres Metadata:

1
2
3
4
5
export CREDENTIALS_POSTGRES_USER=lineage_ref
export CREDENTIALS_POSTGRES_PASSWORD=lineage_ref
export CREDENTIALS_POSTGRES_DATABASE=warehouse

python3 example/scripts/sample_postgres_loader_nebula.py

If you look into the code of the sample script for loading Postgres metadata to Nebula, the main lines are quite straightforward:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# part 1: PostgressMetadata --> CSV --> Nebula Graph
job = DefaultJob(
      conf=job_config,
      task=DefaultTask(
          extractor=PostgresMetadataExtractor(),
          loader=FsNebulaCSVLoader()),
      publisher=NebulaCsvPublisher())

...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)

job = DefaultJob(conf=job_config, task=task)

The first job was to load data in path:PostgressMetadata --> CSV --> Nebula Graph

  • PostgresMetadataExtractor was used to extract/pull metadata from Postgres, refer here for its documentation.
  • FsNebulaCSVLoader was used to put extracted data intermediately as CSV files
  • NebulaCsvPublisher was used to publish metadata in form of CSV to Nebula Graph

The second job was to load in the path: Metadata stored in NebulaGraph --> Elasticsearch

  • NebulaSearchDataExtractor was used to fetch metadata stored in Nebula Graph
  • SearchMetadatatoElasticasearchTask was used to make metadata indexed with Elasticsearch.

Note, in production, we could trigger those jobs either in scripts or with an orchestration platform like Apache Airflow.

Search payments or directly visit http://localhost:5000/table_detail/warehouse/postgres/public/payments, you could see the metadata from our Postgres like:

https://user-images.githubusercontent.com/1651790/168475180-ebfaa188-268c-4fbe-a614-135d56d07e5d.png

Then, metadata management actions like adding tags, owners, and descriptions could be done easily as it was in the above screen capture, too.

Actually, we could also pull metadata from dbt itself.

The Amundsen DbtExtractor, will parse the catalog.json or manifest.json file to load metadata to Amundsen storage(Nebula Graph and Elasticsearch).

In above meltano chapter, we had already generated that file with meltano invoke dbt docs generate, and the output like the following is telling us the catalog.json file:

1
2
3
14:23:15  Done.
14:23:15  Building catalog
14:23:15  Catalog written to /home/ubuntu/ref-data-lineage/meltano_example_implementations/meltano_projects/singer_dbt_jaffle/.meltano/transformers/dbt/target/catalog.json

There is an example script with a sample dbt output files:

The sample dbt files:

1
2
3
4
$ ls -l example/sample_data/dbt/
total 184
-rw-rw-r-- 1 w w   5320 May 15 07:17 catalog.json
-rw-rw-r-- 1 w w 177163 May 15 07:17 manifest.json

We could load this sample dbt manifest with:

1
python3 example/scripts/sample_dbt_loader_nebula.py

From this lines of python code, we could tell those process as:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# part 1: Dbt manifest --> CSV --> Nebula Graph
job = DefaultJob(
      conf=job_config,
      task=DefaultTask(
          extractor=DbtExtractor(),
          loader=FsNebulaCSVLoader()),
      publisher=NebulaCsvPublisher())

...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)

job = DefaultJob(conf=job_config, task=task)

And the only differences from the Postgres meta ETL is the extractor=DbtExtractor(), where it comes with following confiugrations to get below information regarding dbt projects:

  • databases_name
  • catalog_json
  • manifest_json
1
2
3
4
5
job_config = ConfigFactory.from_dict({
  'extractor.dbt.database_name': database_name,
  'extractor.dbt.catalog_json': catalog_file_loc,  # File
  'extractor.dbt.manifest_json': json.dumps(manifest_data),  # JSON Dumped objecy
  'extractor.dbt.source_url': source_url})

Search dbt_demo or visit http://localhost:5000/table_detail/dbt_demo/snowflake/public/raw_inventory_value to see:

https://user-images.githubusercontent.com/1651790/168479864-2f73ea73-265f-4cd2-999f-e7effbaf3ec1.png

Tips: we could optionally enable debug logging to see what had been sent to Elasticsearch and Nebula Graph!

1
2
- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.DEBUG)

Or, alternatively, explore the imported data in Nebula Studio:

First, click “Start with Vertices”, fill in the vertex id: snowflake://dbt_demo.public/fact_warehouse_inventory

https://user-images.githubusercontent.com/1651790/168480047-26c28cde-5df8-40af-8da4-6ab0203094e2.png

Then, we could see the vertex being shown as the pink dot. Let’s modify the Expand options with:

  • Direction: Bidirect
  • Steps: Single with 3

https://user-images.githubusercontent.com/1651790/168480101-7b7b5824-06d9-4155-87c9-798db0dc7612.png

And double click the vertex(dot), it will expand 3 steps in bidirection:

https://user-images.githubusercontent.com/1651790/168480280-1dc88d1b-1f1e-48fd-9997-972965522ef5.png

From this graph view, the insight of the metadata is extremely easy to be explored, right?

Tips, you may like to click the 👁 icon to select some properties to be shown, which was done by me before capturing the screen as above.

And, what we had seen in the Nebula Studio echoes the data model of Amundsen metadata service, too:

https://www.amundsen.io/amundsen/img/graph_model.png

Finally, remember we had leveraged dbt to transform some data in meltano, and the menifest file path is .meltano/transformers/dbt/target/catalog.json, you can try create a databuilder job to import it.

Dashboards, Charts and the relationships with Tables can be extracted by Amundsen data builder, as we already setup a Superset Dashboard, let’s try ingesting its metadata.

The sample superset script will fetch data from Superset and load metadata into Nebula Graph and Elasticsearch.

1
python3 sample_superset_data_loader_nebula.py

If we set the logging level to DEBUG, we could actually see lines like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# fetching metadata from superset
DEBUG:urllib3.connectionpool:http://localhost:8088 "POST /api/v1/security/login HTTP/1.1" 200 280
INFO:databuilder.task.task:Running a task
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 308 374
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard/?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 200 1058
...

# insert Dashboard

DEBUG:databuilder.publisher.nebula_csv_publisher:Query: INSERT VERTEX `Dashboard` (`dashboard_url`, `name`, published_tag, publisher_last_updated_epoch_ms) VALUES  "superset_dashboard://my_cluster.1/3":("http://localhost:8088/superset/dashboard/3/","my_dashboard","unique_tag",timestamp());
...

# insert a DASHBOARD_WITH_TABLE relationship/edge

INFO:databuilder.publisher.nebula_csv_publisher:Importing data in edge files: ['/tmp/amundsen/dashboard/relationships/Dashboard_Table_DASHBOARD_WITH_TABLE.csv']
DEBUG:databuilder.publisher.nebula_csv_publisher:Query:
INSERT edge `DASHBOARD_WITH_TABLE` (`END_LABEL`, `START_LABEL`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/orders":("Table","Dashboard","unique_tag", timestamp()), "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/customers":("Table","Dashboard","unique_tag", timestamp());

By searching it in Amundsen, we could the Dashboard info now. And we could verify it from Nebula Studio, too.

https://user-images.githubusercontent.com/1651790/168719624-738323dd-4c6e-475f-a370-f149181c6184.png

Note, see also the Dashboard’s model in Amundsen from the dashboard ingestion guide:

dashboard_graph_modeling

Superset could be used to preview Table Data like this. Corresponding documentation could be referred here, where the API of /superset/sql_json/ will be called by Amundsen Frontend.

https://github.com/amundsen-io/amundsenfrontendlibrary/blob/master/docs/img/data_preview.png?raw=true

By default, data lineage was not enabled, we could enable it by:

  1. Go to the Amundsen repo, that’s also where we run the docker-compose -f docker-amundsen-nebula.yml up command
1
cd amundsen
  1. Modify frontend JS configuration:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
--- a/frontend/amundsen_application/static/js/config/config-default.ts
+++ b/frontend/amundsen_application/static/js/config/config-default.ts
   tableLineage: {
-    inAppListEnabled: false,
-    inAppPageEnabled: false,
+    inAppListEnabled: true,
+    inAppPageEnabled: true,
     externalEnabled: false,
     iconPath: 'PATH_TO_ICON',
     isBeta: false,
  1. Now let’s run again build for docker image, where the frontend image will be rebuilt.
1
docker-compose -f docker-amundsen-nebula.yml build

Then, rerun the up -d to ensure frontend container to be recreated with new configuration:

1
docker-compose -f docker-amundsen-nebula.yml up -d

We could see something like this:

1
2
3
$ docker-compose -f docker-amundsen-nebula.yml up -d
...
Recreating amundsenfrontend           ... done

After that, we could visit http://localhost:5000/lineage/table/gold/hive/test_schema/test_table1 to see the Lineage is shown as:

https://user-images.githubusercontent.com/1651790/168838731-79d0e3bc-439e-4f6b-8ef7-83b37e9bcb12.png

We could click Downstream(if there is) to see downstream resources of this table:

https://user-images.githubusercontent.com/1651790/168839251-efd523af-d729-44cf-a40b-fa83a0852654.png

Or click Lineage to see the graph:

https://user-images.githubusercontent.com/1651790/168838814-e6ff5152-c24b-470e-a46a-48f183ba7201.png

There are API for lineage query, too. Here is an example to query that with cURL, where we leverage the netshoot container as we did before for user creation.

1
2
3
docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot

curl "http://amundsenmetadata:5002/table/snowflake://dbt_demo.public/raw_inventory_value/lineage?depth=3&direction=both"

The above API call was to query linage on both upstream and downstream direction, with depth 3 for table snowflake://dbt_demo.public/raw_inventory_value.

And the result should be like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
    "depth": 3,
    "downstream_entities": [
        {
            "level": 2,
            "usage": 0,
            "key": "snowflake://dbt_demo.public/fact_daily_expenses",
            "parent": "snowflake://dbt_demo.public/fact_warehouse_inventory",
            "badges": [],
            "source": "snowflake"
        },
        {
            "level": 1,
            "usage": 0,
            "key": "snowflake://dbt_demo.public/fact_warehouse_inventory",
            "parent": "snowflake://dbt_demo.public/raw_inventory_value",
            "badges": [],
            "source": "snowflake"
        }
    ],
    "key": "snowflake://dbt_demo.public/raw_inventory_value",
    "direction": "both",
    "upstream_entities": []
}

In fact, this lineage data was just extracted and loaded during our DbtExtractor execution, where extractor.dbt.{DbtExtractor.EXTRACT_LINEAGE} by default was True, thus lineage metadata were created and loaded to Amundsen.

Two of the advantages to use a Graph Database as Metadata Storage are:

  • The graph query itself is a flexible DSL for lineage API, for example, this query helps us do the equivalent query of the Amundsen metadata API for fetching lineage:
1
2
MATCH p=(t:`Table`) -[:`HAS_UPSTREAM`|:`HAS_DOWNSTREAM` *1..3]->(x)
WHERE id(t) == "snowflake://dbt_demo.public/raw_inventory_value" RETURN p
  • We could now even query it in Nebula Graph Studio’s console, and click View Subgraphs to make it rendered in a graph view then.

https://user-images.githubusercontent.com/1651790/168844882-ca3d0587-7946-4e17-8264-9dc973a44673.png

https://user-images.githubusercontent.com/1651790/168845155-b0e7a5ce-3ddf-4cc9-89a3-aaf1bbb0f5ec.png

As mentioned above, DbtExtractor will extract table level lineage, together with other information defined in the dbt ETL pipeline.

The other linage extractor out-of-the-box in Amundsen is OpenLineageTableLineageExtractor.

Open Lineage is an open framework to collect lineage data from different sources in one place, which can output linage information as JSON files to be extracted by OpenLineageTableLineageExtractor:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
dict_config = {
    # ...
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.CLUSTER_NAME}': 'datalab',
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.OL_DATASET_NAMESPACE_OVERRIDE}': 'hive_table',
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.TABLE_LINEAGE_FILE_LOCATION}': 'input_dir/openlineage_nd.json',
}
...

task = DefaultTask(
    extractor=OpenLineageTableLineageExtractor(),
    loader=FsNebulaCSVLoader())

The whole idea of Metadata Governance/Discovery is to:

  • Put all components in the stack as Metadata Sources(from any DB or DW to dbt, Airflow, Openlineage, Superset, etc.)
  • Run metadata ETL with Databuilder(as a script, or DAG) to store and index with Nebula Graph(or other Graph Database) and Elasticsearch
  • Consume, manage, and discover metadata from Frontend UI(with Superset for preview) or API
  • Have more possibilities, flexibility, and insights on Nebula Graph from queries and UI

https://user-images.githubusercontent.com/1651790/168849779-4826f50e-ff87-4e78-b17f-076f91182c43.svg

All projects used in this reference project are listed below in lexicographic order.

  • Amundsen
  • Apache Airflow
  • Apache Superset
  • dbt
  • Elasticsearch
  • meltano
  • Nebula Graph
  • Open Lineage
  • singer

Feature Image credit to Phil Hearing

Related Content