This repository was archived by the owner on Oct 4, 2021. It is now read-only.
forked from xtreemfs/xtreemfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_volume.py
159 lines (133 loc) · 7.49 KB
/
test_volume.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright (c) 2009-2011 by Minor Gordon, Bjoern Kolbeck, Zuse Institute Berlin
# Licensed under the BSD License, see LICENSE file for details.
from time import sleep
import sys, os, subprocess, signal
DEBUG_LEVELS = [ 'EMERG', 'ALERT', 'CRIT', 'ERR', 'WARNING', 'NOTICE', 'INFO', 'DEBUG' ]
class Volume:
def __init__(self,
name,
mount_point_dir_path,
xtreemfs_dir,
debug_level,
mount_options,
mkfs_options,
mrc_uri,
dir_uri,
pkcs12_file_path,
pkcs12_passphrase,
stripe_width,
stripe_size,
rwr_policy,
rwr_factor,
ronly_factor):
self.__mount_point_dir_path = os.path.abspath(mount_point_dir_path)
self.__name = name
self.__debug_level = debug_level
self.__xtreemfs_dir = xtreemfs_dir
self.__mount_options = mount_options
self.__mkfs_options = mkfs_options
self.__mrc_uri = mrc_uri
if not mrc_uri.endswith("/"):
self.__mrc_uri += "/"
self.__dir_uri = dir_uri
self.__pkcs12_file_path = pkcs12_file_path
self.__pkcs12_passphrase = pkcs12_passphrase
self.__stripe_width = stripe_width
self.__stripe_size = stripe_size
self.__rwr_policy = rwr_policy
self.__rwr_factor = rwr_factor
self.__ronly_factor = ronly_factor
def create(self):
mkfs_xtreemfs_file_path = os.path.abspath(os.path.join(self.__xtreemfs_dir, "bin", "mkfs.xtreemfs"))
if not os.path.exists(mkfs_xtreemfs_file_path):
mkfs_xtreemfs_file_path = "mkfs.xtreemfs" # Assume it's in the global path
mkfs_xtreemfs_args = [mkfs_xtreemfs_file_path]
mkfs_xtreemfs_args.extend(("-d", DEBUG_LEVELS[int(self.__debug_level)]))
mkfs_xtreemfs_args.extend(("-p", 'RAID0'))
if self.__pkcs12_file_path is not None: mkfs_xtreemfs_args.extend(("--pkcs12-file-path", self.__pkcs12_file_path))
if self.__pkcs12_passphrase is not None: mkfs_xtreemfs_args.extend(("--pkcs12-passphrase", self.__pkcs12_passphrase))
mkfs_xtreemfs_args.extend(("-s", str(self.__stripe_size)))
mkfs_xtreemfs_args.extend(("-w", str(self.__stripe_width)))
mkfs_xtreemfs_args.extend(self.__mkfs_options)
mkfs_xtreemfs_args.append(self.__mrc_uri + self.__name)
mkfs_xtreemfs_args = " ".join(mkfs_xtreemfs_args)
print "xtestenv: creating volume", self.__name, "with", mkfs_xtreemfs_args
retcode = subprocess.call(mkfs_xtreemfs_args, shell=True)
if retcode != 0:
raise RuntimeError("Failed to create volume: " + self.__name + " You can use the option --clean-test-dir to clean previous data from the test dir. mkfs.xtreemfs return value: " + str(retcode) + " Executed command: " + mkfs_xtreemfs_args)
def get_mount_point_dir_path(self):
return self.__mount_point_dir_path
def get_name(self):
return self.__name
def mount(self, log_file_path):
xtfsutil_file_path = os.path.abspath(os.path.join(self.__xtreemfs_dir, "bin", "xtfsutil"))
if not os.path.exists(xtfsutil_file_path):
xtfsutil_file_path = "xtfsutil" # Assume it's in the global path
try: os.mkdir(self.__mount_point_dir_path)
except: pass
mount_xtreemfs_file_path = os.path.abspath(os.path.join(self.__xtreemfs_dir, "bin", "mount.xtreemfs"))
if not os.path.exists(mount_xtreemfs_file_path):
mount_xtreemfs_file_path = "mount.xtreemfs" # Assume it's in the global path
mount_xtreemfs_args = [mount_xtreemfs_file_path]
mount_xtreemfs_args.append("-f") # So we can redirect stdout and stderr
mount_xtreemfs_args.extend(("-d", DEBUG_LEVELS[int(self.__debug_level)]))
mount_xtreemfs_args.extend(self.__mount_options)
if self.__pkcs12_file_path is not None: mount_xtreemfs_args.extend(("--pkcs12-file-path", self.__pkcs12_file_path))
if self.__pkcs12_passphrase is not None: mount_xtreemfs_args.extend(("--pkcs12-passphrase", self.__pkcs12_passphrase))
volume_uri = self.__dir_uri
if not volume_uri.endswith("/"): volume_uri += "/"
volume_uri += self.__name
mount_xtreemfs_args.append(volume_uri)
mount_xtreemfs_args.append(self.get_mount_point_dir_path())
if log_file_path is None:
stdout = sys.stdout
stderr = sys.stderr
else:
stderr = stdout = open(log_file_path, "a")
print "xtestenv: mounting volume", self.__name, "at", self.get_mount_point_dir_path(), "with", " ".join(mount_xtreemfs_args)
# Use subprocess.Popen instead of subprocess.call to run in the background
p = subprocess.Popen(mount_xtreemfs_args, stderr=stderr, stdout=stdout)
sleep(1.0)
if p.returncode is not None:
raise RuntimeError("Failed to mount volume '" + self.__name + "' error: " + str(p.returncode))
# Use 'waitpid' to touch any zombies and ensure that these are cleaned up first before checking /proc/<pid>.
try: os.waitpid(p.pid, os.WNOHANG)
# We dont care about the actual result of waitpid.
except OSError: pass
if not os.path.exists("/proc/" + str(p.pid)):
raise RuntimeError("Failed to mount volume '" + self.__name + "' error: mount.xtreemfs did not successfully start.")
# enable replication
if self.__rwr_factor > 0:
command = (xtfsutil_file_path + " " +
"--set-drp " +
"--replication-policy="+self.__rwr_policy + " " +
"--replication-factor="+str(self.__rwr_factor) + " " +
self.__mount_point_dir_path)
retcode = subprocess.call(command, shell=True)
if retcode != 0:
raise RuntimeError("Failed to enable read-write replication on volume: " + self.__name
+ " xtfsutil return value: " + str(retcode)
+ " Executed command: " + command)
# enable replicate on close for ronly replication
if self.__ronly_factor > 0:
command = (xtfsutil_file_path + " " +
"--set-drp " +
"--replication-policy=readonly " +
"--replication-factor="+str(self.__rwr_factor) + " " +
self.__mount_point_dir_path)
retcode = subprocess.call(command, shell=True)
if retcode != 0:
raise RuntimeError("Failed to enable read/only replication on volume: " + self.__name
+ " xtfsutil return value: " + str(retcode)
+ " Executed command: " + command)
def unmount(self):
for mounts_line in open("/proc/mounts").readlines():
mounts_line_parts = mounts_line.split()
test_device = mounts_line_parts[0]
test_mount_point_dir_path = mounts_line_parts[1]
if test_mount_point_dir_path.endswith(self.get_mount_point_dir_path()):
fusermount_args = " ".join(["fusermount", "-u", "-z", self.get_mount_point_dir_path()])
print "xtestenv: unmounting volume", self.get_name(), "with", fusermount_args
retcode = subprocess.call(fusermount_args, shell=True)
if retcode != 0:
print("Failed to unmount volume: " + self.__name + " fusermount -u return value: " + str(retcode))