Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ericdmoore committed Jan 17, 2025
1 parent d4b28ed commit 7f088ab
Show file tree
Hide file tree
Showing 15 changed files with 696 additions and 170 deletions.
10 changes: 5 additions & 5 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"fmt": {
"useTabs": true,
"lineWidth": 120,
"exclude": ["vendor/", "/.cov", "/.fsCache", "_fresh"]
"exclude": ["vendor/", ".cov/", ".fsCache/", "_fresh/", "_scratch/"]
},
"lint": {
"rules": {
Expand Down Expand Up @@ -92,10 +92,10 @@
"mu-forms/": "https://esm.sh/[email protected]/",
"multiformats": "https://esm.sh/[email protected]?deno-std=0.181.0&dts",
"mustache": "https://deno.land/x/[email protected]/mod.ts",
"apache-arrow":"https://esm.sh/[email protected]",
"parquet-wasm/":"https://esm.sh/[email protected]/",

"apache-arrow": "https://esm.sh/[email protected]",
"parquet-wasm/": "https://esm.sh/[email protected]/",

"superstruct": "https://deno.land/x/[email protected]/mod.ts",
"stripe": "https://esm.sh/[email protected]?deno-std=0.181.0&dts",
"toXml": "https://deno.land/x/[email protected]/mod.ts",
Expand Down
30 changes: 24 additions & 6 deletions lib/clients/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,21 @@ export interface ICacheProvider<NativeDataType = Uint8Array> {
[name: string]: unknown;
};
transforms: TransformFunctionGroup<NativeDataType>;
set: (name: string, data: NativeDataType) => Promise<ICacheDataFromProvider<NativeDataType>>;
set: (
name: string,
data: NativeDataType,
opts?: { structure: "array" | "map" },
) => Promise<ICacheDataFromProvider<NativeDataType>>;
get: (name: string) => Promise<NullableProviderData<NativeDataType>>;
peek: (name: string) => Promise<NullableProviderData<NativeDataType>>;
del: (name: string) => Promise<NullableProviderData<NativeDataType>>;
has: (name: string) => Promise<boolean>;
// ideas for future enhancements - not yet to be implemented 2024-05-07
//
// mapAdd: (name: string, data: NativeDataType) => Promise<ICacheDataFromProvider<NativeDataType>>;
// arrayPush: (name: string, data: NativeDataType) => Promise<ICacheDataFromProvider<NativeDataType>>;
// arrayPop: (name: string, data: NativeDataType) => Promise<ICacheDataFromProvider<NativeDataType>>;
// arrayConcat: (name: string, data: NativeDataType) => Promise<ICacheDataFromProvider<NativeDataType>>;
}

interface IntraCacheNotifications {
Expand Down Expand Up @@ -224,16 +234,24 @@ export const renamerWithSha1: RenamerFn = async (s: string) => {
export const defaultRenamer: RenamerFn = (s: string) =>
Promise.resolve(changeEncOf(s).from("utf8").to("base64url").string());

export const defaultFromBytes = ((retrieved?: ICacheableDataForCache): Promise< null | Uint8Array | string >=> {
export const defaultFromBytes = ((retrieved?: ICacheableDataForCache): Promise<null | Uint8Array | string> => {
console.log("-- defaultFromBytes");
console.log(retrieved);
console.log("typeof value.data", typeof retrieved?.value.data);
const valData = new Uint8Array(Object.values(retrieved?.value.data as any) as number[]);

console.log("typeof value.data", typeof valData);
console.log("value.data.decode", (new TextDecoder()).decode(valData));
// console.log('instanceof value.data', retrieved?.value.data instanceof Uint8Array);

if (!retrieved?.value.data) {
return Promise.resolve(null);
} else if (retrieved.value["content-type"] !== "Uint8Array") {
const dec = new TextDecoder();
return typeof retrieved.value.data === "string"
? Promise.resolve(JSON.parse(retrieved.value.data))
: Promise.resolve(JSON.parse(dec.decode(retrieved.value.data)));
: Promise.resolve(JSON.parse((new TextDecoder()).decode(retrieved.value.data)));
} else {
return Promise.resolve(retrieved.value.data)
return Promise.resolve(retrieved.value.data);
}
}) as TransformFromBytes;

Expand Down Expand Up @@ -285,7 +303,7 @@ export const makeKey = async (name: string, renamer: RenamerFn) => ({ name, rena
export const inMem = (
max = 1000,
transform?: { renamer?: RenamerFn; fromBytes?: TransformFromBytes; toBytes?: TransformToBytes },
): ICacheProvider<string| Uint8Array> => {
): ICacheProvider<string | Uint8Array> => {
const provider = "RAM";

const cache = new LRUCache<string, ICacheableDataForCache, unknown>({ updateAgeOnGet: true, max });
Expand Down
9 changes: 5 additions & 4 deletions lib/clients/cacheProviders/denoKV.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference lib="deno.unstable" />
//#region imports

import { assert } from "$std/testing/asserts.ts";
import {
defaultFromBytes,
defaultRenamer,
Expand All @@ -25,7 +26,7 @@ export const denoKVcache = async (
}>,
): Promise<ICacheProvider> => {
const provider = "Deno:KV";

const coder = await encodingWith();
const history = new Map<string, number>();
const renamer = transforms?.renamer ?? defaultRenamer;
Expand All @@ -37,9 +38,9 @@ export const denoKVcache = async (
...config,
};
const set = async (name: string, data: Uint8Array | string) => {
const kvP = Deno.openKv();
const kv = await Deno.openKv();
const renamed = await renamer(name);

// objectHistory.push({name: renamed, ts: Date.now()})
history.set(renamed, Date.now());

Expand All @@ -52,7 +53,7 @@ export const denoKVcache = async (
},
} as ICacheableDataForCache;

const kv = await kvP;
// const kv = await kvP;
await kv.set([config.prefix, renamed], payload);

if (history.size > config.maxItems) {
Expand Down
1 change: 0 additions & 1 deletion lib/clients/cacheProviders/r2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export const cache = (
};
});


// export const cache = async (s3c: ICloudflareCacheConfig): Promise<CloudflareR2Cache> => {
// const cache = await S3Cache({
// ...s3c,
Expand Down
14 changes: 13 additions & 1 deletion lib/clients/cacheProviders/recoders/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ export const encodingWith = async (encodingmap?: PromiseOr<Record<string, EncMod
);
};

const recode = (encodingRequests: AvailableEncodings[], valInCache: ValueForCacheInternals) => {
const encodings = valInCache["content-encoding"]
.split(";")
.concat(encodingRequests) as AvailableEncodings[];
const en = encodings.shift()!;

return encodings.reduce(
async (cacheVal, enNext) => encMap[enNext].recode(await cacheVal),
encMap[en].from(valInCache),
);
};

const decode = (valInCache: ValueForCacheInternals) => {
// console.log('valInCache["content-encoding"]', valInCache["content-encoding"])

Expand All @@ -100,7 +112,7 @@ export const encodingWith = async (encodingmap?: PromiseOr<Record<string, EncMod
);
};

return { encode, decode };
return { encode, decode, recode };
};

export default { id, br, zstd, gzip, snappy, base64url, encoderMap, encodingWith };
8 changes: 4 additions & 4 deletions lib/clients/cacheProviders/recoders/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ New, middle balanced, very fast, very good compression ratio lib, binary format

# External Considerations

- For AWS S3 Select - you can [compress a parquet columns][aws-s3-select] within the object using `Snappy` or `GZIP` or you can read data subsets stored in CSV or JSON record files.
- For AWS S3 Select - you can [compress a parquet columns][aws-s3-select] within the object using `Snappy` or `GZIP` or
you can read data subsets stored in CSV or JSON record files.
- Also for AWS S3 Select - you can [compress a CSV or JSON file][aws-s3-select-doc] usiung `GZIP` or `BZIP2`


<!-- Ref Links -->

- [aws-s3-select-parquet]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html#selecting-content-from-objects-requirements-and-limits
- [aws-s3-select-doc]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html#API_SelectObjectContent
-
-
2 changes: 1 addition & 1 deletion lib/clients/cacheProviders/recoders/snappy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const snappy: EncModule = async (compressThreshold = 512) => {

const to = async (input: Uint8Array | string, contentEncoding = "id" as string) => {
const bytes = makeBytes(input);
``;

if (bytes.length > (compressThreshold as number)) {
return {
data: await compress(bytes as Buffer),
Expand Down
94 changes: 57 additions & 37 deletions lib/clients/cacheProviders/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import {
defaultFromBytes,
defaultRenamer,
defaultToBytesWithTypeNote,
// type TransformFromBytes,
// type TransformToBytes,
// type ValueForCacheInternals,
type ICacheableDataForCache,
// type ICacheDataFromProvider,
type ICacheProvider,
// type TransformFromBytes,
// type TransformToBytes,
type ValueForCacheInternals,
} from "../cache.ts";

import { encodingWith } from "./recoders/mod.ts";
import { encoderMap, encodingWith } from "./recoders/mod.ts";
import { snappy } from "./recoders/snappy.ts";
import { NullableProviderData } from "$lib/clients/cache.ts";
//#region interfaces

Expand Down Expand Up @@ -58,6 +59,7 @@ export const cache = async (
s3c.defaultBucket = s3c.defaultBucket ?? "";
s3c.defualtPrefix = s3c.defualtPrefix ?? "";

const snapper = await snappy();
const provider = "AWS:S3";
const coder = await encodingWith();

Expand Down Expand Up @@ -97,35 +99,70 @@ export const cache = async (
}
};

const set = async (locationName: string, inputData: Uint8Array | string) => {
const { Bucket, Key } = defaultedParse(locationName, {
const del = async (name: string) => {
const { Bucket, Key } = defaultedParse(name, {
Bucket: s3c.defaultBucket,
Key: s3c.defualtPrefix,
});
const renamed = await renamer(Key);
const sendKey = `${s3c.defualtPrefix}/${renamed}`;
handledItems--;
await s3.send(new DeleteObjectCommand({ Bucket, Key: sendKey }));
return {
meta,
provider,
key: { name, renamed },
value: {
data: new Uint8Array(),
transformed: new Uint8Array(),
},
};
};

const set = async (locationHandle: string, inputData: Uint8Array | string) => {
const { Bucket, Key } = defaultedParse(locationHandle, {
Bucket: s3c.defaultBucket,
Key: s3c.defualtPrefix,
});
const renamed = await renamer(Key);
const sendKey = `${Bucket}/${renamed}`;
const sendKey = `${s3c.defualtPrefix}/${renamed}.json`;
const value = await toBytes(inputData);

// console.log('s3-Set-value:', value.data.toString())

const encdval = await snapper.from(value);

const dataToS3 = {
meta,
provider,
key: { name: locationName, renamed },
value,
key: { name: locationHandle, renamed },
value: {
...encdval,
data: value.data.toString(),
},
};

console.log("value, encdval, dataToS3 ::", value, encdval, dataToS3);

console.log("s3-cache-Set-preNetwork:", { Bucket, Key, renamed, sendKey });
const s3r = await s3.send(
new PutObjectCommand({ Bucket, Key: sendKey, Body: JSON.stringify(dataToS3) }),
new PutObjectCommand({
Bucket,
Key: sendKey,
Body: JSON.stringify(dataToS3),
}),
);
console.log("s3-cache-Set-postNet:", { s3r });
handledItems++;

const ret = {
provider,
value,
meta: { cloud: "AWS", s3resp: s3r },
key: {
name: locationName,
name: locationHandle,
renamed: await renamer(Key),
},
value,
};

return {
Expand All @@ -137,43 +174,26 @@ export const cache = async (
};
};

const del = async (name: string) => {
const { Bucket, Key } = defaultedParse(name, {
const get = async (handleLocation: string) => {
const { Bucket, Key } = defaultedParse(handleLocation, {
Bucket: s3c.defaultBucket,
Key: s3c.defualtPrefix,
});
const renamed = await renamer(Key);
const sendKey = `${s3c.defualtPrefix}/${renamed}`;
handledItems--;
await s3.send(new DeleteObjectCommand({ Bucket, Key: sendKey }));
return {
meta,
provider,
key: { name, renamed },
value: {
data: new Uint8Array(),
transformed: new Uint8Array(),
},
};
};
const sendKey = `${s3c.defualtPrefix}/${renamed}.json`;

const get = async (name: string) => {
const { Bucket, Key } = defaultedParse(name, {
Bucket: s3c.defaultBucket,
Key: s3c.defualtPrefix,
});
const renamed = await renamer(Key);
const sendKey = `${s3c.defualtPrefix}/${renamed}`;
console.log("s3-cache-get-preNetwork:", { sendKey, Bucket, Key, renamed });
const s3r = await s3.send(new GetObjectCommand({ Bucket, Key: sendKey }));
console.log("s3-cache-get-postNet:", { s3r });

const s3CacheInternals = JSON.parse(await s3r.Body?.transformToString() ?? "") as ICacheableDataForCache;
const value = await coder.decode(s3CacheInternals.value);

const synthValueFromCache = {
provider,
meta: { ...s3r, cloud: "AWS" } as Record<string, unknown>,
key: { name, renamed },
value
key: { name: handleLocation, renamed },
value,
} as ICacheableDataForCache;

const transformed = await fromBytes(synthValueFromCache);
Expand All @@ -184,7 +204,7 @@ export const cache = async (
...synthValueFromCache.value,
transformed,
},
} as NullableProviderData<string | Uint8Array>
} as NullableProviderData<string | Uint8Array>;
};

const has = async (name: string) => {
Expand Down
Loading

0 comments on commit 7f088ab

Please sign in to comment.