Skip to content

Commit

Permalink
Refactor video uploads (#5570)
Browse files Browse the repository at this point in the history
* Remove unused video field

* Stop exposing video dispatch

* Move cancellation out of the reducer

* Make useUploadStatusQuery controlled by jobId

* Rename SetStatus to SetProcessing

This action only has one callsite and it's always passing "processing".

* Move jobId into video reducer state

* Make cancellation scoped

* Inline useCompressVideoMutation

* Move processVideo down the file

* Extract getErrorMessage

* useServiceAuthToken -> getServiceAuthToken

* useVideoAgent -> createVideoAgent

* useVideoUploadLimits -> getVideoUploadLimits

* useUploadVideoMutation -> uploadVideo

* Use async/await in processVideo

* Inline onVideoCompressed into processVideo

* Use async/await for uploadVideo

* Factor out error messages

* Guard dispatch with signal

This lets us remove the scattered signal checks around dispatch.

* Move job polling out of RQ

* Handle poll failures

* Remove unnecessary guards

* Slightly more accurate condition

* Move initVideoUri handling out of the hook

* Remove dead argument

It wasn't being used before either.

* Remove unused detailed status

This isn't being used because we're only respecting that state variable when isProcessing=true, but isProcessing is always false during video upload.

If we want to re-add this later, it should really just be derived from the reducer state.

* Harden the video reducer

* Tie all spawned work to a signal

* Preserve asset/media for nicer error state

* Rename actions to match states

* Inline useUploadVideo

This abstraction is getting in the way of some future work.

* Move MIME check to the only place that handles it
  • Loading branch information
gaearon authored Oct 3, 2024
1 parent c2dac85 commit d2392d2
Show file tree
Hide file tree
Showing 8 changed files with 644 additions and 559 deletions.
39 changes: 0 additions & 39 deletions src/state/queries/video/compress-video.ts

This file was deleted.

11 changes: 4 additions & 7 deletions src/state/queries/video/util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {useMemo} from 'react'
import {AtpAgent} from '@atproto/api'

import {SupportedMimeTypes, VIDEO_SERVICE} from '#/lib/constants'
Expand All @@ -17,12 +16,10 @@ export const createVideoEndpointUrl = (
return url.href
}

export function useVideoAgent() {
return useMemo(() => {
return new AtpAgent({
service: VIDEO_SERVICE,
})
}, [])
export function createVideoAgent() {
return new AtpAgent({
service: VIDEO_SERVICE,
})
}

export function mimeToExt(mimeType: SupportedMimeTypes | (string & {})) {
Expand Down
88 changes: 38 additions & 50 deletions src/state/queries/video/video-upload.shared.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,61 @@
import {useCallback} from 'react'
import {BskyAgent} from '@atproto/api'
import {I18n} from '@lingui/core'
import {msg} from '@lingui/macro'
import {useLingui} from '@lingui/react'

import {VIDEO_SERVICE_DID} from '#/lib/constants'
import {UploadLimitError} from '#/lib/media/video/errors'
import {getServiceAuthAudFromUrl} from '#/lib/strings/url-helpers'
import {useAgent} from '#/state/session'
import {useVideoAgent} from './util'
import {createVideoAgent} from './util'

export function useServiceAuthToken({
export async function getServiceAuthToken({
agent,
aud,
lxm,
exp,
}: {
agent: BskyAgent
aud?: string
lxm: string
exp?: number
}) {
const agent = useAgent()

return useCallback(async () => {
const pdsAud = getServiceAuthAudFromUrl(agent.dispatchUrl)

if (!pdsAud) {
throw new Error('Agent does not have a PDS URL')
}

const {data: serviceAuth} = await agent.com.atproto.server.getServiceAuth({
aud: aud ?? pdsAud,
lxm,
exp,
})

return serviceAuth.token
}, [agent, aud, lxm, exp])
const pdsAud = getServiceAuthAudFromUrl(agent.dispatchUrl)
if (!pdsAud) {
throw new Error('Agent does not have a PDS URL')
}
const {data: serviceAuth} = await agent.com.atproto.server.getServiceAuth({
aud: aud ?? pdsAud,
lxm,
exp,
})
return serviceAuth.token
}

export function useVideoUploadLimits() {
const agent = useVideoAgent()
const getToken = useServiceAuthToken({
export async function getVideoUploadLimits(agent: BskyAgent, _: I18n['_']) {
const token = await getServiceAuthToken({
agent,
lxm: 'app.bsky.video.getUploadLimits',
aud: VIDEO_SERVICE_DID,
})
const {_} = useLingui()

return useCallback(async () => {
const {data: limits} = await agent.app.bsky.video
.getUploadLimits(
{},
{headers: {Authorization: `Bearer ${await getToken()}`}},
)
.catch(err => {
if (err instanceof Error) {
throw new UploadLimitError(err.message)
} else {
throw err
}
})

if (!limits.canUpload) {
if (limits.message) {
throw new UploadLimitError(limits.message)
const videoAgent = createVideoAgent()
const {data: limits} = await videoAgent.app.bsky.video
.getUploadLimits({}, {headers: {Authorization: `Bearer ${token}`}})
.catch(err => {
if (err instanceof Error) {
throw new UploadLimitError(err.message)
} else {
throw new UploadLimitError(
_(
msg`You have temporarily reached the limit for video uploads. Please try again later.`,
),
)
throw err
}
})

if (!limits.canUpload) {
if (limits.message) {
throw new UploadLimitError(limits.message)
} else {
throw new UploadLimitError(
_(
msg`You have temporarily reached the limit for video uploads. Please try again later.`,
),
)
}
}, [agent, _, getToken])
}
}
111 changes: 57 additions & 54 deletions src/state/queries/video/video-upload.ts
Original file line number Diff line number Diff line change
@@ -1,76 +1,79 @@
import {createUploadTask, FileSystemUploadType} from 'expo-file-system'
import {AppBskyVideoDefs} from '@atproto/api'
import {AppBskyVideoDefs, BskyAgent} from '@atproto/api'
import {I18n} from '@lingui/core'
import {msg} from '@lingui/macro'
import {useLingui} from '@lingui/react'
import {useMutation} from '@tanstack/react-query'
import {nanoid} from 'nanoid/non-secure'

import {cancelable} from '#/lib/async/cancelable'
import {AbortError} from '#/lib/async/cancelable'
import {ServerError} from '#/lib/media/video/errors'
import {CompressedVideo} from '#/lib/media/video/types'
import {createVideoEndpointUrl, mimeToExt} from '#/state/queries/video/util'
import {useSession} from '#/state/session'
import {useServiceAuthToken, useVideoUploadLimits} from './video-upload.shared'
import {getServiceAuthToken, getVideoUploadLimits} from './video-upload.shared'

export const useUploadVideoMutation = ({
onSuccess,
onError,
export async function uploadVideo({
video,
agent,
did,
setProgress,
signal,
_,
}: {
onSuccess: (response: AppBskyVideoDefs.JobStatus) => void
onError: (e: any) => void
video: CompressedVideo
agent: BskyAgent
did: string
setProgress: (progress: number) => void
signal: AbortSignal
}) => {
const {currentAccount} = useSession()
const getToken = useServiceAuthToken({
_: I18n['_']
}) {
if (signal.aborted) {
throw new AbortError()
}
await getVideoUploadLimits(agent, _)

const uri = createVideoEndpointUrl('/xrpc/app.bsky.video.uploadVideo', {
did,
name: `${nanoid(12)}.${mimeToExt(video.mimeType)}`,
})

if (signal.aborted) {
throw new AbortError()
}
const token = await getServiceAuthToken({
agent,
lxm: 'com.atproto.repo.uploadBlob',
exp: Date.now() / 1000 + 60 * 30, // 30 minutes
})
const checkLimits = useVideoUploadLimits()
const {_} = useLingui()
const uploadTask = createUploadTask(
uri,
video.uri,
{
headers: {
'content-type': video.mimeType,
Authorization: `Bearer ${token}`,
},
httpMethod: 'POST',
uploadType: FileSystemUploadType.BINARY_CONTENT,
},
p => setProgress(p.totalBytesSent / p.totalBytesExpectedToSend),
)

return useMutation({
mutationKey: ['video', 'upload'],
mutationFn: cancelable(async (video: CompressedVideo) => {
await checkLimits()
if (signal.aborted) {
throw new AbortError()
}
const res = await uploadTask.uploadAsync()

const uri = createVideoEndpointUrl('/xrpc/app.bsky.video.uploadVideo', {
did: currentAccount!.did,
name: `${nanoid(12)}.${mimeToExt(video.mimeType)}`,
})
if (!res?.body) {
throw new Error('No response')
}

const uploadTask = createUploadTask(
uri,
video.uri,
{
headers: {
'content-type': video.mimeType,
Authorization: `Bearer ${await getToken()}`,
},
httpMethod: 'POST',
uploadType: FileSystemUploadType.BINARY_CONTENT,
},
p => setProgress(p.totalBytesSent / p.totalBytesExpectedToSend),
)
const res = await uploadTask.uploadAsync()
const responseBody = JSON.parse(res.body) as AppBskyVideoDefs.JobStatus

if (!res?.body) {
throw new Error('No response')
}
if (!responseBody.jobId) {
throw new ServerError(responseBody.error || _(msg`Failed to upload video`))
}

const responseBody = JSON.parse(res.body) as AppBskyVideoDefs.JobStatus

if (!responseBody.jobId) {
throw new ServerError(
responseBody.error || _(msg`Failed to upload video`),
)
}

return responseBody
}, signal),
onError,
onSuccess,
})
if (signal.aborted) {
throw new AbortError()
}
return responseBody
}
Loading

0 comments on commit d2392d2

Please sign in to comment.