Skip to content

Commit

Permalink
Made ScirGcloudBatch configurable
Browse files Browse the repository at this point in the history
Three variables need to be configured by the CBRAIN admin:

GCLOUD_PROJECT=project_id
GCLOUD_LOCATION=location
GCLOUD_IMAGE_BASENAME=computeimagename

These are normally set either at the bourreau level in
the 'extra qsub arguments' field, or at the tool config level
(also in the extra qesub arguments). They should be
put together separated by spaces, e.g.

   GCLOUD_PROJECT=abcd GCLOUD_LOCATION=northamerica-northeast1-b
  • Loading branch information
prioux committed Feb 13, 2025
1 parent dcfe656 commit ccb3f96
Showing 1 changed file with 45 additions and 16 deletions.
61 changes: 45 additions & 16 deletions BrainPortal/lib/scir_gcloud_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def update_job_info_cache #:nodoc:
out_text, err_text = bash_this_and_capture_out_err(
# the '%A' format returns the job ID
# the '%t' format returns the status with the one or two letter codes.
"gcloud batch jobs list #{gcloud_location()}"
"gcloud batch jobs list --location #{gcloud_location()}"
)
raise "Cannot get output of 'squeue'" if err_text.present?
out_lines = out_text.split("\n")
Expand All @@ -43,7 +43,7 @@ def update_job_info_cache #:nodoc:
#projects/tidal-reactor-438920-g4/locations/northamerica-northeast1/jobs/tr1 northamerica-northeast1 FAILED
# In a real deploy, all jobs IDs will be 'cbrain-{task.id}-{task.run_number}'
out_lines.each do |line|
job_path, job_location, job_status = line.split(/\s+/)
job_path, _, job_status = line.split(/\s+/)
next unless job_path.present? && job_status.present?
job_id = Pathname.new(job_path).basename.to_s
state = statestring_to_stateconst(job_status)
Expand Down Expand Up @@ -78,7 +78,7 @@ def resume(jid) #:nodoc:
end

def terminate(jid) #:nodoc:
out = IO.popen("gcloud batch jobs delete #{gcloud_location()} #{shell_escape(jid)} 2>&1","r") { |i| i.read }
IO.popen("gcloud batch jobs delete --location #{gcloud_location} #{shell_escape(jid)} 2>&1","r") { |i| i.read }
#raise "Error deleting: #{out.join("\n")}" if whatever TODO
return
end
Expand All @@ -99,7 +99,7 @@ def qsubout_to_jid(txt) #:nodoc:
struct = YAML.load(txt)
fullname = struct['name'] # "projects/tidal-reactor-438920-g4/locations/northamerica-northeast1/jobs/cbrain-123-1-092332"
Pathname.new(fullname).basename.to_s # cbrain-123-1-092332
rescue => ex
rescue
raise "Cannot find job ID from 'gcloud batch jobs submit' output. Text was blank" if txt.blank?
File.open("/tmp/debug.submit_error.txt","a") { |fh| fh.write("\n----\n#{txt}") }
raise "Cannot find job ID from 'gcloud batch jobs submit' output."
Expand All @@ -110,13 +110,44 @@ def qsubout_to_jid(txt) #:nodoc:
class JobTemplate < Scir::JobTemplate #:nodoc:

def gcloud_location
#TODO better
"--location northamerica-northeast1"
get_config_value_from_extra_qsubs('GCLOUD_LOCATION')
end

def compute_node_image_name
#TODO better
"projects/cbrain-449118/global/images/cbrain-compute"
def gcloud_compute_image_basename
get_config_value_from_extra_qsubs('GCLOUD_IMAGE_BASENAME')
end

def gcloud_project
get_config_value_from_extra_qsubs('GCLOUD_PROJECT')
end

# This method should not be overriden
def full_compute_node_image_name
"projects/#{gcloud_project}/global/images/#{gcloud_compute_image_basename}"
end

# The admin is expected to have configured three values
# GCLOUD_PROJECT, GCLOUD_IMAGE_BASENAME and GCLOUD_LOCATION
# either in the Bourreau or ToolConfig levels, within the
# attribute known as 'extra_qsub_args'. The attributes
# should be set with "NAME=VALUE" substrings, separated by blanks.
# Values found at the ToolConfig level have priority.
def get_config_value_from_extra_qsubs(varname)
value =
extract_config_value(varname, self.tc_extra_qsub_args ) ||
extract_config_value(varname, Scir.cbrain_config[:extra_qsub_args])
raise "Missing Gcloud configuration value for '#{varname}'. Add it in extra_qsub_args at the Bourreau or ToolConfig level as '#{varname}=value'" if value.blank?
value
end

# Given a from_string like
# "GCLOUD_PROJECT=abcde GCLOUD_IMAGE_BASENAME=baseim GCLOUD_LOCATION=westofhere"
# and a varname like "GCLOUD_PROJECT", this method returns the value, "abcde".
def extract_config_value(varname, from_string)
return nil if from_string.blank?
search_val = Regexp.new('\b' + Regexp.escape(varname) + '\s*=\s*(\w[\.\w-]+)', Regexp::IGNORECASE)
return Regexp.last_match[1] if from_string.match(search_val)
nil
end

def qsub_command #:nodoc:
Expand All @@ -131,9 +162,7 @@ def qsub_command #:nodoc:
gname = gname[0..50] if gname.size > 50
gname = gname + DateTime.now.strftime("-%H%M%S") # this should be good enough

command = "gcloud batch jobs submit #{gname} #{gcloud_location} "
command += "#{self.tc_extra_qsub_args} " if self.tc_extra_qsub_args.present?
command += "#{Scir.cbrain_config[:extra_qsub_args]} " if Scir.cbrain_config[:extra_qsub_args].present?
command = "gcloud batch jobs submit #{gname} --location #{gcloud_location} "

script_name = self.arg[0]
script_command = ""
Expand All @@ -153,7 +182,7 @@ def qsub_command #:nodoc:
script_command,
memory,
walltime,
compute_node_image_name,
full_compute_node_image_name,
)

# Write the json config to a file; use a name unique enough for the current submission,
Expand All @@ -162,12 +191,12 @@ def qsub_command #:nodoc:
json_tmp_config_file = "/tmp/job_submit-#{pid_threadid}.json"
File.open(json_tmp_config_file,"w") { |fh| fh.write json_config_text }

command += "--config #{json_tmp_config_file} 2>/dev/null" # we must ignore the friendly message line in stderr
command += "--config #{json_tmp_config_file} 2>/dev/null && rm -f #{json_tmp_config_file}" # we must ignore the friendly message line in stderr

return command
end

def json_cloud_batch_jobs_config(command, maxmem_mb, walltime_s, compute_node_image_name)
def json_cloud_batch_jobs_config(command, maxmem_mb, walltime_s, full_compute_node_image_name)
struct = struct_gcloud_batch_jobs_config_template.dup

task_spec = struct["taskGroups"][0]["taskSpec"]
Expand All @@ -177,7 +206,7 @@ def json_cloud_batch_jobs_config(command, maxmem_mb, walltime_s, compute_node_im
task_spec["maxRunDuration"] = "#{walltime_s}s"

policy = struct["allocationPolicy"]["instances"][0]["policy"]
policy["bootDisk"]["image"] = compute_node_image_name
policy["bootDisk"]["image"] = full_compute_node_image_name

struct.to_json
end
Expand Down

0 comments on commit ccb3f96

Please sign in to comment.