티스토리 뷰
Airflow란?
- Apache Airflow는 초기 에어비엔비(Airfbnb) 엔지니어링 팀에서 개발한 워크플로우 오픈 소스 플랫폼
** 워크플로우란? : 의존성으로 연결된 작업(Task)들의 집합
(ex) ETL의 경우 Extractaction > Transformation > Loading 의 작업의 흐름 - 프로그래밍 방식으로 워크플로우를 작성, 예약 및 모니터링
설치
dependency 문제가 많으므로 virtualenv에 설치하는 것을 추천한다.
pip install apache-airflow
실행
원하는 위치에서
airflow initdb
따로 설정 없이도 홈 디렉토리의 airflow 디렉토리에 airflow db파일이 생성된다.
127.0.0.1:8080 접속 시 initialize가 제대로 된 것을 확인할 수 있다.
example 들이 많이 보여서 불편하다면 airflow.cfg 파일을 열어서 load_examples를 False로 바꿔준다.
airflow db init 을 다시 실행해준다.
airflow webserver
을 다시 실행해서 서버를 재부팅해준다.
airflow users create \
> --username admin \
> --firstname FIRST_NAME \
> --lastname LAST_NAME \
> --role Admin \
> --email admin@example.org
명령어를 통해서 유저를 생성해준다.
로직 설계하기
내가 시행해야 할 테스크는 다음과 같았다.
<N 뉴스 수집하기>
task 1. 나스에 업로드 된 뉴스들을 postgresql로 테이블 만들어서 옮기기
task 2. 네이버 뉴스 수집하기 -> 자연어 처리하기 -> postgresql에 저장하기
<T 블로그 수집하기>
task 1. 지금까지 저장된 데이터를 정제하기(코드 추가로 작성 해야 함)
task 2. 블로그 수집해서 postgresql에 저장하기
task 3. 추천 블로그 리스트 새로 생기면 (정기적으로) postgresql에 저장하기
<N 블로그 수집하기>
task 1. 지금까지 저장된 데이터를 정제하기(코드 추가로 작성 해야 함)
task 2. 블로그 수집해서 postgresql에 저장하기
task 3. 추천 블로그 리스트 새로 생기면 (정기적으로) postgresql에 저장하기
<웹소설 수집하기>
task 1. 웹소설 수집해서 postgresql에 저장하기
task 2. 웹소설 새로 업데이트 되면 postgresql에 저장하기
<백업하기>
task 1. 모든 데이터 정기적으로 다운로드 받아서 database object로 저장하기
PythonOperator를 사용해서 각 task들을 파이썬 함수로 정의해서 사용하려고 한다.
branch: if else같은 역할. 직전 상태에서 return값에 따라 분기를 태울 수 있다.
이 친구들은 다 다른 테스크를 수행하기 때문에 5개의 DAG로 분리해야겠다고 생각했다.
op_kwargs를 통해서 argument들을 넘겨줄 수 있다.
Airflow DAG 스케줄링
with DAG(
dag_id='dag-for-velog',
schedule_interval='@daily',
tags=['airflow'],
start_date = datetime(2022,1,1),
catchup=False # catchup을 True로 하면, start_date 부터 현재까지 못돌린 날들을 채운다
...
)
처음 실행 시 interval을 설정해줄 수 있다.
interval을 정하는 방법에는 cron기반과 timedelta 기반이 있다. 내용은 생략한다.
timedelta
schedule_interval=datetime.timedelta(days=3),
주의!DAG 정의 파일에 로직 쓰지 않기!
머리를 감싸야 할 한 가지는 (처음에는 모든 사람에게 매우 직관적이지 않을 수 있습니다)
이 Airflow Python 스크립트가 DAG의 구조를 코드로 지정하는 구성 파일일 뿐이라는 것입니다.
여기에 정의된 실제 작업은 이 스크립트의 맥락과 다른 맥락에서 실행될 것이다.
다른 작업은 다른 시점에 다른 작업자에서 실행되며, 이는 이 스크립트가 작업 간의 교차 통신에 사용될 수 없다는 것을 의미합니다.
이 목적을 위해 우리는 XComs라고 불리는 더 고급 기능을 가지고 있습니다.
사람들은 때때로 DAG 정의 파일을 실제 데이터 처리를 할 수 있는 장소로 생각합니다 - 전혀 그렇지 않습니다!
스크립트의 목적은 DAG 객체를 정의하는 것이다.
스케줄러가 변경 사항을 반영하기 위해 주기적으로 실행하기 때문에 빠르게 평가해야 합니다(분이 아닌 초).
처음에 airflow를 무턱대고 작업할 때 이 파일 조차도 파이썬 파일이라고 생각해서(실제로 .py파일이므로)
파이썬과 관련된 로직들을 넣곤 했다. 예를 들면 airflow에서 사용할 class를 정의하고 그의 instance를 생성하고,
전역변수를 썼다.
하지만, 문서에서 얘기하기로는 그러기 위해서는 XComs라는 함수를 쓰고 이 파일에는 로직을 넣지 말라고 하고 있다.
모두들 주의하길 바라며...
(OPTIONAL) queue 만들기
스케줄러가 할 일을 만들고 큐에다가 넣으면 worker들이 시행할 수 있다.
예를 들어 스케줄러들이 DAG를 확인하고 뉴스 url들을 많이 만들어두면 큐에 넣어두면 worker들이 http result를 받고 sql에 저장까지 할 수 있는 것이다.
이걸 하려면 sequential 옵션을 버려야 한다....
한국시간대로 맞추기
pip install pendulum
pendulum.datetime(2019, 5, 1, tz="Asia/Seoul")
와 같이 정의하면 된다.
airflow.cfg
[core]
...
...
# default_timezone = utc
default_timezone = Asia/Seoul
## Pendulum 라이브러리에서 지원하는 timezone 형태