Home Cloud-Data-Infra - 4차
Post
Cancel

Cloud-Data-Infra - 4차

EMR (Elastic Map Reduce)

  • Hadoop은 병렬처리(PPM)와 Object-Storage(HDFS)를 의미한다.
  • AWS는 병렬처리는 지원하지만 HDFS는 지원하지 않고자 했기때문에(지금은 지원)
  • 데이터 저장소 보다는 프로세싱 엔진으로 생각하는 편이 좋다.

실습 1. EMR 에서 Spark 코드 실행하기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1. EMR 구성하기
    - EMR 클러스터 생성
        - AWS Glue 에서 Hive 데이터 메타데이터에서 사용한다.
        - 추후 EMR을 죽거나 필요할 때 켜서 쓰는 데 유용하다.
        - 비용절감 측면에서 스팟 형태로도 많이 사용한다.

2. EMR에 보안 설정
    - EMR > Summary > security groups for master > edit inbound rules 
    - c9 private ip 허용
    - local ip address 허용

3. ssh로 c9 혹은 로컬(inbound 22포트 추가로 추가) 에서 접속
    - ssh hadoop@ec2-43-201-36-199.ap-northeast-2.compute.amazonaws.com -i EmrKeyPair.pem

4. S3 에 버킷 및 하위 디렉토리 생성
    - data, files, input, logs, output 폴더 생성
    - 요건에 맞게 폴더별 데이터 업로드

5. EMR에 접속하여 wget으로 실행하고자 하는 파일 저장
    - wget https://raw.githubusercontent.com/iamtaewan/workstation-demo/main/spark-etl.py

6. EMR에서 spark 코드 실행
    - spark-submit spark-etl.py s3://<YOUR-BUCKET>/input/ s3://<YOUR-BUCKET>/output/spark
    - (spark-submit spark-etl.py s3://emrdemo-kbi0127/input/ s3://emrdemo-kbi0127/output/spark)

7. Spark History Server

+ 동일한 방식으로 step 에서도 실행할 수 있고
+ Jupyter Lab 에서도 실행할 수 있다.

실습 2. Spark 코드 예제(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 3):
        print("Usage: spark-etl [input-folder] [output-folder]")
        sys.exit(0)

    spark = SparkSession\
        .builder\
        .appName("SparkETL")\
        .getOrCreate()

    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])

    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))

    updatedNYTaxi.printSchema()

    print(updatedNYTaxi.show())

    print("Total number of records: " + str(updatedNYTaxi.count()))

    updatedNYTaxi.write.parquet(sys.argv[2])

실습 3. Spark 코드 예제(2)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, col

input_path = "s3://emrdemo-kbi0127/data/sales.csv"
output_path = "s3://emrdemo-kbi0127/output/jupyter/"

df = (spark.read.format('csv').option('header', 'True').option(
 "inferSchema", "true").load(input_path))
df.show()

df_final = (
 df.withColumn("order_id", df["Order ID"]).drop("Order ID").withColumn(
 "order_date",
 to_date(col("Order Date"), "M/d/yyyy")).drop("Order Date").withColumn(
 "item_type", df["Item Type"]).drop("Item Type").withColumn(
 "sales_channel",
 df["Sales Channel"]).drop("Sales Channel").withColumn(
 "units_sold",
 df["Units Sold"].cast('float')).drop("Units Sold").
 withColumn("unit_price",
 df["Unit Price"].cast('float')).drop("Unit Price").withColumn(
 "total_cost", df["Total Cost"].cast('float')).
 drop("Total Cost").withColumn(
 "total_profit",
 df["Total Profit"].cast('float')).drop("Total Profit").withColumn(
 "total_revenue",
 df["Total Revenue"].cast("float")).drop("Total Revenue").drop(
 "Order Priority", "Ship Date", "Unit Cost").distinct())
df_final.show(5)

df_final.createOrReplaceTempView('df_final_View')
spark.sql("select * from df_final_View").show(5)

df_final.repartition(2).write.mode("overwrite").save(
 "s3://emrdemo-kbi0127/data/output/sales/sales_final_parquet") # 뭉쳐서 파티션을 두개로 나눈다.

 df_final.write.partitionBy("region").parquet(
 "s3://emrdemo-kbi0127/output/sales/sales_region_final_parquet")  # 뭉쳐서 파티션을 두개로 나눈다.

실습 4. Hive 코드 예제

Hive : 동작시키는 엔진이 MR -> Spark 로 바뀌었다. Managed Table : External Table : 일반적인 External과 살짝 다른 의미이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CREATE EXTERNAL TABLE ny_taxi_test (
    vendor_id int,
    lpep_pickup_datetime string,
    lpep_dropoff_datetime string,
    store_and_fwd_flag string,
    rate_code_id smallint,
    pu_location_id int,
    do_location_id int,
    passenger_count int,
    trip_distance double,
    fare_amount double,
    mta_tax double,
    tip_amount double,
    tolls_amount double,
    ehail_fee double,
    improvement_surcharge double,
    total_amount double,
    payment_type smallint,
    trip_type smallint
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION "s3://emrdemo-kbi0127/input/";

실습 5. Pig 코드 예제

DEFINE CSVLoader org.apache.pig.piggybank.storage.CSVLoader();

NY_TAXI = LOAD '$INPUT' USING CSVLoader(',') AS
       (vendor_id:int,
       lpep_pickup_datetime:chararray,
       lpep_dropoff_datetime:chararray,
       store_and_fwd_flag:chararray,
       rate_code_id:int,
       pu_location_id:int,
       do_location_id:int,
       passenger_count:int,
       trip_distance:double,
       fare_amount:double,
       mta_tax:double,
       tip_amount:double,
       tolls_amount:double,
       ehail_fee:double,
       improvement_surcharge:double,
       total_amount:double,
       payment_type:int,
       trip_type:int);

STORE NY_TAXI into '$OUTPUT' USING PigStorage('\t');

MLOPS OpenSource

  • airflow (airbnb,uber관리)
  • kubeflow (container 기반)
  • mlflow (bigdata 기반)

ONNX

ML Model 을 C++ or Java 등 언어로 바꿔준다.

Hadoop 개요

Hadoop : 대용량 파일을 분산저장(HDFS)하여 mapreduce 를 통해 원하는 정보를 처리한다.

  • Distributed, Scalable, Fault-tolerant 를 특징으로 갖는다.
  • resource 관리는 yarn이 담당한다.
  • Hadoop HDFS, Hadoop MapReduce, Hadoop YARN 이 주요 구성요소이다.
  • KUDU : object storage 의 immutable storage 특징을 극복할 수 있게 도와주는 SW
  • HBASE : HDFS 베이스로 하는 컬럼너 NoSQL DB이다.
  • IMPALA : C++ 로 구성되어 Spark 보다 빠르게 동작하지만, only-memory 라 메모리의 양이 결국 처리할 수 있는 데이터의 양이라는 단점이 있다.
  • HUE : UI 이나 오히려 zepline을 사용하기도 한다.

Hadoop Stack

  • 최근에는 네트워크 성능이 많이 좋아져서 Hadoop Storage 와 Computing 엔진을 별도로 두는 경우가 많다.

Apache Sentry(권한 부여) , kevros

EMR

  • Master Node : 클러스터를 관리한다.
  • Core Node : 데이터 노드
  • Task Node : 선택적인 노드이다.

###

This post is licensed under CC BY 4.0 by the author.