๐Ÿ‘ฝ Language & Frameworks/Spark

[๋Ÿฌ๋‹ ์ŠคํŒŒํฌ] ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์—ฐ์‚ฐ๊ณผ ์ „์ฒ˜๋ฆฌ

๋ณต๋งŒ 2023. 11. 20. 23:23

spark์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์—ฐ์‚ฐ๋“ค์„ ์ด์šฉํ•ด ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ, ๋ณ€ํ™˜, ํ†ต๊ณ„ ๋“ฑ ๋‹ค์–‘ํ•œ ์ผ์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

๋‹ค์Œ์€ ๋ช‡๊ฐ€์ง€ ์—ฐ์‚ฐ๋“ค๊ณผ ํ™œ์šฉ ์˜ˆ์‹œ์ด๋‹ค.

 

ํ”„๋กœ์ ์…˜๊ณผ ํ•„ํ„ฐ

df = df.select(df.colA, df.colB) # ํ”„๋กœ์ ์…˜ (colA์™€ colB๋งŒ ์„ ํƒ)
df = df.where(df.colB < 5)) # ํ•„ํ„ฐ

 

 

Column ์ด๋ฆ„ ๋ณ€๊ฒฝ ๋ฐ ์ถ”๊ฐ€, ์‚ญ์ œ

df = df.withColumnRenamed("colA", "colB") # column ์ด๋ฆ„ ๋ณ€๊ฒฝ

df = df.withColumn("largeA", expr("colA > 10000")) # colA์˜ ๊ฐ’์ด 10000์ด์ƒ์ด๋ฉด True๋ฅผ ๊ฐ–๋Š” column largeA๋ฅผ ์ถ”๊ฐ€

df = df.drop("colA") # colA ์‚ญ์ œ

 

์ฐธ๊ณ ) alias์™€ withColumnRenamed์˜ ์ฐจ์ด -

  • alias: ์ฟผ๋ฆฌ ๋‚ด์—์„œ ์ž„์‹œ์ ์œผ๋กœ ์ปฌ๋Ÿผ์— ๋ณ„์นญ ๋ถ€์—ฌ
  • withColumnRenamed: ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ์ปฌ๋Ÿผ ์ด๋ฆ„์„ ์˜๊ตฌ์ ์œผ๋กœ ๋ณ€๊ฒฝ

 

 

์ง‘๊ณ„

# Null์ด ์•„๋‹Œ colA ๊ฐ’๋“ค์˜ ๊ฐ€์ง“์ˆ˜
df = (df
    .where(df.colA.isNotNull())
    .agg(countDistinct(df.colA).alias("DistinctColANum"))

# Null์ด ์•„๋‹Œ colA ๊ฐ’๋“ค์˜ ์ข…๋ฅ˜
df = (df
    .where(df.colA.isNotNull())
    .distinct())
    
# colA์˜ ๊ฐ ๊ฐ’์˜ ๊ฐฏ์ˆ˜๋ฅผ ์ƒˆ๋กœ ๋‚ด๋ฆผ์ฐจ์ˆœ์œผ๋กœ ์ •๋ ฌ
df = (df
    .groupBy("colA")
    .count()
    .orderBy("count", ascending=False))
    
# colA์˜ ํ†ต๊ณ„๊ฐ’ ๊ณ„์‚ฐ
import pyspark.sql.functions as F

df = df.select(F.sum("colA"), F.avg("colA"), F.min("colA"), F.max("colA"))

 

๋ฐ˜์‘ํ˜•