## Python pre DevOps #### alebo (nie) príliš stručný úvod do [Apache Airflow](https://airflow.apache.org/) [Miroslav "mirek" Biňas](https://bletvaska.github.io) / [**PyCon SK 2024**](https://2024.pycon.sk/sk/speakers/Mirek%20Bi%C5%88as.html)
> ... Chcel by som sa opytat, ci by bolo mozne zmenit sylaby zaveru tohto kurzu, aby tam boli temy, ktore suvisia blizsie s **SD Networkingom**? Osobne nemam vedomosti, co presne by malo byt naplnou kurzu, resp. ake temy v ramci **Pythonu** by sa mal clovek naucit, aby mal dobre zaklady pre **SD Networking**? ...
### ETL in Bash `$ scrape_data | process_data | publish_data`
### ETL Python Example Let's scrape'n'store selected data from [openweathermap.org](http://api.openweathermap.org/data/2.5/weather?units=metric&q=bratislava&appid=98113cc2ea891cc2246900c7dc6a8038) ``` +--------------------------+ +----------------+ +----------------+ | Scrape Data | | Process Data | | Publish Data | | ------------- | ----> | -------------- | ----> | -------------- | | (GET openweathermap.org) | | (extract data) | | (save to file) | +--------------------------+ +----------------+ +----------------+ ```
### Scrape Data ```python def scrape_data(query: str, units: str, appid: str) -> dict: print('>> Scrape data') url = 'http://api.openweathermap.org/data/2.5/weather' params = { 'q': query, 'units': units, 'appid': appid } response = httpx.get(url, params=params) return response.json() ```
### Process Data ```python def process_data(data: dict) -> str: print('>> Process data') return '{},{},{},{},{},{}'.format( data['dt'], data['name'], data['sys']['country'], data['main']['temp'], data['main']['humidity'], data['main']['pressure'], ) ```
### Publish Data ```python def publish_data(line: str): print('>> Publishing data') with open('dataset.csv', 'a') as dataset: print(line, file=dataset) ```
### CLI Tool ```python #!/usr/bin/env python import click import httpx @click.command(help='Download current weather condition in CSV format.') @click.argument('query') @click.option('--units', type=click.Choice(['metric', 'standard', 'imperial']), default='metric', help='Unit of measurement') @click.option('--appid', default=None, help='API key for openweathermap.org service.', envvar='APPID') def main(query: str, units: str, appid: str): data = scrape_data(query, units, appid) processed_data = process_data(data) publish_data(processed_data) main() ```
### Running from CLI ```bash # token will be available during pycon ;) $ ./workflow.py --appid 98113cc2ea891cc2246900c7dc6a8038 kosice ```
### Some of My Problems... * periodic execution * `cron`, `systemd` Timers * monitoring? * what if something goes wrong? * email from cron? * SMS/IM/notification? * desktop/phone?
[![](images/logo-airflow.png)](https://airflow.apache.org/) An open-source platform for developing, scheduling and monitoring workflows.
### Installation / Running * as standard [Python package](https://pypi.org/project/apache-airflow/): ```bash $ pip install apache-airflow ``` * as [Docker Container](https://hub.docker.com/r/bitnami/airflow) ```bash $ docker container run --name airflow bitnami/airflow:latest ``` * in [Docker Composition](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html) ```bash $ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.3/docker-compose.yaml' $ docker compose up ```
### DAG Skeleton ```python from airflow.decorators import dag from pendulum import datetime @dag( "pycon_scraper", description="Scrapes weather from openweathermap.org", schedule="*/20 * * * *", start_date=datetime(2024, 3, 14, tz="UTC"), tags=['pycon', 'weather', 'devops'], catchup=False, ) def main(): pass main() ```
### Adding Tasks to DAG * task is the basic unit of execution * a function decorated with `@task` decorator * key part of using tasks is defining how they relate to each other
### Tasks Relation ```python APPID='98113cc2ea891cc2246900c7dc6a8038' @dag( "pycon_scraper", description="Scrapes weather from openweathermap.org", schedule="*/20 * * * *", start_date=datetime(2024, 3, 14, tz="UTC"), tags=['pycon', 'weather', 'devops'], catchup=False, ) def main(): data = scrape_data('kosice', 'metric', APPID) line = process_data(data) publish_data(line) ```
## XComs > **XComs** (short for “cross-communications”) are a **mechanism** that **let Tasks talk to each other**. > > -- [Airflow Docs](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html)
## Params > **Params** enable you to **provide runtime configuration to tasks**. > > -- [Airflow Docs](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html)
### Query as Param ```python @dag( "pycon_scraper", description="Scrapes weather from openweathermap.org", schedule="*/2 * * * *", start_date=datetime(2024, 3, 14, tz="UTC"), tags=['pycon', 'weather', 'devops'], catchup=False, ) def main(query: str = 'kosice'): payload = scrape_data(query, 'metric', APPID) csv_line = process_data(payload) publish_data(csv_line) main() ```
### Connections & Hooks * Airflow is often used to pull and push data into other systems. * A **Connection** - **set of parameters** that Airflow connects to. * A **Hook** - a high-level **access interface** (not only) to connections.
### Hook Usage Example ```python from airflow.hooks.base import BaseHook def scrape_data(query: str, units: str) -> dict: print('>> Scrape data') conn = BaseHook.get_connection("pycon") url = f'https://{conn.host}/data/2.5/weather' params = { 'q': query, 'units': units, 'appid': conn.password } response = httpx.get(url, params=params) return response.json() ```
### Task Failure ?! Don't Forget: **Tasks can (and will) fail!**
### Task Failure Example simple healthcheck example ```python from sh import ping # from airflow.exceptions import AirflowFailException @task def service_healthcheck(): conn = BaseHook.get_connection("pycon") ping('-c', '1', conn.host, _timeout=1) # raise AirflowFailException('we have a problem') ```
### Connecting Funcs Returning `None` ```python payload = service_healthcheck() >> scrape_data(query, 'metric') ```
### Parallelism ```python payload = [ service_healthcheck(), dummy_healthcheck() ] >> scrape_data(query, 'metric') ```
## What's Next?
### Other Features * datasets and sensors * different types of triggers than time * parallelism * dynamic DAG generation * notifier on success/failure * runs in Kubernetes * extendable * ...
### [Common Use Cases](https://www.devopsschool.com/blog/what-is-apache-airflow-and-use-cases-of-apache-airflow/#Top_10_use_cases_of_Apache_Airflow) * data ETL/processing/migration * reporting and analytics * workflow automation and monitoring * infrastructure management * DevOps automation * IoT data processing
[![Zdenko Vrábel](images/zdenko.vrabel.jpg)](https://www.linkedin.com/in/zdenkovrabel/) Zdenko Vrábel ([linked in](https://www.linkedin.com/in/zdenkovrabel/))
[![Talk Python to Me](images/talk.python.to.me.jpg)](https://talkpython.fm/) #330: [Apache Airflow Open-Source Workflow with Python](https://talkpython.fm/episodes/show/330/apache-airflow-open-source-workflow-with-python)
