diff --git a/.vscode/launch.json b/.vscode/launch.json index 0cb4fdfd4c..b22678949b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -118,9 +118,9 @@ "args": ["src/__start.ts"], "env": { "MONGO_URL": "mongodb://localhost:27017", - // "DB_URL": "mongodb://localhost:27017", - "REGION": "pg", - "DB_URL": "postgresql://postgres:example@localhost:5432", + "DB_URL": "mongodb://localhost:27017", + "REGION": "", + // "DB_URL": "postgresql://postgres:example@localhost:5432", "SERVER_SECRET": "secret", "TRANSACTOR_URL": "ws://localhost:3333", "ACCOUNTS_URL": "http://localhost:3000", diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 3e6f259484..66198922a1 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -115,10 +115,9 @@ services: - MODEL_ENABLED=* - ACCOUNTS_URL=http://host.docker.internal:3000 - BRANDING_PATH=/var/cfg/branding.json - - NOTIFY_INBOX_ONLY=true # - PARALLEL=2 - # - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml - # - INIT_WORKSPACE=onboarding + - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml + - INIT_WORKSPACE=test restart: unless-stopped workspacepg: image: hardcoreeng/workspace @@ -142,7 +141,7 @@ services: - ACCOUNTS_URL=http://host.docker.internal:3000 - BRANDING_PATH=/var/cfg/branding.json # - PARALLEL=2 - # - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml + - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml # - INIT_WORKSPACE=onboarding restart: unless-stopped collaborator: diff --git a/dev/tool/src/markup.ts b/dev/tool/src/markup.ts index c898757a54..8c2f7d154a 100644 --- a/dev/tool/src/markup.ts +++ b/dev/tool/src/markup.ts @@ -72,8 +72,6 @@ async function processFixJsonMarkupFor ( db: Db, storageAdapter: StorageAdapter ): Promise { - console.log('processing', domain, _class) - const collection = db.collection(domain) const docs = await collection.find({ _class }).toArray() for (const doc of docs) { @@ -119,8 +117,6 @@ async function processFixJsonMarkupFor ( } } } - - console.log('...processed', docs.length) } export async function migrateMarkup ( @@ -151,12 +147,9 @@ export async function migrateMarkup ( const collection = workspaceDb.collection(domain) const filter = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class } - - const count = await collection.countDocuments(filter) const iterator = collection.find(filter) try { - console.log('processing', _class, '->', count) await processMigrateMarkupFor(ctx, hierarchy, storageAdapter, workspaceId, attributes, iterator, concurrency) } finally { await iterator.close() diff --git a/models/activity/src/migration.ts b/models/activity/src/migration.ts index 1f71a9b5f4..3562a8f731 100644 --- a/models/activity/src/migration.ts +++ b/models/activity/src/migration.ts @@ -67,7 +67,6 @@ async function processMigrateMarkupFor ( client: MigrationClient, iterator: MigrationIterator ): Promise { - let processed = 0 while (true) { const docs = await iterator.next(1000) if (docs === null || docs.length === 0) { @@ -104,9 +103,6 @@ async function processMigrateMarkupFor ( if (ops.length > 0) { await client.bulk(DOMAIN_ACTIVITY, ops) } - - processed += docs.length - console.log('...processed', processed) } } diff --git a/models/text-editor/src/migration.ts b/models/text-editor/src/migration.ts index f4505ac6c4..3efc911713 100644 --- a/models/text-editor/src/migration.ts +++ b/models/text-editor/src/migration.ts @@ -53,7 +53,6 @@ async function processMigrateMarkupFor ( client: MigrationClient, iterator: MigrationIterator ): Promise { - let processed = 0 while (true) { const docs = await iterator.next(1000) if (docs === null || docs.length === 0) { @@ -88,9 +87,6 @@ async function processMigrateMarkupFor ( if (operations.length > 0) { await client.bulk(domain, operations) } - - processed += docs.length - console.log('...processed', processed) } } @@ -122,7 +118,6 @@ async function processFixMigrateMarkupFor ( client: MigrationClient, iterator: MigrationIterator ): Promise { - let processed = 0 while (true) { const docs = await iterator.next(1000) if (docs === null || docs.length === 0) { @@ -164,9 +159,6 @@ async function processFixMigrateMarkupFor ( if (operations.length > 0) { await client.bulk(domain, operations) } - - processed += docs.length - console.log('...processed', processed) } } diff --git a/server/indexer/src/fulltext.ts b/server/indexer/src/fulltext.ts index b31805a42c..2eef85d939 100644 --- a/server/indexer/src/fulltext.ts +++ b/server/indexer/src/fulltext.ts @@ -71,8 +71,11 @@ export class FullTextIndex implements WithFind { } async close (): Promise { + this.indexer.triggerIndexing() if (!this.upgrade) { await this.indexer.cancel() + } else { + await this.indexer.processUpload(this.indexer.metrics) } } diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index 5c2b350126..b27f90ac41 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -351,7 +351,9 @@ class TSessionManager implements SessionManager { version: this.modelVersion, workspaceVersion: versionToString(workspaceInfo.version), workspace: workspaceInfo.workspaceId, - workspaceUrl: workspaceInfo.workspaceUrl + workspaceUrl: workspaceInfo.workspaceUrl, + email: token.email, + extra: JSON.stringify(token.extra ?? {}) }) // Version mismatch, return upgrading. return { upgrade: true, upgradeInfo: workspaceInfo.upgrade } diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 8751ed00d4..771474530a 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -14,9 +14,7 @@ // import core, { - BackupClient, Branding, - Client as CoreClient, coreId, DOMAIN_BENCHMARK, DOMAIN_MIGRATION, @@ -34,13 +32,13 @@ import core, { TxOperations, WorkspaceId, WorkspaceIdWithUrl, + type Client, type Doc, type Ref, type WithLookup } from '@hcengineering/core' import { consoleModelLogger, MigrateOperation, ModelLogger, tryMigrate } from '@hcengineering/model' import { DomainIndexHelperImpl, Pipeline, StorageAdapter, type DbAdapter } from '@hcengineering/server-core' -import { connect } from './connect' import { InitScript, WorkspaceInitializer } from './initializer' import toolPlugin from './plugin' import { MigrateClientImpl } from './upgrade' @@ -165,23 +163,15 @@ export async function updateModel ( try { let i = 0 for (const op of migrateOperations) { - logger.log('Migrate', { name: op[0] }) + const st = Date.now() await op[1].upgrade(migrateState, async () => connection as any, logger) + const tdelta = Date.now() - st + if (tdelta > 0) { + logger.log('Create', { name: op[0], time: tdelta }) + } i++ - await progress((((100 / migrateOperations.length) * i) / 100) * 30) + await progress((((100 / migrateOperations.length) * i) / 100) * 100) } - - // Create update indexes - await createUpdateIndexes( - ctx, - connection.getHierarchy(), - connection.getModel(), - pipeline, - async (value) => { - await progress(30 + (Math.min(value, 100) / 100) * 70) - }, - workspaceId - ) await progress(100) } catch (e: any) { logger.error('error', { error: e }) @@ -203,6 +193,7 @@ export async function initializeWorkspace ( ): Promise { const initWS = branding?.initWorkspace ?? getMetadata(toolPlugin.metadata.InitWorkspace) const scriptUrl = getMetadata(toolPlugin.metadata.InitScriptURL) + ctx.info('Init script details', { scriptUrl, initWS }) if (initWS === undefined || scriptUrl === undefined) return try { // `https://raw.githubusercontent.com/hcengineering/init/main/script.yaml` @@ -237,11 +228,12 @@ export async function upgradeModel ( workspaceId: WorkspaceIdWithUrl, txes: Tx[], pipeline: Pipeline, + connection: Client, storageAdapter: StorageAdapter, migrateOperations: [string, MigrateOperation][], logger: ModelLogger = consoleModelLogger, progress: (value: number) => Promise, - forceIndexes: boolean = false + updateIndexes: 'perform' | 'skip' | 'disable' = 'skip' ): Promise { if (txes.some((tx) => tx.objectSpace !== core.space.Model)) { throw Error('Model txes must target only core.space.Model') @@ -308,87 +300,69 @@ export async function upgradeModel ( workspaceId ) } - if (forceIndexes) { + if (updateIndexes === 'perform') { await upgradeIndexes() } await ctx.with('migrate', {}, async (ctx) => { let i = 0 for (const op of migrateOperations) { - const t = Date.now() try { + const t = Date.now() await ctx.with(op[0], {}, async () => { await op[1].migrate(migrateClient, logger) }) + const tdelta = Date.now() - t + if (tdelta > 0) { + logger.log('migrate:', { workspaceId: workspaceId.name, operation: op[0], time: Date.now() - t }) + } } catch (err: any) { logger.error(`error during migrate: ${op[0]} ${err.message}`, err) throw err } - logger.log('migrate:', { workspaceId: workspaceId.name, operation: op[0], time: Date.now() - t }) await progress(20 + ((100 / migrateOperations.length) * i * 20) / 100) i++ } - await tryMigrate(migrateClient, coreId, [ - { - state: 'indexes-v5', - func: upgradeIndexes - } - ]) + if (updateIndexes === 'skip') { + await tryMigrate(migrateClient, coreId, [ + { + state: 'indexes-v5', + func: upgradeIndexes + } + ]) + } }) logger.log('Apply upgrade operations', { workspaceId: workspaceId.name }) - let connection: (CoreClient & BackupClient) | undefined - const getUpgradeClient = async (): Promise => - await ctx.with('connect-platform', {}, async (ctx) => { - if (connection !== undefined) { - return connection - } - connection = (await connect( - transactorUrl, - workspaceId, - undefined, - { - mode: 'backup', - model: 'upgrade', - admin: 'true' - }, - model - )) as CoreClient & BackupClient - return connection - }) - try { - await ctx.with('upgrade', {}, async (ctx) => { - let i = 0 - for (const op of migrateOperations) { - const t = Date.now() - await ctx.with(op[0], {}, () => op[1].upgrade(migrateState, getUpgradeClient, logger)) - logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name }) - await progress(60 + ((100 / migrateOperations.length) * i * 30) / 100) - i++ + await ctx.with('upgrade', {}, async (ctx) => { + let i = 0 + for (const op of migrateOperations) { + const t = Date.now() + await ctx.with(op[0], {}, () => op[1].upgrade(migrateState, async () => connection, logger)) + const tdelta = Date.now() - t + if (tdelta > 0) { + logger.log('upgrade:', { operation: op[0], time: tdelta, workspaceId: workspaceId.name }) } - }) + await progress(60 + ((100 / migrateOperations.length) * i * 30) / 100) + i++ + } + }) - if (connection === undefined) { - // We need to send reboot for workspace - ctx.info('send force close', { workspace: workspaceId.name, transactorUrl }) - const serverEndpoint = transactorUrl.replaceAll('wss://', 'https://').replace('ws://', 'http://') - const token = generateToken(systemAccountEmail, workspaceId, { admin: 'true' }) - try { - await fetch( - serverEndpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${toWorkspaceString(workspaceId)}`, - { - method: 'PUT' - } - ) - } catch (err: any) { - // Ignore error if transactor is not yet ready + // We need to send reboot for workspace + ctx.info('send force close', { workspace: workspaceId.name, transactorUrl }) + const serverEndpoint = transactorUrl.replaceAll('wss://', 'https://').replace('ws://', 'http://') + const token = generateToken(systemAccountEmail, workspaceId, { admin: 'true' }) + try { + await fetch( + serverEndpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${toWorkspaceString(workspaceId)}`, + { + method: 'PUT' } - } - } finally { - await connection?.sendForceClose() - await connection?.close() + ) + } catch (err: any) { + // Ignore error if transactor is not yet ready } return model } @@ -407,7 +381,13 @@ async function prepareMigrationClient ( const migrateClient = new MigrateClientImpl(pipeline, hierarchy, model, logger, storageAdapter, workspaceId) const states = await migrateClient.find(DOMAIN_MIGRATION, { _class: core.class.MigrationState }) const sts = Array.from(groupByArray(states, (it) => it.plugin).entries()) - const migrateState = new Map(sts.map((it) => [it[0], new Set(it[1].map((q) => q.state))])) + + const _toSet = (vals: WithLookup[]): Set => { + return new Set(vals.map((q) => q.state)) + } + + const migrateState = new Map>(sts.map((it) => [it[0], _toSet(it[1])])) + // const migrateState = new Map(sts.map((it) => [it[0], new Set(it[1].map((q) => q.state))])) migrateClient.migrateState = migrateState return { migrateClient, migrateState } diff --git a/server/workspace-service/src/index.ts b/server/workspace-service/src/index.ts index eb95fcc9bf..438642aafb 100644 --- a/server/workspace-service/src/index.ts +++ b/server/workspace-service/src/index.ts @@ -90,6 +90,8 @@ export function serveWorkspaceAccount ( setMetadata(serverNotification.metadata.InboxOnlyNotifications, true) + let canceled = false + const worker = new WorkspaceWorker( version, txes, @@ -100,17 +102,22 @@ export function serveWorkspaceAccount ( brandings ) - void worker.start(measureCtx, { - errorHandler: async (ws, err) => { - Analytics.handleError(err) + void worker.start( + measureCtx, + { + errorHandler: async (ws, err) => { + Analytics.handleError(err) + }, + force: false, + console: false, + logs: 'upgrade-logs', + waitTimeout }, - force: false, - console: false, - logs: 'upgrade-logs', - waitTimeout - }) + () => canceled + ) const close = (): void => { + canceled = true onClose?.() } diff --git a/server/workspace-service/src/service.ts b/server/workspace-service/src/service.ts index df8a48e81e..d529896517 100644 --- a/server/workspace-service/src/service.ts +++ b/server/workspace-service/src/service.ts @@ -79,7 +79,7 @@ export class WorkspaceWorker { wakeup: () => void = () => {} defaultWakeup: () => void = () => {} - async start (ctx: MeasureContext, opt: WorkspaceOptions): Promise { + async start (ctx: MeasureContext, opt: WorkspaceOptions, isCanceled: () => boolean): Promise { this.defaultWakeup = () => { ctx.info("I'm busy", { version: this.version, region: this.region }) } @@ -92,7 +92,7 @@ export class WorkspaceWorker { ctx.info('Successfully connected to the account service') - while (true) { + while (!isCanceled()) { await this.waitForAvailableThread() const workspace = await ctx.with('get-pending-workspace', {}, async (ctx) => { diff --git a/server/workspace-service/src/ws-operations.ts b/server/workspace-service/src/ws-operations.ts index 457fcd47c5..44e6d70f26 100644 --- a/server/workspace-service/src/ws-operations.ts +++ b/server/workspace-service/src/ws-operations.ts @@ -131,23 +131,20 @@ export async function createWorkspace ( usePassedCtx: true }) const txAdapter = await txFactory(ctx, hierarchy, dbUrl, wsId, modelDb, storageAdapter) - await childLogger.withLog('init-workspace', {}, async (ctx) => { - await initModel(ctx, wsId, txes, txAdapter, storageAdapter, ctxModellogger, async (value) => { - await handleWsEvent?.('progress', version, 10 + Math.round((Math.min(value, 100) / 100) * 10)) - }) + await initModel(ctx, wsId, txes, txAdapter, storageAdapter, ctxModellogger, async (value) => {}) }) const client = new TxOperations(wrapPipeline(ctx, pipeline, wsUrl), core.account.ConfigUser) await updateModel(ctx, wsId, migrationOperation, client, pipeline, ctxModellogger, async (value) => { - await handleWsEvent?.('progress', version, 20 + Math.round((Math.min(value, 100) / 100) * 10)) + await handleWsEvent?.('progress', version, 10 + Math.round((Math.min(value, 100) / 100) * 10)) }) ctx.info('Starting init script if any') await initializeWorkspace(ctx, branding, wsUrl, storageAdapter, client, ctxModellogger, async (value) => { ctx.info('Init script progress', { value }) - await handleWsEvent?.('progress', version, 30 + Math.round((Math.min(value, 100) / 100) * 60)) + await handleWsEvent?.('progress', version, 20 + Math.round((Math.min(value, 100) / 100) * 60)) }) await upgradeWorkspaceWith( @@ -157,14 +154,15 @@ export async function createWorkspace ( migrationOperation, workspaceInfo, pipeline, + client, storageAdapter, ctxModellogger, async (event, version, value) => { ctx.info('Init script progress', { event, value }) - await handleWsEvent?.('progress', version, 90 + Math.round((Math.min(value, 100) / 100) * 10)) + await handleWsEvent?.('progress', version, 80 + Math.round((Math.min(value, 100) / 100) * 20)) }, false, - false + 'disable' ) await handleWsEvent?.('create-done', version, 100, '') @@ -216,6 +214,12 @@ export async function upgradeWorkspace ( return } + const wsUrl: WorkspaceIdWithUrl = { + name: ws.workspace, + workspaceName: ws.workspaceName ?? '', + workspaceUrl: ws.workspaceUrl ?? '' + } + await upgradeWorkspaceWith( ctx, version, @@ -223,11 +227,12 @@ export async function upgradeWorkspace ( migrationOperation, ws, pipeline, + wrapPipeline(ctx, pipeline, wsUrl), storageAdapter, logger, handleWsEvent, forceUpdate, - forceIndexes, + forceIndexes ? 'perform' : 'skip', external ) } finally { @@ -246,6 +251,7 @@ export async function upgradeWorkspaceWith ( migrationOperation: [string, MigrateOperation][], ws: BaseWorkspaceInfo, pipeline: Pipeline, + connection: Client, storageAdapter: StorageAdapter, logger: ModelLogger = consoleModelLogger, handleWsEvent?: ( @@ -255,7 +261,7 @@ export async function upgradeWorkspaceWith ( message?: string ) => Promise, forceUpdate: boolean = true, - forceIndexes: boolean = false, + updateIndexes: 'perform' | 'skip' | 'disable' = 'skip', external: boolean = false ): Promise { const versionStr = versionToString(version) @@ -310,13 +316,14 @@ export async function upgradeWorkspaceWith ( wsId, txes, pipeline, + connection, storageAdapter, migrationOperation, logger, async (value) => { progress = value }, - forceIndexes + updateIndexes ) await handleWsEvent?.('upgrade-done', version, 100, '') diff --git a/services/github/model-github/src/migration.ts b/services/github/model-github/src/migration.ts index 63f888ab6a..1cef89abbb 100644 --- a/services/github/model-github/src/migration.ts +++ b/services/github/model-github/src/migration.ts @@ -271,7 +271,6 @@ async function processMigrateMarkupFor ( client: MigrationClient, iterator: MigrationIterator ): Promise { - let processed = 0 while (true) { const docs = await iterator.next(1000) if (docs === null || docs.length === 0) { @@ -298,9 +297,6 @@ async function processMigrateMarkupFor ( if (operations.length > 0) { await client.bulk(DOMAIN_GITHUB, operations) } - - processed += docs.length - console.log('...processed', processed) } }