Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Update Read CSV Filter caching to handle modified files. #1078

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct ReadCSVFileFilterCache
usize TotalLines = 0;
usize HeadersLine = 0;
std::string Headers;
fs::file_time_type LastModifiedTime;
};

std::atomic_int32_t s_InstanceId = 0;
Expand Down Expand Up @@ -323,6 +324,47 @@ IFilter::PreflightResult readHeaders(const std::string& inputFilePath, usize hea
return {};
}

Result<> cacheHeaders(const ReadCSVData& readCsvData)
{
std::fstream in(readCsvData.inputFilePath.c_str(), std::ios_base::in);
if(!in.is_open())
{
return MakeErrorResult(to_underlying(IssueCodes::FILE_NOT_OPEN), fmt::format("Could not open file for reading: {}", readCsvData.inputFilePath));
}

usize currentLine = 0;
while(!in.eof())
{
std::string line;
std::getline(in, line);
currentLine++;

if(currentLine == readCsvData.headersLine)
{
s_HeaderCache[s_InstanceId].Headers = line;
s_HeaderCache[s_InstanceId].HeadersLine = readCsvData.headersLine;
break;
}
}

return {};
}

Result<> cacheFullFile(const ReadCSVData& readCsvData)
{
s_HeaderCache[s_InstanceId].FilePath = readCsvData.inputFilePath;
auto result = cacheHeaders(readCsvData);
if(result.invalid())
{
return result;
}

s_HeaderCache[s_InstanceId].TotalLines = nx::core::FileUtilities::LinesInFile(readCsvData.inputFilePath);
s_HeaderCache[s_InstanceId].LastModifiedTime = fs::last_write_time(readCsvData.inputFilePath);

return {};
}

} // namespace

namespace nx::core
Expand Down Expand Up @@ -431,57 +473,28 @@ IFilter::PreflightResult ReadCSVFileFilter::preflightImpl(const DataStructure& d
}

StringVector headers;
if(readCSVData.inputFilePath != s_HeaderCache[s_InstanceId].FilePath)
auto lastModifiedTime = fs::last_write_time(readCSVData.inputFilePath);
if(readCSVData.inputFilePath != s_HeaderCache[s_InstanceId].FilePath || lastModifiedTime > s_HeaderCache[s_InstanceId].LastModifiedTime)
{
int64 lineCount = nx::core::FileUtilities::LinesInFile(inputFilePath);
if(lineCount < 0)
{
return {MakeErrorResult<OutputActions>(to_underlying(IssueCodes::FILE_NOT_OPEN), fmt::format("Could not open file for reading: {}", inputFilePath)), {}};
}
std::fstream in(inputFilePath.c_str(), std::ios_base::in);
if(!in.is_open())
// File path changed or file was modified
auto result = cacheFullFile(readCSVData);
if(result.invalid())
{
return {MakeErrorResult<OutputActions>(to_underlying(IssueCodes::FILE_NOT_OPEN), fmt::format("Could not open file for reading: {}", inputFilePath)), {}};
return {ConvertResultTo<OutputActions>(ConvertResult(std::move(result)), {})};
}

s_HeaderCache[s_InstanceId].FilePath = readCSVData.inputFilePath;

usize currentLine = 0;
while(!in.eof())
{
std::string line;
std::getline(in, line);
currentLine++;

if(headerMode == ReadCSVData::HeaderMode::LINE && currentLine == readCSVData.headersLine)
{
s_HeaderCache[s_InstanceId].Headers = line;
s_HeaderCache[s_InstanceId].HeadersLine = readCSVData.headersLine;
break;
}
}

headers = StringUtilities::split(s_HeaderCache[s_InstanceId].Headers, readCSVData.delimiters, readCSVData.consecutiveDelimiters);
s_HeaderCache[s_InstanceId].TotalLines = lineCount;
}
else if(headerMode == ReadCSVData::HeaderMode::LINE)
else if(headerMode == ReadCSVData::HeaderMode::LINE && readCSVData.headersLine != s_HeaderCache[s_InstanceId].HeadersLine)
{
if(readCSVData.headersLine != s_HeaderCache[s_InstanceId].HeadersLine)
// We are in header line mode and the header line number changed
auto result = cacheHeaders(readCSVData);
if(result.invalid())
{
IFilter::PreflightResult result = readHeaders(readCSVData.inputFilePath, readCSVData.headersLine, s_HeaderCache[s_InstanceId]);
if(result.outputActions.invalid())
{
return result;
}
return {ConvertResultTo<OutputActions>(ConvertResult(std::move(result)), {})};
}

headers = StringUtilities::split(s_HeaderCache[s_InstanceId].Headers, readCSVData.delimiters, readCSVData.consecutiveDelimiters);
}

if(headerMode == ReadCSVData::HeaderMode::CUSTOM)
{
headers = readCSVData.customHeaders;
}
headers = (headerMode == ReadCSVData::HeaderMode::LINE) ? StringUtilities::split(s_HeaderCache[s_InstanceId].Headers, readCSVData.delimiters, readCSVData.consecutiveDelimiters) :
readCSVData.customHeaders;

usize totalLines = s_HeaderCache[s_InstanceId].TotalLines;

Expand Down
Loading