Skip to content

Commit

Permalink
Initial version of web workers calling main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettjstevens committed Sep 12, 2023
1 parent 835a010 commit f21ec28
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ import {
BaseOptions,
BaseSequenceAdapter,
} from '@jbrowse/core/data_adapters/BaseAdapter'
import { getFetcher } from '@jbrowse/core/util/io'
import { ObservableCreate } from '@jbrowse/core/util/rxjs'
import SimpleFeature, { Feature } from '@jbrowse/core/util/simpleFeature'
import { NoAssemblyRegion, UriLocation } from '@jbrowse/core/util/types'
import { NoAssemblyRegion, Region } from '@jbrowse/core/util/types'
import { nanoid } from 'nanoid'

import { createFetchErrorMessage } from '../util'
import { BackendDriver } from '../BackendDrivers'
import { ApolloSessionModel } from '../session'

export interface RefSeq {
_id: string
Expand All @@ -17,8 +18,26 @@ export interface RefSeq {
length: number
}

interface ApolloMessageData {
apollo: true
messageId: string
sequence: string
regions: Region[]
}

function isApolloMessageData(data?: unknown): data is ApolloMessageData {
return (
typeof data === 'object' &&
data !== null &&
'apollo' in data &&
data.apollo === true
)
}

const isInWebWorker = typeof sessionStorage === 'undefined'

export class ApolloSequenceAdapter extends BaseSequenceAdapter {
private refSeqs: Promise<RefSeq[]> | undefined
private regions: NoAssemblyRegion[] | undefined

get baseURL(): string {
return readConfObject(this.config, 'baseURL').uri
Expand All @@ -31,96 +50,132 @@ export class ApolloSequenceAdapter extends BaseSequenceAdapter {
.internetAccountPreAuthorization
}

protected async getRefSeqs({ signal }: BaseOptions) {
if (this.refSeqs) {
return this.refSeqs
}
const assemblyId = readConfObject(this.config, 'assemblyId')
const url = new URL('refSeqs', this.baseURL)
const searchParams = new URLSearchParams({ assembly: assemblyId })
url.search = searchParams.toString()
const uri = url.toString()
const location: UriLocation = { locationType: 'UriLocation', uri }
if (this.internetAccountPreAuthorization) {
location.internetAccountPreAuthorization =
this.internetAccountPreAuthorization
}
const fetch = getFetcher(location, this.pluginManager)
const response = await fetch(uri, { signal })
if (!response.ok) {
const errorMessage = await createFetchErrorMessage(
response,
'Failed to fetch refSeqs',
)
throw new Error(errorMessage)
}
const refSeqs = (await response.json()) as RefSeq[]
this.refSeqs = Promise.resolve(refSeqs)
return refSeqs
}

public async getRefNames(opts: BaseOptions) {
const refSeqs = await this.getRefSeqs(opts)
return refSeqs.map((refSeq) => refSeq.name)
const regions = await this.getRegions(opts)
return regions.map((regions) => regions.refName)
}

public async getRegions(opts: BaseOptions): Promise<NoAssemblyRegion[]> {
const refSeqs = await this.getRefSeqs(opts)
return refSeqs.map((refSeq) => ({
refName: refSeq.name,
start: 0,
end: refSeq.length,
}))
if (this.regions) {
return this.regions
}
const assemblyId = readConfObject(this.config, 'assemblyId')
if (!isInWebWorker) {
const dataStore = (
this.pluginManager?.rootModel?.session as ApolloSessionModel | undefined
)?.apolloDataStore
if (!dataStore) {
throw new Error('No Apollo data store found')
}
const backendDriver = dataStore.getBackendDriver(
assemblyId,
) as BackendDriver
const regions = await backendDriver.getRegions(assemblyId)
this.regions = regions
return regions
}
const { signal } = opts
const regions = await new Promise(
(
resolve: (sequence: Region[]) => void,
reject: (reason: string) => void,
) => {
const timeoutId = setTimeout(() => {
reject('timeout')
}, 20_000)
const messageId = nanoid()
const messageListener = (event: MessageEvent) => {
const { data } = event
if (!isApolloMessageData(data)) {
return
}
if (data.messageId !== messageId) {
return
}
clearTimeout(timeoutId)
removeEventListener('message', messageListener)
resolve(data.regions)
}
addEventListener('message', messageListener, { signal })
postMessage({
apollo: true,
method: 'getRegions',
assembly: assemblyId,
messageId,
})
},
)
this.regions = regions
return regions
}

/**
* Fetch features for a certain region
* @param param -
* @returns Observable of Feature objects in the region
*/
public getFeatures(
{ end, refName, start }: NoAssemblyRegion,
opts: BaseOptions,
) {
public getFeatures(region: Region, opts: BaseOptions) {
const { signal } = opts
const { assemblyName, end, refName, start } = region
return ObservableCreate<Feature>(async (observer) => {
const refSeqs = await this.getRefSeqs(opts)
const refSeq = refSeqs.find((rs) => rs.name === refName)
if (!refSeq) {
return observer.error(
`Could not find refSeq that matched refName "${refName}"`,
)
}
const url = new URL('refSeqs/getSequence', this.baseURL)
const searchParams = new URLSearchParams({
refSeq: refSeq._id,
start: String(start),
end: String(end),
})
url.search = searchParams.toString()
const uri = url.toString()
const location: UriLocation = { locationType: 'UriLocation', uri }
if (this.internetAccountPreAuthorization) {
location.internetAccountPreAuthorization =
this.internetAccountPreAuthorization
}
const fetch = getFetcher(location, this.pluginManager)
const response = await fetch(uri, { signal: opts.signal })
if (!response.ok) {
const errorMessage = await createFetchErrorMessage(
response,
'Failed to fetch refSeqs',
)
throw new Error(errorMessage)
}
const seq = (await response.text()) as string
if (seq) {
if (!isInWebWorker) {
const dataStore = (
this.pluginManager?.rootModel?.session as
| ApolloSessionModel
| undefined
)?.apolloDataStore
if (!dataStore) {
observer.error('No Apollo data store found')
}
const backendDriver = dataStore.getBackendDriver(
assemblyName,
) as BackendDriver
const { seq } = await backendDriver.getSequence(region)
observer.next(
new SimpleFeature({
id: `${refName} ${start}-${end}`,
data: { refName, start, end, seq },
}),
)
observer.complete()
return
}
const seq = await new Promise(
(
resolve: (sequence: string) => void,
reject: (reason: string) => void,
) => {
const timeoutId = setTimeout(() => {
reject('timeout')
}, 20_000)
const messageId = nanoid()
const messageListener = (event: MessageEvent) => {
const { data } = event
if (!isApolloMessageData(data)) {
return
}
if (data.messageId !== messageId) {
return
}
clearTimeout(timeoutId)
removeEventListener('message', messageListener)
resolve(data.sequence)
}
addEventListener('message', messageListener, { signal })
postMessage({
apollo: true,
method: 'getSequence',
region,
messageId,
})
},
)
observer.next(
new SimpleFeature({
id: `${refName} ${start}-${end}`,
data: { refName, start, end, seq },
}),
)
observer.complete()
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export abstract class BackendDriver {

abstract getSequence(region: Region): Promise<{ seq: string; refSeq: string }>

abstract getRefSeqs(): Promise<string[]>
abstract getRegions(assemblyName: string): Promise<Region[]>

abstract getAssemblies(internetAccountConfigId?: string): Assembly[]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,41 @@ export class CollaborationServerDriver extends BackendDriver {
return { seq: await response.text(), refSeq }
}

async getRefSeqs() {
throw new Error('getRefSeqs not yet implemented')
return []
async getRegions(assemblyName: string): Promise<Region[]> {
const { assemblyManager } = getSession(this.clientStore)
const assembly = assemblyManager.get(assemblyName)
if (!assembly) {
throw new Error(`Could not find assembly with name "${assemblyName}"`)
}
const internetAccount = this.clientStore.getInternetAccount(
assemblyName,
) as ApolloInternetAccount
const { baseURL } = internetAccount
const url = new URL('refSeqs', baseURL)
const searchParams = new URLSearchParams({ assembly: assemblyName })
url.search = searchParams.toString()
const uri = url.toString()

const response = await this.fetch(internetAccount, uri)
if (!response.ok) {
let errorMessage
try {
errorMessage = await response.text()
} catch {
errorMessage = ''
}
throw new Error(
`getRegions failed: ${response.status} (${response.statusText})${
errorMessage ? ` (${errorMessage})` : ''
}`,
)
}
const refSeqs = await response.json()
return refSeqs.map((refSeq: { name: string; length: number }) => ({
refName: refSeq.name,
start: 0,
end: refSeq.length,
}))
}

getAssemblies(internetAccountId?: string) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getConf } from '@jbrowse/core/configuration'
import { getSession } from '@jbrowse/core/util'
import { Region, getSession } from '@jbrowse/core/util'
import { AssemblySpecificChange, Change } from 'apollo-common'
import { AnnotationFeatureSnapshot } from 'apollo-mst'
import { ValidationResultSet } from 'apollo-shared'
Expand All @@ -12,12 +12,35 @@ export class InMemoryFileDriver extends BackendDriver {
return []
}

async getSequence() {
return { seq: '', refSeq: '' }
async getSequence(region: Region) {
const { assemblyName, end, refName, start } = region
const assembly = this.clientStore.assemblies.get(assemblyName)
if (!assembly) {
return { seq: '', refSeq: refName }
}
const refSeq = assembly.refSeqs.get(refName)
if (!refSeq) {
return { seq: '', refSeq: refName }
}
const seq = refSeq.getSequence(start, end)
return { seq, refSeq: refName }
}

async getRefSeqs() {
return []
async getRegions(assemblyName: string): Promise<Region[]> {
const assembly = this.clientStore.assemblies.get(assemblyName)
if (!assembly) {
return []
}
const regions: Region[] = []
for (const [, refSeq] of assembly.refSeqs) {
regions.push({
assemblyName,
refName: refSeq.name,
start: refSeq.sequence[0].start,
end: refSeq.sequence[0].stop,
})
}
return regions
}

getAssemblies() {
Expand Down
Loading

0 comments on commit f21ec28

Please sign in to comment.