Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2195 Add s3a wrapper around run scripts #2198

Merged
merged 16 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@
<hadoop.version>2.8.5</hadoop.version>
<htrace.version>3.1.0-incubating</htrace.version>
<httpclient.version>4.4.1</httpclient.version>
<jackson.spark.datatype.version>2.10.4</jackson.spark.datatype.version>
<jackson.spark.version>2.10.4</jackson.spark.version>
<jackson.version>2.9.8</jackson.version>
<jackson.spark.datatype.version>2.6.7</jackson.spark.datatype.version>
<jackson.spark.version>2.6.7</jackson.spark.version>
<jackson.version>2.6.7</jackson.version>
<jjwt.version>0.10.7</jjwt.version>
<junit.version>4.11</junit.version>
<kafka.spark.version>0-10</kafka.spark.version>
Expand Down
7 changes: 7 additions & 0 deletions scripts/bash/enceladus_env.template.sh
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,10 @@ ADDITIONAL_JVM_EXECUTOR_CONF_CLUSTER="$KRB5_CONF_CLUSTER $TRUST_STORE_CLUSTER $T
# Switch that tells the script if it should exit if it encounters unrecognized.
# On true it prints an Error and exits with 127, on false it only prints a warning
EXIT_ON_UNRECOGNIZED_OPTIONS="true"

# Variables for the s3a wrapper implementation
MENAS_API="http://localhost:8080/menas/api"
ECS_API_BASE="https://localhost"
ECS_API_KK="$ecs_api_base/kk"
ECS_API_MAP="$ecs_api_base/map"
Zejnilovic marked this conversation as resolved.
Show resolved Hide resolved
ECS_API_KEY="MY_SECRET_KEY"
8 changes: 8 additions & 0 deletions scripts/bash/run_enceladus.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ AUTOCLEAN_STD_FOLDER=""
PERSIST_STORAGE_LEVEL=""
HELP_CALL="0"
ASYNCHRONOUSMODE="0"
JCEKS_PATH=""

# Spark configuration options
CONF_SPARK_EXECUTOR_MEMORY_OVERHEAD=""
Expand Down Expand Up @@ -326,6 +327,10 @@ case $key in
ASYNCHRONOUSMODE="1"
shift
;;
--jceks-path)
JCEKS_PATH="$2"
shift 2 # past argument and value
;;
*) # unknown option
OTHER_PARAMETERS+=("$1") # save it in an array for later
shift # past argument
Expand Down Expand Up @@ -520,6 +525,9 @@ fi
CMD_LINE="${CMD_LINE} ${ADDITIONAL_SPARK_CONF} ${SPARK_CONF}"
CMD_LINE="${CMD_LINE} --conf \"${JVM_CONF} ${ADDITIONAL_JVM_CONF}\""
CMD_LINE="${CMD_LINE} --conf \"spark.executor.extraJavaOptions=${ADDITIONAL_JVM_EXECUTOR_CONF}\""
if [[ -n $JCEKS_PATH ]]; then
CMD_LINE="${CMD_LINE} --conf \"${JCEKS_PATH}\""
fi
CMD_LINE="${CMD_LINE} --class ${CLASS} ${JAR}"

# Adding command line parameters that go AFTER the jar file
Expand Down
174 changes: 174 additions & 0 deletions scripts/bash/s3a_wrapper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#!/bin/bash

# Copyright 2018 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# We have 2 states.
# Write Standardisation
# Write Conformance
# The script will have to figure out which is being run and if it is being written to S3 or HDFS.
# In case of standardisation parsing the java properties supplied to figure out the standardisation path.
# In case of conformance calling Menas for info over REST API using supplied metadata
# Then cleaning up the paths to conform to the path schema in query provided.

# TODO - Auto load jceks files
miroslavpojer marked this conversation as resolved.
Show resolved Hide resolved

set -e

# Source env variables
source "$(dirname "$0")/enceladus_env.sh"

# Initialize variables
keytab=""
dataset_name=""
dataset_version=""
report_date=""
report_version=""

# The first argument is the name of the original script
original_script="$(dirname "$0")/$(basename "$1")"

# Shift the first argument so we can process the rest
shift

# Check if the original script exists
if [[ ! -f "$original_script" ]]; then
echo "Error: The script '$original_script' does not exist in the current directory."
exit 1
fi

# Initialize an array to hold the other arguments
other_args=()

# Loop through arguments
while [[ $# -gt 0 ]]; do
case "$1" in
--menas-auth-keytab)
keytab="$2"
shift # past argument
shift # past value
;;
--dataset-name)
dataset_name="$2"
shift # past argument
shift # past value
;;
--dataset-version)
dataset_version="$2"
shift # past argument
shift # past value
;;
--report-date)
report_date="$2"
shift # past argument
shift # past value
;;
--report-version)
report_version="$2"
shift # past argument
shift # past value
;;
*) # unknown option
other_args+=("$1") # save it in an array for later
shift # past argument
;;
esac
done

# Print the extracted variables
echo "Keytab: $keytab"
echo "Dataset Name: $dataset_name"
echo "Dataset Version: $dataset_version"
echo "Report Date: $report_date"
echo "Report Version: $report_version"

# Run klist to check for a current Kerberos ticket
if klist -s; then
echo "Kerberos ticket found."
else
echo "No Kerberos ticket found or ticket is expired. Please run kinit."
fi

# Get Dataset info
response=$(curl --negotiate -s -u : "$MENAS_API/dataset/$dataset_name/$dataset_version")
miroslavpojer marked this conversation as resolved.
Show resolved Hide resolved
miroslavpojer marked this conversation as resolved.
Show resolved Hide resolved
if [ $? -ne 0 ]; then
echo "Could not load dataset info - $dataset_name v $dataset_version from Menas at $MENAS_API"
exit 1
fi

# Parse the response using jq to extract hdfsPublishPath and hdfsPath
hdfsPublishPath=$(echo "$response" | jq -r '.hdfsPublishPath')
hdfsPath=$(echo "$response" | jq -r '.hdfsPath')

# Check if the paths are null or not
if [[ $hdfsPublishPath != "null" && $hdfsPath != "null" ]]; then
echo "hdfsPublishPath: $hdfsPublishPath"
echo "hdfsPath: $hdfsPath"
else
echo "Could not find the required paths in the response."
exit 1
fi

# Run the original script with all the arguments
"$original_script" "${other_args[@]}" \
--menas-auth-keytab "$keytab" \
--dataset-name "$dataset_name" \
--dataset-version "$dataset_version" \
--report-date "$report_date" \
--report-version "$report_version"

# Save the exit code
exit_code=$?

# Run versions cleanup for publish on s3a
if [[ $hdfsPublishPath == s3a://* ]]; then
echo "We have publish versions to clean:"
curl -X GET \
--header "x-api-key: $ECS_API_KEY" \
-d "{\"ecs_path\":\"${hdfsPublishPath#s3a://}\"}" \
Zejnilovic marked this conversation as resolved.
Show resolved Hide resolved
$ECS_API_KK
echo
curl -X DELETE \
--header "x-api-key: $ECS_API_KEY" \
-d "{\"ecs_path\":\"${hdfsPublishPath#s3a://}\"}" \
$ECS_API_KK
echo
echo "Versions cleaned"
else
echo "No publish versions to clean."
fi

if [[ $STD_HDFS_PATH == s3a://* ]]; then
STD_HDFS_PATH_FILLED="${STD_HDFS_PATH//\{0\}/$dataset_name}"
STD_HDFS_PATH_FILLED="${STD_HDFS_PATH_FILLED//\{1\}/$dataset_version}"
STD_HDFS_PATH_FILLED="${STD_HDFS_PATH_FILLED//\{2\}/$report_date}"
STD_HDFS_PATH_FILLED="${STD_HDFS_PATH_FILLED//\{3\}/$report_version}"

echo "We have tmp versions to clean:"
curl -X GET \
--header "x-api-key: $ECS_API_KEY" \
-d "{\"ecs_path\":\"${STD_HDFS_PATH_FILLED#s3a://}\"}" \
$ECS_API_KK
echo
curl -X DELETE \
--header "x-api-key: $ECS_API_KEY" \
-d "{\"ecs_path\":\"${STD_HDFS_PATH_FILLED#s3a://}\"}" \
$ECS_API_KK
echo
echo "Versions cleaned"
else
echo "No std versions to clean."
fi

# At the end of the script, use the saved exit code
exit $exit_code
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object FileSystemUtils {
path.toSimpleS3Location match {

case Some(s3Location) => // s3 over hadoop fs api
val s3BucketUri: String = s"s3://${s3Location.bucketName}" // s3://<bucket>
val s3BucketUri: String = s"${s3Location.protocol}://${s3Location.bucketName}" // s3://<bucket>
val s3uri: URI = new URI(s3BucketUri)
FileSystem.get(s3uri, hadoopConf)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.utils.fs

import org.apache.hadoop.conf.Configuration
import org.scalatest.funsuite.{AnyFunSuite, AnyFunSuiteLike}
import za.co.absa.enceladus.utils.testUtils.SparkTestBase

class FileSystemUtilsSpec extends AnyFunSuiteLike with SparkTestBase {
implicit val hadoopConf: Configuration = spark.sparkContext.hadoopConfiguration

test("hdfs protocol default") {
val fs = FileSystemUtils.getFileSystemFromPath("hdfs://my/path")
assert(fs.getUri.toString == "file:///")
}

// test("s3 protocol recognition and bucket set") {
// val fs = FileSystemUtils.getFileSystemFromPath("s3://my-bucket/my/path")
// assert(fs.getUri.toString == "s3://my-bucket")
// }
//
// test("s3a protocol recognition and bucket set") {
// val fs = FileSystemUtils.getFileSystemFromPath("s3a://my-bucket/my/path")
// assert(fs.getUri.toString == "s3a://my-bucket")
// }

}
Loading