Home Airflow Lecture 4
Post
Cancel

Airflow Lecture 4

TRANSACTION

  • Atomic 하게 실행되어야 하는 SQL 들을 묶어서 하나의 작업처럼 처리하는 방법
  • Transaction Isolation Level 은 Read Commited 가 디폴트 세팅
  • python 의 경우, try/catch 와 같이 사용하는 것이 일박적이며, 끝에 raise를 붙여준다.

Airflow TestCode

  • Airflow Pipeline == DAG » tasks » Operateors
  • DAG는 Dag Object 를 만들면서 시작한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# DAG Object 선언
dag = DAG( 
    dag_id = 'HelloWorld',
    start_date = datetime(2022,6,14),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *')

# 돌릴 함수들 선언
def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

# Operator 선언
print_hello = PythonOperator(
    task_id = 'print_hello',
    #python_callable param points to the function you want to run 
    python_callable = print_hello,
    #dag param points to the DAG that this task is a part of
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,
    dag = dag)

# Operator 실행 선후행 관계를 표시한다.
# Assign the order of the tasks in our DAG
print_hello >> print_goodbye
1
2
3
4
5
6
7
docker exec -it data-engineering-batch13-airflow-scheduler-1 sh

airflow dags list

airflow tasks list HelloWorld

airflow tasks test print_hello 2023-07-01

Task Decorator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
) as dag:

    # Assign the tasks to the DAG in order
    print_hello() >> print_goodbye()

Airflow Task Sample Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
    user = "lsyeup1206"  # 본인 ID 사용
    password = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()


def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)


def transform(text):
    logging.info("Transform started")	
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(records):
    logging.info("load started")
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = "keeyong"
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


def etl():
    link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    data = extract(link)
    lines = transform(data)
    load(lines)


dag_second_assignment = DAG(
	dag_id = 'name_gender',
	catchup = False,
	start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
	schedule = '0 2 * * *')  # 적당히 조절

task = PythonOperator(
	task_id = 'perform_etl',
	python_callable = etl,
	dag = dag_second_assignment)

Airflow Task Sample Source - 개선 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime
from datetime import timedelta
from plugins import slack

import requests
import logging
import psycopg2



def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)


def transform(**context):
    logging.info("Transform started")
    # xcom_pull을 통해, 앞서 실행되었던 extract 의 id를 가지고 리턴값을 읽어온다.
    # xcom을 통해 읽어온 데이터는 postgres 에 저장된다. 결과적으로 큰 데이터는 못넘기고
    # S3 같은 데이터의 경로를 넘긴다.
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    if text is None:
        print("++++++++++++++++++++++++++++++")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]
    
    records = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")    
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"IINSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error Msg", error)
        cur.execute("ROLLBACK;")
        raise  
    logging.info("load done")


dag = DAG(
    dag_id = 'name_gender_v4',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        'on_failure_callback': slack.on_failure_callback,
    }
)


extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': 'keeyong',   ## 자신의 스키마로 변경
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load

Airflow Task Sample Source - 개선 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db') # Airflow Connection ID
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


@task
def load(schema, table, records):
    logging.info("load started")    
    cur = get_Redshift_connection()   
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


with DAG(
    dag_id='namegender_v5',
    start_date=datetime(2022, 10, 6),  # 날짜가 미래인 경우 실행이 안됨
    schedule='0 2 * * *',  # 적당히 조절
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
) as dag:

    url = Variable.get("csv_url")
    schema = 'keeyong'   ## 자신의 스키마로 변경
    table = 'name_gender'

    lines = transform(extract(url))
    load(schema, table, lines)

Connections and Variables

Yahoo Finance API DAG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
CREATE TABLE {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("keeyong", "stock_info", results)
  • Docker Module 에 yfinance 어떻게 설치할 것인지?
    • docker-compose yaml에서 > pip additional req에서 설치
    • yaml에서 ‘:-‘ 문법을 사용

docker 에 pip3 로 yfinance 모듈 설지할 때 worker-node 에도 같이 설치해주어야 한다!!

airflow.cfg INFO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1. DAGs 폴더는 어디에 지정되는가?
    [core]
    # The folder where your airflow pipelines live, most likely a
    # subfolder in a code repository. This path must be absolute.
    dags_folder = /opt/airflow/dags

2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?
    
    # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
    dag_dir_list_interval = 300

3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?

    https://airflow.apache.org/docs/apache-airflow/stable/security/api.html

    enable_experimental_api = False -> true
    # Comma separated list of auth backends to authenticate users of the API. See
    # https://airflow.apache.org/docs/apache-airflow/stable/security/api.html for possible values.
    # ("airflow.api.auth.backend.default" allows all requests for historic reasons)
    auth_backends = airflow.api.auth.backend.session -> airflow.api.auth.backend.default
        - 1) Open the airflow.cfg file and set the api_auth_backend configuration option to airflow.api.auth.backend.default. 
        - 2) Also, set the api_experimental configuration option to true. Save the file and restart the Airflow webserver.

4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? 
    https://medium.com/@sukul.teradata/apache-airflow-variables-summary-2281fdf18846
    => password, secret, passwd, authorization, api_key, apikey, access_token

5. 이 환경설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
    => 스케줄러 노드와 웹서버 노드를 재시작해야 한다.

6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?
    # Secret key to save connection passwords in the db
    fernet_key =

세계 나라 정보 API 사용 DAG 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import pandas as pd
import logging
import requests
import json



def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

@task
def get_countries_info(url):
    response = requests.get(url)
    rest_countries_list = json.loads(response.text)
    records = []
    
    for country in rest_countries_list:
        print(country['name']['official'], country['population'], country['area'] ,end = "\n")
        records.append([country['name']['official'], country['population'], country['area']])
    return records


def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
        CREATE TABLE IF NOT EXISTS {schema}.{table} (
            country varcher(255),
            population varcher(255),
            area varcher(255)
        );""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
        for r in records:
            sql = f"INSERT INTO t VALUES ({r[0]}, {r[1]}, {r[2]});"
            print(sql)
            cur.execute(sql)

        # 원본 테이블 생성
        _create_table(cur, schema, table, True)
        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;") 
        raise
    logging.info("load done")


with DAG(
    dag_id = 'CountryInfo_v1',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '30 6 * * 6' #DAG는 UTC로 매주 토요일 오전6시 30분에 실행
) as dag:

    source_url = 'https://restcountries.com/v3.1/all'
    results = get_countries_info(source_url)
    load("lsyeup1206", "country_info", results)

Data Organization

  • Data Engineering Team
    • Managing Data Warehouse
    • Writing and Managing Data-Pipelines
      • Data-Pipeline == ETL(Extract, Transform, Load) == Data Job == DAG(Airflow)
    • Type of Data-Pipelines
      • Batch Processing
      • Real-time Processing
      • Summary Data Generation(ELT: 이미 data-warehouse에 올라간 데이터를 활용, 데이터 분석가가 많이 작업함)
    • Event Collection
  • SQL / Python / Scalar / Java
  • AirFlow
  • Spark, Hadoop
  • AWS
  • K8S, Docker
  • ML, A/B Test, Statistics

RedShift

Data Warehouse ?

  • A Seperate SQL DataBase (Not a Production DataBase)
    • Production DataBase 는 주로 서비스 이용자가 사용하는 DB이며 속도가 중요하고, Online Transaction Processing (OLTP) 에 특화되어 있다.
    • Data Warehouse 는 분석을 위한 DB이며, 속도 보다는 대용량 처리가 중요하고, Online Analytical Processing (OLAP) 에 특화되어 있다.
  • Central Data Storage of our company
  • Fixed Cost Option (RedShift) vs Variable Cost Option (Bigquery, SnowFlake)
This post is licensed under CC BY 4.0 by the author.