Skip to content

Commit

Permalink
fix recursiveness
Browse files Browse the repository at this point in the history
  • Loading branch information
nicarq committed Jan 7, 2025
1 parent da90730 commit 067483a
Showing 1 changed file with 125 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -376,51 +376,103 @@ export const PythonCodeRunner = ({ code, jobId, nodeAddress, token }: PythonCode
return localContent.length !== remoteFile.size;
}

// Recursively mirror job files into /home/pyodide
async function syncJobFilesToIDBFS() {
if (!pyodideMain || !jobContents) return;
// Utility: ensure subdirectories exist before writing a file
function ensureDirExists(dirPath: string) {
if (!pyodideMain) {
console.error('Pyodide is not initialized.');
return;
}

const fs = pyodideMain.FS;
const parts = dirPath.split('/').filter(Boolean);
let currentPath = '';
for (const part of parts) {
currentPath += '/' + part;
try {
fs.mkdir(currentPath);
} catch (err) {
// Ignore EEXIST
if (!(err instanceof Error && err.message.includes('File exists'))) {
throw err;
}
}
}
}

console.time('Sync Job Files to IDBFS');
setIsSyncing(true);
try {
const idbfsContents = readIDBFSContents('/home/pyodide') ?? [];
const idbfsNames = new Set(idbfsContents.map(e => e.name));
const jobFileNames = new Set(jobContents.map(item => item.name));
// Recursively remove anything in IDBFS that is not in the job
function removeStaleIDBFSItems(idbDirPath: string, jobPathsSet: Set<string>) {
if (!pyodideMain) {
console.error('Pyodide is not initialized.');
return;
}

// Remove anything from IDBFS that’s no longer in the job
for (const entry of idbfsContents) {
if (!jobFileNames.has(entry.name)) {
const fs = pyodideMain.FS;
const entries = fs.readdir(idbDirPath);
for (const entry of entries) {
if (entry === '.' || entry === '..') continue;
const fullPath = `${idbDirPath}/${entry}`;
const stat = fs.stat(fullPath);
if (fs.isDir(stat.mode)) {
removeStaleIDBFSItems(fullPath, jobPathsSet); // recurse
// If folder is not in jobPathsSet itself, remove it
if (!jobPathsSet.has(fullPath)) {
try {
if (entry.type === 'directory') {
pyodideMain.FS.rmdir(`/home/pyodide/${entry.name}`);
} else {
pyodideMain.FS.unlink(`/home/pyodide/${entry.name}`);
}
console.log(`Removed ${entry.name} from IDBFS`);
fs.rmdir(fullPath);
console.log(`Removed directory ${fullPath} from IDBFS`);
} catch (err) {
console.error(`Failed removing directory ${fullPath}:`, err);
}
}
} else {
// If file is not in the job set, remove it
if (!jobPathsSet.has(fullPath)) {
try {
fs.unlink(fullPath);
console.log(`Removed file ${fullPath} from IDBFS`);
} catch (err) {
console.error(`Failed removing ${entry.name}:`, err);
console.error(`Failed removing file ${fullPath}:`, err);
}
}
}
}
}

// Add or update anything that’s missing in IDBFS
// Recursively mirror job files into /home/pyodide
async function syncJobFilesToIDBFS() {
if (!pyodideMain || !jobContents) {
console.error('Pyodide is not initialized or job contents are missing.');
return;
}

console.time('Sync Job Files to IDBFS');
setIsSyncing(true);
try {
// Build a Set of all fullPaths from the job
const jobPathsSet = new Set<string>();
for (const remoteFile of jobContents) {
const fullDirPath = `/home/pyodide/${remoteFile.path.replace(/^\//, '')}`;
jobPathsSet.add(fullDirPath);
if (!remoteFile.is_directory) {
if (!idbfsNames.has(remoteFile.name)) {
await syncFileToIDBFS(remoteFile);
}
jobPathsSet.add(fullDirPath);
}
}

// First, remove anything from IDBFS that is no longer in the job
removeStaleIDBFSItems('/home/pyodide', jobPathsSet);

// Next, add or update items from the job
for (const remoteFile of jobContents) {
const fullPath = `/home/pyodide/${remoteFile.path.replace(/^\//, '')}`;
if (remoteFile.is_directory) {
ensureDirExists(fullPath);
} else {
try {
const stat = pyodideMain.FS.stat(`/home/pyodide/${remoteFile.name}`);
if (!pyodideMain.FS.isDir(stat.mode)) {
throw new Error(`${remoteFile.name} is not a directory`);
}
} catch {
pyodideMain.FS.mkdir(`/home/pyodide/${remoteFile.name}`);
}
const dirOnly = fullPath.substring(0, fullPath.lastIndexOf('/'));
ensureDirExists(dirOnly);
await syncFileToIDBFS(remoteFile);
}
}

// Finally, push changes to IDB
await new Promise<void>((resolve, reject) => {
pyodideMain?.FS.syncfs(false, (err: Error | null) => {
if (err) {
Expand All @@ -440,6 +492,46 @@ export const PythonCodeRunner = ({ code, jobId, nodeAddress, token }: PythonCode
}
}

// Recursive function to traverse directories and upload files
async function traverseAndUpload(
entries: FileSystemEntry[],
basePath: string,
timeBefore: number,
nodeAddress: string,
token: string,
jobId: string
) {
for (const entry of entries) {
const fullPath = `${basePath}/${entry.name}`;
if (entry.type === 'file' && entry.mtimeMs !== undefined) {
const mtimeInMs = entry.mtimeMs;
const mtimeISO = new Date(mtimeInMs).toISOString();
const timeBeforeISO = new Date(timeBefore).toISOString();
console.log(`Checking file: ${fullPath}, Modified Time: ${mtimeISO}, Time Before: ${timeBeforeISO}`);
if (mtimeInMs > timeBefore) {
console.log(`Uploading changed file: ${fullPath}`);
try {
const blob = new Blob([entry.content ?? ''], { type: 'text/plain' });
const file = new File([blob], entry.name, { type: 'text/plain' });
await addFileToJob(nodeAddress, token, {
filename: fullPath.replace('/home/pyodide/', ''), // Adjust path for upload
job_id: jobId,
file,
});
console.log(`Successfully uploaded file: ${fullPath}`);
} catch (error) {
console.error(`Failed to upload file ${fullPath}:`, error);
}
} else {
console.log(`No changes detected for file: ${fullPath}`);
}
} else if (entry.type === 'directory' && entry.contents) {
// Recurse into subdirectory
await traverseAndUpload(entry.contents, fullPath, timeBefore, nodeAddress, token, jobId);
}
}
}

// Function to compare and upload files based on mtime
async function compareAndUploadFiles(timeBefore: number | null) {
console.log('compareAndUploadFiles', timeBefore);
Expand All @@ -457,31 +549,8 @@ export const PythonCodeRunner = ({ code, jobId, nodeAddress, token }: PythonCode
return;
}

for (const entry of idbfsContents) {
if (entry.type === 'file' && entry.mtimeMs !== undefined) {
const mtimeInMs = entry.mtimeMs / 1000; // Convert mtime to milliseconds
const mtimeISO = new Date(mtimeInMs).toISOString();
const timeBeforeISO = new Date(timeBefore).toISOString();
console.log(`Checking file: ${entry.name}, Modified Time: ${mtimeISO}, Time Before: ${timeBeforeISO}`);
if (mtimeInMs > timeBefore) {
console.log(`Uploading changed file: ${entry.name}`);
try {
const blob = new Blob([entry.content ?? ''], { type: 'text/plain' });
const file = new File([blob], entry.name, { type: 'text/plain' });
await addFileToJob(nodeAddress, token, {
filename: entry.name,
job_id: jobId,
file,
});
console.log(`Successfully uploaded file: ${entry.name}`);
} catch (error) {
console.error(`Failed to upload file ${entry.name}:`, error);
}
} else {
console.log(`No changes detected for file: ${entry.name}`);
}
}
}
// Start the recursive traversal from the root directory
await traverseAndUpload(idbfsContents, '/home/pyodide', timeBefore, nodeAddress, token, jobId);
} catch (error) {
console.error('Failed to compare/upload files:', error);
} finally {
Expand Down

0 comments on commit 067483a

Please sign in to comment.