Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writing files to Azure blob throws exceptions #176

Open
rickersilva opened this issue Jan 20, 2025 · 0 comments
Open

Writing files to Azure blob throws exceptions #176

rickersilva opened this issue Jan 20, 2025 · 0 comments

Comments

@rickersilva
Copy link

I am not sure if this is the best way to ask for help, but I have spend a few days now trying to write some files (specifically parquet files) into my blob storage from a synapse notebook. Thank you.

Following the instructions here to access and write files to Azure blobs, I got into some exceptions that kept me from moving forward.

First, trying to load the holidays

from azureml.opendatasets import PublicHolidays

from datetime import datetime
from dateutil import parser
from dateutil.relativedelta import relativedelta


end_date = datetime.today()
start_date = datetime.today() - relativedelta(months=6)
hol = PublicHolidays(start_date=start_date, end_date=end_date)
hol_df = hol.to_spark_dataframe()

I got
TypeError: argument of type 'azureml.dataprep.rslex.StreamInfo' is not iterable at hol_df = hol.to_spark_dataframe()

This is not critical, Since I can create a sample data frame to test writing files, which is where I really have issues.

Running this code, where I remove my senstitive information.

# Input and output paths
input_folder = "dev-sandbox/input"
output_folder = "dev-sandbox/output"

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Spark configuration for Azure Blob Storage
spark.conf.set(f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net", sas_token)
# print("SCRIPT: About to print spark configuration")
# print(spark.sparkContext.getConf().getAll())
# print("SCRIPT: End of spark configuration")

# Example DataFrame
test_df = spark.createDataFrame([(1, "test"), (2, "data")], ["id", "value"])
print("SCRIPT: data frame created")

# Write DataFrame to Azure Blob Storage
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{output_folder}/test.csv"
print(f"SCRIPT: output path created: {output_path}")

print("SCRIPT: Writing data frame to the output path...")
# test_df.write.parquet(output_path).mode("overwrite")
test_df.write.mode("overwrite").csv(output_path)

print(f"SCRIPT: Data written successfully to {output_path}")

I got this error:

Py4JJavaError: An error occurred while calling o6862.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 33) (vm-8c653250 executor 1): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to wasbs://[CONTAINER_NAME]@[ACCOUNT_NAME].blob.core.windows.net/dev-sandbox/output/test.csv/.spark-staging-198d0508-1991-420e-be6e-3efb60313af2.
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:788)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:472)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:120)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:574)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: Error closing the output.
at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:1000)
at org.apache.spark.sql.catalyst.csv.UnivocityGenerator.close(UnivocityGenerator.scala:124)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CsvOutputWriter.scala:48)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseCurrentWriter(FileFormatDataWriter.scala:73)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:87)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:143)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:456)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:462)
... 15 more
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: This request is not authorized to perform this operation.
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2898)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2764)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:1245)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:1111)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at java.base/sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:341)
at java.base/sun.nio.cs.StreamEncoder.close(StreamEncoder.java:161)
at java.base/java.io.OutputStreamWriter.close(OutputStreamWriter.java:255)
at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:996)
... 23 more
Caused by: com.microsoft.azure.storage.StorageException: This request is not authorized to perform this operation.
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
at com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:735)
at com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:691)
at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:434)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2837)
... 32 more

I tried the same using a sas_token in the spark conf, and I got the same error. I understand this code here is not using the sas token but the linked services I already have up and running in my Synapse. Any way, my sas_token, has this permissions: "sp=rwdlacupiytfx".

What could be causing this kind of errors?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant