Skip to content

Commit

Permalink
Add fix-layout status
Browse files Browse the repository at this point in the history
Signed-off-by: Shree Vatsa N <[email protected]>
  • Loading branch information
vatsa287 committed Jan 24, 2023
1 parent c0f8669 commit 92cbe26
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 25 deletions.
25 changes: 21 additions & 4 deletions mgr/src/cmds/rebalance.cr
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,18 @@ def rebalance_status_summary(volume, args)
sum_of_scanned_bytes = 0
sum_of_total_bytes = 0
sum_of_progress = 0
fix_layout_status = volume.distribute_groups[0].storage_units[0].fix_layout_status

puts "Name : #{volume.pool.name}/#{volume.name}"
puts "Type : #{volume.type}"
puts "ID : #{volume.id}"

puts "Fix-Layout Status : #{fix_layout_status.state}"
if fix_layout_status.state != "not started"
puts "Total Dirs Scanned : #{fix_layout_status.total_dirs}"
puts "Duration : #{fix_layout_status.duration_seconds}"
end

volume.distribute_groups.each do |dist_grp|
storage_unit = dist_grp.storage_units[0]
migrate_data_status = storage_unit.migrate_data_status
Expand Down Expand Up @@ -126,12 +133,20 @@ def detailed_rebalance_status(volume, args)
total_completed_migrate_data_processes = 0
total_non_started_migrate_data_processes = 0
total_failed_migrate_data_processes = 0
fix_layout_status = volume.distribute_groups[0].storage_units[0].fix_layout_status
rebalance_status = ""

puts "Name : #{volume.pool.name}/#{volume.name}"
puts "Type : #{volume.type}"
puts "ID : #{volume.id}"

puts "Fix-Layout Status : #{fix_layout_status.state}"
if fix_layout_status.state != "not started"
puts "Total Dirs Scanned : #{fix_layout_status.total_dirs}"
puts "Duration : #{fix_layout_status.duration_seconds}"
end
puts

volume.distribute_groups.each_with_index do |dist_grp, dist_grp_index|
storage_unit = dist_grp.storage_units[0]
migrate_data_status = storage_unit.migrate_data_status
Expand All @@ -156,10 +171,12 @@ def detailed_rebalance_status(volume, args)
)

printf(" Status : %s\n", migrate_data_status.state)
printf(" Progress : %s %%\n", migrate_data_status.progress)
printf(" Scanned : %s / %s\n", migrate_data_status.scanned_bytes.humanize_bytes, migrate_data_status.total_bytes.humanize_bytes)
printf(" Duration Seconds : %s\n", migrate_data_status.duration_seconds)
printf(" Estimate Seconds : %s\n", migrate_data_status.estimate_seconds)
if migrate_data_status.state != "not started"
printf(" Progress : %s %%\n", migrate_data_status.progress)
printf(" Scanned : %s / %s\n", migrate_data_status.scanned_bytes.humanize_bytes, migrate_data_status.total_bytes.humanize_bytes)
printf(" Duration Seconds : %s\n", migrate_data_status.duration_seconds)
printf(" Estimate Seconds : %s\n", migrate_data_status.estimate_seconds)
end
puts
end

Expand Down
4 changes: 0 additions & 4 deletions mgr/src/server/plugins/volume_expand.cr
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ put "/api/v1/pools/:pool_name/volumes" do |env|

set_volume_metrics(req)

# Add only the first node for fix-layout service
services = add_fix_layout_service(services, pool.not_nil!.name, req.name, nodes[0],
volume.not_nil!.distribute_groups[0].storage_units[0])

existing_nodes = participating_nodes(pool_name, volume)

# Add only the first existing node for fix-layout service
Expand Down
93 changes: 77 additions & 16 deletions mgr/src/server/plugins/volume_rebalance_status.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ require "./volume_utils.cr"

REBALNCE_DIR = "/var/lib/kadalu/rebalance"

ACTION_REBALANCE_STATUS = "action_rebalance_status"
ACTION_FIX_LAYOUT_STATUS = "action_fix_layout_status"
ACTION_MIGRATE_DATA_STATUS = "action_migrate_data_status"

struct RebalanceStatusRequestToNode
include JSON::Serializable
Expand All @@ -21,23 +22,49 @@ end

alias RebalanceRequestToNode = Tuple(String, Hash(String, Array(MoanaTypes::ServiceUnit)), Hash(String, RebalanceStatusRequestToNode))

# TODO: Assign migrate-data state at volume level
def assign_migrate_data_state(migrate_data_status, svc)
return migrate_data_status if migrate_data_status.state == "not started"
# TODO: Assign state at volume level
def assign_state(status, svc)
return status if status.state == "not started"

if svc.running? == false && migrate_data_status.complete == false
migrate_data_status.state = "failed"
if svc.running? == false && status.complete == false
status.state = "failed"
elsif svc.running? == true
migrate_data_status.state = "running"
elsif migrate_data_status.complete == true
migrate_data_status.state = "complete"
status.state = "running"
elsif status.complete == true
status.state = "complete"
end

migrate_data_status
status
end

# Calculate the status file with highest estimate_seconds and return that file data from node.
node_action ACTION_REBALANCE_STATUS do |data, _env|
node_action ACTION_FIX_LAYOUT_STATUS do |data, _env|
volume_name, services, request = RebalanceRequestToNode.from_json(data)
status_file_path = ""
rebalance_dir = Path.new(WORKDIR, "rebalance", "#{volume_name}").to_s
request = Hash(String, RebalanceStatusRequestToNode).from_json(request.to_json)
local_node_id = GlobalConfig.local_node.id
node_resp = RebalanceStatusRequestToNode.new

if services.has_key?(local_node_id) && request.has_key?(local_node_id)
svc = Service.from_json(services[local_node_id][0].to_json)
storage_unit = request[local_node_id].storage_units[0]
if svc.id == "rebalance-fix-layout-#{storage_unit.path.gsub("/", "%2F")}"
status_file_path = "#{rebalance_dir}/#{svc.id}.json"
if File.exists?(status_file_path)
storage_unit.fix_layout_status = MoanaTypes::FixLayoutRebalanceStatus.from_json(File.read(status_file_path))
else
storage_unit.fix_layout_status.state = "not started"
end
storage_unit.fix_layout_status = assign_state(storage_unit.fix_layout_status, svc)

node_resp.storage_units << storage_unit
end
end

NodeResponse.new(true, node_resp.to_json)
end

node_action ACTION_MIGRATE_DATA_STATUS do |data, _env|
volume_name, services, request = RebalanceRequestToNode.from_json(data)
status_file_path = ""
rebalance_dir = Path.new(WORKDIR, "rebalance", "#{volume_name}").to_s
Expand All @@ -57,7 +84,7 @@ node_action ACTION_REBALANCE_STATUS do |data, _env|
else
storage_unit.migrate_data_status.state = "not started"
end
storage_unit.migrate_data_status = assign_migrate_data_state(storage_unit.migrate_data_status, svc)
storage_unit.migrate_data_status = assign_state(storage_unit.migrate_data_status, svc)

node_resp.storage_units << storage_unit
end
Expand All @@ -68,6 +95,20 @@ node_action ACTION_REBALANCE_STATUS do |data, _env|
NodeResponse.new(true, node_resp.to_json)
end

def construct_fix_layout_service_request(pool_name, nodes, volume)
req = Hash(String, RebalanceStatusRequestToNode).new
services = Hash(String, Array(MoanaTypes::ServiceUnit)).new

# Add only the first existing node & first storage_unit for fix-layout service
storage_unit = volume.distribute_groups[0].storage_units[0]
req[storage_unit.node.id] = RebalanceStatusRequestToNode.new if req[storage_unit.node.id]?.nil?
req[storage_unit.node.id].storage_units << storage_unit
services = add_fix_layout_service(services, pool_name, volume.name, nodes[0],
volume.distribute_groups[0].storage_units[0])

{req, services}
end

def construct_migrate_data_service_request(pool_name, volume)
req = Hash(String, RebalanceStatusRequestToNode).new
services = Hash(String, Array(MoanaTypes::ServiceUnit)).new
Expand Down Expand Up @@ -102,16 +143,36 @@ get "/api/v1/pools/:pool_name/volumes/:volume_name/rebalance_status" do |env|
resp = dispatch_action(ACTION_PING, pool_name, nodes, "")
api_exception(!resp.ok, node_errors("Not all participant nodes are reachable", resp.node_responses).to_json)

request, services = construct_migrate_data_service_request(pool_name, volume)
request, services = construct_fix_layout_service_request(pool_name, nodes, volume)
first_node = [] of MoanaTypes::Node
first_node << nodes[0]
resp = dispatch_action(
ACTION_FIX_LAYOUT_STATUS,
pool_name,
first_node,
{volume_name, services, request}.to_json
)

api_exception(!resp.ok, node_errors("Failed to get fix-layout status of volume #{volume.name}", resp.node_responses).to_json)

storage_unit = volume.distribute_groups[0].storage_units[0]
if resp.node_responses[storage_unit.node.id].ok
node_resp = RebalanceStatusRequestToNode.from_json(resp.node_responses[storage_unit.node.id].response)
su = node_resp.storage_units[0]
if su.node.id == storage_unit.node.id && su.path == storage_unit.path
storage_unit.fix_layout_status = su.fix_layout_status
end
end

request, services = construct_migrate_data_service_request(pool_name, volume)
resp = dispatch_action(
ACTION_REBALANCE_STATUS,
ACTION_MIGRATE_DATA_STATUS,
pool_name,
nodes,
{volume_name, services, request}.to_json
)

api_exception(!resp.ok, node_errors("Failed to get rebalance status of volume #{volume.name}", resp.node_responses).to_json)
api_exception(!resp.ok, node_errors("Failed to get migrate-data status of volume #{volume.name}", resp.node_responses).to_json)

volume.distribute_groups.each do |dist_grp|
storage_unit = dist_grp.storage_units[0]
Expand Down
1 change: 1 addition & 0 deletions mgr/src/server/plugins/volume_utils.cr
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ end

def add_fix_layout_service(services, pool_name, volume_name, node, storage_unit)
service = FixLayoutService.new(pool_name, volume_name, storage_unit)
services[node.id] = [] of MoanaTypes::ServiceUnit unless services[node.id]?
services[node.id] << service.unit

services
Expand Down
3 changes: 2 additions & 1 deletion types/src/moana_types.cr
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ module MoanaTypes
service = ServiceUnit.new,
metrics = Metrics.new,
heal_metrics = HealMetrics.new,
fix_layout_status = FixLayoutRebalanceStatus.new,
migrate_data_status = MigrateDataRebalanceStatus.new

def initialize(node_name, @port, @path)
Expand Down Expand Up @@ -255,7 +256,7 @@ module MoanaTypes
struct FixLayoutRebalanceStatus
include JSON::Serializable

property complete = false, total_dirs = 0, duration_seconds = 0
property complete = false, total_dirs = 0, duration_seconds = 0, state = "failed"

def initialize
end
Expand Down

0 comments on commit 92cbe26

Please sign in to comment.