Skip to content

Commit

Permalink
Merge pull request #510 from dcSpark/nico/mvp_python_persistence
Browse files Browse the repository at this point in the history
Nico/mvp python persistence
  • Loading branch information
nicarq authored Oct 26, 2024
2 parents 87ad7be + fe592aa commit cfbca91
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import plotly.io as pio
import json
import array
import os
import lxml
import requests
from requests.models import Response
Expand Down Expand Up @@ -137,6 +138,24 @@ json.dumps(imports)
return result;
};

// New function to find imports using regex
const findImportsUsingRegex = (code: string): string[] => {
const importRegex = /(?:from\s+(\S+)\s+import\s+[\s\S]+|import\s+(\S+))/g;
const matches = code.matchAll(importRegex);
const imports = new Set<string>();

for (const match of matches) {
if (match[1]) {
imports.add(match[1]);
}
if (match[2]) {
imports.add(match[2]);
}
}

return Array.from(imports);
};

/**
* Attempts to install dependencies for the given Python code using micropip.
*
Expand All @@ -161,20 +180,23 @@ const installDependencies = async (code: string): Promise<void> => {
console.time('install micropip dependencies');
await pyodide.loadPackage(['micropip']);
const micropip = pyodide.pyimport('micropip');
// TODO: i think is safe to remove this since we are not using pyodide-http
// await micropip.install('pyodide-http>=0.2.1');

const codeDependencies = [
// Our code wrapper contains dependencies so we need to install them
...(await findImportsFromCodeString(wrapCode(''))),
...(await findImportsFromCodeString(code)),
...findImportsUsingRegex(code), // Merge results from regex-based detection
];

// Remove duplicates by converting to a Set and back to an Array
const uniqueDependencies = Array.from(new Set(codeDependencies));

console.log(
'Trying to install the following dependencies:',
codeDependencies,
uniqueDependencies,
);

const installPromises = codeDependencies.map((dependency) =>
const installPromises = uniqueDependencies.map((dependency) =>
micropip.install(dependency),
);
await Promise.allSettled(installPromises);
Expand Down Expand Up @@ -265,21 +287,41 @@ const fetchPage = (

console.log('Posted message to main thread, waiting for response...');

Atomics.wait(syncArray, 0, 0);
console.log('atomic wait done with status: ', syncArray[0]);
const textDecoder = new TextDecoder();
let result = '';
let moreChunks = true;

if (syncArray[0] === -1) {
const textDecoder = new TextDecoder();
const errorMessage = textDecoder.decode(dataArray);
console.error('Error fetching page:', errorMessage);
throw new Error(errorMessage);
}
while (moreChunks) {
// Busy-wait loop
while (syncArray[0] === 0) {
// This loop will block the thread until syncArray[0] changes
}

const textDecoder = new TextDecoder();
let result = textDecoder.decode(dataArray);
result = result.replace(/\0/g, '').trim();
console.log('Polling done with status: ', syncArray[0]);

if (syncArray[0] === -1) {
const errorMessage = textDecoder.decode(dataArray);
console.error('Error fetching page:', errorMessage);
throw new Error(errorMessage);
}

// Read the current chunk
const chunk = textDecoder.decode(dataArray).replace(/\0/g, '').trim();
result += chunk;

console.log(`Received chunk of length: ${chunk.length}`);

console.log(`Received data of length: ${result.length}`);
// Check if more chunks are needed
if (syncArray[0] === 1) {
moreChunks = false; // Success, all chunks received
} else {
// Signal readiness for the next chunk
syncArray[0] = 0;
Atomics.notify(syncArray, 0);
}
}

console.log(`Total received data of length: ${result.length}`);
console.log('result: ', result);

return result;
Expand Down Expand Up @@ -316,8 +358,11 @@ const initialize = async () => {

// **Mount IDBFS to persist filesystem in IndexedDB**
try {
// Mount IDBFS to the persistent directory
pyodide.FS.mount(pyodide.FS.filesystems.IDBFS, { autoPersist: true }, '/');
pyodide.FS.mount(
pyodide.FS.filesystems.IDBFS,
{ autoPersist: true },
'/home/pyodide',
);

// Use syncFilesystem to synchronize the filesystem
await syncFilesystem(true);
Expand All @@ -332,15 +377,53 @@ const initialize = async () => {
console.timeEnd('initialize');
};

// Function to print contents of a directory
function printDirectoryContents(dirPath: string) {
try {
const entries = pyodide.FS.readdir(dirPath);
const folders: Array<string> = [];
const files: Array<string> = [];

entries.forEach((entry: string) => {
if (entry === '.' || entry === '..') return;
const path = `${dirPath}/${entry}`;
const stat = pyodide.FS.stat(path);
if (pyodide.FS.isDir(stat.mode)) {
folders.push(entry);
} else if (pyodide.FS.isFile(stat.mode)) {
files.push(entry);
}
});

console.log(`Contents of ${dirPath}:`);
console.log('Folders:', folders);
console.log('Files:', files);
} catch (error) {
console.error(`Error reading ${dirPath} directory:`, error);
}
}

// Function to synchronize the filesystem to IndexedDB
const syncFilesystem = async (save = false) => {
return new Promise<void>((resolve, reject) => {
pyodide.FS.syncfs(save, (err: any) => {
printDirectoryContents('/home/pyodide');

printDirectoryContents('/home/web_user');

// Print contents inside the /home directory
printDirectoryContents('/home');

printDirectoryContents('/new_mnt');

// Print contents inside the root directory
printDirectoryContents('/');

if (err) {
console.error('syncfs error:', err);
reject(err);
} else {
console.log(`syncfs ${save ? 'saved to' : 'loaded from'} IndexedDB`);
console.log(`syncfs ${save ? 'synced from' : 'synced to'} IndexedDB`);
resolve();
}
});
Expand All @@ -363,7 +446,8 @@ self.onmessage = async (event) => {
const runResult = await run(event.data.payload.code);

// // Synchronize the filesystem to save changes to IndexedDB
// await syncFilesystem(false); // Change to true to save changes
await syncFilesystem(false); // Change to true to save changes
console.log('> synced filesystem');

// Post the successful run result
self.postMessage({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,26 @@ export const usePythonRunnerRunMutation = (
const textEncoder = new TextEncoder();
const encodedData = textEncoder.encode(response.body);

if (encodedData.length > dataArray.length) {
throw new Error('Buffer size insufficient');
console.log('Required buffer size:', encodedData.length); // Log the required buffer size

let offset = 0;
while (offset < encodedData.length) {
const chunkSize = Math.min(dataArray.length, encodedData.length - offset);
dataArray.set(encodedData.subarray(offset, offset + chunkSize));
offset += chunkSize;

// Indicate that a chunk is ready
syncArray[0] = 2; // New number to indicate chunk ready
console.log('main thread> Notifying Atomics with chunk ready');
Atomics.notify(syncArray, 0);

// Polling loop to wait for the other end to be ready for the next chunk
while (syncArray[0] === 2) {
await delay(25); // Wait for 25ms before checking again
}
}

console.log(
'main thread> success ',
encodedData.length,
dataArray.length,
);
dataArray.set(encodedData);
// Indicate success after all chunks are sent
syncArray[0] = 1; // Indicate success
console.log('main thread> Notifying Atomics with success');
Atomics.notify(syncArray, 0);
Expand All @@ -106,7 +116,7 @@ export const usePythonRunnerRunMutation = (
if (error instanceof Error) {
errorMessage = error.message;
}
console.error(`main thread> error ${method.toLowerCase()}ing page`, errorMessage);
console.error(`main thread> error using ${method.toLowerCase()} with page`, errorMessage);

const textEncoder = new TextEncoder();
const encodedError = textEncoder.encode(errorMessage);
Expand Down

0 comments on commit cfbca91

Please sign in to comment.