EMR (Elastic Map Reduce)
- Hadoop은 병렬처리(PPM)와 Object-Storage(HDFS)를 의미한다.
- AWS는 병렬처리는 지원하지만 HDFS는 지원하지 않고자 했기때문에(지금은 지원)
- 데이터 저장소 보다는 프로세싱 엔진으로 생각하는 편이 좋다.
실습 1. EMR 에서 Spark 코드 실행하기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1. EMR 구성하기
- EMR 클러스터 생성
- AWS Glue 에서 Hive 데이터 메타데이터에서 사용한다.
- 추후 EMR을 죽거나 필요할 때 켜서 쓰는 데 유용하다.
- 비용절감 측면에서 스팟 형태로도 많이 사용한다.
2. EMR에 보안 설정
- EMR > Summary > security groups for master > edit inbound rules
- c9 private ip 허용
- local ip address 허용
3. ssh로 c9 혹은 로컬(inbound 22포트 추가로 추가) 에서 접속
- ssh hadoop@ec2-43-201-36-199.ap-northeast-2.compute.amazonaws.com -i EmrKeyPair.pem
4. S3 에 버킷 및 하위 디렉토리 생성
- data, files, input, logs, output 폴더 생성
- 요건에 맞게 폴더별 데이터 업로드
5. EMR에 접속하여 wget으로 실행하고자 하는 파일 저장
- wget https://raw.githubusercontent.com/iamtaewan/workstation-demo/main/spark-etl.py
6. EMR에서 spark 코드 실행
- spark-submit spark-etl.py s3://<YOUR-BUCKET>/input/ s3://<YOUR-BUCKET>/output/spark
- (spark-submit spark-etl.py s3://emrdemo-kbi0127/input/ s3://emrdemo-kbi0127/output/spark)
7. Spark History Server
+ 동일한 방식으로 step 에서도 실행할 수 있고
+ Jupyter Lab 에서도 실행할 수 있다.
실습 2. Spark 코드 예제(1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 3):
print("Usage: spark-etl [input-folder] [output-folder]")
sys.exit(0)
spark = SparkSession\
.builder\
.appName("SparkETL")\
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.printSchema()
print(updatedNYTaxi.show())
print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet(sys.argv[2])
실습 3. Spark 코드 예제(2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, col
input_path = "s3://emrdemo-kbi0127/data/sales.csv"
output_path = "s3://emrdemo-kbi0127/output/jupyter/"
df = (spark.read.format('csv').option('header', 'True').option(
"inferSchema", "true").load(input_path))
df.show()
df_final = (
df.withColumn("order_id", df["Order ID"]).drop("Order ID").withColumn(
"order_date",
to_date(col("Order Date"), "M/d/yyyy")).drop("Order Date").withColumn(
"item_type", df["Item Type"]).drop("Item Type").withColumn(
"sales_channel",
df["Sales Channel"]).drop("Sales Channel").withColumn(
"units_sold",
df["Units Sold"].cast('float')).drop("Units Sold").
withColumn("unit_price",
df["Unit Price"].cast('float')).drop("Unit Price").withColumn(
"total_cost", df["Total Cost"].cast('float')).
drop("Total Cost").withColumn(
"total_profit",
df["Total Profit"].cast('float')).drop("Total Profit").withColumn(
"total_revenue",
df["Total Revenue"].cast("float")).drop("Total Revenue").drop(
"Order Priority", "Ship Date", "Unit Cost").distinct())
df_final.show(5)
df_final.createOrReplaceTempView('df_final_View')
spark.sql("select * from df_final_View").show(5)
df_final.repartition(2).write.mode("overwrite").save(
"s3://emrdemo-kbi0127/data/output/sales/sales_final_parquet") # 뭉쳐서 파티션을 두개로 나눈다.
df_final.write.partitionBy("region").parquet(
"s3://emrdemo-kbi0127/output/sales/sales_region_final_parquet") # 뭉쳐서 파티션을 두개로 나눈다.
실습 4. Hive 코드 예제
Hive : 동작시키는 엔진이 MR -> Spark 로 바뀌었다. Managed Table : External Table : 일반적인 External과 살짝 다른 의미이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CREATE EXTERNAL TABLE ny_taxi_test (
vendor_id int,
lpep_pickup_datetime string,
lpep_dropoff_datetime string,
store_and_fwd_flag string,
rate_code_id smallint,
pu_location_id int,
do_location_id int,
passenger_count int,
trip_distance double,
fare_amount double,
mta_tax double,
tip_amount double,
tolls_amount double,
ehail_fee double,
improvement_surcharge double,
total_amount double,
payment_type smallint,
trip_type smallint
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION "s3://emrdemo-kbi0127/input/";
실습 5. Pig 코드 예제
DEFINE CSVLoader org.apache.pig.piggybank.storage.CSVLoader();
NY_TAXI = LOAD '$INPUT' USING CSVLoader(',') AS
(vendor_id:int,
lpep_pickup_datetime:chararray,
lpep_dropoff_datetime:chararray,
store_and_fwd_flag:chararray,
rate_code_id:int,
pu_location_id:int,
do_location_id:int,
passenger_count:int,
trip_distance:double,
fare_amount:double,
mta_tax:double,
tip_amount:double,
tolls_amount:double,
ehail_fee:double,
improvement_surcharge:double,
total_amount:double,
payment_type:int,
trip_type:int);
STORE NY_TAXI into '$OUTPUT' USING PigStorage('\t');
MLOPS OpenSource
- airflow (airbnb,uber관리)
- kubeflow (container 기반)
- mlflow (bigdata 기반)
ONNX
ML Model 을 C++ or Java 등 언어로 바꿔준다.
Hadoop 개요
Hadoop : 대용량 파일을 분산저장(HDFS)하여 mapreduce 를 통해 원하는 정보를 처리한다.
- Distributed, Scalable, Fault-tolerant 를 특징으로 갖는다.
- resource 관리는 yarn이 담당한다.
- Hadoop HDFS, Hadoop MapReduce, Hadoop YARN 이 주요 구성요소이다.
- KUDU : object storage 의 immutable storage 특징을 극복할 수 있게 도와주는 SW
- HBASE : HDFS 베이스로 하는 컬럼너 NoSQL DB이다.
- IMPALA : C++ 로 구성되어 Spark 보다 빠르게 동작하지만, only-memory 라 메모리의 양이 결국 처리할 수 있는 데이터의 양이라는 단점이 있다.
- HUE : UI 이나 오히려 zepline을 사용하기도 한다.
Hadoop Stack
- 최근에는 네트워크 성능이 많이 좋아져서 Hadoop Storage 와 Computing 엔진을 별도로 두는 경우가 많다.
Apache Sentry(권한 부여) , kevros
EMR
- Master Node : 클러스터를 관리한다.
- Core Node : 데이터 노드
- Task Node : 선택적인 노드이다.
###