## Python pre DevOps
#### alebo (nie) príliš stručný úvod do [Apache Airflow](https://airflow.apache.org/)
[Miroslav "mirek" Biňas](https://bletvaska.github.io)
/ [**PyCon SK 2024**](https://2024.pycon.sk/sk/speakers/Mirek%20Bi%C5%88as.html)
![thats me](images/mirek.na.hackathone.jpg)
> ... Chcel by som sa opytat, ci by bolo mozne zmenit sylaby zaveru tohto kurzu, aby tam boli temy, ktore suvisia blizsie s **SD Networkingom**? Osobne nemam vedomosti, co presne by malo byt naplnou kurzu, resp. ake temy v ramci **Pythonu** by sa mal clovek naucit, aby mal dobre zaklady pre **SD Networking**? ...
![Schema](images/schema.png)
### ETL in Bash
`$ scrape_data | process_data | publish_data`
### ETL Python Example
Let's scrape'n'store selected data from [openweathermap.org](http://api.openweathermap.org/data/2.5/weather?units=metric&q=bratislava&appid=98113cc2ea891cc2246900c7dc6a8038)
```
+--------------------------+ +----------------+ +----------------+
| Scrape Data | | Process Data | | Publish Data |
| ------------- | ----> | -------------- | ----> | -------------- |
| (GET openweathermap.org) | | (extract data) | | (save to file) |
+--------------------------+ +----------------+ +----------------+
```
### Scrape Data
```python
def scrape_data(query: str, units: str, appid: str) -> dict:
print('>> Scrape data')
url = 'http://api.openweathermap.org/data/2.5/weather'
params = {
'q': query,
'units': units,
'appid': appid
}
response = httpx.get(url, params=params)
return response.json()
```
### Process Data
```python
def process_data(data: dict) -> str:
print('>> Process data')
return '{},{},{},{},{},{}'.format(
data['dt'],
data['name'],
data['sys']['country'],
data['main']['temp'],
data['main']['humidity'],
data['main']['pressure'],
)
```
### Publish Data
```python
def publish_data(line: str):
print('>> Publishing data')
with open('dataset.csv', 'a') as dataset:
print(line, file=dataset)
```
### Running from CLI
```bash
# token will be available during pycon ;)
$ ./workflow.py --appid 98113cc2ea891cc2246900c7dc6a8038 kosice
```
### Some of My Problems...
* periodic execution
* `cron`, `systemd` Timers
* monitoring?
* what if something goes wrong?
* email from cron?
* SMS/IM/notification?
* desktop/phone?
[![](images/logo-airflow.png)](https://airflow.apache.org/)
An open-source platform for developing, scheduling and monitoring workflows.
### Installation / Running
* as standard [Python package](https://pypi.org/project/apache-airflow/):
```bash
$ pip install apache-airflow
```
* as [Docker Container](https://hub.docker.com/r/bitnami/airflow)
```bash
$ docker container run --name airflow bitnami/airflow:latest
```
* in [Docker Composition](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html)
```bash
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.3/docker-compose.yaml'
$ docker compose up
```
![What is DAG?](images/what.is.dag.png)
## CI/CD Pipeline
![CI/CD](images/cicd.png)
### DAG Skeleton
```python
from airflow.decorators import dag
from pendulum import datetime
@dag(
"pycon_scraper",
description="Scrapes weather from openweathermap.org",
schedule="*/20 * * * *",
start_date=datetime(2024, 3, 14, tz="UTC"),
tags=['pycon', 'weather', 'devops'],
catchup=False,
)
def main():
pass
main()
```
### Adding Tasks to DAG
* task is the basic unit of execution
* a function decorated with `@task` decorator
* key part of using tasks is defining how they relate to each other
### Tasks Relation
```python
APPID='98113cc2ea891cc2246900c7dc6a8038'
@dag(
"pycon_scraper",
description="Scrapes weather from openweathermap.org",
schedule="*/20 * * * *",
start_date=datetime(2024, 3, 14, tz="UTC"),
tags=['pycon', 'weather', 'devops'],
catchup=False,
)
def main():
data = scrape_data('kosice', 'metric', APPID)
line = process_data(data)
publish_data(line)
```
## XComs
> **XComs** (short for “cross-communications”) are a **mechanism** that **let Tasks talk to each other**.
>
> -- [Airflow Docs](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html)
## Params
> **Params** enable you to **provide runtime configuration to tasks**.
>
> -- [Airflow Docs](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html)
### Query as Param
```python
@dag(
"pycon_scraper",
description="Scrapes weather from openweathermap.org",
schedule="*/2 * * * *",
start_date=datetime(2024, 3, 14, tz="UTC"),
tags=['pycon', 'weather', 'devops'],
catchup=False,
)
def main(query: str = 'kosice'):
payload = scrape_data(query, 'metric', APPID)
csv_line = process_data(payload)
publish_data(csv_line)
main()
```
### Connections & Hooks
* Airflow is often used to pull and push data into other systems.
* A **Connection** - **set of parameters** that Airflow connects to.
* A **Hook** - a high-level **access interface** (not only) to connections.
### Hook Usage Example
```python
from airflow.hooks.base import BaseHook
def scrape_data(query: str, units: str) -> dict:
print('>> Scrape data')
conn = BaseHook.get_connection("pycon")
url = f'https://{conn.host}/data/2.5/weather'
params = {
'q': query,
'units': units,
'appid': conn.password
}
response = httpx.get(url, params=params)
return response.json()
```
### Task Failure ?!
Don't Forget: **Tasks can (and will) fail!**
### Task Failure Example
simple healthcheck example
```python
from sh import ping
# from airflow.exceptions import AirflowFailException
@task
def service_healthcheck():
conn = BaseHook.get_connection("pycon")
ping('-c', '1', conn.host, _timeout=1)
# raise AirflowFailException('we have a problem')
```
### Connecting Funcs Returning `None`
```python
payload = service_healthcheck() >> scrape_data(query, 'metric')
```
### Parallelism
```python
payload = [
service_healthcheck(),
dummy_healthcheck()
] >> scrape_data(query, 'metric')
```
### Other Features
* datasets and sensors
* different types of triggers than time
* parallelism
* dynamic DAG generation
* notifier on success/failure
* runs in Kubernetes
* extendable
* ...
### [Common Use Cases](https://www.devopsschool.com/blog/what-is-apache-airflow-and-use-cases-of-apache-airflow/#Top_10_use_cases_of_Apache_Airflow)
* data ETL/processing/migration
* reporting and analytics
* workflow automation and monitoring
* infrastructure management
* DevOps automation
* IoT data processing
### Try Me!
[`https://bit.ly/3TkxkCq`](https://bit.ly/3TkxkCq)
(user: `admin` pass: `admin`)
**Please DON'T CHANGE credentials!**
### Links
* [Apache Airflow](https://airflow.apache.org/) - project homepage
* [Awesome Apache Airflow](https://github.com/jghoman/awesome-apache-airflow) - Curated list of resources about Apache Airflow
* [Ecosystem](https://airflow.apache.org/ecosystem/) - other resources (listed at homepage)
[![Zdenko Vrábel](images/zdenko.vrabel.jpg)](https://www.linkedin.com/in/zdenkovrabel/)
Zdenko Vrábel ([linked in](https://www.linkedin.com/in/zdenkovrabel/))
[![Talk Python to Me](images/talk.python.to.me.jpg)](https://talkpython.fm/)
#330: [Apache Airflow Open-Source Workflow with Python](https://talkpython.fm/episodes/show/330/apache-airflow-open-source-workflow-with-python)
![qr code](https://api.qrserver.com/v1/create-qr-code/?data=https://bit.ly/3TeXd6R&size=300x300)
(**https://bit.ly/3TeXd6R**)