Skip to content

Commit

Permalink
Add ff to skip runs details storage
Browse files Browse the repository at this point in the history
  • Loading branch information
tdraier committed Dec 23, 2024
1 parent 20216a0 commit eed9a1e
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 10 deletions.
35 changes: 29 additions & 6 deletions core/bin/core_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static GLOBAL: Jemalloc = Jemalloc;
/// API State
struct RunManager {
pending_apps: Vec<(app::App, run::Credentials, run::Secrets)>,
pending_apps: Vec<(app::App, run::Credentials, run::Secrets, bool)>,
pending_runs: Vec<String>,
}

Expand Down Expand Up @@ -97,9 +97,17 @@ impl APIState {
}
}

fn run_app(&self, app: app::App, credentials: run::Credentials, secrets: run::Secrets) {
fn run_app(
&self,
app: app::App,
credentials: run::Credentials,
secrets: run::Secrets,
store_block_result: bool,
) {
let mut run_manager = self.run_manager.lock();
run_manager.pending_apps.push((app, credentials, secrets));
run_manager
.pending_apps
.push((app, credentials, secrets, store_block_result));
}

async fn stop_loop(&self) {
Expand All @@ -123,7 +131,7 @@ impl APIState {
let mut loop_count = 0;

loop {
let apps: Vec<(app::App, run::Credentials, run::Secrets)> = {
let apps: Vec<(app::App, run::Credentials, run::Secrets, bool)> = {
let mut manager = self.run_manager.lock();
let apps = manager.pending_apps.drain(..).collect::<Vec<_>>();
apps.iter().for_each(|app| {
Expand All @@ -145,7 +153,15 @@ impl APIState {

match app
.0
.run(app.1, app.2, store, databases_store, qdrant_clients, None)
.run(
app.1,
app.2,
store,
databases_store,
qdrant_clients,
None,
app.3,
)
.await
{
Ok(()) => {
Expand Down Expand Up @@ -587,6 +603,7 @@ struct RunsCreatePayload {
config: run::RunConfig,
credentials: run::Credentials,
secrets: Vec<Secret>,
store_block_result: Option<bool>,
}

async fn run_helper(
Expand Down Expand Up @@ -830,7 +847,12 @@ async fn runs_create(
Ok(app) => {
// The run is empty for now, we can clone it for the response.
let run = app.run_ref().unwrap().clone();
state.run_app(app, credentials, secrets);
state.run_app(
app,
credentials,
secrets,
payload.store_block_result.unwrap_or(true),
);
(
StatusCode::OK,
Json(APIResponse {
Expand Down Expand Up @@ -915,6 +937,7 @@ async fn runs_create_stream(
databases_store,
qdrant_clients,
Some(tx.clone()),
payload.store_block_result.unwrap_or(true),
)
.await
{
Expand Down
2 changes: 2 additions & 0 deletions core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ impl App {
databases_store: Box<dyn DatabasesStore + Sync + Send>,
qdrant_clients: QdrantClients,
event_sender: Option<UnboundedSender<Value>>,
store_block_result: bool,
) -> Result<()> {
assert!(self.run.is_some());
assert!(self.run_config.is_some());
Expand Down Expand Up @@ -731,6 +732,7 @@ impl App {
block_idx,
&block.block_type(),
name,
store_block_result,
)
.await?;

Expand Down
13 changes: 11 additions & 2 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ impl Store for PostgresStore {
block_idx: usize,
block_type: &BlockType,
block_name: &String,
store_block_result: bool,
) -> Result<()> {
let traces = run
.traces
Expand All @@ -796,8 +797,16 @@ impl Store for PostgresStore {
.iter()
.enumerate()
.map(|(map_idx, execution)| {
let execution_json = serde_json::to_string(&execution)?;

let execution_json = match store_block_result {
true => serde_json::to_string(&execution)?,
false => serde_json::to_string(
&(BlockExecution {
value: None,
error: execution.error.clone(),
meta: execution.meta.clone(),
}),
)?,
};
Ok((
block_idx,
block_type.clone(),
Expand Down
1 change: 1 addition & 0 deletions core/src/stores/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub trait Store {
block_idx: usize,
block_type: &BlockType,
block_name: &String,
store_block_result: bool,
) -> Result<()>;

async fn load_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { withPublicAPIAuthentication } from "@app/lib/api/auth_wrappers";
import apiConfig from "@app/lib/api/config";
import { getDustAppSecrets } from "@app/lib/api/dust_app_secrets";
import { withResourceFetchingFromRoute } from "@app/lib/api/resource_wrappers";
import type { Authenticator } from "@app/lib/auth";
import { getFeatureFlags, type Authenticator } from "@app/lib/auth";
import { AppResource } from "@app/lib/resources/app_resource";
import type { RunUsageType } from "@app/lib/resources/run_resource";
import { RunResource } from "@app/lib/resources/run_resource";
Expand Down Expand Up @@ -276,6 +276,9 @@ async function handler(
}
}

const flags = await getFeatureFlags(owner);
const storeBlockResult = !flags.includes("disable_run_logs");

logger.info(
{
workspace: {
Expand All @@ -299,6 +302,7 @@ async function handler(
credentials,
secrets,
isSystemKey: auth.isSystemKey(),
storeBlockResult,
}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { withSessionAuthenticationForWorkspace } from "@app/lib/api/auth_wrapper
import config from "@app/lib/api/config";
import { getDustAppSecrets } from "@app/lib/api/dust_app_secrets";
import { withResourceFetchingFromRoute } from "@app/lib/api/resource_wrappers";
import { Authenticator } from "@app/lib/auth";
import { Authenticator, getFeatureFlags } from "@app/lib/auth";
import type { SessionWithUser } from "@app/lib/iam/provider";
import { AppResource } from "@app/lib/resources/app_resource";
import { RunResource } from "@app/lib/resources/run_resource";
Expand Down Expand Up @@ -123,6 +123,9 @@ async function handler(
);
const inputDataset = inputConfigEntry ? inputConfigEntry.dataset : null;

const flags = await getFeatureFlags(owner);
const storeBlockResult = !flags.includes("disable_run_logs");

const dustRun = await coreAPI.createRun(owner, auth.groups(), {
projectId: app.dustAPIProjectId,
runType: "local",
Expand All @@ -134,6 +137,7 @@ async function handler(
config: { blocks: config },
credentials: credentialsFromProviders(providers),
secrets,
storeBlockResult,
});

if (dustRun.isErr()) {
Expand Down
5 changes: 5 additions & 0 deletions types/src/front/lib/core_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type CoreAPICreateRunParams = {
credentials: CredentialsType;
secrets: DustAppSecretType[];
isSystemKey?: boolean;
storeBlockResult?: boolean;
};

type GetDatasetResponse = {
Expand Down Expand Up @@ -306,6 +307,7 @@ export class CoreAPI {
credentials,
secrets,
isSystemKey,
storeBlockResult = true,
}: CoreAPICreateRunParams
): Promise<CoreAPIResponse<{ run: CoreAPIRun }>> {
const response = await this._fetchWithError(
Expand All @@ -327,6 +329,7 @@ export class CoreAPI {
config: config,
credentials: credentials,
secrets: secrets,
store_block_result: storeBlockResult,
}),
}
);
Expand All @@ -348,6 +351,7 @@ export class CoreAPI {
credentials,
secrets,
isSystemKey,
storeBlockResult = true,
}: CoreAPICreateRunParams
): Promise<
CoreAPIResponse<{
Expand All @@ -374,6 +378,7 @@ export class CoreAPI {
config: config,
credentials: credentials,
secrets: secrets,
store_block_result: storeBlockResult,
}),
}
);
Expand Down
1 change: 1 addition & 0 deletions types/src/shared/feature_flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const WHITELISTABLE_FEATURES = [
"openai_o1_high_reasoning_feature",
"index_private_slack_channel",
"conversations_jit_actions",
"disable_run_logs",
"labs_trackers",
] as const;
export type WhitelistableFeature = (typeof WHITELISTABLE_FEATURES)[number];
Expand Down

0 comments on commit eed9a1e

Please sign in to comment.