Running Apache Spark in Snowflake

Details on how to run Apache Spark in Snowflake.
spark
snowflake
Published

August 26, 2025

Running Apache Spark in SnowflakeDataset: https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv

image-20240507155528488
# Snowflake Connection Setup for Cursor/Jupyter
import os
from dotenv import load_dotenv
from snowflake.snowpark import Session
# from snowflake import snowpark_connect  # Not available in standard installations

# Load environment variables from .env file
load_dotenv()

# Connection parameters - Update these with your Snowflake credentials
# connection_parameters = {
#     'account': os.getenv('SNOWFLAKE_ACCOUNT', 'your_account_identifier'),
#     'user': os.getenv('SNOWFLAKE_USER', 'your_username'), 
#     'password': os.getenv('SNOWFLAKE_PASSWORD', 'your_password'),
#     'role': os.getenv('SNOWFLAKE_ROLE', 'SYSADMIN'),
#     'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE', 'COMPUTE_WH'),
#     'database': os.getenv('SNOWFLAKE_DATABASE', 'AICOLLEGE'),
#     'schema': os.getenv('SNOWFLAKE_SCHEMA', 'PUBLIC')
# }

# Alternative: Import from config file
from snowflake_config import SNOWFLAKE_CONFIG
connection_parameters = SNOWFLAKE_CONFIG

try:
    # Create Snowpark session
    session = Session.builder.configs(connection_parameters).create()
    
    # Note: We'll use Snowpark DataFrames instead of Spark DataFrames
    # Snowpark provides similar functionality to Spark for data processing
    
    print("✅ Connected to Snowflake successfully!")
    print(f"Current database: {session.get_current_database()}")
    print(f"Current schema: {session.get_current_schema()}")
    
except Exception as e:
    print(f"❌ Connection failed: {str(e)}")
    print("Please check your Snowflake credentials in snowflake_config.py or .env file")
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[3], line 22
      8 load_dotenv()
     10 # Connection parameters - Update these with your Snowflake credentials
     11 # connection_parameters = {
     12 #     'account': os.getenv('SNOWFLAKE_ACCOUNT', 'your_account_identifier'),
   (...)     20 
     21 # Alternative: Import from config file
---> 22 from snowflake_config import SNOWFLAKE_CONFIG
     23 connection_parameters = SNOWFLAKE_CONFIG
     25 try:
     26     # Create Snowpark session

ModuleNotFoundError: No module named 'snowflake_config'
# Set up the env for Java libraries and enable the Spark Connect Modeimport osimport tracebackos.environ['JAVA_HOME'] = os.environ["CONDA_PREFIX"]os.environ['JAVA_LD_LIBRARY_PATH'] = os.path.join(os.environ["CONDA_PREFIX"], 'lib', 'server')os.environ["SPARK_LOCAL_HOSTNAME"] = "127.0.0.1"os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"from snowflake import snowpark_connectfrom snowflake.snowpark.context import get_active_sessionsession = get_active_session()snowpark_connect.start_session(snowpark_session = session)# Here is your normal pyspark code. You can of course have them in other Python Cellsspark = snowpark_connect.get_session()df = spark.sql("show schemas").limit(10)df.show()
# Create a DataFrame with sample data using Snowparkfrom snowflake.snowpark.types import StructType, StructField, IntegerType, StringTypedata = [[2021, "test", "Albany", "M", 42]]schema = StructType([    StructField("Year", IntegerType()),    StructField("First Name", StringType()),    StructField("County", StringType()),    StructField("Sex", StringType()),    StructField("Count", IntegerType())])df1 = session.create_dataframe(data, schema=schema)df1.show()  # Snowpark DataFrame show() method
# Read CSV file using Snowpark
from snowflake.snowpark.types import *

# Option 1: Read from stage using Snowpark
df_csv = session.read.option("FIELD_DELIMITER", ",").option("SKIP_HEADER", 1).csv("@aicollege.public.setup/row.csv")

# Option 2: If the CSV is already loaded as a table, use SQL
# df_csv = session.table("your_table_name")

df_csv.show()
# Print schema information for both DataFrames
print("CSV DataFrame schema:")
df_csv.show(1)  # Show first row to see column structure
print("\nSample DataFrame schema:")
df1.show(1)
# Rename column using Snowpark
from snowflake.snowpark.functions import col

df_csv = df_csv.with_column_renamed("First Name", "First_Name")
print("After renaming column:")
df_csv.show(1)
# Union DataFrames using Snowpark
df = df1.union_all(df_csv)  # union_all is the Snowpark equivalent
df.show()
# Filter using Snowpark
from snowflake.snowpark.functions import col

df.filter(col("Count") > 50).show()
df.where(df["Count"] > 50).show()
from pyspark.sql.functions import descdf.select("First_Name", "Count").orderBy(desc("Count")).show()
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
subsetDF.show()
df.write.mode("overwrite").saveAsTable("AICOLLEGE.PUBLIC.MYFIRSTSPARK")
df.write.format("json").mode("overwrite").save(f"@aicollege.public.setup/myfirstspark")
#spark.read.format("json").json(f"@aicollege.public.setup/myfirstspark")
df.selectExpr("Count", "upper(County) as big_name").show()
from pyspark.sql.functions import expr
df.select("Count", expr("lower(County) as little_name")).show()
spark.sql(f"SELECT * FROM AICOLLEGE.PUBLIC.MYFIRSTSPARK").show()