티스토리 뷰

Airflow란?

  • Apache Airflow는 초기 에어비엔비(Airfbnb) 엔지니어링 팀에서 개발한 워크플로우 오픈 소스 플랫폼
    ** 워크플로우란? : 의존성으로 연결된 작업(Task)들의 집합
    (ex) ETL의 경우 Extractaction > Transformation > Loading 의 작업의 흐름
  • 프로그래밍 방식으로 워크플로우를 작성, 예약 및 모니터링

설치

dependency 문제가 많으므로 virtualenv에 설치하는 것을 추천한다. 

pip install apache-airflow

 

실행

원하는 위치에서

airflow initdb

따로 설정 없이도 홈 디렉토리의 airflow 디렉토리에 airflow db파일이 생성된다.

 

127.0.0.1:8080 접속 시 initialize가 제대로 된 것을 확인할 수 있다.

example 들이 많이 보여서 불편하다면 airflow.cfg 파일을 열어서 load_examples를 False로 바꿔준다.

 airflow db init 을 다시 실행해준다.

 

airflow webserver

 

을 다시 실행해서 서버를 재부팅해준다.

 

airflow users create \
>           --username admin \
>           --firstname FIRST_NAME \
>           --lastname LAST_NAME \
>           --role Admin \
>           --email admin@example.org

명령어를 통해서 유저를 생성해준다.

 

로직 설계하기

 

내가 시행해야 할 테스크는 다음과 같았다.

 

<N 뉴스 수집하기>

task 1. 나스에 업로드 된 뉴스들을 postgresql로 테이블 만들어서 옮기기

task 2. 네이버 뉴스 수집하기 -> 자연어 처리하기 -> postgresql에 저장하기

 

<T 블로그 수집하기>

task 1. 지금까지 저장된 데이터를 정제하기(코드 추가로 작성 해야 함)

task 2. 블로그 수집해서 postgresql에 저장하기

task 3. 추천 블로그 리스트 새로 생기면 (정기적으로) postgresql에 저장하기

 

<N 블로그 수집하기>

task 1. 지금까지 저장된 데이터를 정제하기(코드 추가로 작성 해야 함)

task 2. 블로그 수집해서 postgresql에 저장하기

task 3. 추천 블로그 리스트 새로 생기면 (정기적으로) postgresql에 저장하기

 

<웹소설 수집하기>

task 1. 웹소설 수집해서 postgresql에 저장하기

task 2. 웹소설 새로 업데이트 되면 postgresql에 저장하기

 

<백업하기>

task 1. 모든 데이터 정기적으로 다운로드 받아서 database object로 저장하기

 

 

PythonOperator를 사용해서 각 task들을 파이썬 함수로 정의해서 사용하려고 한다.

branch: if else같은 역할. 직전 상태에서 return값에 따라 분기를 태울 수 있다.

 

이 친구들은 다 다른 테스크를 수행하기 때문에 5개의 DAG로 분리해야겠다고 생각했다.

 

op_kwargs를 통해서 argument들을 넘겨줄 수 있다.

 

Airflow DAG 스케줄링

with DAG(
	dag_id='dag-for-velog',
    schedule_interval='@daily',
    tags=['airflow'],
    start_date = datetime(2022,1,1),
    catchup=False     # catchup을 True로 하면, start_date 부터 현재까지 못돌린 날들을 채운다
    ...
    )

처음 실행 시 interval을 설정해줄 수 있다.

 

interval을 정하는 방법에는 cron기반과 timedelta 기반이 있다. 내용은 생략한다.

timedelta

    schedule_interval=datetime.timedelta(days=3),

 

주의!DAG 정의 파일에 로직 쓰지 않기!

 
머리를 감싸야 할 한 가지는 (처음에는 모든 사람에게 매우 직관적이지 않을 수 있습니다) 
이 Airflow Python 스크립트가 DAG의 구조를 코드로 지정하는 구성 파일일 뿐이라는 것입니다. 
여기에 정의된 실제 작업은 이 스크립트의 맥락과 다른 맥락에서 실행될 것이다. 
다른 작업은 다른 시점에 다른 작업자에서 실행되며, 이는 이 스크립트가 작업 간의 교차 통신에 사용될 수 없다는 것을 의미합니다. 
이 목적을 위해 우리는 XComs라고 불리는 더 고급 기능을 가지고 있습니다.

사람들은 때때로 DAG 정의 파일을 실제 데이터 처리를 할 수 있는 장소로 생각합니다 - 전혀 그렇지 않습니다! 
스크립트의 목적은 DAG 객체를 정의하는 것이다. 
스케줄러가 변경 사항을 반영하기 위해 주기적으로 실행하기 때문에 빠르게 평가해야 합니다(분이 아닌 초).

처음에 airflow를 무턱대고 작업할 때 이 파일 조차도 파이썬 파일이라고 생각해서(실제로 .py파일이므로)

파이썬과 관련된 로직들을 넣곤 했다. 예를 들면 airflow에서 사용할 class를 정의하고 그의 instance를 생성하고,

전역변수를 썼다. 

 

하지만, 문서에서 얘기하기로는 그러기 위해서는 XComs라는 함수를 쓰고 이 파일에는 로직을 넣지 말라고 하고 있다

모두들 주의하길 바라며...

(OPTIONAL) queue 만들기

 

스케줄러가 할 일을 만들고 큐에다가 넣으면 worker들이 시행할 수 있다. 

예를 들어 스케줄러들이 DAG를 확인하고 뉴스 url들을 많이 만들어두면 큐에 넣어두면 worker들이 http result를 받고 sql에 저장까지 할 수 있는 것이다.

 

이걸 하려면 sequential 옵션을 버려야 한다....

 

한국시간대로 맞추기

pip install pendulum
pendulum.datetime(2019, 5, 1, tz="Asia/Seoul")

와 같이 정의하면 된다.

 

airflow.cfg
[core]
...
...
# default_timezone = utc
default_timezone = Asia/Seoul
## Pendulum 라이브러리에서 지원하는 timezone 형태
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함