πŸ‘½ Language & Frameworks/Spark

[λŸ¬λ‹ 슀파크] λ°μ΄ν„°ν”„λ ˆμž„ μŠ€ν‚€λ§ˆ

볡만 2023. 9. 3. 01:10

슀파크 λ°μ΄ν„°ν”„λ ˆμž„μ€ νŒλ‹€μŠ€ λ°μ΄ν„°ν”„λ ˆμž„μ— 영ν–₯을 λ°›μ•„ λ§Œλ“€μ–΄μ Έ 이름 μžˆλŠ” 칼럼과 μŠ€ν‚€λ§ˆλ₯Ό 가진 λΆ„μ‚° 인메λͺ¨λ¦¬ ν…Œμ΄λΈ”μ²˜λŸΌ λ™μž‘ν•œλ‹€.

 

μŠ€ν‚€λ§ˆ Schema

 

μŠ€ν‚€λ§ˆλŠ” λ°μ΄ν„°ν”„λ ˆμž„μ˜ 칼럼 이름과 데이터 νƒ€μž…μ„ μ •μ˜ν•œ 것이닀. μŠ€ν‚€λ§ˆλŠ” 데이터λ₯Ό 읽어듀일 λ•Œ μžλ™μœΌλ‘œ μ •μ˜λ˜κ²Œ ν•  μˆ˜λ„ μžˆμ§€λ§Œ, 미리 μŠ€ν‚€λ§ˆλ₯Ό μ •μ˜ν•˜λ©΄ λ‹€μŒκ³Ό 같은 μž₯점이 μžˆλ‹€.

 

 

 πŸ’‘ μŠ€νŒŒν¬κ°€ 데이터 νƒ€μž…μ„ μΆ”μΈ‘ν•΄μ•Ό ν•˜λŠ” μ±…μž„μ„ λœμ–΄ 쀌으둜써, μŠ€ν‚€λ§ˆ 확정을 μœ„ν•œ λ³„λ„μ˜ μž‘μ„ λ§Œλ“œλŠ” 것을 λ°©μ§€ν•œλ‹€.

 πŸ’‘ λ°μ΄ν„°κ°€ μŠ€ν‚€λ§ˆμ™€ λ§žμ§€ μ•ŠλŠ” 경우 쑰기에 문제λ₯Ό λ°œκ²¬ν•  수 μžˆλ‹€.

 

 

μŠ€ν‚€λ§ˆλ₯Ό μ •μ˜ν•˜λŠ” 방법은 두가지가 μžˆλ‹€. 첫 λ²ˆμ§ΈλŠ” pyspark.sql.typesμ—μ„œ 데이터 νƒ€μž…μ„ λΆˆλŸ¬μ™€ ν”„λ‘œκ·Έλž˜λ° μŠ€νƒ€μΌλ‘œ μ •μ˜ν•˜λŠ” 것이닀. λ‹€μŒκ³Ό 같이 각 ν•„λ“œμ˜ 이름과 Type을 μ •μ˜ν•΄ StructField에 인자둜 μ£Όκ³ , 이듀을 λ¬Άμ–΄μ„œ StructType에 μ „λ‹¬ν•˜λ©΄ λœλ‹€. StructField의 μ„Έλ²ˆμ§Έ μΈμžλŠ” null 값을 ν—ˆμš©ν•  것인지λ₯Ό μ˜λ―Έν•œλ‹€.

 

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
    ]
)

 

spark의 λͺ¨λ“  데이터 νƒ€μž…μ€ μ—¬κΈ°μ—μ„œ 확인할 수 μžˆλ‹€.

 

Data Types — PySpark 3.5.0 documentation

 

spark.apache.org

 

λ‘λ²ˆμ§Έ 방법은 DDL (data definition language)을 μ‚¬μš©ν•˜λŠ” λ°©λ²•μœΌλ‘œ, 쒀더 λ‹¨μˆœν•˜κ³  읽기 쉽닀. string으둜 각 ν•„λ“œμ˜ 이름과 Type을 ,둜 κ΅¬λΆ„ν•˜μ—¬ μ£Όλ©΄ λœλ‹€.

 

schema = "id INT, name STRING, age INT"

 

 

λ°μ΄ν„°ν”„λ ˆμž„ 생성

 

μ΄λ ‡κ²Œ μ •μ˜ν•œ μŠ€ν‚€λ§ˆλ₯Ό λ°μ΄ν„°ν”„λ ˆμž„μ„ 생성할 λ•Œ createDataFame의 인수둜 같이 λ„£μ–΄μ£Όλ©΄ λœλ‹€.

 

pyspark.sql.SparkSession.createDataFrame — PySpark 3.5.0 documentation

an RDD of any kind of SQL data representation (Row, tuple, int, boolean, etc.), or list, pandas.DataFrame or numpy.ndarray.

spark.apache.org

 

df = spark.createDataFrame(data, schema)

 

λ‹€μŒκ³Ό 같이 사전 μ •μ˜λœ μŠ€ν‚€λ§ˆ λŒ€μ‹  column name의 listλ₯Ό μ£Όκ±°λ‚˜, None으둜 지정할 μˆ˜λ„ μžˆλŠ”λ°, 이 경우 λ°μ΄ν„°μ—μ„œ μŠ€ν‚€λ§ˆλ₯Ό μžλ™μœΌλ‘œ μœ μΆ”(inference)ν•œλ‹€.

 

df = spark.createDataFrame(data, schema=["id", "name", "age"], samplingRatio=None)

 

μŠ€ν‚€λ§ˆλ₯Ό μœ μΆ”ν•  λ•ŒλŠ” 전체 λ°μ΄ν„°ν”„λ ˆμž„μ˜ 일뢀 rowλ₯Ό samplingν•˜μ—¬ μ•Œμ•„λ‚Έλ‹€. samplingRatioλ₯Ό None으둜 μ„€μ •ν•˜λ©΄ 첫 번째 row만 μ½μ–΄μ„œ μŠ€ν‚€λ§ˆλ₯Ό μœ μΆ”ν•œλ‹€. 

 

κ·ΈλŸ¬λ‚˜ 이 λ°©μ‹μ—λŠ” μ£Όμ˜ν•΄μ•Ό ν•  점이 μžˆλŠ”λ°, samplingRatio만큼의 rowλ₯Ό μ½μ—ˆλŠ”λ° 값이 λͺ¨λ‘ Nullμ΄μ–΄μ„œ μŠ€ν‚€λ§ˆλ₯Ό μœ μΆ”ν•  수 μ—†λ‹€λ©΄, μ—λŸ¬κ°€ λ°œμƒν•˜κ²Œ λœλ‹€.

 

μƒμ„±λœ λ°μ΄ν„°ν”„λ ˆμž„μ—μ„œ λ‹€μŒκ³Ό 같이 μŠ€ν‚€λ§ˆλ₯Ό 뢈러올 수 μžˆλ‹€.

df_schema = df.schema

 

λ°˜μ‘ν˜•