Skip to content

Commit

Permalink
Merge pull request #5 from cyclic-software/promises
Browse files Browse the repository at this point in the history
Promises
  • Loading branch information
seekayel authored Jan 4, 2023
2 parents e5e7633 + 961ce2d commit 00fdaa8
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 57 deletions.
85 changes: 84 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,87 @@
# @cyclic.sh/s3fs

Drop in replacement for Node.js fs library backed by AWS s3.
Drop in replacement for the Node.js `fs` library backed by AWS S3.

Require in the same format as Node.js `fs`, specifying an S3 Bucket:
- Callbacks and Sync methods:
```js
const fs = require('@cyclic.sh/s3fs')(S3_BUCKET_NAME)
```
- Promises
```js
const fs = require('@cyclic.sh/s3fs/promises')(S3_BUCKET_NAME)
```

## Supported methods
`@cyclic.sh/s3fs` supports the following `fs` methods operating on AWS S3:
- [x] fs.writeFile(filename, data, [options], callback)
- [x] promise
- [x] cb
- [x] sync
- [x] fs.readFile(filename, [options], callback)
- [x] promise
- [x] cb
- [x] sync
- [x] fs.exists(path, callback)
- [x] promise
- [x] cb
- [x] sync
- [ ] fs.readdir(path, callback)
- [ ] fs.mkdir(path, [mode], callback)
- [ ] fs.rmdir(path, callback)
- [x] fs.stat(path, callback)
- [x] promise
- [x] cb
- [x] sync
- [ ] fs.lstat(path, callback)
- [ ] fs.createReadStream(path, [options])
- [ ] fs.createWriteStream(path, [options])
- [ ] fs.unlink(path, callback)
- [ ] fs.rm(path, callback)

## Example Usage
### Authentication
Authenticating the client can be done with one of two ways:
- **Environment Variables** - the internal S3 client will use AWS credentials if set in the environment
```
AWS_REGION
AWS_ACCESS_KEY_ID
AWS_SECRET_KEY
AWS_SECRET_ACCESS_KEY
```
- **Client Credentials** - the library also accepts standard S3 client parameters at initialization:
```js
const fs = require('@cyclic.sh/s3fs')(S3_BUCKET_NAME, {
region: ...
credentials: {...}
})
```
- **Local Mode** - When no credentials are available - the client will fall back to using `fs` and the local filesystem with a warning.


### Using Methods
The supported methods have the same API as Node.js `fs`:
- Sync
```js
const fs = require('@cyclic.sh/s3fs')(S3_BUCKET_NAME)
const json = JSON.parse(fs.readFileSync('test/_read.json'))
```
- Callbacks
```js
const fs = require('@cyclic.sh/s3fs')(S3_BUCKET_NAME)
fs.readFile('test/_read.json', (error,data)=>{
const json = JSON.parse(data)
})
```
- Promises
```js
const fs = require('@cyclic.sh/s3fs/promises')(S3_BUCKET_NAME)
async function run(){
const json = JSON.parse(await fs.readFile('test/_read.json'))
}
```

refer to fs, s3fs:

- https://github.com/TooTallNate/s3fs
- https://nodejs.org/docs/latest-v0.10.x/api/fs.html#fs_fs_mkdir_path_mode_callback
110 changes: 110 additions & 0 deletions src/CyclicS3FSPromises.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
const {
S3Client,
GetObjectCommand,
PutObjectCommand,
HeadObjectCommand,
} = require("@aws-sdk/client-s3");

const {Stats} = require('fs')
const sync_interface = require('./sync_interface');
function streamToBuffer(stream) {
return new Promise((resolve, reject) => {
const chunks = [];
stream.on("data", (chunk) => chunks.push(chunk));
stream.on("error", reject);
stream.on("end", () => resolve(Buffer.concat(chunks))); // can call .toString("utf8") on the buffer
});
}

class CyclicS3FSPromises{
constructor(bucketName, config={}) {
this.bucket = bucketName
this.config = config
this.s3 = new S3Client({...config});
}

async readFile(fileName ,options){
const cmd = new GetObjectCommand({
Bucket: this.bucket,
Key: fileName,
})

let obj = await this.s3.send(cmd)
obj = await streamToBuffer(obj.Body)
return obj
}

async writeFile(fileName, data, options={}){
const cmd = new PutObjectCommand({
Bucket: this.bucket,
Key: fileName,
Body: data
})
await this.s3.send(cmd)
}

async exists(fileName, data, options={}){
const cmd = new HeadObjectCommand({
Bucket: this.bucket,
Key: fileName
})
let exists
try{
let res = await this.s3.send(cmd)
if(res.LastModified){
exists = true
}
}catch(e){
if(e.name === 'NotFound'){
exists = false
}else{
throw e
}
}
return exists
}

async stat(fileName, data, options={}){
const cmd = new HeadObjectCommand({
Bucket: this.bucket,
Key: fileName
})
let result;
try{
let data = await this.s3.send(cmd)
let modified_ms = new Date(data.LastModified).getTime()
result = new Stats(...Object.values({
dev: 0,
mode: 0,
nlink: 0,
uid: 0,
gid: 0,
rdev: 0,
blksize: 0,
ino: 0,
size: Number(data.ContentLength),
blocks: 0,
atimeMs: modified_ms,
mtimeMs: modified_ms,
ctimeMs: modified_ms,
birthtimeMs: modified_ms,
atime: data.LastModified,
mtime: data.LastModified,
ctime: data.LastModified,
birthtime: data.LastModified
}));
}catch(e){
if(e.name === 'NotFound'){
throw new Error(`Error: ENOENT: no such file or directory, stat '${fileName}'`)
}else{
throw e
}
}
return result
}

}



module.exports = CyclicS3FSPromises
119 changes: 86 additions & 33 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,79 @@
const fs = require('fs')
const path = require('path')
const { S3Client, GetObjectCommand, PutObjectCommand } = require("@aws-sdk/client-s3");
const childProcess = require('child_process')
const CyclicS3FSPromises = require('./CyclicS3FSPromises')
const sync_interface = require('./sync_interface');
const v8 = require('v8')
const HUNDRED_MEGABYTES = 1000 * 1000 * 100;
const sync_interface = require('./sync_interface')
function streamToBuffer(stream) {
return new Promise((resolve, reject) => {
const chunks = [];
stream.on("data", (chunk) => chunks.push(chunk));
stream.on("error", reject);
stream.on("end", () => resolve(Buffer.concat(chunks))); // can call .toString("utf8") on the buffer
});
}

class CyclicS3FS {
constructor(bucketName, config={}) {
this.bucket = bucketName
this.config = config
this.s3 = new S3Client({...config});
// Ensure that callbacks run in the global context. Only use this function
// for callbacks that are passed to the binding layer, callbacks that are
// invoked from JS already run in the proper scope.
function makeCallback(cb) {
if (cb === undefined) {
return rethrow();
}

if (typeof cb !== 'function') {
throw new TypeError('callback must be a function');
}

async readFile(fileName) {
const cmd = new GetObjectCommand({
Bucket: this.bucket,
Key: fileName,
})
return function() {
return cb.apply(null, arguments);
};
}

let obj = await this.s3.send(cmd)
obj = await streamToBuffer(obj.Body)
return obj
class CyclicS3FS extends CyclicS3FSPromises {
constructor(bucketName, config={}) {
super(bucketName, config={})
}

async writeFile(fileName, data, options={}) {
const cmd = new PutObjectCommand({
Bucket: this.bucket,
Key: fileName,
Body: data

readFile(fileName, options, callback) {
callback = makeCallback(arguments[arguments.length - 1]);
new Promise(async (resolve,reject)=>{
try{
let res = await super.readFile(...arguments)
return resolve(callback(null,res))
}catch(e){
return resolve(callback(e))
}
})
}

writeFile(fileName, data, options, callback) {
callback = makeCallback(arguments[arguments.length - 1]);
new Promise(async (resolve,reject)=>{
try{
let res = await super.writeFile(...arguments)
return resolve(callback(null,res))
}catch(e){
return resolve(callback(e))
}
})
}

exists(fileName, callback) {
callback = makeCallback(arguments[arguments.length - 1]);
new Promise(async (resolve,reject)=>{
try{
let res = await super.exists(...arguments)
return resolve(callback(null,res))
}catch(e){
return resolve(callback(e))
}
})
}

await this.s3.send(cmd)
stat(fileName, callback) {
callback = makeCallback(arguments[arguments.length - 1]);
new Promise(async (resolve,reject)=>{
try{
let res = await super.stat(...arguments)
return resolve(callback(null,res))
}catch(e){
return resolve(callback(e))
}
})
}


readFileSync(fileName) {
return sync_interface.runSync(this,'readFile',[fileName])
Expand All @@ -50,8 +83,28 @@ class CyclicS3FS {
return sync_interface.runSync(this,'writeFile',[fileName, data, options])
}

existsSync(fileName) {
let res = sync_interface.runSync(this,'exists',[fileName])
// let exists = res.toString() === 'true' ? true : false
res = v8.deserialize(res)
return res
}

statSync(fileName) {
let res = sync_interface.runSync(this,'stat',[fileName])
res = v8.deserialize(res)
return res
}

}


const client = function(bucketName, config={}){
if(!process.env.AWS_SECRET_ACCESS_KEY){
console.warn('[s3fs] WARNING: AWS credentials are not set. Using local file system')
return fs
}
return new CyclicS3FS(bucketName, config)
}

module.exports = CyclicS3FS
module.exports = client
11 changes: 11 additions & 0 deletions src/promises.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
const fs = require('fs/promises')
const CyclicS3FSPromises = require('./CyclicS3FSPromises')
const client = function(bucketName, config={}){
if(!process.env.AWS_SECRET_ACCESS_KEY){
console.warn('[s3fs] WARNING: AWS credentials are not set. Using local file system')
return fs
}
return new CyclicS3FSPromises(bucketName, config)
}

module.exports = client
18 changes: 11 additions & 7 deletions src/sync_interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ const path = require('path')
const process = require('process')
const childProcess = require('child_process')
const v8 = require('v8')
const s3fs = require("./index.js")
const CyclicS3FSPromises = require('./CyclicS3FSPromises')

const HUNDRED_MEGABYTES = 1000 * 1000 * 100;
var fs = require("fs");

Expand All @@ -25,10 +26,10 @@ const runSync = function(client, method, args){
}
);

// console.log({
// stdout: stdout?.toString(),
// stderr: stderr?.toString(),
// })
// console.log({
// stdout: stdout?.toString(),
// stderr: stderr?.toString(),
// })

let error = stderr?.toString()
if(error){
Expand All @@ -47,9 +48,12 @@ module.exports = {
runSync,
}
const run = async function(bucket, config, method, args){
const fs = new s3fs(bucket, config)
const fs = new CyclicS3FSPromises(bucket, config)
let result = await fs[method](...args)
if(result){
if(typeof result !== 'undefined'){
if(['stat','exists'].includes(method)){
result = v8.serialize(result)
}
process.stdout.write(result);
}
}
Expand Down
Loading

0 comments on commit 00fdaa8

Please sign in to comment.