forked from apache/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfqltool_test.py
189 lines (165 loc) · 7.57 KB
/
fqltool_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import pytest
import logging
import os
import subprocess
import tempfile
from dtest import Tester
from shutil import rmtree
since = pytest.mark.since
logger = logging.getLogger(__name__)
@since('4.0')
class TestFQLTool(Tester):
"""
Makes sure fqltool replay and fqltool compare work
@jira_ticket CASSANDRA-14690
"""
def test_replay(self):
"""
Generates a full query log, wipes the nodes and replays the
query log, then makes sure that the data is correct.
@jira_ticket CASSANDRA-14690
"""
self.cluster.populate(2).start()
node1, node2 = self.cluster.nodelist()
with tempfile.TemporaryDirectory() as temp_dir:
tmpdir = tempfile.mkdtemp(dir=temp_dir)
tmpdir2 = tempfile.mkdtemp(dir=temp_dir)
node1.nodetool("enablefullquerylog --path={}".format(tmpdir))
node2.nodetool("enablefullquerylog --path={}".format(tmpdir2))
node1.stress(['write', 'n=1000'])
node1.flush()
node2.flush()
node1.nodetool("disablefullquerylog")
node2.nodetool("disablefullquerylog")
node1.stop(wait_other_notice=True)
node2.stop(wait_other_notice=True)
node1.clear()
node2.clear()
node1.start(wait_for_binary_proto=True)
node2.start(wait_for_binary_proto=True)
# make sure the node is empty:
got_exception = False
try:
node1.stress(['read', 'n=1000'])
except Exception:
got_exception = True
assert got_exception
# replay the log files
self._run_fqltool_replay(node1, [tmpdir, tmpdir2], "127.0.0.1", None, None, True)
# and verify the data is there
node1.stress(['read', 'n=1000'])
def test_compare(self):
"""
uses fqltool replay to compare two runs of the same query log and makes
sure that the results match
@jira_ticket CASSANDRA-14690
"""
self.cluster.populate(1).start()
node1 = self.cluster.nodelist()[0]
with tempfile.TemporaryDirectory() as temp_dir:
results1 = tempfile.mkdtemp(dir=temp_dir)
queries1 = tempfile.mkdtemp(dir=temp_dir)
results2 = tempfile.mkdtemp(dir=temp_dir)
queries2 = tempfile.mkdtemp(dir=temp_dir)
fqldir = tempfile.mkdtemp(dir=temp_dir)
node1.stress(['write', 'n=1000'])
node1.flush()
node1.nodetool("enablefullquerylog --path={}".format(fqldir))
node1.stress(['read', 'n=1000'])
node1.nodetool("disablefullquerylog")
self._run_fqltool_replay(node1, [fqldir], "127.0.0.1", queries1, results1)
self._run_fqltool_replay(node1, [fqldir], "127.0.0.1", queries2, results2)
output = self._run_fqltool_compare(node1, queries1, [results1, results2])
assert b"MISMATCH" not in output # running the same reads against the same data
def test_compare_mismatch(self):
"""
generates two fql log files with different data (seq is different when running stress)
then asserts that the replays of each generates a mismatch
@jira_ticket CASSANDRA-14690
"""
self.cluster.populate(1).start()
node1 = self.cluster.nodelist()[0]
with tempfile.TemporaryDirectory() as temp_dir:
fqldir1 = tempfile.mkdtemp(dir=temp_dir)
fqldir2 = tempfile.mkdtemp(dir=temp_dir)
results1 = tempfile.mkdtemp(dir=temp_dir)
queries1 = tempfile.mkdtemp(dir=temp_dir)
results2 = tempfile.mkdtemp(dir=temp_dir)
queries2 = tempfile.mkdtemp(dir=temp_dir)
node1.nodetool("enablefullquerylog --path={}".format(fqldir1))
node1.stress(['write', 'n=1000'])
node1.flush()
node1.stress(['read', 'n=1000'])
node1.nodetool("disablefullquerylog")
node1.stop()
for d in node1.data_directories():
rmtree(d)
os.mkdir(d)
node1.start(wait_for_binary_proto=True)
node1.nodetool("enablefullquerylog --path={}".format(fqldir2))
node1.stress(['write', 'n=1000', '-pop', 'seq=1000..2000'])
node1.flush()
node1.stress(['read', 'n=1000', '-pop', 'seq=1000..2000'])
node1.nodetool("disablefullquerylog")
node1.stop()
for d in node1.data_directories():
rmtree(d)
os.mkdir(d)
node1.start(wait_for_binary_proto=True)
self._run_fqltool_replay(node1, [fqldir1], "127.0.0.1", queries1, results1)
node1.stop()
for d in node1.data_directories():
rmtree(d)
os.mkdir(d)
node1.start(wait_for_binary_proto=True)
self._run_fqltool_replay(node1, [fqldir2], "127.0.0.1", queries2, results2)
output = self._run_fqltool_compare(node1, queries1, [results1, results2])
assert b"MISMATCH" in output # compares two different stress runs, should mismatch
def test_jvmdtest(self):
""" mimics the behavior of the in-jvm dtest, see CASSANDRA-16720 """
self.cluster.populate(1).start()
node1 = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
with tempfile.TemporaryDirectory() as temp_dir:
tmpdir = tempfile.mkdtemp(dir=temp_dir)
session.execute("CREATE KEYSPACE fql_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};")
node1.nodetool("enablefullquerylog --path={}".format(tmpdir))
session.execute("CREATE TABLE fql_ks.fql_table (id int primary key);")
session.execute("INSERT INTO fql_ks.fql_table (id) VALUES (1)")
node1.nodetool("disablefullquerylog")
session.execute("DROP TABLE fql_ks.fql_table;")
self._run_fqltool_replay(node1, [tmpdir], "127.0.0.1", None, None, replay_ddl_statements=False)
got_exception = False
try:
session.execute("SELECT * FROM fql_ks.fql_table;")
except Exception:
got_exception = True
assert got_exception
self._run_fqltool_replay(node1, [tmpdir], "127.0.0.1", None, None, replay_ddl_statements=True)
rs = session.execute("SELECT * FROM fql_ks.fql_table;")
assert(len(list(rs)) == 1)
def _run_fqltool_replay(self, node, logdirs, target, queries, results, replay_ddl_statements=False):
fqltool = self.fqltool(node)
args = [fqltool, "replay", "--target {}".format(target)]
if queries is not None:
args.append("--store-queries {}".format(queries))
if results is not None:
args.append("--results {}".format(results))
if replay_ddl_statements is True:
args.append("--replay-ddl-statements")
args.extend(logdirs)
rc = subprocess.call(args)
assert rc == 0
def _run_fqltool_compare(self, node, queries, results):
fqltool = self.fqltool(node)
args = [fqltool, "compare", "--queries {}".format(queries)]
args.extend([os.path.join(r, "127.0.0.1") for r in results])
logger.info(args)
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
logger.info(stdout)
return stdout
def fqltool(self, node):
cdir = node.get_install_dir()
fqltool = os.path.join(cdir, 'tools', 'bin', 'fqltool')
return fqltool