diff --git a/package-lock.json b/package-lock.json index c0194371..3cc8ba75 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,12 +13,13 @@ "block-stream2": "^2.1.0", "browser-or-node": "^2.1.1", "buffer-crc32": "^0.2.13", + "eventemitter3": "^5.0.1", "fast-xml-parser": "^4.2.2", "ipaddr.js": "^2.0.1", - "json-stream": "^1.0.0", "lodash": "^4.17.21", "mime-types": "^2.1.35", "query-string": "^7.1.3", + "stream-json": "^1.8.0", "through2": "^4.0.2", "web-encoding": "^1.1.5", "xml2js": "^0.5.0" @@ -37,6 +38,7 @@ "@types/lodash": "^4.14.194", "@types/mime-types": "^2.1.1", "@types/node": "^20.1.0", + "@types/stream-json": "^1.7.5", "@types/through2": "^2.0.38", "@types/xml2js": "^0.4.11", "@typescript-eslint/eslint-plugin": "^5.59.2", @@ -2295,6 +2297,25 @@ "integrity": "sha512-21cFJr9z3g5dW8B0CVI9g2O9beqaThGQ6ZFBqHfwhzLDKUxaqTIy3vnfah/UPkfOiF2pLq+tGz+W8RyCskuslw==", "dev": true }, + "node_modules/@types/stream-chain": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/@types/stream-chain/-/stream-chain-2.0.3.tgz", + "integrity": "sha512-cwWE6mrdDpmW3B5wr1+vpjbg8h3hZfOr/PbKQ38VE21xNyF64GNjrh855YdNAPCt4kSYXuLwgRqBWkY/dD6KMg==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, + "node_modules/@types/stream-json": { + "version": "1.7.5", + "resolved": "https://registry.npmjs.org/@types/stream-json/-/stream-json-1.7.5.tgz", + "integrity": "sha512-IVTtojYNqc6RT9FWBlwPLG6QTVdv2gHdqHOyBYPgcsCKfwIxcQpKw0e0ybQVyCvJJT9Le6w9RB5r5c6mo2m+IQ==", + "dev": true, + "dependencies": { + "@types/node": "*", + "@types/stream-chain": "*" + } + }, "node_modules/@types/through2": { "version": "2.0.38", "resolved": "https://registry.npmjs.org/@types/through2/-/through2-2.0.38.tgz", @@ -4211,6 +4232,11 @@ "node": ">=0.10.0" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "node_modules/execa": { "version": "7.1.1", "resolved": "https://registry.npmjs.org/execa/-/execa-7.1.1.tgz", @@ -5382,12 +5408,6 @@ "dev": true, "license": "MIT" }, - "node_modules/json-stream": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/json-stream/-/json-stream-1.0.0.tgz", - "integrity": "sha512-H/ZGY0nIAg3QcOwE1QN/rK/Fa7gJn7Ii5obwp6zyPO4xiPNwpIMjqy2gwjBEGqzkF/vSWEIBQCBuN19hYiL6Qg==", - "license": "MIT" - }, "node_modules/json-stringify-safe": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz", @@ -7182,6 +7202,19 @@ "node": ">=6" } }, + "node_modules/stream-chain": { + "version": "2.2.5", + "resolved": "https://registry.npmjs.org/stream-chain/-/stream-chain-2.2.5.tgz", + "integrity": "sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==" + }, + "node_modules/stream-json": { + "version": "1.8.0", + "resolved": "https://registry.npmjs.org/stream-json/-/stream-json-1.8.0.tgz", + "integrity": "sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw==", + "dependencies": { + "stream-chain": "^2.2.5" + } + }, "node_modules/strict-uri-encode": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/strict-uri-encode/-/strict-uri-encode-2.0.0.tgz", diff --git a/package.json b/package.json index 4a8257f9..a2be2e75 100644 --- a/package.json +++ b/package.json @@ -86,12 +86,13 @@ "block-stream2": "^2.1.0", "browser-or-node": "^2.1.1", "buffer-crc32": "^0.2.13", + "eventemitter3": "^5.0.1", "fast-xml-parser": "^4.2.2", "ipaddr.js": "^2.0.1", - "json-stream": "^1.0.0", "lodash": "^4.17.21", "mime-types": "^2.1.35", "query-string": "^7.1.3", + "stream-json": "^1.8.0", "through2": "^4.0.2", "web-encoding": "^1.1.5", "xml2js": "^0.5.0" @@ -110,6 +111,7 @@ "@types/lodash": "^4.14.194", "@types/mime-types": "^2.1.1", "@types/node": "^20.1.0", + "@types/stream-json": "^1.7.5", "@types/through2": "^2.0.38", "@types/xml2js": "^0.4.11", "@typescript-eslint/eslint-plugin": "^5.59.2", diff --git a/src/minio.d.ts b/src/minio.d.ts index c6278000..d0decce6 100644 --- a/src/minio.d.ts +++ b/src/minio.d.ts @@ -1,8 +1,6 @@ // imported from https://github.com/DefinitelyTyped/DefinitelyTyped/blob/93cfb0ec069731dcdfc31464788613f7cddb8192/types/minio/index.d.ts /* eslint-disable @typescript-eslint/no-explicit-any */ -import { EventEmitter } from 'node:events' - import type { CopyDestinationOptions, CopySourceOptions, @@ -28,6 +26,7 @@ import type { IsoDate, ItemBucketMetadata, ItemBucketMetadataList, + LegalHoldStatus, MetadataItem, ObjectLockInfo, PutObjectLegalHoldOptions, @@ -46,10 +45,13 @@ import type { Tag, VersionIdentificator, } from './internal/type.ts' +import type { NotificationConfig, NotificationEvent, NotificationPoller } from './notification.ts' export * from './errors.ts' export * from './helpers.ts' export type { Region } from './internal/s3-endpoints.ts' +export type * from './notification.ts' +export * from './notification.ts' export { CopyConditions, PostPolicy } export type { MakeBucketOpt } from './internal/client.ts' export type { @@ -67,6 +69,7 @@ export type { IsoDate, ItemBucketMetadata, ItemBucketMetadataList, + LegalHoldStatus, MetadataItem, NoResultCallback, ObjectLockInfo, @@ -86,26 +89,6 @@ export type { Tag, } -// Exports only from typings -export type NotificationEvent = - | 's3:ObjectCreated:*' - | 's3:ObjectCreated:Put' - | 's3:ObjectCreated:Post' - | 's3:ObjectCreated:Copy' - | 's3:ObjectCreated:CompleteMultipartUpload' - | 's3:ObjectRemoved:*' - | 's3:ObjectRemoved:Delete' - | 's3:ObjectRemoved:DeleteMarkerCreated' - | 's3:ReducedRedundancyLostObject' - | 's3:TestEvent' - | 's3:ObjectRestore:Post' - | 's3:ObjectRestore:Completed' - | 's3:Replication:OperationFailedReplication' - | 's3:Replication:OperationMissedThreshold' - | 's3:Replication:OperationReplicatedAfterThreshold' - | 's3:Replication:OperationNotTracked' - | string - /** * @deprecated keep for backward compatible, use `RETENTION_MODES` instead */ @@ -116,10 +99,6 @@ export type Mode = RETENTION_MODES */ export type LockUnit = RETENTION_VALIDITY_UNITS -/** - * @deprecated keep for backward compatible - */ -export type LegalHoldStatus = LEGAL_HOLD_STATUS export type VersioningConfig = Record export type TagList = Record export type Lifecycle = LifecycleConfig | null | '' @@ -404,46 +383,3 @@ export class Client extends TypedClient { // Other newPostPolicy(): PostPolicy } - -export declare class NotificationPoller extends EventEmitter { - stop(): void - - start(): void - - // must to be public? - checkForChanges(): void -} - -export declare class NotificationConfig { - add(target: TopicConfig | QueueConfig | CloudFunctionConfig): void -} - -export declare class TopicConfig extends TargetConfig { - constructor(arn: string) -} - -export declare class QueueConfig extends TargetConfig { - constructor(arn: string) -} - -export declare class CloudFunctionConfig extends TargetConfig { - constructor(arn: string) -} - -export declare function buildARN( - partition: string, - service: string, - region: string, - accountId: string, - resource: string, -): string - -export declare const ObjectCreatedAll: NotificationEvent // s3:ObjectCreated:*' -export declare const ObjectCreatedPut: NotificationEvent // s3:ObjectCreated:Put -export declare const ObjectCreatedPost: NotificationEvent // s3:ObjectCreated:Post -export declare const ObjectCreatedCopy: NotificationEvent // s3:ObjectCreated:Copy -export declare const ObjectCreatedCompleteMultipartUpload: NotificationEvent // s3:ObjectCreated:CompleteMultipartUpload -export declare const ObjectRemovedAll: NotificationEvent // s3:ObjectRemoved:* -export declare const ObjectRemovedDelete: NotificationEvent // s3:ObjectRemoved:Delete -export declare const ObjectRemovedDeleteMarkerCreated: NotificationEvent // s3:ObjectRemoved:DeleteMarkerCreated -export declare const ObjectReducedRedundancyLostObject: NotificationEvent // s3:ReducedRedundancyLostObject diff --git a/src/minio.js b/src/minio.js index 3987a90d..9a9cb9a6 100644 --- a/src/minio.js +++ b/src/minio.js @@ -52,7 +52,7 @@ import { uriResourceEscape, } from './internal/helper.ts' import { PostPolicy } from './internal/post-policy.ts' -import { NotificationConfig, NotificationPoller } from './notification.js' +import { NotificationConfig, NotificationPoller } from './notification.ts' import { promisify } from './promisify.js' import { postPresignSignatureV4, presignSignatureV4 } from './signing.ts' import * as transformers from './transformers.js' @@ -60,7 +60,7 @@ import { parseSelectObjectContentResponse } from './xml-parsers.js' export * from './errors.ts' export * from './helpers.ts' -export * from './notification.js' +export * from './notification.ts' export { CopyConditions, PostPolicy } export class Client extends TypedClient { diff --git a/src/notification.js b/src/notification.ts similarity index 51% rename from src/notification.js rename to src/notification.ts index 2c3b45c3..64c24112 100644 --- a/src/notification.js +++ b/src/notification.ts @@ -14,54 +14,42 @@ * limitations under the License. */ -import { EventEmitter } from 'node:events' +import { EventEmitter } from 'eventemitter3' +import jsonLineParser from 'stream-json/jsonl/Parser.js' import { DEFAULT_REGION } from './helpers.ts' +import type { TypedClient } from './internal/client.ts' import { pipesetup, uriEscape } from './internal/helper.ts' -import * as transformers from './transformers.js' -// Notification config - array of target configs. -// Target configs can be -// 1. Topic (simple notification service) -// 2. Queue (simple queue service) -// 3. CloudFront (lambda function) -export class NotificationConfig { - add(target) { - let instance = '' - if (target instanceof TopicConfig) { - instance = 'TopicConfiguration' - } - if (target instanceof QueueConfig) { - instance = 'QueueConfiguration' - } - if (target instanceof CloudFunctionConfig) { - instance = 'CloudFunctionConfiguration' - } - if (!this[instance]) { - this[instance] = [] - } - this[instance].push(target) - } -} +// TODO: type this + +type Event = unknown // Base class for three supported configs. -class TargetConfig { - setId(id) { +export class TargetConfig { + private Filter?: { S3Key: { FilterRule: { Name: string; Value: string }[] } } + private Event?: Event[] + private Id: unknown + + setId(id: unknown) { this.Id = id } - addEvent(newevent) { + + addEvent(newevent: Event) { if (!this.Event) { this.Event = [] } this.Event.push(newevent) } - addFilterSuffix(suffix) { + + addFilterSuffix(suffix: string) { if (!this.Filter) { this.Filter = { S3Key: { FilterRule: [] } } } this.Filter.S3Key.FilterRule.push({ Name: 'suffix', Value: suffix }) } - addFilterPrefix(prefix) { + + addFilterPrefix(prefix: string) { if (!this.Filter) { this.Filter = { S3Key: { FilterRule: [] } } } @@ -71,7 +59,9 @@ class TargetConfig { // 1. Topic (simple notification service) export class TopicConfig extends TargetConfig { - constructor(arn) { + private Topic: string + + constructor(arn: string) { super() this.Topic = arn } @@ -79,7 +69,9 @@ export class TopicConfig extends TargetConfig { // 2. Queue (simple queue service) export class QueueConfig extends TargetConfig { - constructor(arn) { + private Queue: string + + constructor(arn: string) { super() this.Queue = arn } @@ -87,16 +79,44 @@ export class QueueConfig extends TargetConfig { // 3. CloudFront (lambda function) export class CloudFunctionConfig extends TargetConfig { - constructor(arn) { + private CloudFunction: string + + constructor(arn: string) { super() this.CloudFunction = arn } } -export const buildARN = (partition, service, region, accountId, resource) => { - return 'arn:' + partition + ':' + service + ':' + region + ':' + accountId + ':' + resource +// Notification config - array of target configs. +// Target configs can be +// 1. Topic (simple notification service) +// 2. Queue (simple queue service) +// 3. CloudFront (lambda function) +export class NotificationConfig { + private TopicConfiguration?: TargetConfig[] + private CloudFunctionConfiguration?: TargetConfig[] + private QueueConfiguration?: TargetConfig[] + + add(target: TargetConfig) { + let instance: TargetConfig[] | undefined + if (target instanceof TopicConfig) { + instance = this.TopicConfiguration ??= [] + } + if (target instanceof QueueConfig) { + instance = this.QueueConfiguration ??= [] + } + if (target instanceof CloudFunctionConfig) { + instance = this.CloudFunctionConfiguration ??= [] + } + if (instance) { + instance.push(target) + } + } } +export const buildARN = (partition: string, service: string, region: string, accountId: string, resource: string) => { + return 'arn:' + partition + ':' + service + ':' + region + ':' + accountId + ':' + resource +} export const ObjectCreatedAll = 's3:ObjectCreated:*' export const ObjectCreatedPut = 's3:ObjectCreated:Put' export const ObjectCreatedPost = 's3:ObjectCreated:Post' @@ -106,12 +126,42 @@ export const ObjectRemovedAll = 's3:ObjectRemoved:*' export const ObjectRemovedDelete = 's3:ObjectRemoved:Delete' export const ObjectRemovedDeleteMarkerCreated = 's3:ObjectRemoved:DeleteMarkerCreated' export const ObjectReducedRedundancyLostObject = 's3:ReducedRedundancyLostObject' - +export type NotificationEvent = + | 's3:ObjectCreated:*' + | 's3:ObjectCreated:Put' + | 's3:ObjectCreated:Post' + | 's3:ObjectCreated:Copy' + | 's3:ObjectCreated:CompleteMultipartUpload' + | 's3:ObjectRemoved:*' + | 's3:ObjectRemoved:Delete' + | 's3:ObjectRemoved:DeleteMarkerCreated' + | 's3:ReducedRedundancyLostObject' + | 's3:TestEvent' + | 's3:ObjectRestore:Post' + | 's3:ObjectRestore:Completed' + | 's3:Replication:OperationFailedReplication' + | 's3:Replication:OperationMissedThreshold' + | 's3:Replication:OperationReplicatedAfterThreshold' + | 's3:Replication:OperationNotTracked' + | string // put string at least so auto-complete could work + +// TODO: type this +export type NotificationRecord = unknown // Poll for notifications, used in #listenBucketNotification. // Listening constitutes repeatedly requesting s3 whether or not any // changes have occurred. -export class NotificationPoller extends EventEmitter { - constructor(client, bucketName, prefix, suffix, events) { +export class NotificationPoller extends EventEmitter<{ + notification: (event: NotificationRecord) => void + error: (error: unknown) => void +}> { + private client: TypedClient + private bucketName: string + private prefix: string + private suffix: string + private events: NotificationEvent[] + private ending: boolean + + constructor(client: TypedClient, bucketName: string, prefix: string, suffix: string, events: NotificationEvent[]) { super() this.client = client @@ -143,14 +193,14 @@ export class NotificationPoller extends EventEmitter { return } - let method = 'GET' - var queries = [] + const method = 'GET' + const queries = [] if (this.prefix) { - var prefix = uriEscape(this.prefix) + const prefix = uriEscape(this.prefix) queries.push(`prefix=${prefix}`) } if (this.suffix) { - var suffix = uriEscape(this.suffix) + const suffix = uriEscape(this.suffix) queries.push(`suffix=${suffix}`) } if (this.events) { @@ -158,44 +208,47 @@ export class NotificationPoller extends EventEmitter { } queries.sort() - var query = '' + let query = '' if (queries.length > 0) { query = `${queries.join('&')}` } const region = this.client.region || DEFAULT_REGION - this.client.makeRequest({ method, bucketName: this.bucketName, query }, '', [200], region, true, (e, response) => { - if (e) { - return this.emit('error', e) - } - - let transformer = transformers.getNotificationTransformer() - pipesetup(response, transformer) - .on('data', (result) => { - // Data is flushed periodically (every 5 seconds), so we should - // handle it after flushing from the JSON parser. - let records = result.Records - // If null (= no records), change to an empty array. - if (!records) { - records = [] - } - - // Iterate over the notifications and emit them individually. - records.forEach((record) => { - this.emit('notification', record) - }) - // If we're done, stop. - if (this.ending) { - response.destroy() - } - }) - .on('error', (e) => this.emit('error', e)) - .on('end', () => { - // Do it again, if we haven't cancelled yet. - process.nextTick(() => { - this.checkForChanges() + this.client.makeRequestAsync({ method, bucketName: this.bucketName, query }, '', [200], region).then( + (response) => { + const asm = jsonLineParser.make() + + pipesetup(response, asm) + .on('data', (data) => { + // Data is flushed periodically (every 5 seconds), so we should + // handle it after flushing from the JSON parser. + let records = data.value.Records + // If null (= no records), change to an empty array. + if (!records) { + records = [] + } + + // Iterate over the notifications and emit them individually. + records.forEach((record: NotificationRecord) => { + this.emit('notification', record) + }) + + // If we're done, stop. + if (this.ending) { + response?.destroy() + } }) - }) - }) + .on('error', (e) => this.emit('error', e)) + .on('end', () => { + // Do it again, if we haven't cancelled yet. + process.nextTick(() => { + this.checkForChanges() + }) + }) + }, + (e) => { + return this.emit('error', e) + }, + ) } } diff --git a/src/transformers.js b/src/transformers.js index 4ad168f6..f10945d5 100644 --- a/src/transformers.js +++ b/src/transformers.js @@ -16,7 +16,6 @@ import * as Crypto from 'node:crypto' -import JSONParser from 'json-stream' import Through2 from 'through2' import { isFunction } from './internal/helper.ts' @@ -121,12 +120,6 @@ export function getBucketNotificationTransformer() { return getConcater(xmlParsers.parseBucketNotification) } -// Parses a notification. -export function getNotificationTransformer() { - // This will parse and return each object. - return new JSONParser() -} - export function lifecycleTransformer() { return getConcater(xmlParsers.parseLifecycleConfig) }