A quick reference guide to the most commonly used patterns and functions in PySpark SQL.
- Common Patterns
- String Operations
- Number Operations
- Date & Timestamp Operations
- Array Operations
- Aggregation Operations
- Advanced Operations
If you can't find what you're looking for, check out the PySpark Official Documentation and add it here!
Follow the link for more info about configuration properties and their default values.
from pyspark.sql import SparkSession
from pyspark import SparkConf
spark = SparkSession.builder.config(
conf=(
SparkConf()
.setAppName("My-Spark-Application")
.setMaster("local[*]")
# .setMaster("spark://spark-master:7077")
.set("spark.files.overwrite", "true")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors","1")
.set("spark.dynamicAllocation.maxExecutors","6")
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "2")
.set("spark.driver.memory", "8g")
.set("spark.driver.cores", "1")
.set("spark.cores.max", "20")
)
).getOrCreate()
from pyspark.sql import SparkSession
from pyspark import SparkConf
spark = SparkSession.builder.config(
conf=(
SparkConf()
.setAppName("My-Spark-Application")
.setMaster("local[*]")
# .setMaster("spark://spark-master:7077")
.set("spark.executor.instances", "2")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.driver.memory", "4g")
.set("spark.driver.cores", "1")
.set("spark.cores.max", "4")
)
).getOrCreate()
Verify configuration values
print(
"\n",
f'spark.files.overwrite: {spark.sparkContext.getConf().get("spark.files.overwrite")}\n',
f'spark.executor.instances: {spark.sparkContext.getConf().get("spark.executor.instances")}\n',
f'spark.executor.memory: {spark.sparkContext.getConf().get("spark.executor.memory")}\n',
f'spark.executor.cores: {spark.sparkContext.getConf().get("spark.executor.cores")}\n',
f'spark.driver.memory: {spark.sparkContext.getConf().get("spark.driver.memory")}\n',
f'spark.driver.cores: {spark.sparkContext.getConf().get("spark.driver.cores")}\n',
f'spark.cores.max: {spark.sparkContext.getConf().get("spark.cores.max")}\n',
"\n",
)
# Easily reference these as F.my_function() and T.my_type() below
from pyspark.sql import types as T
from pyspark.sql import Window as W
from pyspark.sql import functions as F
# Filter on equals condition
df = df.filter(df.is_adult == 'Y')
# Filter on >, <, >=, <= condition
df = df.filter(df.age > 25)
# Multiple conditions require parentheses around each condition
df = df.filter((df.age > 25) & (df.is_adult == 'Y'))
# Compare against a list of allowed values
df = df.filter(col('first_name').isin([3, 4, 7]))
# Sort results
df = df.orderBy(df.age.asc())
df = df.orderBy(df.age.desc())
# Left join in another dataset
df = df.join(person_lookup_table, 'person_id', 'left')
# Match on different columns in left & right datasets
df = df.join(other_table, df.id == other_table.person_id, 'left')
# Match on multiple columns
df = df.join(other_table, ['first_name', 'last_name'], 'left')
# Useful for one-liner lookup code joins if you have a bunch
def lookup_and_replace(df1, df2, df1_key, df2_key, df2_value):
return (
df1
.join(df2[[df2_key, df2_value]], df1[df1_key] == df2[df2_key], 'left')
.withColumn(df1_key, F.coalesce(F.col(df2_value), F.col(df1_key)))
.drop(df2_key)
.drop(df2_value)
)
df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc)
# Add a new static column
df = df.withColumn('status', F.lit('PASS'))
# Construct a new dynamic column
condition = (
F.when((df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname))
.otherwise(F.lit('N/A'))
)
df = (
df
.withColumn('full_name', condition)
)
# Pick which columns to keep, optionally rename some
df = df.select(
'name',
'age',
F.col('dob').alias('date_of_birth'),
)
# Remove columns
df = df.drop('mod_dt', 'mod_username')
# Rename a column
df = df.withColumnRenamed('dob', 'date_of_birth')
# Keep all the columns which also occur in another dataset
df = df.select(*(F.col(c) for c in df2.columns))
# Batch Rename/Clean Columns
for col in df.columns:
df = df.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_'))
# Cast a column to a different type
df = df.withColumn('price', df.price.cast(T.DoubleType()))
# Replace all nulls with a specific value
df = df.fillna({
'first_name': 'Tom',
'age': 0,
})
# Take the first value that is not null
df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A')))
# Drop duplicate rows in a dataset (distinct)
df = df.dropDuplicates()
# Drop duplicate rows, but consider only specific columns
df = df.dropDuplicates(['name', 'height'])
# Replace empty strings with null (leave out subset keyword arg to replace in all columns)
df = df.replace({"": None}, subset=["name"])
# Convert Python/PySpark/NumPy NaN operator to null
df = df.replace(float("nan"), None)
# Contains - col.contains(string)
df = df.filter(df.name.contains('o'))
# Starts With - col.startswith(string)
df = df.filter(df.name.startswith('Al'))
# Ends With - col.endswith(string)
df = df.filter(df.name.endswith('ice'))
# Is Null - col.isNull()
df = df.filter(df.is_adult.isNull())
# Is Not Null - col.isNotNull()
df = df.filter(df.first_name.isNotNull())
# Like - col.like(string_with_sql_wildcards)
df = df.filter(df.name.like('Al%'))
# Regex Like - col.rlike(regex)
df = df.filter(df.name.rlike('[A-Z]*ice$'))
# Is In List - col.isin(*cols)
df = df.filter(df.name.isin('Bob', 'Mike'))
# Substring - col.substr(startPos, length)
df = df.withColumn('short_id', df.id.substr(0, 10))
# Trim - F.trim(col)
df = df.withColumn('name', F.trim(df.name))
# Left Pad - F.lpad(col, len, pad)
# Right Pad - F.rpad(col, len, pad)
df = df.withColumn('id', F.lpad('id', 4, '0'))
# Left Trim - F.ltrim(col)
# Right Trim - F.rtrim(col)
df = df.withColumn('id', F.ltrim('id'))
# Concatenate - F.concat(*cols)
df = df.withColumn('full_name', F.concat('fname', F.lit(' '), 'lname'))
# Concatenate with Separator/Delimiter - F.concat_ws(delimiter, *cols)
df = df.withColumn('full_name', F.concat_ws('-', 'fname', 'lname'))
# Regex Replace - F.regexp_replace(str, pattern, replacement)[source]
df = df.withColumn('id', F.regexp_replace(id, '0F1(.*)', '1F1-$1'))
# Regex Extract - F.regexp_extract(str, pattern, idx)
df = df.withColumn('id', F.regexp_extract(id, '[0-9]*', 0))
# Round - F.round(col, scale=0)
df = df.withColumn('price', F.round('price', 0))
# Floor - F.floor(col)
df = df.withColumn('price', F.floor('price'))
# Ceiling - F.ceil(col)
df = df.withColumn('price', F.ceil('price'))
# Absolute Value - F.abs(col)
df = df.withColumn('price', F.abs('price'))
# X raised to power Y β F.pow(x, y)
df = df.withColumn('exponential_growth', F.pow('x', 'y'))
# Select smallest value out of multiple columns β F.least(*cols)
df = df.withColumn('least', F.least('subtotal', 'total'))
# Select largest value out of multiple columns β F.greatest(*cols)
df = df.withColumn('greatest', F.greatest('subtotal', 'total'))
# Convert a string of known format to a date (excludes time information)
df = df.withColumn('date_of_birth', F.to_date('date_of_birth', 'yyyy-MM-dd'))
# Convert a string of known format to a timestamp (includes time information)
df = df.withColumn('time_of_birth', F.to_timestamp('time_of_birth', 'yyyy-MM-dd HH:mm:ss'))
# Get year from date: F.year(col)
# Get month from date: F.month(col)
# Get day from date: F.dayofmonth(col)
# Get hour from date: F.hour(col)
# Get minute from date: F.minute(col)
# Get second from date: F.second(col)
df = df.filter(F.year('date_of_birth') == F.lit('2017'))
# Add & subtract days
df = df.withColumn('three_days_after', F.date_add('date_of_birth', 3))
df = df.withColumn('three_days_before', F.date_sub('date_of_birth', 3))
# Add & Subtract months
df = df.withColumn('next_month', F.add_month('date_of_birth', 1))
# Get number of days between two dates
df = df.withColumn('days_between', F.datediff('start', 'end'))
# Get number of months between two dates
df = df.withColumn('months_between', F.months_between('start', 'end'))
# Keep only rows where date_of_birth is between 2017-05-10 and 2018-07-21
df = df.filter(
(F.col('date_of_birth') >= F.lit('2017-05-10')) &
(F.col('date_of_birth') <= F.lit('2018-07-21'))
)
# Column Array - F.array(*cols)
df = df.withColumn('full_name', F.array('fname', 'lname'))
# Empty Array - F.array(*cols)
df = df.withColumn('empty_array_column', F.array([]))
# Array Size/Length βΒ F.size(col)
df = df.withColumn('array_length', F.size('my_array'))
# Flatten Array β F.flatten(col)
df = df.withColumn('flattened', F.flatten('my_array'))
# Unique/Distinct Elements β F.array_distinct(col)
df = df.withColumn('unique_elements', F.array_distinct('my_array'))
# Row Count: F.count()
# Sum of Rows in Group: F.sum(*cols)
# Mean of Rows in Group: F.mean(*cols)
# Max of Rows in Group: F.max(*cols)
# Min of Rows in Group: F.min(*cols)
# First Row in Group: F.alias(*cols)
df = (
df
.groupBy('gender')
.agg(
F.max('age').alias('max_age_by_gender'),
F.min('age').alias('min_age_by_gender'),
F.mean('age').alias('mean_age_by_gender'),
)
)
# Collect a Set of all Rows in Group: F.collect_set(col)
# Collect a List of all Rows in Group: F.collect_list(col)
df = (
df
.groupBy('age')
.agg(F.collect_set('name').alias('person_names'))
)
window = (
W
.partitionBy(
"first_name",
"last_name"
)
.orderBy(
F.desc("date")
)
)
df = df.withColumn("row_number", F.row_number().over(window))
df = df.filter(F.col("row_number") == 1)
df = df.drop("row_number")
# df.repartition(num_output_partitions) -> used to increase/decrease the number of partitions
df = df.repartition(1)
# df.coalesce(num_output_partitions) -> used to decrease the number of partitions
df = df.coalesce(1)
# Multiply each row's age column by two
times_two_udf = F.udf(lambda x: x * 2)
df = df.withColumn('age', times_two_udf(df.age))
# Randomly choose a value to use as a row's name
import random
random_name_udf = F.udf(lambda: random.choice(['Bob', 'Tom', 'Amy', 'Jenna']))
df = df.withColumn('name', random_name_udf())