Skip to content

Commit

Permalink
v2
Browse files Browse the repository at this point in the history
Signed-off-by: Shree Vatsa N <[email protected]>
  • Loading branch information
vatsa287 committed Jan 23, 2023
1 parent 9ea0991 commit e20c7db
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 42 deletions.
141 changes: 137 additions & 4 deletions mgr/src/cmds/rebalance.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
require "./helpers"

struct RebalanceArgs
property detail = false
end

class Args
property rebalance_args = RebalanceArgs.new
end

command "rebalance.start", "Start Rebalancing Kadalu Storage volume" do |parser, _|
parser.banner = "Usage: kadalu rebalance start POOL/VOLNAME [arguments]"
end
Expand Down Expand Up @@ -46,20 +54,145 @@ handler "rebalance.stop" do |args|
end
end

command "rebalance.status", "Show Kadalu Storage volume rebalance status" do |parser, _|
command "rebalance.status", "Show Kadalu Storage volume rebalance status" do |parser, args|
parser.banner = "Usage: kadalu rebalance status POOL/VOLNAME [arguments]"
parser.on("--detail", "Show detailed rebalance status info of individual storage units") do
args.rebalance_args.detail = true
end
end

def rebalance_status_summary(volume, args)
total_migrate_data_processes = 0
total_non_started_migrate_data_processes = 0
total_completed_migrate_data_processes = 0
total_failed_migrate_data_processes = 0
rebalance_status = ""
highest_estimate_seconds = -2147483648
sum_of_scanned_bytes = 0
sum_of_total_bytes = 0
sum_of_progress = 0

puts "Name : #{volume.pool.name}/#{volume.name}"
puts "Type : #{volume.type}"
puts "ID : #{volume.id}"

volume.distribute_groups.each do |dist_grp|
storage_unit = dist_grp.storage_units[0]
migrate_data_status = storage_unit.migrate_data_status
total_migrate_data_processes += 1

case migrate_data_status.state
when "not started"
total_non_started_migrate_data_processes += 1
when "complete"
total_completed_migrate_data_processes += 1
when "failed"
total_failed_migrate_data_processes += 1
end

if migrate_data_status.estimate_seconds.to_i64 > highest_estimate_seconds
highest_estimate_seconds = migrate_data_status.estimate_seconds.to_i64
end

sum_of_scanned_bytes += migrate_data_status.scanned_bytes.to_i64
sum_of_total_bytes += migrate_data_status.total_bytes.to_i64
sum_of_progress += migrate_data_status.progress.to_i64
end

printf("Progress : %.2f %%\n", (sum_of_progress/total_migrate_data_processes))
printf("Estimate Seconds : %i\n", highest_estimate_seconds)
printf("Scanned : %s / %s\n",
(sum_of_scanned_bytes/total_migrate_data_processes).to_i64.humanize_bytes, (sum_of_total_bytes/total_migrate_data_processes).to_i64.humanize_bytes)
puts

if total_completed_migrate_data_processes == total_migrate_data_processes
rebalance_status = "complete"
elsif total_failed_migrate_data_processes == total_migrate_data_processes
rebalance_status = "fail"
elsif total_non_started_migrate_data_processes == total_migrate_data_processes
rebalance_status = "not started"
else
rebalance_status = "partial"
end

puts "Volume #{volume.name} Rebalance Status : #{rebalance_status}"
puts "Total Number of Rebalance Process : #{total_migrate_data_processes}"
puts "Number of Completed Rebalance Process : #{total_completed_migrate_data_processes}"
puts "Number of Failed Rebalance Process : #{total_failed_migrate_data_processes}"
end

def detailed_rebalance_status(volume, args)
total_migrate_data_processes = 0
total_completed_migrate_data_processes = 0
total_non_started_migrate_data_processes = 0
total_failed_migrate_data_processes = 0
rebalance_status = ""

puts "Name : #{volume.pool.name}/#{volume.name}"
puts "Type : #{volume.type}"
puts "ID : #{volume.id}"

volume.distribute_groups.each_with_index do |dist_grp, dist_grp_index|
storage_unit = dist_grp.storage_units[0]
migrate_data_status = storage_unit.migrate_data_status

total_migrate_data_processes += 1

case migrate_data_status.state
when "not started"
total_non_started_migrate_data_processes += 1
when "complete"
total_completed_migrate_data_processes += 1
when "failed"
total_failed_migrate_data_processes += 1
end

printf("Distribute group %-2s\n", dist_grp_index + 1)
printf(
" Storage unit %-3s : %s:%s\n",
1,
storage_unit.node.name,
storage_unit.path,
)

printf(" Status : %s\n", migrate_data_status.state)
printf(" Progress : %s %%\n", migrate_data_status.progress)
printf(" Scanned : %s / %s\n", migrate_data_status.scanned_bytes.humanize_bytes, migrate_data_status.total_bytes.humanize_bytes)
printf(" Duration Seconds : %s\n", migrate_data_status.duration_seconds)
printf(" Estimate Seconds : %s\n", migrate_data_status.estimate_seconds)
puts
end

if total_completed_migrate_data_processes == total_migrate_data_processes
rebalance_status = "complete"
elsif total_failed_migrate_data_processes == total_migrate_data_processes
rebalance_status = "fail"
elsif total_non_started_migrate_data_processes == total_migrate_data_processes
rebalance_status = "not started"
else
rebalance_status = "partial"
end

puts "Volume #{volume.name} Rebalance Status : #{rebalance_status}"
puts "Total Number of Rebalance Process : #{total_migrate_data_processes}"
puts "Number of Completed Rebalance Process : #{total_completed_migrate_data_processes}"
puts "Number of Failed Rebalance Process : #{total_failed_migrate_data_processes}"
end

handler "rebalance.status" do |args|
begin
command_error "Pool/Volname is required" if args.pos_args.size == 0
args.pool_name, volume_name = pool_and_volume_name(args.pos_args.size > 0 ? args.pos_args[0] : "")
api_call(args, "Failed to show rebalance status of volume") do |client|
status = client.pool(args.pool_name).volume(volume_name).rebalance_status
volume = client.pool(args.pool_name).volume(volume_name).rebalance_status

handle_json_output(status, args)
handle_json_output(volume, args)

puts "Rebalance status of volume #{volume_name}"
if args.rebalance_args.detail
detailed_rebalance_status(volume, args)
else
rebalance_status_summary(volume, args)
end
end
rescue ex : InvalidVolumeRequest
STDERR.puts "Failed to show rebalance status of Kadalu Storage Volume"
Expand Down
114 changes: 79 additions & 35 deletions mgr/src/server/plugins/volume_rebalance_status.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,85 @@ REBALNCE_DIR = "/var/lib/kadalu/rebalance"

ACTION_REBALANCE_STATUS = "action_rebalance_status"

struct RebalanceStatusRequestToNode
include JSON::Serializable

property storage_units = [] of MoanaTypes::StorageUnit

def initialize
end
end

alias RebalanceRequestToNode = Tuple(String, Hash(String, Array(MoanaTypes::ServiceUnit)), Hash(String, RebalanceStatusRequestToNode))

# TODO: Assign migrate-data state at volume level
def assign_migrate_data_state(migrate_data_status, svc)
return migrate_data_status if migrate_data_status.state == "not started"

if svc.running? == false && migrate_data_status.complete == false
migrate_data_status.state = "failed"
elsif svc.running? == true
migrate_data_status.state = "running"
elsif migrate_data_status.complete == true
migrate_data_status.state = "complete"
end

migrate_data_status
end

# Calculate the status file with highest estimate_seconds and return that file data from node.
node_action ACTION_REBALANCE_STATUS do |data, _env|
services, volume = ServiceRequestToNodeWithVolume.from_json(data)
int_min = "-2147483648"
rebalance_status_file_with_highest_estimate_in_secs = ""
volume_name, services, request = RebalanceRequestToNode.from_json(data)
status_file_path = ""
rebalance_dir = Path.new(WORKDIR, "rebalance", "#{volume.name}").to_s
node_resp = Hash(String, MoanaTypes::MigrateDataRebalanceStatus).new
rebalance_dir = Path.new(WORKDIR, "rebalance", "#{volume_name}").to_s
request = Hash(String, RebalanceStatusRequestToNode).from_json(request.to_json)
node_resp = RebalanceStatusRequestToNode.new

unless services[GlobalConfig.local_node.id]?.nil?
services[GlobalConfig.local_node.id].each do |service|
svc = Service.from_json(service.to_json)
puts "svc: #{svc}"
# TODO: Add check for svc.running?
if svc.name == "migratedataservice"
status_file_path = "#{rebalance_dir}/#{svc.id}.json"
if File.exists?(status_file_path)
data = MoanaTypes::MigrateDataRebalanceStatus.from_json(File.read(status_file_path))
puts "data"
if data.estimate_seconds.to_i64 > int_min.to_i64
int_min = data.estimate_seconds
rebalance_status_file_with_highest_estimate_in_secs = status_file_path
request.each do |node_id, storage_units_data|
next unless node_id == GlobalConfig.local_node.id
svc = Service.from_json(service.to_json)
storage_units_data.storage_units.each do |storage_unit|
next unless svc.id == "rebalance-migrate-data-#{storage_unit.path.gsub("/", "%2F")}"
status_file_path = "#{rebalance_dir}/#{svc.id}.json"
if File.exists?(status_file_path)
storage_unit.migrate_data_status = MoanaTypes::MigrateDataRebalanceStatus.from_json(File.read(status_file_path))
else
storage_unit.migrate_data_status.state = "not started"
end
storage_unit.migrate_data_status = assign_migrate_data_state(storage_unit.migrate_data_status, svc)

node_resp.storage_units << storage_unit
end
end
end
end

if rebalance_status_file_with_highest_estimate_in_secs != ""
node_resp[GlobalConfig.local_node.id] = MoanaTypes::MigrateDataRebalanceStatus.from_json(File.read(rebalance_status_file_with_highest_estimate_in_secs))
NodeResponse.new(true, node_resp.to_json)
else
NodeResponse.new(true, (node_resp[GlobalConfig.local_node.id] = MoanaTypes::MigrateDataRebalanceStatus.new).to_json)
NodeResponse.new(true, node_resp.to_json)
end

def construct_migrate_data_service_request(volume)
services = Hash(String, Array(MoanaTypes::ServiceUnit)).new

volume.distribute_groups.each do |dist_grp|
services = add_migrate_data_service(services, volume.pool.name, volume.name,
dist_grp.storage_units[0].node, dist_grp.storage_units[0])
end

services
end

def rebalance_status_node_request_prepare(pool_name, volume)
req = Hash(String, RebalanceStatusRequestToNode).new

volume.distribute_groups.each do |dist_grp|
storage_unit = dist_grp.storage_units[0]
req[storage_unit.node.id] = RebalanceStatusRequestToNode.new if req[storage_unit.node.id]?.nil?
req[storage_unit.node.id].storage_units << storage_unit
end

req
end

get "/api/v1/pools/:pool_name/volumes/:volume_name/rebalance_status" do |env|
Expand All @@ -55,7 +100,6 @@ get "/api/v1/pools/:pool_name/volumes/:volume_name/rebalance_status" do |env|
volume = Datastore.get_volume(pool_name, volume_name)
api_exception(volume.nil?, {"error": "Volume doesn't exists"}.to_json)
volume = volume.not_nil!
pool = volume.not_nil!.pool

nodes = participating_nodes(pool_name, volume)

Expand All @@ -66,28 +110,28 @@ get "/api/v1/pools/:pool_name/volumes/:volume_name/rebalance_status" do |env|
api_exception(!resp.ok, node_errors("Not all participant nodes are reachable", resp.node_responses).to_json)

services = construct_migrate_data_service_request(volume)
request = rebalance_status_node_request_prepare(pool_name, volume)

resp = dispatch_action(
ACTION_REBALANCE_STATUS,
pool_name,
nodes,
{services, volume}.to_json
{volume_name, services, request}.to_json
)

api_exception(!resp.ok, node_errors("Failed to get rebalance status of volume #{volume.name}", resp.node_responses).to_json)

# Goto every participating node of volume's workdir/rebalance/volume_name
# Fetch the highest estimate seconds value in that node
# construct status data which is highest estimate seconds of all nodes.
# Return by adding Rebalances status to Class Volume or Class StorageUnit in JSON

# Constraints
# How to show migrate-data has crashed [when svc.running? is false and complete: false]

# nodes.each do |node|
# puts "resp: #{resp}"
# end
puts "resp1: #{resp}"
volume.distribute_groups.each do |dist_grp|
storage_unit = dist_grp.storage_units[0]
if resp.node_responses[storage_unit.node.id].ok
node_resp = RebalanceStatusRequestToNode.from_json(resp.node_responses[storage_unit.node.id].response)
node_resp.storage_units.each do |su|
if su.node.id == storage_unit.node.id && su.path == storage_unit.path
storage_unit.migrate_data_status = su.migrate_data_status
end
end
end
end

env.response.status_code = 200
volume.to_json
Expand Down
1 change: 0 additions & 1 deletion mgr/src/server/plugins/volume_utils.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ VOLUME_ID_XATTR_NAME = "trusted.glusterfs.volume-id"

alias VolumeRequestToNode = Tuple(Hash(String, Array(MoanaTypes::ServiceUnit)), Hash(String, Array(MoanaTypes::Volfile)), MoanaTypes::Volume)
alias VolumeRequestToNodeWithAction = Tuple(Hash(String, Array(MoanaTypes::ServiceUnit)), Hash(String, Array(MoanaTypes::Volfile)), MoanaTypes::Volume, String)
alias ServiceRequestToNodeWithVolume = Tuple(Hash(String, Array(MoanaTypes::ServiceUnit)), MoanaTypes::Volume)

ACTION_VALIDATE_VOLUME_CREATE = "validate_volume_create"
ACTION_VOLUME_CREATE = "volume_create"
Expand Down
5 changes: 3 additions & 2 deletions types/src/moana_types.cr
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ module MoanaTypes
node = Node.new,
type = "",
fs = "",
service = ServiceUnit.new,
metrics = Metrics.new,
heal_metrics = HealMetrics.new,
service = ServiceUnit.new
migrate_data_status = MigrateDataRebalanceStatus.new

def initialize(node_name, @port, @path)
@node.name = node_name
Expand Down Expand Up @@ -245,7 +246,7 @@ module MoanaTypes
include JSON::Serializable

property complete = false, progress = 0, scanned_bytes = 0_i64,
total_bytes = 0_i64, duration_seconds = 0, estimate_seconds = 0
total_bytes = 0_i64, duration_seconds = 0, estimate_seconds = 0, state = "failed"

def initialize
end
Expand Down

0 comments on commit e20c7db

Please sign in to comment.