Running Apache Spark in Snowflake

Details on how to run Apache Spark in Snowflake.
spark
snowflake
Published

August 26, 2025

Running Apache Spark in SnowflakeDataset: https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv

image-20240507155528488
# Set up the env for Java libraries and enable the Spark Connect Modeimport osimport tracebackos.environ['JAVA_HOME'] = os.environ["CONDA_PREFIX"]os.environ['JAVA_LD_LIBRARY_PATH'] = os.path.join(os.environ["CONDA_PREFIX"], 'lib', 'server')os.environ["SPARK_LOCAL_HOSTNAME"] = "127.0.0.1"os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"from snowflake import snowpark_connectfrom snowflake.snowpark.context import get_active_sessionsession = get_active_session()snowpark_connect.start_session(snowpark_session = session)# Here is your normal pyspark code. You can of course have them in other Python Cellsspark = snowpark_connect.get_session()df = spark.sql("show schemas").limit(10)df.show()
# Create a DataFrame with sample data using Snowparkfrom snowflake.snowpark.types import StructType, StructField, IntegerType, StringTypedata = [[2021, "test", "Albany", "M", 42]]schema = StructType([    StructField("Year", IntegerType()),    StructField("First Name", StringType()),    StructField("County", StringType()),    StructField("Sex", StringType()),    StructField("Count", IntegerType())])df1 = session.create_dataframe(data, schema=schema)df1.show()  # Snowpark DataFrame show() method
# Read CSV file using Snowpark
from snowflake.snowpark.types import *

# Option 1: Read from stage using Snowpark
df_csv = session.read.option("FIELD_DELIMITER", ",").option("SKIP_HEADER", 1).csv("@aicollege.public.setup/row.csv")

# Option 2: If the CSV is already loaded as a table, use SQL
# df_csv = session.table("your_table_name")

df_csv.show()
# Print schema information for both DataFrames
print("CSV DataFrame schema:")
df_csv.show(1)  # Show first row to see column structure
print("\nSample DataFrame schema:")
df1.show(1)
# Rename column using Snowpark
from snowflake.snowpark.functions import col

df_csv = df_csv.with_column_renamed("First Name", "First_Name")
print("After renaming column:")
df_csv.show(1)
# Union DataFrames using Snowpark
df = df1.union_all(df_csv)  # union_all is the Snowpark equivalent
df.show()
# Filter using Snowpark
from snowflake.snowpark.functions import col

df.filter(col("Count") > 50).show()
df.where(df["Count"] > 50).show()
from pyspark.sql.functions import descdf.select("First_Name", "Count").orderBy(desc("Count")).show()
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
subsetDF.show()
df.write.mode("overwrite").saveAsTable("AICOLLEGE.PUBLIC.MYFIRSTSPARK")
df.write.format("json").mode("overwrite").save(f"@aicollege.public.setup/myfirstspark")
#spark.read.format("json").json(f"@aicollege.public.setup/myfirstspark")
df.selectExpr("Count", "upper(County) as big_name").show()
from pyspark.sql.functions import expr
df.select("Count", expr("lower(County) as little_name")).show()
spark.sql(f"SELECT * FROM AICOLLEGE.PUBLIC.MYFIRSTSPARK").show()