From 35a43e4541b717c7db1cb230b921db4ff4f185a0 Mon Sep 17 00:00:00 2001 From: Pierre Lehnen Date: Wed, 26 Jul 2023 22:39:49 -0300 Subject: [PATCH] Cherry picked changes from PR #29714 --- apps/meteor/app/api/server/v1/import.ts | 91 +++++++++- apps/meteor/app/api/server/v1/users.ts | 16 +- .../app/importer-csv/server/importer.js | 22 ++- .../server/importer.js | 2 +- .../importer-slack-users/server/importer.js | 20 ++- .../app/importer-slack/server/importer.js | 4 + .../server/classes/ImportDataConverter.ts | 17 +- .../importer/server/classes/ImporterBase.js | 168 ++++-------------- .../definitions/IConversionCallbacks.ts | 6 +- apps/meteor/app/importer/server/index.ts | 4 + .../methods/downloadPublicImportFile.ts | 7 +- .../server/methods/getImportFileData.ts | 1 - .../server/methods/getImportProgress.ts | 1 - .../importer/server/methods/startImport.ts | 8 +- .../server/methods/uploadImportFile.ts | 6 +- .../server/startup/setImportsToInvalid.js | 34 +--- .../lib/server/functions/getFullUserData.ts | 23 ++- .../views/admin/import/ImportProgressPage.tsx | 2 +- .../client/views/admin/import/ImportTypes.ts | 19 -- apps/meteor/server/models/RawImports.ts | 6 - apps/meteor/server/models/raw/ImportData.ts | 6 +- apps/meteor/server/models/raw/Imports.ts | 55 +++++- apps/meteor/server/models/raw/Messages.ts | 5 +- apps/meteor/server/models/raw/RawImports.ts | 10 -- apps/meteor/server/models/startup.ts | 1 - .../notifications/notifications.module.ts | 25 +-- apps/meteor/server/services/import/service.ts | 149 ++++++++++++++++ apps/meteor/server/services/startup.ts | 2 + ee/packages/ddp-client/src/types/streams.ts | 29 +-- packages/core-services/src/index.ts | 3 + .../core-services/src/types/IImportService.ts | 9 + .../core-typings/src/IMessage/IMessage.ts | 14 ++ packages/core-typings/src/IUser.ts | 1 + packages/core-typings/src/import/IImport.ts | 10 +- .../core-typings/src/import/IImportMessage.ts | 10 +- .../core-typings/src/import/IImportUser.ts | 2 + .../src/import/IImporterSelection.ts | 9 + .../src/import/IImporterSelectionChannel.ts | 9 + .../src/import/IImporterSelectionUser.ts | 9 + .../core-typings/src/import/ImportState.ts | 10 ++ packages/core-typings/src/import/index.ts | 4 + packages/model-typings/src/index.ts | 1 - .../model-typings/src/models/IImportsModel.ts | 8 +- .../src/models/IMessagesModel.ts | 12 +- .../src/models/IRawImportsModel.ts | 3 - .../model-typings/src/models/IUsersModel.ts | 2 +- packages/models/src/index.ts | 2 - .../src/v1/import/ImportAddUsersParamsPOST.ts | 57 ++++++ packages/rest-typings/src/v1/import/import.ts | 19 +- packages/rest-typings/src/v1/import/index.ts | 1 + .../src/v1/users/UsersInfoParamsGet.ts | 16 +- 51 files changed, 630 insertions(+), 320 deletions(-) delete mode 100644 apps/meteor/client/views/admin/import/ImportTypes.ts delete mode 100644 apps/meteor/server/models/RawImports.ts delete mode 100644 apps/meteor/server/models/raw/RawImports.ts create mode 100644 apps/meteor/server/services/import/service.ts create mode 100644 packages/core-services/src/types/IImportService.ts create mode 100644 packages/core-typings/src/import/IImporterSelection.ts create mode 100644 packages/core-typings/src/import/IImporterSelectionChannel.ts create mode 100644 packages/core-typings/src/import/IImporterSelectionUser.ts create mode 100644 packages/core-typings/src/import/ImportState.ts delete mode 100644 packages/model-typings/src/models/IRawImportsModel.ts create mode 100644 packages/rest-typings/src/v1/import/ImportAddUsersParamsPOST.ts diff --git a/apps/meteor/app/api/server/v1/import.ts b/apps/meteor/app/api/server/v1/import.ts index 077e58653110..f4ceb6bae2dc 100644 --- a/apps/meteor/app/api/server/v1/import.ts +++ b/apps/meteor/app/api/server/v1/import.ts @@ -9,8 +9,10 @@ import { isDownloadPendingFilesParamsPOST, isDownloadPendingAvatarsParamsPOST, isGetCurrentImportOperationParamsGET, + isImportAddUsersParamsPOST, } from '@rocket.chat/rest-typings'; import { Imports } from '@rocket.chat/models'; +import { Import } from '@rocket.chat/core-services'; import { API } from '../api'; import { Importers } from '../../../importer/server'; @@ -66,7 +68,7 @@ API.v1.addRoute( async post() { const { input } = this.bodyParams; - await executeStartImport({ input }); + await executeStartImport({ input }, this.userId); return API.v1.success(); }, @@ -133,8 +135,9 @@ API.v1.addRoute( throw new Meteor.Error('error-importer-not-defined', 'The Pending File Importer was not found.', 'downloadPendingFiles'); } - importer.instance = new importer.importer(importer); // eslint-disable-line new-cap - await importer.instance.build(); + const operation = await Import.newOperation(this.userId, importer.name, importer.key); + + importer.instance = new importer.importer(importer, operation); // eslint-disable-line new-cap const count = await importer.instance.prepareFileCount(); return API.v1.success({ @@ -158,8 +161,8 @@ API.v1.addRoute( throw new Meteor.Error('error-importer-not-defined', 'The Pending File Importer was not found.', 'downloadPendingAvatars'); } - importer.instance = new importer.importer(importer); // eslint-disable-line new-cap - await importer.instance.build(); + const operation = await Import.newOperation(this.userId, importer.name, importer.key); + importer.instance = new importer.importer(importer, operation); // eslint-disable-line new-cap const count = await importer.instance.prepareFileCount(); return API.v1.success({ @@ -185,3 +188,81 @@ API.v1.addRoute( }, }, ); + +API.v1.addRoute( + 'import.clear', + { + authRequired: true, + permissionsRequired: ['run-import'], + }, + { + async post() { + await Import.clear(); + + return API.v1.success(); + }, + }, +); + +API.v1.addRoute( + 'import.new', + { + authRequired: true, + permissionsRequired: ['run-import'], + }, + { + async post() { + const operation = await Import.newOperation(this.userId, 'api', 'api'); + + return API.v1.success({ operation }); + }, + }, +); + +API.v1.addRoute( + 'import.status', + { + authRequired: true, + permissionsRequired: ['run-import'], + }, + { + async get() { + const status = await Import.status(); + + return API.v1.success(status); + }, + }, +); + +API.v1.addRoute( + 'import.addUsers', + { + authRequired: true, + validateParams: isImportAddUsersParamsPOST, + permissionsRequired: ['run-import'], + }, + { + async post() { + const { users } = this.bodyParams; + + await Import.addUsers(users); + + return API.v1.success(); + }, + }, +); + +API.v1.addRoute( + 'import.run', + { + authRequired: true, + permissionsRequired: ['run-import'], + }, + { + async post() { + await Import.run(this.userId); + + return API.v1.success(); + }, + }, +); diff --git a/apps/meteor/app/api/server/v1/users.ts b/apps/meteor/app/api/server/v1/users.ts index 56c15c6e35c8..36f5cb30973e 100644 --- a/apps/meteor/app/api/server/v1/users.ts +++ b/apps/meteor/app/api/server/v1/users.ts @@ -29,7 +29,7 @@ import { checkUsernameAvailability, checkUsernameAvailabilityWithValidation, } from '../../../lib/server/functions/checkUsernameAvailability'; -import { getFullUserDataByIdOrUsername } from '../../../lib/server/functions/getFullUserData'; +import { getFullUserDataByIdOrUsernameOrImportId } from '../../../lib/server/functions/getFullUserData'; import { setStatusText } from '../../../lib/server/functions/setStatusText'; import { API } from '../api'; import { findUsersToAutocomplete, getInclusiveFields, getNonEmptyFields, getNonEmptyQuery } from '../lib/users'; @@ -393,10 +393,16 @@ API.v1.addRoute( async get() { const { fields } = await this.parseJsonQuery(); - const user = await getFullUserDataByIdOrUsername(this.userId, { - filterId: (this.queryParams as any).userId, - filterUsername: (this.queryParams as any).username, - }); + const searchTerms: [string, 'id' | 'username' | 'importId'] | false = + ('userId' in this.queryParams && !!this.queryParams.userId && [this.queryParams.userId, 'id']) || + ('username' in this.queryParams && !!this.queryParams.username && [this.queryParams.username, 'username']) || + ('importId' in this.queryParams && !!this.queryParams.importId && [this.queryParams.importId, 'importId']); + + if (!searchTerms) { + return API.v1.failure('Invalid search query.'); + } + + const user = await getFullUserDataByIdOrUsernameOrImportId(this.userId, ...searchTerms); if (!user) { return API.v1.failure('User not found.'); diff --git a/apps/meteor/app/importer-csv/server/importer.js b/apps/meteor/app/importer-csv/server/importer.js index c10f134d80d6..f1e46bddcab7 100644 --- a/apps/meteor/app/importer-csv/server/importer.js +++ b/apps/meteor/app/importer-csv/server/importer.js @@ -59,13 +59,15 @@ export class CsvImporter extends Base { // Ignore anything that has `__MACOSX` in it's name, as sadly these things seem to mess everything up if (entry.entryName.indexOf('__MACOSX') > -1) { this.logger.debug(`Ignoring the file: ${entry.entryName}`); - return increaseProgressCount(); + increaseProgressCount(); + continue; } // Directories are ignored, since they are "virtual" in a zip file if (entry.isDirectory) { this.logger.debug(`Ignoring the directory entry: ${entry.entryName}`); - return increaseProgressCount(); + increaseProgressCount(); + continue; } // Parse the channels @@ -97,7 +99,8 @@ export class CsvImporter extends Base { } await super.updateRecord({ 'count.channels': channelsCount }); - return increaseProgressCount(); + increaseProgressCount(); + continue; } // Parse the users @@ -114,6 +117,7 @@ export class CsvImporter extends Base { const name = u[2].trim(); await this.converter.addUser({ + type: 'user', importIds: [username], emails: [email], username, @@ -122,7 +126,8 @@ export class CsvImporter extends Base { } await super.updateRecord({ 'count.users': usersCount }); - return increaseProgressCount(); + increaseProgressCount(); + continue; } // Parse the messages @@ -140,7 +145,8 @@ export class CsvImporter extends Base { msgs = this.csvParser(entry.getData().toString()); } catch (e) { this.logger.warn(`The file ${entry.entryName} contains invalid syntax`, e); - return increaseProgressCount(); + increaseProgressCount(); + continue; } let data; @@ -211,7 +217,8 @@ export class CsvImporter extends Base { } await super.updateRecord({ 'count.messages': messagesCount, 'messagesstatus': null }); - return increaseProgressCount(); + increaseProgressCount(); + continue; } increaseProgressCount(); @@ -243,7 +250,8 @@ export class CsvImporter extends Base { if (usersCount === 0 && channelsCount === 0 && messagesCount === 0) { this.logger.error('No users, channels, or messages found in the import file.'); await super.updateProgress(ProgressStep.ERROR); - return super.getProgress(); } + + return super.getProgress(); } } diff --git a/apps/meteor/app/importer-pending-avatars/server/importer.js b/apps/meteor/app/importer-pending-avatars/server/importer.js index 912d253e808c..a7517285762b 100644 --- a/apps/meteor/app/importer-pending-avatars/server/importer.js +++ b/apps/meteor/app/importer-pending-avatars/server/importer.js @@ -39,7 +39,7 @@ export class PendingAvatarImporter extends Base { try { if (!url || !url.startsWith('http')) { - return; + continue; } try { diff --git a/apps/meteor/app/importer-slack-users/server/importer.js b/apps/meteor/app/importer-slack-users/server/importer.js index ad0276752a2a..3791afb7fa57 100644 --- a/apps/meteor/app/importer-slack-users/server/importer.js +++ b/apps/meteor/app/importer-slack-users/server/importer.js @@ -1,3 +1,5 @@ +import fs from 'fs'; + import { Settings } from '@rocket.chat/models'; import { Base, ProgressStep } from '../../importer/server'; @@ -13,14 +15,19 @@ export class SlackUsersImporter extends Base { } async prepareUsingLocalFile(fullFilePath) { + this.logger.debug('start preparing import operation'); await this.converter.clearImportData(); - return super.prepareUsingLocalFile(fullFilePath); - } + const file = fs.readFileSync(fullFilePath); + const buffer = Buffer.isBuffer(file) ? file : Buffer.from(file); - async prepare(dataURI, sentContentType, fileName) { - this.logger.debug('start preparing import operation'); - await super.prepare(dataURI, sentContentType, fileName, true); + const { contentType } = this.importRecord; + const fileName = this.importRecord.file; + + const data = buffer.toString('base64'); + const dataURI = `data:${contentType};base64,${data}`; + + await this.updateRecord({ file: fileName }); await super.updateProgress(ProgressStep.PREPARING_USERS); const uriResult = RocketChatFile.dataURIParse(dataURI); @@ -76,6 +83,7 @@ export class SlackUsersImporter extends Base { await super.updateProgress(ProgressStep.USER_SELECTION); await super.addCountToTotal(userCount); await Settings.incrementValueById('Slack_Users_Importer_Count', userCount); - return super.updateRecord({ 'count.users': userCount }); + await super.updateRecord({ 'count.users': userCount }); + return super.getProgress(); } } diff --git a/apps/meteor/app/importer-slack/server/importer.js b/apps/meteor/app/importer-slack/server/importer.js index b91b1832d9e3..ddb430cbd5fc 100644 --- a/apps/meteor/app/importer-slack/server/importer.js +++ b/apps/meteor/app/importer-slack/server/importer.js @@ -295,6 +295,8 @@ export class SlackImporter extends Base { ImporterWebsocket.progressUpdated({ rate: 100 }); await this.updateRecord({ 'count.messages': messagesCount, 'messagesstatus': null }); + + return this.progress; } parseMentions(newMessage) { @@ -400,6 +402,8 @@ export class SlackImporter extends Base { } break; } + + return false; } makeSlackMessageId(channelId, ts, fileIndex = undefined) { diff --git a/apps/meteor/app/importer/server/classes/ImportDataConverter.ts b/apps/meteor/app/importer/server/classes/ImportDataConverter.ts index 0204e88c2484..a6da082fb92b 100644 --- a/apps/meteor/app/importer/server/classes/ImportDataConverter.ts +++ b/apps/meteor/app/importer/server/classes/ImportDataConverter.ts @@ -80,6 +80,8 @@ export class ImportDataConverter { return this._options; } + public aborted = false; + constructor(options?: IConverterOptions) { this._options = options || { flagEmailsAsVerified: false, @@ -285,7 +287,8 @@ export class ImportDataConverter { // TODO async insertUser(userData: IImportUser): Promise { - const password = `${Date.now()}${userData.name || ''}${userData.emails.length ? userData.emails[0].toUpperCase() : ''}`; + const password = + userData.password || `${Date.now()}${userData.name || ''}${userData.emails.length ? userData.emails[0].toUpperCase() : ''}`; const userId = userData.emails.length ? await Accounts.createUserAsync({ email: userData.emails[0], @@ -329,6 +332,10 @@ export class ImportDataConverter { public async convertUsers({ beforeImportFn, afterImportFn }: IConversionCallbacks = {}): Promise { const users = (await this.getUsersToImport()) as IImportUserRecord[]; for await (const { data, _id } of users) { + if (this.aborted) { + return; + } + try { if (beforeImportFn && !(await beforeImportFn(data, 'user'))) { await this.skipRecord(_id); @@ -568,6 +575,10 @@ export class ImportDataConverter { const messages = await this.getMessagesToImport(); for await (const { data, _id } of messages) { + if (this.aborted) { + return; + } + try { if (beforeImportFn && !(await beforeImportFn(data, 'message'))) { await this.skipRecord(_id); @@ -937,6 +948,10 @@ export class ImportDataConverter { async convertChannels(startedByUserId: string, { beforeImportFn, afterImportFn }: IConversionCallbacks = {}): Promise { const channels = await this.getChannelsToImport(); for await (const { data, _id } of channels) { + if (this.aborted) { + return; + } + try { if (beforeImportFn && !(await beforeImportFn(data, 'channel'))) { await this.skipRecord(_id); diff --git a/apps/meteor/app/importer/server/classes/ImporterBase.js b/apps/meteor/app/importer/server/classes/ImporterBase.js index 51fdb0728c4d..7191e55f67b5 100644 --- a/apps/meteor/app/importer/server/classes/ImporterBase.js +++ b/apps/meteor/app/importer/server/classes/ImporterBase.js @@ -1,15 +1,13 @@ import http from 'http'; -import fs from 'fs'; import https from 'https'; -import { Settings, ImportData, Imports, RawImports } from '@rocket.chat/models'; +import { Settings, ImportData, Imports } from '@rocket.chat/models'; import { Meteor } from 'meteor/meteor'; import AdmZip from 'adm-zip'; -import getFileType from 'file-type'; import { Progress } from './ImporterProgress'; import { ImporterWebsocket } from './ImporterWebsocket'; -import { ProgressStep } from '../../lib/ImporterProgressStep'; +import { ProgressStep, ImportPreparingStartedStates } from '../../lib/ImporterProgressStep'; import { ImporterInfo } from '../../lib/ImporterInfo'; import { Logger } from '../../../logger/server'; import { ImportDataConverter } from './ImportDataConverter'; @@ -20,25 +18,14 @@ import { Selection, SelectionChannel, SelectionUser } from '..'; * Base class for all of the importers. */ export class Base { - /** - * Constructs a new importer, adding an empty collection, AdmZip property, and empty users & channels - * - * @param {string} name The importer's name. - * @param {string} description The i18n string which describes the importer - * @param {string} mimeType The expected file type. - */ constructor(info, importRecord) { if (!(info instanceof ImporterInfo)) { throw new Error('Information passed in must be a valid ImporterInfo instance.'); } - this.http = http; - this.https = https; this.AdmZip = AdmZip; - this.getFileType = getFileType; this.converter = new ImportDataConverter(); - this.prepare = this.prepare.bind(this); this.startImport = this.startImport.bind(this); this.getProgress = this.getProgress.bind(this); this.updateProgress = this.updateProgress.bind(this); @@ -51,37 +38,15 @@ export class Base { this.logger = new Logger(`${this.info.name} Importer`); this.converter.setLogger(this.logger); + this.importRecord = importRecord; this.progress = new Progress(this.info.key, this.info.name); - this.collection = RawImports; - this.importRecordParam = importRecord; this.users = {}; this.channels = {}; this.messages = {}; this.oldSettings = {}; - } - async build() { - const userId = Meteor.userId(); - if (this.importRecordParam) { - this.logger.debug('Found existing import operation'); - this.importRecord = this.importRecordParam; - this.progress.step = this.importRecord.status; - this.reloadCount(); - } else { - this.logger.debug('Starting new import operation'); - const importId = ( - await Imports.insertOne({ - type: this.info.name, - importerKey: this.info.key, - ts: Date.now(), - status: this.progress.step, - valid: true, - user: userId, - }) - ).insertedId; - this.importRecord = await Imports.findOne(importId); - } - this.logger.debug(`Constructed a new ${this.info.name} Importer.`); + this.progress.step = this.importRecord.status; + this.reloadCount(); } /** @@ -102,46 +67,8 @@ export class Base { * @param {string} fullFilePath the full path of the uploaded file * @returns {Progress} The progress record of the import. */ - async prepareUsingLocalFile(fullFilePath) { - const file = fs.readFileSync(fullFilePath); - const buffer = Buffer.isBuffer(file) ? file : Buffer.from(file); - - const { contentType } = this.importRecord; - const fileName = this.importRecord.file; - - const data = buffer.toString('base64'); - const dataURI = `data:${contentType};base64,${data}`; - - return this.prepare(dataURI, contentType, fileName, true); - } - - /** - * Takes the uploaded file and extracts the users, channels, and messages from it. - * - * @param {string} dataURI Base64 string of the uploaded file - * @param {string} sentContentType The sent file type. - * @param {string} fileName The name of the uploaded file. - * @param {boolean} skipTypeCheck Optional property that says to not check the type provided. - * @returns {Progress} The progress record of the import. - */ - async prepare(dataURI, sentContentType, fileName, skipTypeCheck) { - await this.collection.deleteMany({}); - if (!skipTypeCheck) { - const fileType = this.getFileType(Buffer.from(dataURI.split(',')[1], 'base64')); - this.logger.debug('Uploaded file information is:', fileType); - this.logger.debug('Expected file type is:', this.info.mimeType); - - if (!fileType || fileType.mime !== this.info.mimeType) { - this.logger.warn(`Invalid file uploaded for the ${this.info.name} importer.`); - await this.updateProgress(ProgressStep.ERROR); - throw new Meteor.Error('error-invalid-file-uploaded', `Invalid file uploaded to import ${this.info.name} data from.`, { - step: 'prepare', - }); - } - } - - await this.updateProgress(ProgressStep.PREPARING_STARTED); - return this.updateRecord({ file: fileName }); + async prepareUsingLocalFile() { + return this.updateProgress(ProgressStep.PREPARING_STARTED); } /** @@ -153,7 +80,7 @@ export class Base { * @param {Selection} importSelection The selection data. * @returns {Progress} The progress record of the import. */ - async startImport(importSelection) { + async startImport(importSelection, startedByUserId) { if (!(importSelection instanceof Selection)) { throw new Error(`Invalid Selection data provided to the ${this.info.name} importer.`); } else if (importSelection.users === undefined) { @@ -164,12 +91,20 @@ export class Base { ); } + if (!startedByUserId) { + throw new Error('You must be logged in to do this.'); + } + await this.updateProgress(ProgressStep.IMPORTING_STARTED); this.reloadCount(); const started = Date.now(); - const startedByUserId = Meteor.userId(); const beforeImportFn = async (data, type) => { + if (this.importRecord.valid === false) { + this.converter.aborted = true; + throw new Error('The import operation is no longer valid.'); + } + switch (type) { case 'channel': { const id = data.t === 'd' ? '__directMessages__' : data.importIds[0]; @@ -182,6 +117,11 @@ export class Base { return false; } case 'user': { + // #TODO: Replace this workaround in the final version of the API importer + if (importSelection.users.length === 0 && this.info.key === 'api') { + return true; + } + const id = data.importIds[0]; for (const user of importSelection.users) { if (user.user_id === id) { @@ -197,7 +137,12 @@ export class Base { }; const afterImportFn = async () => { - return this.addCountCompleted(1); + await this.addCountCompleted(1); + + if (this.importRecord.valid === false) { + this.converter.aborted = true; + throw new Error('The import operation is no longer valid.'); + } }; process.nextTick(async () => { @@ -254,7 +199,7 @@ export class Base { async applySettingValues(settingValues) { await Settings.updateValueById('Accounts_AllowedDomainsList', settingValues.allowedDomainList ?? ''); - await Settings.updateValueById('Accounts_AllowUsernameChange', setTimeout.allowUsernameChange ?? true); + await Settings.updateValueById('Accounts_AllowUsernameChange', settingValues.allowUsernameChange ?? true); await Settings.updateValueById('FileUpload_MaxFileSize', settingValues.maxFileSize ?? -1); await Settings.updateValueById('FileUpload_MediaTypeWhiteList', settingValues.mediaTypeWhiteList ?? '*'); await Settings.updateValueById('FileUpload_MediaTypeBlackList', settingValues.mediaTypeBlackList ?? ''); @@ -283,7 +228,10 @@ export class Base { this.logger.debug(`${this.info.name} is now at ${step}.`); await this.updateRecord({ status: this.progress.step }); - this.reportProgress(); + // Do not send the default progress report during the preparing stage - the classes are sending their own report in a different format. + if (!ImportPreparingStartedStates.includes(this.progress.step)) { + this.reportProgress(); + } return this.progress; } @@ -320,9 +268,11 @@ export class Base { async addCountCompleted(count) { this.progress.count.completed += count; - // Only update the database every 500 records + const range = [ProgressStep.IMPORTING_USERS, ProgressStep.IMPORTING_CHANNELS].includes(this.progress.step) ? 50 : 500; + + // Only update the database every 500 messages (or 50 for users/channels) // Or the completed is greater than or equal to the total amount - if (this.progress.count.completed % 500 === 0 || this.progress.count.completed >= this.progress.count.total) { + if (this.progress.count.completed % range === 0 || this.progress.count.completed >= this.progress.count.total) { await this.updateRecord({ 'count.completed': this.progress.count.completed }); this.reportProgress(); } else if (!this._reportProgressHandler) { @@ -342,51 +292,11 @@ export class Base { reportProgress() { if (this._reportProgressHandler) { clearTimeout(this._reportProgressHandler); - this._reportProgressHandler = false; + this._reportProgressHandler = undefined; } ImporterWebsocket.progressUpdated(this.progress); } - /** - * Registers error information on a specific user from the import record - * - * @param {int} the user id - * @param {object} an exception object - */ - async addUserError(userId, error) { - await Imports.updateOne( - { - '_id': this.importRecord._id, - 'fileData.users.user_id': userId, - }, - { - $set: { - 'fileData.users.$.error': error, - 'hasErrors': true, - }, - }, - ); - } - - async addMessageError(error, msg) { - await Imports.updateOne( - { - _id: this.importRecord._id, - }, - { - $push: { - errors: { - error, - msg, - }, - }, - $set: { - hasErrors: true, - }, - }, - ); - } - /** * Updates the import record with the given fields being `set`. * @@ -429,7 +339,7 @@ export class Base { selectionChannels.push(new SelectionChannel('__directMessages__', t('Direct_Messages'), false, true, true, undefined, true)); } - const results = new Selection(this.name, selectionUsers, selectionChannels, selectionMessages); + const results = new Selection(this.info.name, selectionUsers, selectionChannels, selectionMessages); return results; } diff --git a/apps/meteor/app/importer/server/definitions/IConversionCallbacks.ts b/apps/meteor/app/importer/server/definitions/IConversionCallbacks.ts index a1abfffe4163..ee2d31feb789 100644 --- a/apps/meteor/app/importer/server/definitions/IConversionCallbacks.ts +++ b/apps/meteor/app/importer/server/definitions/IConversionCallbacks.ts @@ -1,11 +1,11 @@ -import type { IImportUser, IImportMessage, IImportChannel } from '@rocket.chat/core-typings'; +import type { IImportUser, IImportMessage, IImportChannel, IImportRecordType } from '@rocket.chat/core-typings'; type ImporterBeforeImportCallback = { - (data: IImportUser | IImportChannel | IImportMessage, type: string): Promise; + (data: IImportUser | IImportChannel | IImportMessage, type: IImportRecordType): Promise; }; export type ImporterAfterImportCallback = { - (data: IImportUser | IImportChannel | IImportMessage, type: string, isNewRecord: boolean): Promise; + (data: IImportUser | IImportChannel | IImportMessage, type: IImportRecordType, isNewRecord: boolean): Promise; }; export interface IConversionCallbacks { diff --git a/apps/meteor/app/importer/server/index.ts b/apps/meteor/app/importer/server/index.ts index 1b8514c53f58..319344a343c5 100644 --- a/apps/meteor/app/importer/server/index.ts +++ b/apps/meteor/app/importer/server/index.ts @@ -5,8 +5,12 @@ import { SelectionChannel } from './classes/ImporterSelectionChannel'; import { SelectionUser } from './classes/ImporterSelectionUser'; import { ProgressStep } from '../lib/ImporterProgressStep'; import { Importers } from '../lib/Importers'; +import { ImporterInfo } from '../lib/ImporterInfo'; import './methods'; import './startup/setImportsToInvalid'; import './startup/store'; +// Adding a link to the base class using the 'api' key. This won't be needed in the new importer structure implemented on the parallel PR +Importers.add(new ImporterInfo('api', 'API', ''), Base); + export { Base, Importers, ImporterWebsocket, ProgressStep, Selection, SelectionChannel, SelectionUser }; diff --git a/apps/meteor/app/importer/server/methods/downloadPublicImportFile.ts b/apps/meteor/app/importer/server/methods/downloadPublicImportFile.ts index 8359ecdcea00..d075e3ea5ca6 100644 --- a/apps/meteor/app/importer/server/methods/downloadPublicImportFile.ts +++ b/apps/meteor/app/importer/server/methods/downloadPublicImportFile.ts @@ -3,13 +3,14 @@ import https from 'https'; import fs from 'fs'; import { Meteor } from 'meteor/meteor'; +import { Import } from '@rocket.chat/core-services'; import type { IUser } from '@rocket.chat/core-typings'; import type { ServerMethods } from '@rocket.chat/ui-contexts'; import { RocketChatImportFileInstance } from '../startup/store'; import { ProgressStep } from '../../lib/ImporterProgressStep'; import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission'; -import { Importers } from '..'; +import { Importers } from '../../lib/Importers'; function downloadHttpFile(fileUrl: string, writeStream: fs.WriteStream): void { const protocol = fileUrl.startsWith('https') ? https : http; @@ -40,8 +41,8 @@ export const executeDownloadPublicImportFile = async (userId: IUser['_id'], file } } - importer.instance = new importer.importer(importer); // eslint-disable-line new-cap - await importer.instance.build(); + const operation = await Import.newOperation(userId, importer.name, importer.key); + importer.instance = new importer.importer(importer, operation); // eslint-disable-line new-cap const oldFileName = fileUrl.substring(fileUrl.lastIndexOf('/') + 1).split('?')[0]; const date = new Date(); diff --git a/apps/meteor/app/importer/server/methods/getImportFileData.ts b/apps/meteor/app/importer/server/methods/getImportFileData.ts index 690546642d8f..caad5ff78750 100644 --- a/apps/meteor/app/importer/server/methods/getImportFileData.ts +++ b/apps/meteor/app/importer/server/methods/getImportFileData.ts @@ -25,7 +25,6 @@ export const executeGetImportFileData = async (): Promise => { } importer.instance = new importer.importer(importer, operation); // eslint-disable-line new-cap - await importer.instance.build(); return importer.instance.getProgress(); }; diff --git a/apps/meteor/app/importer/server/methods/startImport.ts b/apps/meteor/app/importer/server/methods/startImport.ts index a7efc10bf80e..347e3d3fa3b4 100644 --- a/apps/meteor/app/importer/server/methods/startImport.ts +++ b/apps/meteor/app/importer/server/methods/startImport.ts @@ -2,11 +2,12 @@ import { Meteor } from 'meteor/meteor'; import type { StartImportParamsPOST } from '@rocket.chat/rest-typings'; import { Imports } from '@rocket.chat/models'; import type { ServerMethods } from '@rocket.chat/ui-contexts'; +import type { IUser } from '@rocket.chat/core-typings'; import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission'; import { Importers, Selection, SelectionChannel, SelectionUser } from '..'; -export const executeStartImport = async ({ input }: StartImportParamsPOST) => { +export const executeStartImport = async ({ input }: StartImportParamsPOST, startedByUserId: IUser['_id']) => { const operation = await Imports.findLastImport(); if (!operation) { throw new Meteor.Error('error-operation-not-found', 'Import Operation Not Found', 'startImport'); @@ -19,7 +20,6 @@ export const executeStartImport = async ({ input }: StartImportParamsPOST) => { } importer.instance = new importer.importer(importer, operation); // eslint-disable-line new-cap - await importer.instance.build(); const usersSelection = input.users.map( (user) => new SelectionUser(user.user_id, user.username, user.email, user.is_deleted, user.is_bot, user.do_import), @@ -37,7 +37,7 @@ export const executeStartImport = async ({ input }: StartImportParamsPOST) => { ), ); const selection = new Selection(importer.name, usersSelection, channelsSelection, 0); - return importer.instance.startImport(selection); + return importer.instance.startImport(selection, startedByUserId); }; declare module '@rocket.chat/ui-contexts' { @@ -59,6 +59,6 @@ Meteor.methods({ throw new Meteor.Error('error-action-not-allowed', 'Importing is not allowed', 'startImport'); } - return executeStartImport({ input }); + return executeStartImport({ input }, userId); }, }); diff --git a/apps/meteor/app/importer/server/methods/uploadImportFile.ts b/apps/meteor/app/importer/server/methods/uploadImportFile.ts index 53df3c0fb099..ebe888b55931 100644 --- a/apps/meteor/app/importer/server/methods/uploadImportFile.ts +++ b/apps/meteor/app/importer/server/methods/uploadImportFile.ts @@ -1,4 +1,5 @@ import { Meteor } from 'meteor/meteor'; +import { Import } from '@rocket.chat/core-services'; import type { IUser } from '@rocket.chat/core-typings'; import type { ServerMethods } from '@rocket.chat/ui-contexts'; @@ -20,8 +21,9 @@ export const executeUploadImportFile = async ( throw new Meteor.Error('error-importer-not-defined', `The importer (${importerKey}) has no import class defined.`, 'uploadImportFile'); } - importer.instance = new importer.importer(importer); // eslint-disable-line new-cap - await importer.instance.build(); + const operation = await Import.newOperation(userId, importer.name, importer.key); + + importer.instance = new importer.importer(importer, operation); // eslint-disable-line new-cap const date = new Date(); const dateStr = `${date.getUTCFullYear()}${date.getUTCMonth()}${date.getUTCDate()}${date.getUTCHours()}${date.getUTCMinutes()}${date.getUTCSeconds()}`; diff --git a/apps/meteor/app/importer/server/startup/setImportsToInvalid.js b/apps/meteor/app/importer/server/startup/setImportsToInvalid.js index c59d72a94e44..53435a2e8ef2 100644 --- a/apps/meteor/app/importer/server/startup/setImportsToInvalid.js +++ b/apps/meteor/app/importer/server/startup/setImportsToInvalid.js @@ -1,38 +1,16 @@ import { Meteor } from 'meteor/meteor'; -import { Imports, RawImports } from '@rocket.chat/models'; +import { Imports } from '@rocket.chat/models'; -import { SystemLogger } from '../../../../server/lib/logger/system'; import { ProgressStep } from '../../lib/ImporterProgressStep'; -async function runDrop(fn) { - try { - await fn(); - } catch (e) { - SystemLogger.error('error', e); // TODO: Remove - // ignored - } -} - Meteor.startup(async function () { const lastOperation = await Imports.findLastImport(); - let idToKeep = false; - // If the operation is ready to start, or already started but failed - // And there's still data for it on the temp collection - // Then we can keep the data there to let the user try again - if (lastOperation && [ProgressStep.USER_SELECTION, ProgressStep.ERROR].includes(lastOperation.status)) { - idToKeep = lastOperation._id; + // If the operation is still on "ready to start" state, we don't need to invalidate it. + if (lastOperation && [ProgressStep.USER_SELECTION].includes(lastOperation.status)) { + await Imports.invalidateOperationsExceptId(lastOperation._id); + return; } - if (idToKeep) { - await Imports.invalidateOperationsExceptId(idToKeep); - - // Clean up all the raw import data, except for the last operation - await runDrop(() => RawImports.deleteMany({ import: { $ne: idToKeep } })); - } else { - await Imports.invalidateAllOperations(); - - // Clean up all the raw import data - await runDrop(() => RawImports.deleteMany({})); - } + await Imports.invalidateOperationsExceptId(); }); diff --git a/apps/meteor/app/lib/server/functions/getFullUserData.ts b/apps/meteor/app/lib/server/functions/getFullUserData.ts index 40ee073089a9..dd6696a18ce4 100644 --- a/apps/meteor/app/lib/server/functions/getFullUserData.ts +++ b/apps/meteor/app/lib/server/functions/getFullUserData.ts @@ -33,6 +33,7 @@ const fullFields = { requirePasswordChange: 1, requirePasswordChangeReason: 1, roles: 1, + importIds: 1, } as const; let publicCustomFields: Record = {}; @@ -69,18 +70,26 @@ const getFields = (canViewAllInfo: boolean): Record => ({ ...getCustomFields(canViewAllInfo), }); -export async function getFullUserDataByIdOrUsername( +export async function getFullUserDataByIdOrUsernameOrImportId( userId: string, - { filterId, filterUsername }: { filterId: string; filterUsername?: undefined } | { filterId?: undefined; filterUsername: string }, + searchValue: string, + searchType: 'id' | 'username' | 'importId', ): Promise { - const caller = await Users.findOneById(userId, { projection: { username: 1 } }); + const caller = await Users.findOneById(userId, { projection: { username: 1, importIds: 1 } }); if (!caller) { return null; } - const targetUser = (filterId || filterUsername) as string; - const myself = (filterId && targetUser === userId) || (filterUsername && targetUser === caller.username); + const myself = + (searchType === 'id' && searchValue === userId) || + (searchType === 'username' && searchValue === caller.username) || + (searchType === 'importId' && caller.importIds?.includes(searchValue)); const canViewAllInfo = !!myself || (await hasPermissionAsync(userId, 'view-full-other-user-info')); + // Only search for importId if the user has permission to view the import id + if (searchType === 'importId' && !canViewAllInfo) { + return null; + } + const fields = getFields(canViewAllInfo); const options = { @@ -90,7 +99,9 @@ export async function getFullUserDataByIdOrUsername( }, }; - const user = await Users.findOneByIdOrUsername(targetUser, options); + const user = await (searchType === 'importId' + ? Users.findOneByImportId(searchValue, options) + : Users.findOneByIdOrUsername(searchValue, options)); if (!user) { return null; } diff --git a/apps/meteor/client/views/admin/import/ImportProgressPage.tsx b/apps/meteor/client/views/admin/import/ImportProgressPage.tsx index d0274ab15111..1a21bb5c3cd5 100644 --- a/apps/meteor/client/views/admin/import/ImportProgressPage.tsx +++ b/apps/meteor/client/views/admin/import/ImportProgressPage.tsx @@ -1,3 +1,4 @@ +import type { ProgressStep } from '@rocket.chat/core-typings'; import { Box, Margins, Throbber } from '@rocket.chat/fuselage'; import { useMutableCallback } from '@rocket.chat/fuselage-hooks'; import { useToastMessageDispatch, useEndpoint, useTranslation, useStream, useRouter } from '@rocket.chat/ui-contexts'; @@ -7,7 +8,6 @@ import React, { useEffect } from 'react'; import { ImportingStartedStates } from '../../../../app/importer/lib/ImporterProgressStep'; import { numberFormat } from '../../../../lib/utils/stringUtils'; import Page from '../../../components/Page'; -import type { ProgressStep } from './ImportTypes'; import { useErrorHandler } from './useErrorHandler'; const ImportProgressPage = function ImportProgressPage() { diff --git a/apps/meteor/client/views/admin/import/ImportTypes.ts b/apps/meteor/client/views/admin/import/ImportTypes.ts deleted file mode 100644 index 5e522d78fb0d..000000000000 --- a/apps/meteor/client/views/admin/import/ImportTypes.ts +++ /dev/null @@ -1,19 +0,0 @@ -export type ProgressStep = - | 'importer_new' - | 'importer_uploading' - | 'importer_downloading_file' - | 'importer_file_loaded' - | 'importer_preparing_started' - | 'importer_preparing_users' - | 'importer_preparing_channels' - | 'importer_preparing_messages' - | 'importer_user_selection' - | 'importer_importing_started' - | 'importer_importing_users' - | 'importer_importing_channels' - | 'importer_importing_messages' - | 'importer_importing_files' - | 'importer_finishing' - | 'importer_done' - | 'importer_import_failed' - | 'importer_import_cancelled'; diff --git a/apps/meteor/server/models/RawImports.ts b/apps/meteor/server/models/RawImports.ts deleted file mode 100644 index b12159c0d708..000000000000 --- a/apps/meteor/server/models/RawImports.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { registerModel } from '@rocket.chat/models'; - -import { db } from '../database/utils'; -import { RawImports } from './raw/RawImports'; - -registerModel('IRawImportsModel', new RawImports(db)); diff --git a/apps/meteor/server/models/raw/ImportData.ts b/apps/meteor/server/models/raw/ImportData.ts index 778654d7ea2c..e38670662a3f 100644 --- a/apps/meteor/server/models/raw/ImportData.ts +++ b/apps/meteor/server/models/raw/ImportData.ts @@ -6,7 +6,7 @@ import type { RocketChatRecordDeleted, } from '@rocket.chat/core-typings'; import type { IImportDataModel } from '@rocket.chat/model-typings'; -import type { Collection, FindCursor, Db, Filter } from 'mongodb'; +import type { Collection, FindCursor, Db, Filter, IndexDescription } from 'mongodb'; import { BaseRaw } from './BaseRaw'; @@ -15,6 +15,10 @@ export class ImportDataRaw extends BaseRaw implements IImportData super(db, 'import_data', trash); } + protected modelIndexes(): IndexDescription[] { + return [{ key: { dataType: 1 } }]; + } + getAllUsers(): FindCursor { return this.find({ dataType: 'user' }) as FindCursor; } diff --git a/apps/meteor/server/models/raw/Imports.ts b/apps/meteor/server/models/raw/Imports.ts index 87b382de2669..05d6b3882312 100644 --- a/apps/meteor/server/models/raw/Imports.ts +++ b/apps/meteor/server/models/raw/Imports.ts @@ -1,14 +1,20 @@ -import type { Db, Document, FindCursor, FindOptions, UpdateResult } from 'mongodb'; +import type { Db, Document, FindCursor, FindOptions, UpdateResult, IndexDescription } from 'mongodb'; import type { IImportsModel } from '@rocket.chat/model-typings'; +import type { IImport } from '@rocket.chat/core-typings'; import { BaseRaw } from './BaseRaw'; +import { ensureArray } from '../../../lib/utils/arrayUtils'; -export class ImportsModel extends BaseRaw implements IImportsModel { +export class ImportsModel extends BaseRaw implements IImportsModel { constructor(db: Db) { super(db, 'import'); } - async findLastImport(): Promise { + protected modelIndexes(): IndexDescription[] { + return [{ key: { ts: -1 } }, { key: { valid: 1 } }]; + } + + async findLastImport(): Promise { const imports = await this.find({}, { sort: { ts: -1 }, limit: 1 }).toArray(); if (imports?.length) { @@ -18,6 +24,18 @@ export class ImportsModel extends BaseRaw implements IImportsModel { return undefined; } + async hasValidOperationInStatus(allowedStatus: IImport['status'][]): Promise { + return Boolean( + await this.findOne( + { + valid: { $ne: false }, + status: { $in: allowedStatus }, + }, + { projection: { _id: 1 } }, + ), + ); + } + invalidateAllOperations(): Promise { return this.updateMany({ valid: { $ne: false } }, { $set: { valid: false } }); } @@ -26,13 +44,34 @@ export class ImportsModel extends BaseRaw implements IImportsModel { return this.updateMany({ valid: { $ne: false }, _id: { $ne: id } }, { $set: { valid: false } }); } - invalidateOperationsNotInStatus(status: string | string[]): Promise { - const statusList = ([] as string[]).concat(status); - - return this.updateMany({ valid: { $ne: false }, status: { $nin: statusList } }, { $set: { valid: false } }); + invalidateOperationsNotInStatus(status: IImport['status'] | IImport['status'][]): Promise { + return this.updateMany({ valid: { $ne: false }, status: { $nin: ensureArray(status) } }, { $set: { valid: false } }); } - findAllPendingOperations(options: FindOptions = {}): FindCursor { + findAllPendingOperations(options: FindOptions = {}): FindCursor { return this.find({ valid: true }, options); } + + async increaseTotalCount(id: string, recordType: 'users' | 'channels' | 'messages', increaseBy = 1): Promise { + return this.updateOne( + { _id: id }, + { + $inc: { + 'count.total': increaseBy, + [`count.${recordType}`]: increaseBy, + }, + }, + ); + } + + async setOperationStatus(id: string, status: IImport['status']): Promise { + return this.updateOne( + { _id: id }, + { + $set: { + status, + }, + }, + ); + } } diff --git a/apps/meteor/server/models/raw/Messages.ts b/apps/meteor/server/models/raw/Messages.ts index f1ee7590df51..b62e5e981a93 100644 --- a/apps/meteor/server/models/raw/Messages.ts +++ b/apps/meteor/server/models/raw/Messages.ts @@ -6,6 +6,7 @@ import type { MessageTypesValues, RocketChatRecordDeleted, MessageAttachment, + IMessageWithPendingFileImport, } from '@rocket.chat/core-typings'; import type { FindPaginated, IMessagesModel } from '@rocket.chat/model-typings'; import type { PaginatedRequest } from '@rocket.chat/rest-typings'; @@ -1602,7 +1603,7 @@ export class MessagesRaw extends BaseRaw implements IMessagesModel { return this.findOne(query, { sort: { ts: 1 } }); } - findAllImportedMessagesWithFilesToDownload(): FindCursor { + findAllImportedMessagesWithFilesToDownload(): FindCursor { const query = { '_importFile.downloadUrl': { $exists: true, @@ -1618,7 +1619,7 @@ export class MessagesRaw extends BaseRaw implements IMessagesModel { }, }; - return this.find(query); + return this.find(query); } countAllImportedMessagesWithFilesToDownload(): Promise { diff --git a/apps/meteor/server/models/raw/RawImports.ts b/apps/meteor/server/models/raw/RawImports.ts deleted file mode 100644 index bb45741c3909..000000000000 --- a/apps/meteor/server/models/raw/RawImports.ts +++ /dev/null @@ -1,10 +0,0 @@ -import type { Db } from 'mongodb'; -import type { IRawImportsModel } from '@rocket.chat/model-typings'; - -import { BaseRaw } from './BaseRaw'; - -export class RawImports extends BaseRaw implements IRawImportsModel { - constructor(db: Db) { - super(db, 'raw_imports'); - } -} diff --git a/apps/meteor/server/models/startup.ts b/apps/meteor/server/models/startup.ts index 8e83683869ac..14b26e0f188f 100644 --- a/apps/meteor/server/models/startup.ts +++ b/apps/meteor/server/models/startup.ts @@ -65,7 +65,6 @@ import './VoipRoom'; import './WebdavAccounts'; import './FederationRoomEvents'; import './Imports'; -import './RawImports'; import './AppsTokens'; import './CronHistory'; import './Migrations'; diff --git a/apps/meteor/server/modules/notifications/notifications.module.ts b/apps/meteor/server/modules/notifications/notifications.module.ts index 0b82792ee819..801692e17ed8 100644 --- a/apps/meteor/server/modules/notifications/notifications.module.ts +++ b/apps/meteor/server/modules/notifications/notifications.module.ts @@ -6,6 +6,7 @@ import type { StreamerCallbackArgs, StreamKeys, StreamNames } from '@rocket.chat import { emit, StreamPresence } from '../../../app/notifications/server/lib/Presence'; import { SystemLogger } from '../../lib/logger/system'; +import type { Progress } from '../../../app/importer/server/classes/ImporterProgress'; export class NotificationsModule { public readonly streamLogged: IStreamer<'notify-logged'>; @@ -535,29 +536,7 @@ export class NotificationsModule { return this.streamPresence.emitWithoutBroadcast(uid, args); } - progressUpdated(progress: { - rate: number; - count?: { completed: number; total: number }; - step?: - | 'importer_new' - | 'importer_uploading' - | 'importer_downloading_file' - | 'importer_file_loaded' - | 'importer_preparing_started' - | 'importer_preparing_users' - | 'importer_preparing_channels' - | 'importer_preparing_messages' - | 'importer_user_selection' - | 'importer_importing_started' - | 'importer_importing_users' - | 'importer_importing_channels' - | 'importer_importing_messages' - | 'importer_importing_files' - | 'importer_finishing' - | 'importer_done' - | 'importer_import_failed' - | 'importer_import_cancelled'; - }): void { + progressUpdated(progress: { rate: number } | Progress): void { this.streamImporters.emit('progress', progress); } } diff --git a/apps/meteor/server/services/import/service.ts b/apps/meteor/server/services/import/service.ts new file mode 100644 index 000000000000..771cb63c8d44 --- /dev/null +++ b/apps/meteor/server/services/import/service.ts @@ -0,0 +1,149 @@ +import { ServiceClassInternal } from '@rocket.chat/core-services'; +import { Imports, ImportData } from '@rocket.chat/models'; +import type { IImportUser, IImport, ImportStatus } from '@rocket.chat/core-typings'; +import type { IImportService } from '@rocket.chat/core-services'; +import { ObjectId } from 'mongodb'; + +import { Importers } from '../../../app/importer/lib/Importers'; +import { Selection } from '../../../app/importer/server/classes/ImporterSelection'; + +export class ImportService extends ServiceClassInternal implements IImportService { + protected name = 'import'; + + public async clear(): Promise { + await Imports.invalidateAllOperations(); + await ImportData.col.deleteMany({}); + } + + public async newOperation(userId: string, name: string, key: string): Promise { + // Make sure there's no other operation running + await this.clear(); + + const importId = ( + await Imports.insertOne({ + type: name, + importerKey: key, + ts: new Date(), + status: 'importer_new', + valid: true, + user: userId, + }) + ).insertedId; + + const operation = await Imports.findOneById(importId); + if (!operation) { + throw new Error('failed to create import operation'); + } + + return operation; + } + + private getStateOfOperation(operation: IImport): 'none' | 'new' | 'loading' | 'ready' | 'importing' | 'done' | 'error' | 'canceled' { + if (!operation.valid && operation.status !== 'importer_done') { + return 'error'; + } + + switch (operation.status) { + case 'importer_new': + return 'new'; + case 'importer_uploading': + case 'importer_downloading_file': + case 'importer_file_loaded': + case 'importer_preparing_started': + case 'importer_preparing_users': + case 'importer_preparing_channels': + case 'importer_preparing_messages': + return 'loading'; + case 'importer_user_selection': + return 'ready'; + case 'importer_importing_started': + case 'importer_importing_users': + case 'importer_importing_channels': + case 'importer_importing_messages': + case 'importer_importing_files': + case 'importer_finishing': + return 'importing'; + case 'importer_done': + return 'done'; + case 'importer_import_failed': + return 'error'; + case 'importer_import_cancelled': + return 'canceled'; + } + } + + public async status(): Promise { + const operation = await Imports.findLastImport(); + + if (!operation) { + return { + state: 'none', + }; + } + + const state = this.getStateOfOperation(operation); + + return { + state, + operation, + }; + } + + private assertsValidStateForNewData(operation: IImport | null): asserts operation is IImport { + if (!operation?.valid) { + throw new Error('Import operation not initialized.'); + } + const state = this.getStateOfOperation(operation); + switch (state) { + case 'loading': + case 'importing': + throw new Error('The current import operation can not receive new data.'); + case 'done': + case 'error': + case 'canceled': + throw new Error('The current import operation is already finished.'); + } + } + + public async addUsers(users: Omit[]): Promise { + if (!users.length) { + return; + } + + const operation = await Imports.findLastImport(); + + this.assertsValidStateForNewData(operation); + + await ImportData.col.insertMany( + users.map((data) => ({ + _id: new ObjectId().toHexString(), + data, + dataType: 'user', + })), + ); + + await Imports.increaseTotalCount(operation._id, 'users', users.length); + await Imports.setOperationStatus(operation._id, 'importer_user_selection'); + } + + public async run(userId: string): Promise { + const operation = await Imports.findLastImport(); + if (!operation?.valid) { + throw new Error('error-operation-not-found'); + } + + if (operation.status !== 'importer_user_selection') { + throw new Error('error-invalid-operation-status'); + } + + const { importerKey } = operation; + const importer = Importers.get(importerKey); + if (!importer) { + throw new Error('error-importer-not-defined'); + } + + const instance = new importer.importer(importer, operation); // eslint-disable-line new-cap + const selection = new Selection(importer.name, [], [], 0); + await instance.startImport(selection, userId); + } +} diff --git a/apps/meteor/server/services/startup.ts b/apps/meteor/server/services/startup.ts index 2afc502846d0..035073537a80 100644 --- a/apps/meteor/server/services/startup.ts +++ b/apps/meteor/server/services/startup.ts @@ -27,6 +27,7 @@ import { MessageService } from './messages/service'; import { TranslationService } from './translation/service'; import { SettingsService } from './settings/service'; import { OmnichannelIntegrationService } from './omnichannel-integrations/service'; +import { ImportService } from './import/service'; import { Logger } from '../lib/logger/Logger'; const { db } = MongoInternals.defaultRemoteCollectionDriver().mongo; @@ -55,6 +56,7 @@ api.registerService(new MessageService()); api.registerService(new TranslationService()); api.registerService(new SettingsService()); api.registerService(new OmnichannelIntegrationService()); +api.registerService(new ImportService()); // if the process is running in micro services mode we don't need to register services that will run separately if (!isRunningMs()) { diff --git a/ee/packages/ddp-client/src/types/streams.ts b/ee/packages/ddp-client/src/types/streams.ts index 5892371afb1d..0e3cdb76d7ad 100644 --- a/ee/packages/ddp-client/src/types/streams.ts +++ b/ee/packages/ddp-client/src/types/streams.ts @@ -20,6 +20,7 @@ import type { ICalendarNotification, IUserStatus, ILivechatInquiryRecord, + IImportProgress, } from '@rocket.chat/core-typings'; type ClientAction = 'inserted' | 'updated' | 'removed' | 'changed'; @@ -161,33 +162,7 @@ export interface StreamerEvents { 'importers': [ { key: 'progress'; - args: [ - { - step?: - | 'importer_new' - | 'importer_uploading' - | 'importer_downloading_file' - | 'importer_file_loaded' - | 'importer_preparing_started' - | 'importer_preparing_users' - | 'importer_preparing_channels' - | 'importer_preparing_messages' - | 'importer_user_selection' - | 'importer_importing_started' - | 'importer_importing_users' - | 'importer_importing_channels' - | 'importer_importing_messages' - | 'importer_importing_files' - | 'importer_finishing' - | 'importer_done' - | 'importer_import_failed' - | 'importer_import_cancelled'; - rate: number; - key?: string; - name?: string; - count?: { completed: number; total: number }; - }, - ]; + args: [{ rate: number } | IImportProgress]; }, ]; diff --git a/packages/core-services/src/index.ts b/packages/core-services/src/index.ts index 3352c0d3d373..681bdeb1b4f7 100644 --- a/packages/core-services/src/index.ts +++ b/packages/core-services/src/index.ts @@ -44,6 +44,7 @@ import type { IMessageService } from './types/IMessageService'; import type { ISettingsService } from './types/ISettingsService'; import type { IOmnichannelEEService } from './types/IOmnichannelEEService'; import type { IOmnichannelIntegrationService } from './types/IOmnichannelIntegrationService'; +import type { IImportService } from './types/IImportService'; export { asyncLocalStorage } from './lib/asyncLocalStorage'; export { MeteorError, isMeteorError } from './MeteorError'; @@ -118,6 +119,7 @@ export { ISettingsService, IOmnichannelEEService, IOmnichannelIntegrationService, + IImportService, }; // TODO think in a way to not have to pass the service name to proxify here as well @@ -152,6 +154,7 @@ export const OmnichannelIntegration = proxifyWithWait('federation'); export const FederationEE = proxifyWithWait('federation-enterprise'); export const OmnichannelEEService = proxifyWithWait('omnichannel-ee'); +export const Import = proxifyWithWait('import'); // Calls without wait. Means that the service is optional and the result may be an error // of service/method not available diff --git a/packages/core-services/src/types/IImportService.ts b/packages/core-services/src/types/IImportService.ts new file mode 100644 index 000000000000..c016afe3b544 --- /dev/null +++ b/packages/core-services/src/types/IImportService.ts @@ -0,0 +1,9 @@ +import type { IImport, IImportUser, ImportStatus } from '@rocket.chat/core-typings'; + +export interface IImportService { + clear(): Promise; + newOperation(userId: string, name: string, key: string): Promise; + status(): Promise; + addUsers(users: Omit[]): Promise; + run(userId: string): Promise; +} diff --git a/packages/core-typings/src/IMessage/IMessage.ts b/packages/core-typings/src/IMessage/IMessage.ts index 00ebe5a981c2..d9a91d4fcc11 100644 --- a/packages/core-typings/src/IMessage/IMessage.ts +++ b/packages/core-typings/src/IMessage/IMessage.ts @@ -365,3 +365,17 @@ export type IVideoConfMessage = IMessage & { export const isE2EEMessage = (message: IMessage): message is IE2EEMessage => message.t === 'e2e'; export const isOTRMessage = (message: IMessage): message is IOTRMessage => message.t === 'otr' || message.t === 'otr-ack'; export const isVideoConfMessage = (message: IMessage): message is IVideoConfMessage => message.t === 'videoconf'; + +export type IMessageWithPendingFileImport = IMessage & { + _importFile: { + downloadUrl: string; + id: string; + size: number; + name: string; + external: boolean; + source: 'slack' | 'hipchat-enterprise'; + original: Record; + rocketChatUrl?: string; + downloaded?: boolean; + }; +}; diff --git a/packages/core-typings/src/IUser.ts b/packages/core-typings/src/IUser.ts index e932679aab45..a6f4e9b1c4ee 100644 --- a/packages/core-typings/src/IUser.ts +++ b/packages/core-typings/src/IUser.ts @@ -180,6 +180,7 @@ export interface IUser extends IRocketChatRecord { }; }; importIds?: string[]; + _pendingAvatarUrl?: string; } export interface IRegisterUser extends IUser { diff --git a/packages/core-typings/src/import/IImport.ts b/packages/core-typings/src/import/IImport.ts index c9be02d08fc3..09374b23aebf 100644 --- a/packages/core-typings/src/import/IImport.ts +++ b/packages/core-typings/src/import/IImport.ts @@ -1,14 +1,22 @@ import type { IRocketChatRecord } from '../IRocketChatRecord'; import type { IUser } from '../IUser'; +import type { ProgressStep } from './IImportProgress'; export interface IImport extends IRocketChatRecord { type: string; importerKey: string; ts: Date; - status: string; + status: ProgressStep; valid: boolean; user: IUser['_id']; _updatedAt: Date; contentType?: string; file?: string; + count?: { + total?: number; + completed?: number; + users?: number; + messages?: number; + channels?: number; + }; } diff --git a/packages/core-typings/src/import/IImportMessage.ts b/packages/core-typings/src/import/IImportMessage.ts index ae5357bec482..8892b484f3e5 100644 --- a/packages/core-typings/src/import/IImportMessage.ts +++ b/packages/core-typings/src/import/IImportMessage.ts @@ -1,4 +1,4 @@ -export type IImportedId = 'string'; +export type IImportedId = string; export interface IImportMessageReaction { name: string; @@ -16,9 +16,9 @@ export interface IImportPendingFile { } export interface IImportAttachment extends Record { - text: string; - title: string; - fallback: string; + text?: string; + title?: string; + fallback?: string; } export interface IImportMessage { @@ -44,7 +44,7 @@ export interface IImportMessage { editedBy?: IImportedId; mentions?: Array; channels?: Array; - attachments?: IImportAttachment; + attachments?: IImportAttachment[]; bot?: boolean; emoji?: string; diff --git a/packages/core-typings/src/import/IImportUser.ts b/packages/core-typings/src/import/IImportUser.ts index e1f8543805c6..317b6240bfe4 100644 --- a/packages/core-typings/src/import/IImportUser.ts +++ b/packages/core-typings/src/import/IImportUser.ts @@ -16,4 +16,6 @@ export interface IImportUser { services?: Record>; customFields?: Record; + + password?: string; } diff --git a/packages/core-typings/src/import/IImporterSelection.ts b/packages/core-typings/src/import/IImporterSelection.ts new file mode 100644 index 000000000000..1c0279e4041b --- /dev/null +++ b/packages/core-typings/src/import/IImporterSelection.ts @@ -0,0 +1,9 @@ +import type { IImporterSelectionChannel } from './IImporterSelectionChannel'; +import type { IImporterSelectionUser } from './IImporterSelectionUser'; + +export interface IImporterSelection { + name: string; + users: IImporterSelectionUser[]; + channels: IImporterSelectionChannel[]; + message_count: number; +} diff --git a/packages/core-typings/src/import/IImporterSelectionChannel.ts b/packages/core-typings/src/import/IImporterSelectionChannel.ts new file mode 100644 index 000000000000..8f74abf94f7a --- /dev/null +++ b/packages/core-typings/src/import/IImporterSelectionChannel.ts @@ -0,0 +1,9 @@ +export interface IImporterSelectionChannel { + channel_id: string; + name: string | undefined; + is_archived: boolean; + do_import: boolean; + is_private: boolean; + creator: undefined; + is_direct: boolean; +} diff --git a/packages/core-typings/src/import/IImporterSelectionUser.ts b/packages/core-typings/src/import/IImporterSelectionUser.ts new file mode 100644 index 000000000000..ed7f13b3514b --- /dev/null +++ b/packages/core-typings/src/import/IImporterSelectionUser.ts @@ -0,0 +1,9 @@ +export interface IImporterSelectionUser { + user_id: string; + username: string | undefined; + email: string; + is_deleted: boolean; + is_bot: boolean; + do_import: boolean; + is_email_taken: boolean; +} diff --git a/packages/core-typings/src/import/ImportState.ts b/packages/core-typings/src/import/ImportState.ts new file mode 100644 index 000000000000..e1d48f307c2d --- /dev/null +++ b/packages/core-typings/src/import/ImportState.ts @@ -0,0 +1,10 @@ +import type { IImport } from './IImport'; + +export type ImportState = 'none' | 'new' | 'loading' | 'ready' | 'importing' | 'done' | 'error' | 'canceled'; + +export type ImportStatus = + | { state: 'none' } + | { + state: ImportState; + operation: IImport; + }; diff --git a/packages/core-typings/src/import/index.ts b/packages/core-typings/src/import/index.ts index 5799dee9fa4d..2f56f72575d4 100644 --- a/packages/core-typings/src/import/index.ts +++ b/packages/core-typings/src/import/index.ts @@ -5,3 +5,7 @@ export * from './IImportMessage'; export * from './IImportChannel'; export * from './IImportFileData'; export * from './IImportProgress'; +export * from './IImporterSelection'; +export * from './IImporterSelectionUser'; +export * from './IImporterSelectionChannel'; +export * from './ImportState'; diff --git a/packages/model-typings/src/index.ts b/packages/model-typings/src/index.ts index eda84bd5a359..23e77ff1de29 100644 --- a/packages/model-typings/src/index.ts +++ b/packages/model-typings/src/index.ts @@ -73,7 +73,6 @@ export * from './models/IAppLogsModel'; export * from './models/IAppsModel'; export * from './models/IAppsPersistenceModel'; export * from './models/IImportsModel'; -export * from './models/IRawImportsModel'; export * from './models/IFederationRoomEventsModel'; export * from './models/IAppsTokensModel'; export * from './models/IAuditLogModel'; diff --git a/packages/model-typings/src/models/IImportsModel.ts b/packages/model-typings/src/models/IImportsModel.ts index cebbc18bd8ca..7072abd8abed 100644 --- a/packages/model-typings/src/models/IImportsModel.ts +++ b/packages/model-typings/src/models/IImportsModel.ts @@ -1,11 +1,15 @@ import type { UpdateResult, FindOptions, FindCursor, Document } from 'mongodb'; +import type { IImport } from '@rocket.chat/core-typings'; import type { IBaseModel } from './IBaseModel'; -export interface IImportsModel extends IBaseModel { +export interface IImportsModel extends IBaseModel { findLastImport(): Promise; + hasValidOperationInStatus(allowedStatus: IImport['status'][]): Promise; invalidateAllOperations(): Promise; invalidateOperationsExceptId(id: string): Promise; invalidateOperationsNotInStatus(status: string | string[]): Promise; - findAllPendingOperations(options: FindOptions): FindCursor; + findAllPendingOperations(options: FindOptions): FindCursor; + increaseTotalCount(id: string, recordType: 'users' | 'channels' | 'messages', increaseBy?: number): Promise; + setOperationStatus(id: string, status: IImport['status']): Promise; } diff --git a/packages/model-typings/src/models/IMessagesModel.ts b/packages/model-typings/src/models/IMessagesModel.ts index d87a450f7a34..c650dbf1f7d7 100644 --- a/packages/model-typings/src/models/IMessagesModel.ts +++ b/packages/model-typings/src/models/IMessagesModel.ts @@ -1,4 +1,12 @@ -import type { IMessage, IRoom, IUser, ILivechatDepartment, MessageTypesValues, MessageAttachment } from '@rocket.chat/core-typings'; +import type { + IMessage, + IRoom, + IUser, + ILivechatDepartment, + MessageTypesValues, + MessageAttachment, + IMessageWithPendingFileImport, +} from '@rocket.chat/core-typings'; import type { AggregationCursor, CountDocumentsOptions, @@ -239,7 +247,7 @@ export interface IMessagesModel extends IBaseModel { setAsReadById(_id: string): Promise; countThreads(): Promise; addThreadFollowerByThreadId(tmid: string, userId: string): Promise; - findAllImportedMessagesWithFilesToDownload(): FindCursor; + findAllImportedMessagesWithFilesToDownload(): FindCursor; countAllImportedMessagesWithFilesToDownload(): Promise; findAgentLastMessageByVisitorLastMessageTs(roomId: string, visitorLastMessageTs: Date): Promise; removeThreadFollowerByThreadId(tmid: string, userId: string): Promise; diff --git a/packages/model-typings/src/models/IRawImportsModel.ts b/packages/model-typings/src/models/IRawImportsModel.ts deleted file mode 100644 index f99eec25e754..000000000000 --- a/packages/model-typings/src/models/IRawImportsModel.ts +++ /dev/null @@ -1,3 +0,0 @@ -import type { IBaseModel } from './IBaseModel'; - -export type IRawImportsModel = IBaseModel; diff --git a/packages/model-typings/src/models/IUsersModel.ts b/packages/model-typings/src/models/IUsersModel.ts index fecc7386f401..9ee633a1cd80 100644 --- a/packages/model-typings/src/models/IUsersModel.ts +++ b/packages/model-typings/src/models/IUsersModel.ts @@ -326,7 +326,7 @@ export interface IUsersModel extends IBaseModel { getSAMLByIdAndSAMLProvider(userId: string, samlProvider: string): Promise; findBySAMLNameIdOrIdpSession(samlNameId: string, idpSession: string): FindCursor; findBySAMLInResponseTo(inResponseTo: string): FindCursor; - addImportIds(userId: string, importIds: Array<{ service: string; id: string }>): Promise; + addImportIds(userId: string, importIds: string | string[]): Promise; updateInviteToken(userId: string, token: string): Promise; updateLastLoginById(userId: string): Promise; addPasswordToHistory(userId: string, password: string, passwordHistoryAmount: number): Promise; diff --git a/packages/models/src/index.ts b/packages/models/src/index.ts index fd6a95979673..e1cf91f1b0ee 100644 --- a/packages/models/src/index.ts +++ b/packages/models/src/index.ts @@ -72,7 +72,6 @@ import type { IAppsPersistenceModel, IAppLogsModel, IImportsModel, - IRawImportsModel, IFederationRoomEventsModel, IAppsTokensModel, IAuditLogModel, @@ -143,7 +142,6 @@ export const PushToken = proxify('IPushTokenModel'); export const Permissions = proxify('IPermissionsModel'); export const ReadReceipts = proxify('IReadReceiptsModel'); export const MessageReads = proxify('IMessageReadsModel'); -export const RawImports = proxify('IRawImportsModel'); export const Reports = proxify('IReportsModel'); export const Roles = proxify('IRolesModel'); export const Rooms = proxify('IRoomsModel'); diff --git a/packages/rest-typings/src/v1/import/ImportAddUsersParamsPOST.ts b/packages/rest-typings/src/v1/import/ImportAddUsersParamsPOST.ts new file mode 100644 index 000000000000..52bb1390f1f8 --- /dev/null +++ b/packages/rest-typings/src/v1/import/ImportAddUsersParamsPOST.ts @@ -0,0 +1,57 @@ +import type { IImportUser } from '@rocket.chat/core-typings'; +import Ajv from 'ajv'; + +const ajv = new Ajv({ + coerceTypes: true, +}); + +export type ImportAddUsersParamsPOST = { + users: [Omit]; +}; + +const ImportAddUsersParamsPostSchema = { + type: 'object', + properties: { + users: { + type: 'array', + items: { + type: 'object', + properties: { + username: { type: 'string', nullable: true }, + emails: { + type: 'array', + items: { + type: 'string', + }, + }, + importIds: { + type: 'array', + items: { + type: 'string', + }, + }, + name: { type: 'string', nullable: true }, + utcOffset: { type: 'number', nullable: true }, + avatarUrl: { type: 'string', nullable: true }, + deleted: { type: 'boolean', nullable: true }, + statusText: { type: 'string', nullable: true }, + roles: { + type: 'array', + items: { + type: 'string', + }, + nullable: true, + }, + type: { type: 'string', nullable: true }, + bio: { type: 'string', nullable: true }, + password: { type: 'string', nullable: true }, + }, + required: ['emails', 'importIds'], + }, + }, + }, + additionalProperties: false, + required: ['users'], +}; + +export const isImportAddUsersParamsPOST = ajv.compile(ImportAddUsersParamsPostSchema); diff --git a/packages/rest-typings/src/v1/import/import.ts b/packages/rest-typings/src/v1/import/import.ts index 0038cd04f127..1454200bab96 100644 --- a/packages/rest-typings/src/v1/import/import.ts +++ b/packages/rest-typings/src/v1/import/import.ts @@ -1,4 +1,4 @@ -import type { IImport, IImportFileData, IImportProgress } from '@rocket.chat/core-typings'; +import type { IImport, IImporterSelection, IImportProgress, ImportStatus, IImportUser } from '@rocket.chat/core-typings'; import type { DownloadPublicImportFileParamsPOST } from './DownloadPublicImportFileParamsPOST'; import type { StartImportParamsPOST } from './StartImportParamsPOST'; @@ -15,7 +15,7 @@ export type ImportEndpoints = { POST: (params: StartImportParamsPOST) => void; }; '/v1/getImportFileData': { - GET: () => IImportFileData | { waiting: true }; + GET: () => IImporterSelection | { waiting: true }; }; '/v1/getImportProgress': { GET: () => IImportProgress; @@ -32,4 +32,19 @@ export type ImportEndpoints = { '/v1/getCurrentImportOperation': { GET: () => { operation: IImport }; }; + '/v1/import.clear': { + POST: () => void; + }; + '/v1/import.new': { + POST: () => { operation: IImport }; + }; + '/v1/import.status': { + GET: () => ImportStatus; + }; + '/v1/import.addUsers': { + POST: (users: Omit[]) => void; + }; + '/v1/import.run': { + POST: () => void; + }; }; diff --git a/packages/rest-typings/src/v1/import/index.ts b/packages/rest-typings/src/v1/import/index.ts index 9ed37a60bd51..622f4c1945b8 100644 --- a/packages/rest-typings/src/v1/import/index.ts +++ b/packages/rest-typings/src/v1/import/index.ts @@ -8,3 +8,4 @@ export * from './GetImportProgressParamsGET'; export * from './GetLatestImportOperationsParamsGET'; export * from './StartImportParamsPOST'; export * from './UploadImportFileParamsPOST'; +export * from './ImportAddUsersParamsPOST'; diff --git a/packages/rest-typings/src/v1/users/UsersInfoParamsGet.ts b/packages/rest-typings/src/v1/users/UsersInfoParamsGet.ts index ddae73750ea8..1f07bed18475 100644 --- a/packages/rest-typings/src/v1/users/UsersInfoParamsGet.ts +++ b/packages/rest-typings/src/v1/users/UsersInfoParamsGet.ts @@ -4,7 +4,7 @@ const ajv = new Ajv({ coerceTypes: true, }); -export type UsersInfoParamsGet = ({ userId: string } | { username: string }) & { +export type UsersInfoParamsGet = ({ userId: string } | { username: string } | { importId: string }) & { fields?: string; }; @@ -38,6 +38,20 @@ const UsersInfoParamsGetSchema = { required: ['username'], additionalProperties: false, }, + { + type: 'object', + properties: { + importId: { + type: 'string', + }, + fields: { + type: 'string', + nullable: true, + }, + }, + required: ['importId'], + additionalProperties: false, + }, ], };