Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Nov 27, 2024
1 parent 9cb4099 commit dd5de10
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 125 deletions.
92 changes: 60 additions & 32 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ namespace facebook::velox::filesystems {

AbfsConfig::AbfsConfig(
std::string_view path,
const config::ConfigBase& config,
bool initDfsClient) {
const config::ConfigBase& config) {
std::string_view file;
bool isHttps = true;
isHttps_ = true;
if (path.find(kAbfssScheme) == 0) {
file = path.substr(kAbfssScheme.size());
} else if (path.find(kAbfsScheme) == 0) {
file = path.substr(kAbfsScheme.size());
isHttps = false;
isHttps_ = false;
} else {
VELOX_FAIL("Invalid ABFS Path {}", path);
}
Expand All @@ -42,39 +41,24 @@ AbfsConfig::AbfsConfig(
fileSystem_ = file.substr(0, firstAt);
auto firstSep = file.find_first_of("/");
filePath_ = file.substr(firstSep + 1);
auto accountNameWithSuffix = file.substr(firstAt + 1, firstSep - firstAt - 1);
std::string accountNameWithSuffixForUrl(accountNameWithSuffix);
if (!initDfsClient) {
// We should use correct suffix for blob client.
size_t start_pos = accountNameWithSuffixForUrl.find("dfs");
if (start_pos != std::string::npos) {
accountNameWithSuffixForUrl.replace(start_pos, 3, "blob");
}
}

url_ = fmt::format(
"{}{}/{}/{}",
isHttps ? "https://" : "http://",
accountNameWithSuffixForUrl,
fileSystem_,
filePath_);
accountNameWithSuffix_ = file.substr(firstAt + 1, firstSep - firstAt - 1);

auto authTypeKey =
fmt::format("{}.{}", kAzureAccountAuthType, accountNameWithSuffix);
fmt::format("{}.{}", kAzureAccountAuthType, accountNameWithSuffix_);
authType_ = "SharedKey";
if (config.valueExists(authTypeKey)) {
authType_ = config.get<std::string>(authTypeKey).value();
}
if (authType_ == "SharedKey") {
auto credKey =
fmt::format("{}.{}", kAzureAccountKey, accountNameWithSuffix);
fmt::format("{}.{}", kAzureAccountKey, accountNameWithSuffix_);
VELOX_USER_CHECK(
config.valueExists(credKey), "Config {} not found", credKey);
auto firstDot = accountNameWithSuffix.find_first_of(".");
auto accountName = accountNameWithSuffix.substr(0, firstDot);
auto endpointSuffix = accountNameWithSuffix.substr(firstDot + 5);
auto firstDot = accountNameWithSuffix_.find_first_of(".");
auto accountName = accountNameWithSuffix_.substr(0, firstDot);
auto endpointSuffix = accountNameWithSuffix_.substr(firstDot + 5);
std::stringstream ss;
ss << "DefaultEndpointsProtocol=" << (isHttps ? "https" : "http");
ss << "DefaultEndpointsProtocol=" << (isHttps_ ? "https" : "http");
ss << ";AccountName=" << accountName;
ss << ";AccountKey=" << config.get<std::string>(credKey).value();
ss << ";EndpointSuffix=" << endpointSuffix;
Expand All @@ -87,11 +71,11 @@ AbfsConfig::AbfsConfig(
connectionString_ = ss.str();
} else if (authType_ == "OAuth") {
auto clientIdKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientId, accountNameWithSuffix);
"{}.{}", kAzureAccountOAuth2ClientId, accountNameWithSuffix_);
auto clientSecretKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientSecret, accountNameWithSuffix);
"{}.{}", kAzureAccountOAuth2ClientSecret, accountNameWithSuffix_);
auto clientEndpointKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientEndpoint, accountNameWithSuffix);
"{}.{}", kAzureAccountOAuth2ClientEndpoint, accountNameWithSuffix_);
VELOX_USER_CHECK(
config.valueExists(clientIdKey), "Config {} not found", clientIdKey);
VELOX_USER_CHECK(
Expand All @@ -116,15 +100,59 @@ AbfsConfig::AbfsConfig(
config.get<std::string>(clientSecretKey).value(),
options);
} else if (authType_ == "SAS") {
auto sasKey = fmt::format("{}.{}", kAzureSASKey, accountNameWithSuffix);
auto sasKey = fmt::format("{}.{}", kAzureSASKey, accountNameWithSuffix_);
VELOX_USER_CHECK(config.valueExists(sasKey), "Config {} not found", sasKey);
urlWithSasToken_ =
fmt::format("{}?{}", url_, config.get<std::string>(sasKey).value());
sas_ = config.get<std::string>(sasKey).value();
} else {
VELOX_USER_FAIL(
"Unsupported auth type {}, supported auth types are SharedKey, OAuth and SAS.",
authType_);
}
}

std::unique_ptr<BlobClient> AbfsConfig::getReadClient() {
if (authType_ == "SAS") {
auto url = getUrl(true);
return std::make_unique<BlobClient>(fmt::format("{}?{}", url, sas_));
} else if (authType_ == "OAuth") {
auto url = getUrl(true);
return std::make_unique<BlobClient>(url, tokenCredential_);
} else {
return std::make_unique<BlobClient>(BlobClient::CreateFromConnectionString(
connectionString_, fileSystem_, filePath_));
}
}

std::unique_ptr<DataLakeFileClient> AbfsConfig::getWriteClient() {
if (authType_ == "SAS") {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(
fmt::format("{}?{}", url, sas_));
} else if (authType_ == "OAuth") {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(url, tokenCredential_);
} else {
return std::make_unique<DataLakeFileClient>(
DataLakeFileClient::CreateFromConnectionString(
connectionString_, fileSystem_, filePath_));
}
}

std::string AbfsConfig::getUrl(bool withblobSuffix) {
std::string accountNameWithSuffixForUrl(accountNameWithSuffix_);
if (withblobSuffix) {
// We should use correct suffix for blob client.
size_t start_pos = accountNameWithSuffixForUrl.find("dfs");
if (start_pos != std::string::npos) {
accountNameWithSuffixForUrl.replace(start_pos, 3, "blob");
}
}
return fmt::format(
"{}{}/{}/{}",
isHttps_ ? "https://" : "http://",
accountNameWithSuffixForUrl,
fileSystem_,
filePath_);
}

} // namespace facebook::velox::filesystems
73 changes: 37 additions & 36 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,88 +17,89 @@
#pragma once

#include <azure/core/credentials/credentials.hpp>
#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/files/datalake.hpp>
#include <folly/hash/Hash.h>
#include <string>

using namespace Azure::Storage::Blobs;
using namespace Azure::Storage::Files::DataLake;

namespace facebook::velox::config {
class ConfigBase;
}

namespace facebook::velox::filesystems {

// This is used to specify the Azurite endpoint in testing.
static std::string kAzureBlobEndpoint{"fs.azure.blob-endpoint"};
static constexpr const char* kAzureBlobEndpoint{"fs.azure.blob-endpoint"};

// The authentication mechanism is set in `fs.azure.account.auth.type` (or the
// account specific variant). The supported values are SharedKey, OAuth and SAS.
static std::string kAzureAccountAuthType{"fs.azure.account.auth.type"};
static constexpr const char* kAzureAccountAuthType =
"fs.azure.account.auth.type";

static std::string kAzureAccountKey{"fs.azure.account.key"};
static constexpr const char* kAzureAccountKey = "fs.azure.account.key";

static std::string kAzureSASKey{"fs.azure.sas.fixed.token"};
static constexpr const char* kAzureSASKey = "fs.azure.sas.fixed.token";

static std::string kAzureAccountOAuth2ClientId{
"fs.azure.account.oauth2.client.id"};
static std::string kAzureAccountOAuth2ClientSecret{
"fs.azure.account.oauth2.client.secret"};
static constexpr const char* kAzureAccountOAuth2ClientId =
"fs.azure.account.oauth2.client.id";
static constexpr const char* kAzureAccountOAuth2ClientSecret =
"fs.azure.account.oauth2.client.secret";

// Token end point, this can be found through Azure portal. For example:
// https://login.microsoftonline.com/{TENANTID}/oauth2/token
static std::string kAzureAccountOAuth2ClientEndpoint{
"fs.azure.account.oauth2.client.endpoint"};
static constexpr const char* kAzureAccountOAuth2ClientEndpoint =
"fs.azure.account.oauth2.client.endpoint";

class AbfsConfig {
public:
explicit AbfsConfig(
std::string_view path,
const config::ConfigBase& config,
bool initDfsClient);
explicit AbfsConfig(std::string_view path, const config::ConfigBase& config);

std::string authType() const {
return authType_;
}
std::unique_ptr<BlobClient> getReadClient();

std::string fileSystem() const {
return fileSystem_;
}
std::unique_ptr<DataLakeFileClient> getWriteClient();

std::string filePath() const {
return filePath_;
}

std::string connectionString() const {
return connectionString_;
}

std::string url() const {
return url_;
}

std::string urlWithSasToken() const {
return urlWithSasToken_;
/// Test only.
std::string fileSystem() const {
return fileSystem_;
}

std::shared_ptr<Azure::Core::Credentials::TokenCredential> tokenCredential()
const {
return tokenCredential_;
/// Test only.
std::string connectionString() const {
return connectionString_;
}

/// Test only.
std::string tenentId() const {
return tenentId_;
}

/// Test only.
std::string authorityHost() const {
return authorityHost_;
}

private:
std::string getUrl(bool withblobSuffix);

std::string authType_;

// Container name is called FileSystem in some Azure API.
std::string fileSystem_;
std::string filePath_;
std::string authType_;
std::string connectionString_;
std::string urlWithSasToken_;
std::string url_;

bool isHttps_;
std::string accountNameWithSuffix_;

std::string sas_;

std::string tenentId_;
std::string authorityHost_;
std::shared_ptr<Azure::Core::Credentials::TokenCredential> tokenCredential_;
Expand Down
17 changes: 2 additions & 15 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"

#include <azure/storage/blobs/blob_client.hpp>
#include <fmt/format.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <glog/logging.h>
Expand All @@ -27,28 +26,16 @@
#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"

namespace facebook::velox::filesystems {
using namespace Azure::Storage::Blobs;

class AbfsReadFile::Impl {
constexpr static uint64_t kNaturalReadSize = 4 << 20; // 4M
constexpr static uint64_t kReadConcurrency = 8;

public:
explicit Impl(std::string_view path, const config::ConfigBase& config) {
auto abfsConfig = AbfsConfig(path, config, false);
auto abfsConfig = AbfsConfig(path, config);
filePath_ = abfsConfig.filePath();
if (abfsConfig.authType() == "SAS") {
fileClient_ = std::make_unique<BlobClient>(abfsConfig.urlWithSasToken());
} else if (abfsConfig.authType() == "OAuth") {
fileClient_ = std::make_unique<BlobClient>(
abfsConfig.url(), abfsConfig.tokenCredential());
} else {
fileClient_ =
std::make_unique<BlobClient>(BlobClient::CreateFromConnectionString(
abfsConfig.connectionString(),
abfsConfig.fileSystem(),
filePath_));
}
fileClient_ = abfsConfig.getReadClient();
}

void initialize(const FileOptions& options) {
Expand Down
26 changes: 5 additions & 21 deletions velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
*/

#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"
#include <azure/storage/files/datalake.hpp>
#include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

using namespace Azure::Storage::Files::DataLake;

namespace facebook::velox::filesystems {
class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
public:
Expand All @@ -31,7 +28,8 @@ class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
client_->Create();
}

Models::PathProperties getProperties() override {
Azure::Storage::Files::DataLake::Models::PathProperties getProperties()
override {
return client_->GetProperties().Value;
}

Expand Down Expand Up @@ -120,23 +118,9 @@ class AbfsWriteFile::Impl {
AbfsWriteFile::AbfsWriteFile(
std::string_view path,
const config::ConfigBase& config) {
auto abfsConfig = AbfsConfig(path, config, true);
std::unique_ptr<AzureDataLakeFileClient> clientWrapper;
if (abfsConfig.authType() == "SAS") {
clientWrapper = std::make_unique<DataLakeFileClientWrapper>(
std::make_unique<DataLakeFileClient>(abfsConfig.urlWithSasToken()));
} else if (abfsConfig.authType() == "OAuth") {
clientWrapper = std::make_unique<DataLakeFileClientWrapper>(
std::make_unique<DataLakeFileClient>(
abfsConfig.url(), abfsConfig.tokenCredential()));
} else {
clientWrapper = std::make_unique<DataLakeFileClientWrapper>(
std::make_unique<DataLakeFileClient>(
DataLakeFileClient::CreateFromConnectionString(
abfsConfig.connectionString(),
abfsConfig.fileSystem(),
abfsConfig.filePath())));
}
auto abfsConfig = AbfsConfig(path, config);
std::unique_ptr<AzureDataLakeFileClient> clientWrapper =
std::make_unique<DataLakeFileClientWrapper>(abfsConfig.getWriteClient());
impl_ = std::make_unique<Impl>(path, clientWrapper);
}

Expand Down
Loading

0 comments on commit dd5de10

Please sign in to comment.