Skip to content

Commit

Permalink
rsc: Track hidden output dirs (#1651)
Browse files Browse the repository at this point in the history
  • Loading branch information
V-FEXrt authored Sep 24, 2024
1 parent ad7ca2a commit b7f811b
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 24 deletions.
1 change: 1 addition & 0 deletions rust/entity/src/output_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct Model {
pub mode: i32,
pub job_id: Uuid,
pub created_at: DateTime,
pub hidden: bool,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
2 changes: 2 additions & 0 deletions rust/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod m20240731_152842_create_job_size_proc;
mod m20240731_201632_create_job_blob_timestamp_index;
mod m20240805_163520_create_blob_id_fk_indexes;
mod m20240819_193352_add_output_indexes;
mod m20240919_214610_add_hidden_to_output_dir;

pub struct Migrator;

Expand All @@ -35,6 +36,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration),
Box::new(m20240805_163520_create_blob_id_fk_indexes::Migration),
Box::new(m20240819_193352_add_output_indexes::Migration),
Box::new(m20240919_214610_add_hidden_to_output_dir::Migration),
]
}
}
36 changes: 36 additions & 0 deletions rust/migration/src/m20240919_214610_add_hidden_to_output_dir.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(OutputDir::Table)
.add_column(ColumnDef::new(OutputDir::Hidden).boolean().not_null().default(false))
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(OutputDir::Table)
.drop_column(OutputDir::Hidden)
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum OutputDir {
Table,
Hidden,
}

1 change: 1 addition & 0 deletions rust/rsc/src/bin/rsc/add_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub async fn add_job(
created_at: NotSet,
path: Set(dir.path),
mode: Set(dir.mode),
hidden: Set(dir.hidden.unwrap_or(false)),
job_id: Set(job_id),
})
.collect(),
Expand Down
1 change: 1 addition & 0 deletions rust/rsc/src/bin/rsc/read_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub async fn read_job(
.map(|m| Dir {
path: m.path,
mode: m.mode,
hidden: Some(m.hidden),
})
.collect();

Expand Down
2 changes: 2 additions & 0 deletions rust/rsc/src/bin/rsc/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub struct File {
pub struct Dir {
pub path: String,
pub mode: i32,
// Optional member to allow for soft migration
pub hidden: Option<bool>
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
14 changes: 12 additions & 2 deletions share/wake/lib/system/remote_cache_api.wake
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ tuple CacheSearchOutputDirectory =
Path: String
# The mode on disk of the directory
Mode: Integer
# If the directory is hidden or published to downstream users
Hidden: Boolean

# A symlink created by a cached job
tuple CacheSearchOutputSymlink =
Expand Down Expand Up @@ -134,6 +136,8 @@ tuple CachePostRequestOutputDirectory =
Path: String
# The mode on disk of the directory
Mode: Integer
# If the directory is hidden or published to downstream users
Hidden: Boolean

# A symlink created by a job
tuple CachePostRequestOutputSymlink =
Expand Down Expand Up @@ -793,10 +797,11 @@ def getCachePostRequestJson (req: CachePostRequest): JValue =
"blob_id" :-> JString blobId,
)

def mkOutputDirJson (CachePostRequestOutputDirectory path mode) =
def mkOutputDirJson (CachePostRequestOutputDirectory path mode hidden) =
JObject (
"path" :-> JString path,
"mode" :-> JInteger mode,
"hidden" :-> JBoolean hidden,
)

def mkOutputSymlinkJson (CachePostRequestOutputSymlink path link) =
Expand Down Expand Up @@ -906,7 +911,12 @@ def mkCacheSearchResponse (json: JValue): Result CacheSearchResponse Error =
failWithError
"rsc: JSON response has incorrect schema. output_directories[x] must have integer key 'mode'"

CacheSearchOutputDirectory path mode
require Pass (JBoolean hidden) = jField v "hidden"
else
failWithError
"rsc: JSON response has incorrect schema. output_directories[x] must have boolean key 'hidden'"

CacheSearchOutputDirectory path mode hidden
| Pass

def mkOutputFile (v: JValue): Result CacheSearchOutputFile Error =
Expand Down
53 changes: 31 additions & 22 deletions share/wake/lib/system/remote_cache_runner.wake
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput =>
def stdoutDownload = rscApiGetStringBlob stdoutBlob
def stderrDownload = rscApiGetStringBlob stderrBlob

def doMakeDirectory (CacheSearchOutputDirectory path mode) =
def doMakeDirectory (CacheSearchOutputDirectory path mode _hidden) =
# wake-format off
def cmd =
"mkdir",
Expand Down Expand Up @@ -188,6 +188,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput =>

def resolvedDirectories =
outputDirs
| filter (!_.getCacheSearchOutputDirectoryHidden)
| map getCacheSearchOutputDirectoryPath

def outputs = resolvedOutputs ++ resolvedDirectories ++ resolvedSymlinks
Expand Down Expand Up @@ -350,30 +351,25 @@ def thirdBy (acceptFn: a => ThirdByGroup): (list: List a) => Triple (one: List a

# Posts a completed job to the remote cache
def postJob (rscApi: RemoteCacheApi) (job: Job) (_wakeroot: String) (hidden: String) (input: RunnerInput) (output: RunnerOutput): Result Unit Error =
require Pass stdout = job.getJobFailedStdoutRaw
require Pass stderr = job.getJobFailedStderrRaw

def allOutputs = output.getRunnerOutputOutputs

def rmapStat path = match (unsafe_stat path)
Fail x -> Fail x
Pass x -> Pass (Pair path x)

require Pass outputsStat =
allOutputs
| map rmapStat
| findFail
| addErrorContext "rsc: Failed to stat files to upload"
def filteredOutputs = output.getRunnerOutputOutputs
def cleanableOutputs = output.getRunnerOutputCleanableOutputs
def hiddenOutputs = subtract scmp cleanableOutputs filteredOutputs

def statToGroup (Stat t _ _) = match t
PathTypeRegularFile -> ThirdByGroupFirst
PathTypeDirectory -> ThirdByGroupSecond
PathTypeSymlink -> ThirdByGroupThird
_ -> panic "rsc: unsuported filetype: {format t}"

def (Triple regFiles directories symlinks) =
outputsStat
| thirdBy (\x x.getPairSecond.statToGroup)
def makeStatTripleThunk paths =
paths
| map (\p unsafe_stat p |< Pair p)
| findFail
| addErrorContext "rsc: Failed to stat files"
|< thirdBy (_.getPairSecond.statToGroup)

def filteredOutputsStatThunk = makeStatTripleThunk filteredOutputs
def hiddenOutputsStatThunk = makeStatTripleThunk hiddenOutputs

def uploadAndMakeFile (Pair path (Stat _ mode _)) =
def doUpload =
Expand Down Expand Up @@ -405,8 +401,10 @@ def postJob (rscApi: RemoteCacheApi) (job: Job) (_wakeroot: String) (hidden: Str
CachePostRequestOutputSymlink path link
| Pass

def makeDirectory (Pair path (Stat _ mode _)) =
CachePostRequestOutputDirectory path (mode | mode2bits)
def makeDirectory hidden (Pair path (Stat _ mode _)) =
CachePostRequestOutputDirectory path (mode | mode2bits) hidden

require Pass (Triple regFiles directories symlinks) = filteredOutputsStatThunk

def fileUploads =
regFiles
Expand All @@ -416,9 +414,20 @@ def postJob (rscApi: RemoteCacheApi) (job: Job) (_wakeroot: String) (hidden: Str
symlinks
| map makeSymlink

def directoriesUpload =
def publishedDirectoriesUpload =
directories
| map makeDirectory
| map (makeDirectory False)

require Pass (Triple _ hiddenDirs _) = hiddenOutputsStatThunk

def hiddenDirectoriesUpload =
hiddenDirs
| map (makeDirectory True)

def directoriesUpload = publishedDirectoriesUpload ++ hiddenDirectoriesUpload

require Pass stdout = job.getJobFailedStdoutRaw
require Pass stderr = job.getJobFailedStderrRaw

def stdoutUpload =
rscApi
Expand Down

0 comments on commit b7f811b

Please sign in to comment.