Create ETL tasks with Python and Luigi Blueprint
What is blueprint?
What is blueprint?
Blueprint is an open source tool which allows you to create ETL jobs using ini configuration style files.
https://github.com/kpatronas/luigi-blueprint
Example: How to connect to SERVER1, do a query, export data and run grep against exported data
Create the following configuration file[BUILD]
TASKS: [GREP_DATA()]
WORKERS:8
LOCAL_SCHEDULER: True[GW_PASSWORDS]
TYPE: CREDS
USER: <username>
PASS: <password>[GW1()]
TYPE: SSH_PROXY
HOST: GW1
PORT: 22
CREDS: GW_PASSWORDS[SERVER1]
TYPE: DB_CONF
ENGINE: ibm_db_sa
CREDS: DB_CREDS_PROD
DBHOST: SERVER1
DBNAME: VLS_DB
DBPORT: 50000[DB_CREDS_PROD]
TYPE: CREDS
USER: <username>
PASS: <password>[SERVER1_QUERY()]
RESULTS: ./results.csv
CLEANUP: True
TYPE: DB_TASK
RESULTS_TYPE: csv
QUERY: ./query.sql
DB: SERVER1
USE_PROXY: True
PROXY: GW1()[GREP_DATA()]
RESULTS: grep_results.csv
CLEANUP: True
REQUIRES: [SERVER1_QUERY()]
TYPE: LOCAL_TASK
COMMAND: grep 'to_grep_something' ./results.csv
How to execute a blueprintblue -b ./config.cfg
How it works
The first task to be executed is [BUILD]
BUILD is an initiator which defines some run time options like how many parallel tasks allowed (WORKERS)
And which flow want to follow back (GREP_DATA()).
GREP_DATA() will try to execute “grep ‘to_grep_something’ ./results.csv” but since does not yet exists it will try to run any task is on the REQUIRES parameter.
all tasks can have a REQUIRES parameter, REQUIRES parameter can accept multiple tasks in this form [TASK1(),TASK2()], those tasks will run in parallel if BUILD section has a WORKERS parameter has a value greater than 1.
CLEANUP parameter defines that RESULTS file will be deleted in next run, this is super useful, if used smart you can set CLEANUP to False and have a final task that will delete all previous tasks RESULTS, this way if you have a task which does not a clean run every time it runs and there is a temporary failure you can continue where it failed in the last run.
Sections
CREDS section, and is used for username and passwords, also can accept KEY parameter for public keys.[GW_PASSWORDS]
TYPE: CREDS
USER: <username>
PASS: <password>
SSH_PROXY, is used to define ssh proxy for a DB task[GW1()]
TYPE: SSH_PROXY
HOST: GW1
PORT: 22
CREDS: GW_PASSWORDS
DB_CONF, is used to define a Database connection, the important thing here is the ENGINE parameter it supports any available on the system SQL alchemy db connector[SERVER1]
TYPE: DB_CONF
ENGINE: ibm_db_sa
CREDS: DB_CREDS_PROD
DBHOST: SERVER1
DBNAME: VLS_DB
DBPORT: 50000
Another kind of task is the REMOTE_TASK, a remote task is a shell command executed in a remote computer, it can use SSH_PROXY option if needed.[BUILD]
TASKS: [REMOTE_TASK_1()]
WORKERS:8
LOCAL_SCHEDULER: True[GW1()]
TYPE: SSH_PROXY
HOST: GW1
PORT: 22
CREDS: MYPASSWORD[MYPASSWORD]
TYPE: CREDS
USER: kpatronas
PASS: mypassword
KEY: /home/kpatronas/.ssh/id_rsa.pub[REMOTE_TASK_1()]
RESULTS: ./remote_task1/ouput.txt
SUCCESS_EXIT_CODE: 0
TYPE:REMOTE_TASK
COMMAND: /bin/ls -la
CLEANUP: True
HOST: SERVER1
CREDS: MYPASSWORD
USE_PROXY: True
PROXY: GW1()
i hope you find the article useful and help you start with Python and ETL jobs