Create ETL tasks with Python and Luigi Blueprint

What is blueprint?

What is blueprint?

Blueprint is an open source tool which allows you to create ETL jobs using ini configuration style files.

https://github.com/kpatronas/luigi-blueprint

Example: How to connect to SERVER1, do a query, export data and run grep against exported data

Create the following configuration file[BUILD]
TASKS: [GREP_DATA()]
WORKERS:8
LOCAL_SCHEDULER: True[GW_PASSWORDS]
TYPE: CREDS
USER: <username>
PASS: <password>[GW1()]
TYPE: SSH_PROXY
HOST: GW1
PORT: 22
CREDS: GW_PASSWORDS[SERVER1]
TYPE:   DB_CONF
ENGINE: ibm_db_sa
CREDS:  DB_CREDS_PROD
DBHOST: SERVER1
DBNAME: VLS_DB
DBPORT: 50000[DB_CREDS_PROD]
TYPE: CREDS
USER: <username>
PASS: <password>[SERVER1_QUERY()]
RESULTS:      ./results.csv
CLEANUP:      True
TYPE:         DB_TASK
RESULTS_TYPE: csv
QUERY:        ./query.sql
DB:           SERVER1
USE_PROXY:    True
PROXY:        GW1()[GREP_DATA()]
RESULTS:      grep_results.csv
CLEANUP:      True
REQUIRES:     [SERVER1_QUERY()]
TYPE:         LOCAL_TASK
COMMAND:      grep 'to_grep_something' ./results.csv

How to execute a blueprintblue -b ./config.cfg

How it works

The first task to be executed is [BUILD]

BUILD is an initiator which defines some run time options like how many parallel tasks allowed (WORKERS)

And which flow want to follow back (GREP_DATA()).

GREP_DATA() will try to execute “grep ‘to_grep_something’ ./results.csv” but since does not yet exists it will try to run any task is on the REQUIRES parameter.

all tasks can have a REQUIRES parameter, REQUIRES parameter can accept multiple tasks in this form [TASK1(),TASK2()], those tasks will run in parallel if BUILD section has a WORKERS parameter has a value greater than 1.

CLEANUP parameter defines that RESULTS file will be deleted in next run, this is super useful, if used smart you can set CLEANUP to False and have a final task that will delete all previous tasks RESULTS, this way if you have a task which does not a clean run every time it runs and there is a temporary failure you can continue where it failed in the last run.

Sections

CREDS section, and is used for username and passwords, also can accept KEY parameter for public keys.[GW_PASSWORDS]
TYPE: CREDS
USER: <username>
PASS: <password>

SSH_PROXY, is used to define ssh proxy for a DB task[GW1()]
TYPE: SSH_PROXY
HOST: GW1
PORT: 22
CREDS: GW_PASSWORDS

DB_CONF, is used to define a Database connection, the important thing here is the ENGINE parameter it supports any available on the system SQL alchemy db connector[SERVER1]
TYPE: DB_CONF
ENGINE: ibm_db_sa
CREDS: DB_CREDS_PROD
DBHOST: SERVER1
DBNAME: VLS_DB
DBPORT: 50000

Another kind of task is the REMOTE_TASK, a remote task is a shell command executed in a remote computer, it can use SSH_PROXY option if needed.[BUILD]
TASKS: [REMOTE_TASK_1()]
WORKERS:8
LOCAL_SCHEDULER: True[GW1()]
TYPE: SSH_PROXY
HOST: GW1
PORT: 22
CREDS: MYPASSWORD[MYPASSWORD]
TYPE: CREDS
USER: kpatronas
PASS: mypassword
KEY: /home/kpatronas/.ssh/id_rsa.pub[REMOTE_TASK_1()]
RESULTS: ./remote_task1/ouput.txt
SUCCESS_EXIT_CODE: 0
TYPE:REMOTE_TASK
COMMAND: /bin/ls -la
CLEANUP: True
HOST: SERVER1
CREDS: MYPASSWORD
USE_PROXY: True
PROXY: GW1()

i hope you find the article useful and help you start with Python and ETL jobs

Join Medium with my referral link - Konstantinos Patronas
As a Medium member, a portion of your membership fee goes to writers you read, and you get full access to every story…