Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small conceptual simplification in new_run(). #2139

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/fishtest/actiondb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime, timezone

from bson.objectid import ObjectId
from fishtest.schemas import action_schema
from fishtest.util import hex_print, worker_name
from pymongo import DESCENDING
Expand Down Expand Up @@ -216,6 +217,7 @@ def insert_action(self, **action):
if "run_id" in action:
action["run_id"] = str(action["run_id"])
action["time"] = datetime.now(timezone.utc).timestamp()
action["_id"] = ObjectId()
try:
validate(action_schema, action, "action")
except ValidationError as e:
Expand Down
12 changes: 7 additions & 5 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ def new_run(
if tc_base:
tc_base = float(tc_base.group(1))
new_run = {
"_id": ObjectId(),
"version": RUN_VERSION,
"args": run_args,
"start_time": start_time,
Expand Down Expand Up @@ -566,12 +567,11 @@ def new_run(
print(message, flush=True)
raise Exception(message)

# We cannot use self.buffer since new_run does not have an id yet.
run_id = str(self.runs.insert_one(new_run).inserted_id)
self.buffer(new_run, True, create=True)

run_id = str(new_run["_id"])
with self.unfinished_runs_lock:
self.unfinished_runs.add(run_id)

return run_id

def is_primary_instance(self):
Expand Down Expand Up @@ -695,7 +695,7 @@ def get_run(self, r_id):
else:
return self.runs.find_one({"_id": r_id_obj})

def buffer(self, run, flush):
def buffer(self, run, flush, create=False):
if not self.is_primary_instance():
print(
"Warning: attempt to use the run_cache on the",
Expand Down Expand Up @@ -725,7 +725,9 @@ def buffer(self, run, flush):
}
if flush:
with self.active_run_lock(r_id):
self.runs.replace_one({"_id": ObjectId(r_id)}, run)
r = self.runs.replace_one({"_id": ObjectId(r_id)}, run, upsert=create)
if not create and r.matched_count == 0:
print(f"Buffer: update of {r_id} failed", flush=True)

def stop(self):
self.flush_all()
Expand Down
36 changes: 18 additions & 18 deletions server/fishtest/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def action_is(x):
(
action_is("failed_task"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "failed_task",
"username": username,
Expand All @@ -208,7 +208,7 @@ def action_is(x):
(
action_is("crash_or_time"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "crash_or_time",
"username": username,
Expand All @@ -222,7 +222,7 @@ def action_is(x):
(
action_is("dead_task"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "dead_task",
"username": username,
Expand All @@ -235,7 +235,7 @@ def action_is(x):
(
action_is("system_event"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "system_event",
"username": "fishtest.system",
Expand All @@ -245,7 +245,7 @@ def action_is(x):
(
action_is("new_run"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "new_run",
"username": username,
Expand All @@ -257,7 +257,7 @@ def action_is(x):
(
action_is("upload_nn"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "upload_nn",
"username": username,
Expand All @@ -267,7 +267,7 @@ def action_is(x):
(
action_is("modify_run"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "modify_run",
"username": username,
Expand All @@ -279,7 +279,7 @@ def action_is(x):
(
action_is("delete_run"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "delete_run",
"username": username,
Expand All @@ -291,7 +291,7 @@ def action_is(x):
action_is("stop_run"),
intersect(
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "stop_run",
"username": username,
Expand All @@ -307,7 +307,7 @@ def action_is(x):
(
action_is("finished_run"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "finished_run",
"username": username,
Expand All @@ -319,7 +319,7 @@ def action_is(x):
(
action_is("approve_run"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "approve_run",
"username": username,
Expand All @@ -331,7 +331,7 @@ def action_is(x):
(
action_is("purge_run"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "purge_run",
"username": username,
Expand All @@ -343,7 +343,7 @@ def action_is(x):
(
action_is("block_user"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "block_user",
"username": username,
Expand All @@ -354,7 +354,7 @@ def action_is(x):
(
action_is("accept_user"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "accept_user",
"username": username,
Expand All @@ -365,7 +365,7 @@ def action_is(x):
(
action_is("block_worker"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "block_worker",
"username": username,
Expand All @@ -376,7 +376,7 @@ def action_is(x):
(
action_is("log_message"),
{
"_id?": ObjectId,
"_id": ObjectId,
"time": timestamp,
"action": "log_message",
"username": username,
Expand Down Expand Up @@ -640,11 +640,11 @@ def flags_must_match(run):
# about non-validation of runs created with the prior
# schema.

RUN_VERSION = 4
RUN_VERSION = 5

runs_schema = intersect(
{
"_id?": ObjectId,
"_id": ObjectId,
"version": uint,
"start_time": datetime_utc,
"last_updated": datetime_utc,
Expand Down
Loading