# Set up the env for Java libraries and enable the Spark Connect Modeimport osimport tracebackos.environ['JAVA_HOME'] = os.environ["CONDA_PREFIX"]os.environ['JAVA_LD_LIBRARY_PATH'] = os.path.join(os.environ["CONDA_PREFIX"], 'lib', 'server')os.environ["SPARK_LOCAL_HOSTNAME"] = "127.0.0.1"os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"from snowflake import snowpark_connectfrom snowflake.snowpark.context import get_active_sessionsession = get_active_session()snowpark_connect.start_session(snowpark_session = session)# Here is your normal pyspark code. You can of course have them in other Python Cellsspark = snowpark_connect.get_session()df = spark.sql("show schemas").limit(10)df.show()
Running Apache Spark in SnowflakeDataset: https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv
# Create a DataFrame with sample data using Snowparkfrom snowflake.snowpark.types import StructType, StructField, IntegerType, StringTypedata = [[2021, "test", "Albany", "M", 42]]schema = StructType([ StructField("Year", IntegerType()), StructField("First Name", StringType()), StructField("County", StringType()), StructField("Sex", StringType()), StructField("Count", IntegerType())])df1 = session.create_dataframe(data, schema=schema)df1.show() # Snowpark DataFrame show() method
# Read CSV file using Snowpark
from snowflake.snowpark.types import *
# Option 1: Read from stage using Snowpark
= session.read.option("FIELD_DELIMITER", ",").option("SKIP_HEADER", 1).csv("@aicollege.public.setup/row.csv")
df_csv
# Option 2: If the CSV is already loaded as a table, use SQL
# df_csv = session.table("your_table_name")
df_csv.show()
# Print schema information for both DataFrames
print("CSV DataFrame schema:")
1) # Show first row to see column structure
df_csv.show(print("\nSample DataFrame schema:")
1) df1.show(
# Rename column using Snowpark
from snowflake.snowpark.functions import col
= df_csv.with_column_renamed("First Name", "First_Name")
df_csv print("After renaming column:")
1) df_csv.show(
# Union DataFrames using Snowpark
= df1.union_all(df_csv) # union_all is the Snowpark equivalent
df df.show()
# Filter using Snowpark
from snowflake.snowpark.functions import col
filter(col("Count") > 50).show() df.
"Count"] > 50).show()
df.where(df[
from pyspark.sql.functions import descdf.select("First_Name", "Count").orderBy(desc("Count")).show()
= df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
subsetDF subsetDF.show()
"overwrite").saveAsTable("AICOLLEGE.PUBLIC.MYFIRSTSPARK") df.write.mode(
format("json").mode("overwrite").save(f"@aicollege.public.setup/myfirstspark") df.write.
#spark.read.format("json").json(f"@aicollege.public.setup/myfirstspark")
"Count", "upper(County) as big_name").show()
df.selectExpr(
from pyspark.sql.functions import expr
"Count", expr("lower(County) as little_name")).show() df.select(
f"SELECT * FROM AICOLLEGE.PUBLIC.MYFIRSTSPARK").show()
spark.sql(