forked from apache/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_repair_test.py
902 lines (726 loc) · 40 KB
/
read_repair_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
from contextlib import contextmanager
import glob
import os
import time
import pytest
import logging
import subprocess
from uuid import uuid4
from cassandra import ConsistencyLevel, WriteTimeout, ReadTimeout
from cassandra.query import SimpleStatement
from ccmlib.node import Node
from pytest import raises
from dtest import Tester, create_ks
from tools.assertions import assert_one
from tools.data import rows_to_list
from tools.jmxutils import JolokiaAgent, make_mbean
from tools.misc import retry_till_success
since = pytest.mark.since
ported_to_in_jvm = pytest.mark.ported_to_in_jvm
logger = logging.getLogger(__name__)
def byteman_validate(node, script, verbose=False, opts=None):
opts = opts or []
cdir = node.get_install_dir()
byteman_cmd = []
byteman_cmd.append(os.path.join(os.environ['JAVA_HOME'],
'bin',
'java'))
byteman_cmd.append('-cp')
jars = [
glob.glob(os.path.join(cdir, 'build', 'lib', 'jars', 'byteman-[0-9]*.jar'))[0],
os.path.join(cdir, 'build', '*'),
]
byteman_cmd.append(':'.join(jars))
byteman_cmd.append('org.jboss.byteman.check.TestScript')
byteman_cmd.append('-p')
byteman_cmd.append(node.byteman_port)
if verbose and '-v' not in opts:
byteman_cmd.append('-v')
byteman_cmd.append(script)
# process = subprocess.Popen(byteman_cmd)
# out, err = process.communicate()
out = subprocess.check_output(byteman_cmd)
if (out is not None) and isinstance(out, bytes):
out = out.decode()
has_errors = 'ERROR' in out
if verbose and not has_errors:
print (out)
assert not has_errors, "byteman script didn't compile\n" + out
def build_byteman_submit_command(node, opts):
cdir = node.get_install_dir()
byteman_cmd = [os.path.join(os.environ['JAVA_HOME'], 'bin', 'java'),
'-cp',
glob.glob(os.path.join(cdir, 'build', 'lib', 'jars', 'byteman-submit-[0-9]*.jar'))[0],
'org.jboss.byteman.agent.submit.Submit',
'-p', node.byteman_port,
'-h', node.network_interfaces['binary'][0]] + opts
return byteman_cmd
def request_verb_timing(node):
# -y is byteman's built-in flag for reading system props
byteman_cmd = build_byteman_submit_command(node, ['-y'])
out = subprocess.check_output(byteman_cmd)
if (out is not None) and isinstance(out, bytes):
out = out.decode()
lines = out.splitlines()
props = {}
for line in lines:
# look for the custom separators, otherwise skip
if "=" in line and "|" in line:
key, value = line.split("=")
split_key = key.split("|")
ip = split_key[-2].replace("/", "")
verb = split_key[-1]
props.setdefault(ip, {}).update({verb: int(value)})
return props
class TestReadRepair(Tester):
@pytest.fixture(scope='function')
def fixture_set_cluster_settings(self, fixture_dtest_setup):
cluster = fixture_dtest_setup.cluster
cluster.populate(3)
# disable dynamic snitch to make replica selection deterministic
# when we use patient_exclusive_cql_connection, CL=1 and RF=n
cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
'endpoint_snitch': 'GossipingPropertyFileSnitch',
'dynamic_snitch': False})
for node in cluster.nodelist():
with open(os.path.join(node.get_conf_dir(), 'cassandra-rackdc.properties'), 'w') as snitch_file:
snitch_file.write("dc=datacenter1" + os.linesep)
snitch_file.write("rack=rack1" + os.linesep)
snitch_file.write("prefer_local=true" + os.linesep)
cluster.start()
@since('3.0')
@ported_to_in_jvm('4.0')
def test_alter_rf_and_run_read_repair(self, fixture_set_cluster_settings):
"""
@jira_ticket CASSANDRA-10655
@jira_ticket CASSANDRA-10657
Test that querying only a subset of all the columns in a row doesn't confuse read-repair to avoid
the problem described in CASSANDRA-10655.
"""
# session is only used to setup & do schema modification. Actual data queries are done directly on
# each node, using an exclusive connection and CL.ONE
session = self.patient_cql_connection(self.cluster.nodelist()[0])
initial_replica, non_replicas = self.do_initial_setup(session)
# Execute a query at CL.ALL on one of the nodes which was *not* the initial replica. It should trigger a
# read repair and propagate the data to all 3 nodes.
# Note: result of the read repair contains only the selected column (a), not all columns
logger.debug("Executing 'SELECT a...' on non-initial replica to trigger read repair " + non_replicas[0].name)
read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
assert_one(read_repair_session, "SELECT a FROM alter_rf_test.t1 WHERE k=1", [1], cl=ConsistencyLevel.ALL)
# The read repair should have repaired the replicas, at least partially (see CASSANDRA-10655)
# verify by querying each replica in turn.
value_skipping_disabled = True if self.cluster.version() < '3.4' else False
self.check_data_on_each_replica(expect_fully_repaired=value_skipping_disabled, initial_replica=initial_replica)
# Now query again at CL.ALL but this time selecting all columns, which should ensure that 'b' also gets repaired
query = "SELECT * FROM alter_rf_test.t1 WHERE k=1"
logger.debug("Executing 'SELECT *...' on non-initial replica to trigger read repair " + non_replicas[0].name)
assert_one(read_repair_session, query, [1, 1, 1], cl=ConsistencyLevel.ALL)
# Check each replica individually again now that we expect the data to be fully repaired
self.check_data_on_each_replica(expect_fully_repaired=True, initial_replica=initial_replica)
@since('2.1', max_version='3.11.x')
def test_read_repair_chance(self, fixture_set_cluster_settings):
"""
@jira_ticket CASSANDRA-12368
"""
# session is only used to setup & do schema modification. Actual data queries are done directly on
# each node, using an exclusive connection and CL.ONE
session = self.patient_cql_connection(self.cluster.nodelist()[0])
initial_replica, non_replicas = self.do_initial_setup(session)
# To ensure read repairs are triggered, set the table property to 100%
logger.debug("Setting table read repair chance to 1")
session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 1;""")
# Execute a query at CL.ONE on one of the nodes which was *not* the initial replica. It should trigger a
# read repair because read_repair_chance == 1, and propagate the data to all 3 nodes.
# Note: result of the read repair contains only the selected column (a), not all columns, so we won't expect
# 'b' to have been fully repaired afterwards.
logger.debug("Executing 'SELECT a...' on non-initial replica to trigger read repair " + non_replicas[0].name)
read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
read_repair_session.execute(SimpleStatement("SELECT a FROM alter_rf_test.t1 WHERE k=1",
consistency_level=ConsistencyLevel.ONE))
# Query each replica individually to ensure that read repair was triggered. We should expect that only
# the initial replica has data for both the 'a' and 'b' columns. If the cluster is on > 3.4, the read repair
# should only have affected the selected column (CASSANDRA-10655), so the other two replicas should only have
# that data.
# Note: we need to temporarily set read_repair_chance to 0 while we perform this check.
logger.debug("Setting table read repair chance to 0 while we verify each replica's data")
session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 0;""")
# The read repair is run in the background, so we spin while checking that the repair has completed
value_skipping_disabled = True if self.cluster.version() < '3.4' else False
retry_till_success(self.check_data_on_each_replica,
expect_fully_repaired=value_skipping_disabled,
initial_replica=initial_replica,
timeout=30,
bypassed_exception=NotRepairedException)
# Re-enable global read repair and perform another query on a non-replica. This time the query selects all
# columns so we also expect the value for 'b' to be repaired.
logger.debug("Setting table read repair chance to 1")
session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 1;""")
logger.debug("Executing 'SELECT *...' on non-initial replica to trigger read repair " + non_replicas[0].name)
read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
read_repair_session.execute(SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1",
consistency_level=ConsistencyLevel.ONE))
# Query each replica again to ensure that second read repair was triggered. This time, we expect the
# data to be fully repaired (both 'a' and 'b' columns) by virtue of the query being 'SELECT *...'
# As before, we turn off read repair before doing this check.
logger.debug("Setting table read repair chance to 0 while we verify each replica's data")
session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 0;""")
retry_till_success(self.check_data_on_each_replica,
expect_fully_repaired=True,
initial_replica=initial_replica,
timeout=30,
bypassed_exception=NotRepairedException)
def do_initial_setup(self, session):
"""
Create a keyspace with rf=1 and a table containing a single row with 2 non-primary key columns.
Insert 1 row, placing the data on a single initial replica. Then, alter the keyspace to rf=3, but don't
repair. Tests will execute various reads on the replicas and assert the effects of read repair.
:param session: Used to perform the schema setup & insert the data
:return: a tuple containing the node which initially acts as the replica, and a list of the other two nodes
"""
# Disable speculative retry and [dclocal]read_repair in initial setup.
session.execute("""CREATE KEYSPACE alter_rf_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};""")
options = "speculative_retry='NONE'";
if self.cluster.version() < '4.0':
options = options + " AND read_repair_chance=0 AND dclocal_read_repair_chance=0"
session.execute("CREATE TABLE alter_rf_test.t1 (k int PRIMARY KEY, a int, b int) WITH " + options)
session.execute("INSERT INTO alter_rf_test.t1 (k, a, b) VALUES (1, 1, 1);")
# identify the initial replica and trigger a flush to ensure reads come from sstables
initial_replica, non_replicas = self.identify_initial_placement()
logger.debug("At RF=1 replica for data is " + initial_replica.name)
initial_replica.flush()
# Just some basic validation.
# At RF=1, it shouldn't matter which node we query, as the actual data should always come from the
# initial replica when reading at CL ONE
for n in self.cluster.nodelist():
logger.debug("Checking " + n.name)
session = self.patient_exclusive_cql_connection(n)
assert_one(session, "SELECT * FROM alter_rf_test.t1 WHERE k=1", [1, 1, 1], cl=ConsistencyLevel.ONE)
# Alter so RF=n but don't repair, calling tests will execute queries to exercise read repair,
# either at CL.ALL or after setting read_repair_chance to 100%.
logger.debug("Changing RF from 1 to 3")
session.execute("""ALTER KEYSPACE alter_rf_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};""")
return initial_replica, non_replicas
def identify_initial_placement(self):
"""
Identify which node in the 3 node cluster contains the specific key at the point that the test keyspace has
rf=1.
:return: tuple containing the initial replica, plus a list of the other 2 replicas.
"""
nodes = self.cluster.nodelist()
out, _, _ = nodes[0].nodetool("getendpoints alter_rf_test t1 1")
address = out.split('\n')[-2]
initial_replica = None
non_replicas = []
for node in nodes:
if node.address() == address:
initial_replica = node
else:
non_replicas.append(node)
assert initial_replica is not None, "Couldn't identify initial replica"
return initial_replica, non_replicas
def check_data_on_each_replica(self, expect_fully_repaired, initial_replica):
"""
Perform a SELECT * query at CL.ONE on each replica in turn. If expect_fully_repaired is True, we verify that
each replica returns the full row being queried. If not, then we only verify that the 'a' column has been
repaired.
"""
stmt = SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1", consistency_level=ConsistencyLevel.ONE)
logger.debug("Checking all if read repair has completed on all replicas")
for n in self.cluster.nodelist():
logger.debug("Checking {n}, {x}expecting all columns"
.format(n=n.name, x="" if expect_fully_repaired or n == initial_replica else "not "))
session = self.patient_exclusive_cql_connection(n)
res = rows_to_list(session.execute(stmt))
logger.debug("Actual result: " + str(res))
expected = [[1, 1, 1]] if expect_fully_repaired or n == initial_replica else [[1, 1, None]]
if res != expected:
raise NotRepairedException()
@since('2.0')
@ported_to_in_jvm('4.0')
def test_range_slice_query_with_tombstones(self, fixture_set_cluster_settings):
"""
@jira_ticket CASSANDRA-8989
@jira_ticket CASSANDRA-9502
Range-slice queries with CL>ONE do unnecessary read-repairs.
Reading from table which contains collection type using token function and with CL > ONE causes overwhelming writes to replicas.
It's possible to check the behavior with tracing - pattern matching in system_traces.events.activity
"""
node1 = self.cluster.nodelist()[0]
session1 = self.patient_exclusive_cql_connection(node1)
session1.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2}")
session1.execute("""
CREATE TABLE ks.cf (
key int primary key,
value double,
txt text
);
""")
for n in range(1, 2500):
str = "foo bar %d iuhiu iuhiu ihi" % n
session1.execute("INSERT INTO ks.cf (key, value, txt) VALUES (%d, %d, '%s')" % (n, n, str))
self.cluster.flush()
self.cluster.stop()
self.cluster.start()
session1 = self.patient_exclusive_cql_connection(node1)
for n in range(1, 1000):
session1.execute("DELETE FROM ks.cf WHERE key = %d" % (n))
time.sleep(1)
node1.flush()
time.sleep(1)
query = SimpleStatement("SELECT * FROM ks.cf LIMIT 100", consistency_level=ConsistencyLevel.LOCAL_QUORUM)
future = session1.execute_async(query, trace=True)
future.result()
trace = future.get_query_trace(max_wait=120)
self.pprint_trace(trace)
for trace_event in trace.events:
# Step 1, find coordinator node:
activity = trace_event.description
assert "Appending to commitlog" not in activity
assert "Adding to cf memtable" not in activity
assert "Acquiring switchLock read lock" not in activity
@since('3.0')
@ported_to_in_jvm('4.0')
def test_gcable_tombstone_resurrection_on_range_slice_query(self, fixture_set_cluster_settings):
"""
@jira_ticket CASSANDRA-11427
Range queries before the 11427 will trigger read repairs for puregable tombstones on hosts that already compacted given tombstones.
This will result in constant transfer and compaction actions sourced by few nodes seeding purgeable tombstones and triggered e.g.
by periodical jobs scanning data range wise.
"""
node1, node2, _ = self.cluster.nodelist()
session1 = self.patient_cql_connection(node1)
create_ks(session1, 'gcts', 3)
query = """
CREATE TABLE gcts.cf1 (
key text,
c1 text,
PRIMARY KEY (key, c1)
)
WITH gc_grace_seconds=0
AND compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'};
"""
session1.execute(query)
# create row tombstone
delete_stmt = SimpleStatement("DELETE FROM gcts.cf1 WHERE key = 'a'", consistency_level=ConsistencyLevel.ALL)
session1.execute(delete_stmt)
# flush single sstable with tombstone
node1.flush()
node2.flush()
# purge tombstones from node2 (gc grace 0)
node2.compact()
# execute range slice query, which should not trigger read-repair for purged TS
future = session1.execute_async(SimpleStatement("SELECT * FROM gcts.cf1", consistency_level=ConsistencyLevel.ALL), trace=True)
future.result()
trace = future.get_query_trace(max_wait=120)
self.pprint_trace(trace)
for trace_event in trace.events:
activity = trace_event.description
assert "Sending READ_REPAIR message" not in activity
@since('3.0')
def test_tracing_does_not_interfere_with_digest_calculation(self):
"""
Test that enabling tracing doesn't interfere with digest responses when using RandomPartitioner.
The use of a threadlocal MessageDigest for generating both DigestResponse messages and for
calculating tokens meant that the DigestResponse was always incorrect when both RP and tracing
were enabled, leading to unnecessary data reads.
@jira_ticket CASSANDRA-13964
"""
cluster = self.cluster
cluster.populate(3)
cluster.set_configuration_options(values={'write_request_timeout_in_ms': 30000,
'read_request_timeout_in_ms': 30000})
cluster.set_partitioner("org.apache.cassandra.dht.RandomPartitioner")
cluster.start(jvm_args=['-Dcassandra.wait_for_tracing_events_timeout_secs=15'])
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 3)
session.execute("""
CREATE TABLE ks.users (
userid uuid PRIMARY KEY,
firstname text,
lastname text,
age int
);
""")
insert = session.prepare(
"INSERT INTO ks.users (userid, firstname, lastname, age) "
"VALUES (?, 'Frodo', 'Baggins', 32)")
insert.consistency_level = ConsistencyLevel.ALL
select = session.prepare(
"SELECT firstname, lastname "
"FROM ks.users WHERE userid = ?")
select.consistency_level = ConsistencyLevel.ALL
for _ in range(10):
id = uuid4()
session.execute(insert.bind((id,)), timeout=30)
res = session.execute(select.bind((id,)), timeout=30, trace=True)
assert 1 == len(res.response_future.get_query_trace_ids())
rr_count = make_mbean('metrics', type='ReadRepair', name='RepairedBlocking')
with JolokiaAgent(node1) as jmx:
# the MBean may not have been initialized, in which case Jolokia agent will return
# a HTTP 404 response. If we receive such, we know that no digest mismatch was reported
# If we are able to read the MBean attribute, assert that the count is 0
if jmx.has_mbean(rr_count):
# expect 0 digest mismatches
assert 0 == jmx.read_attribute(rr_count, 'Count')
else:
pass
def pprint_trace(self, trace):
"""Pretty print a trace"""
if logging.root.level == logging.DEBUG:
print(("-" * 40))
for t in trace.events:
print(("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name)))
print(("-" * 40))
def quorum(query_string):
return SimpleStatement(query_string=query_string, consistency_level=ConsistencyLevel.QUORUM)
kcv = lambda k, c, v: [k, c, v]
listify = lambda results: [list(r) for r in results]
class StorageProxy(object):
def __init__(self, node):
assert isinstance(node, Node)
self.node = node
self.jmx = JolokiaAgent(node)
def start(self):
self.jmx.start()
def stop(self):
self.jmx.stop()
def _get_metric(self, metric):
mbean = make_mbean("metrics", type="ReadRepair", name=metric)
return self.jmx.read_attribute(mbean, "Count")
@property
def blocking_read_repair(self):
return self._get_metric("RepairedBlocking")
@property
def speculated_rr_read(self):
return self._get_metric("SpeculatedRead")
@property
def speculated_rr_write(self):
return self._get_metric("SpeculatedWrite")
def get_table_metric(self, keyspace, table, metric, attr="Count"):
mbean = make_mbean("metrics", keyspace=keyspace, scope=table, type="Table", name=metric)
return self.jmx.read_attribute(mbean, attr)
def __enter__(self):
""" For contextmanager-style usage. """
self.start()
return self
def __exit__(self, exc_type, value, traceback):
""" For contextmanager-style usage. """
self.stop()
class TestSpeculativeReadRepair(Tester):
@pytest.fixture(scope='function', autouse=True)
def fixture_set_cluster_settings(self, fixture_dtest_setup):
cluster = fixture_dtest_setup.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
'dynamic_snitch': False,
'write_request_timeout_in_ms': 1000,
'read_request_timeout_in_ms': 1000})
cluster.populate(3, install_byteman=True, debug=True)
byteman_validate(cluster.nodelist()[0], './byteman/read_repair/sorted_live_endpoints.btm', verbose=True)
cluster.start(jvm_args=['-XX:-PerfDisableSharedMem'])
session = fixture_dtest_setup.patient_exclusive_cql_connection(cluster.nodelist()[0], timeout=2)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}")
session.execute("CREATE TABLE ks.tbl (k int, c int, v int, primary key (k, c)) WITH speculative_retry = '400ms';")
def get_cql_connection(self, node, **kwargs):
return self.patient_exclusive_cql_connection(node, retry_policy=None, **kwargs)
@since('4.0')
def test_failed_read_repair(self):
"""
If none of the disagreeing nodes ack the repair mutation, the read should fail
"""
node1, node2, node3 = self.cluster.nodelist()
assert isinstance(node1, Node)
assert isinstance(node2, Node)
assert isinstance(node3, Node)
session = self.get_cql_connection(node1, timeout=2)
session.execute(quorum("INSERT INTO ks.tbl (k, c, v) VALUES (1, 0, 1)"))
node2.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
node2.byteman_submit(['./byteman/read_repair/stop_rr_writes.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_rr_writes.btm'])
with raises(WriteTimeout):
session.execute(quorum("INSERT INTO ks.tbl (k, c, v) VALUES (1, 1, 2)"))
node2.byteman_submit(['./byteman/read_repair/sorted_live_endpoints.btm'])
session = self.get_cql_connection(node2)
with StorageProxy(node2) as storage_proxy:
assert storage_proxy.blocking_read_repair == 0
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write == 0
with raises(ReadTimeout):
session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
assert storage_proxy.blocking_read_repair > 0
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write > 0
@since('4.0')
def test_normal_read_repair(self):
""" test the normal case """
node1, node2, node3 = self.cluster.nodelist()
assert isinstance(node1, Node)
assert isinstance(node2, Node)
assert isinstance(node3, Node)
session = self.get_cql_connection(node1, timeout=2)
session.execute(quorum("INSERT INTO ks.tbl (k, c, v) VALUES (1, 0, 1)"))
node2.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
session.execute("INSERT INTO ks.tbl (k, c, v) VALUES (1, 1, 2)")
# re-enable writes
node2.byteman_submit(['-u', './byteman/read_repair/stop_writes.btm'])
node2.byteman_submit(['./byteman/read_repair/sorted_live_endpoints.btm'])
coordinator = node2
# Stop reads on coordinator in order to make sure we do not go through
# the messaging service for the local reads
with StorageProxy(node2) as storage_proxy, stop_reads(coordinator):
assert storage_proxy.blocking_read_repair == 0
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write == 0
session = self.get_cql_connection(coordinator)
expected = [kcv(1, 0, 1), kcv(1, 1, 2)]
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
assert listify(results) == expected
assert storage_proxy.blocking_read_repair == 1
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write == 0
@since('4.0')
def test_speculative_data_request(self):
""" If one node doesn't respond to a full data request, it should query the other """
node1, node2, node3 = self.cluster.nodelist()
assert isinstance(node1, Node)
assert isinstance(node2, Node)
assert isinstance(node3, Node)
session = self.get_cql_connection(node1, timeout=2)
session.execute(quorum("INSERT INTO ks.tbl (k, c, v) VALUES (1, 0, 1)"))
node2.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
session.execute("INSERT INTO ks.tbl (k, c, v) VALUES (1, 1, 2)")
# re-enable writes
node2.byteman_submit(['-u', './byteman/read_repair/stop_writes.btm'])
node1.byteman_submit(['./byteman/read_repair/sorted_live_endpoints.btm'])
node1.byteman_submit(['./byteman/request_verb_timing.btm'])
with StorageProxy(node1) as storage_proxy:
assert storage_proxy.blocking_read_repair == 0
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write == 0
session = self.get_cql_connection(node1)
node2.byteman_submit(['./byteman/read_repair/stop_data_reads.btm'])
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
timing = request_verb_timing(node1)
repair_req_node3 = timing[node3.ip_addr].get('READ_REPAIR_REQ')
repair_req_node2 = timing[node2.ip_addr].get('READ_REPAIR_REQ')
assert listify(results) == [kcv(1, 0, 1), kcv(1, 1, 2)]
assert storage_proxy.blocking_read_repair == 1
assert storage_proxy.speculated_rr_read == 1
# under normal circumstances we don't expect a speculated write here,
# but the repair request to node 3 may timeout due to CPU contention and
# then a speculated write is sent to node 2, so we just make sure that the
# request to node 2 didn't happen before the request to node 3
assert storage_proxy.speculated_rr_write == 0 or repair_req_node2 > repair_req_node3
@since('4.0')
def test_speculative_write(self):
""" if one node doesn't respond to a read repair mutation, it should be sent to the remaining node """
node1, node2, node3 = self.cluster.nodelist()
assert isinstance(node1, Node)
assert isinstance(node2, Node)
assert isinstance(node3, Node)
session = self.get_cql_connection(node1, timeout=2)
session.execute(quorum("INSERT INTO ks.tbl (k, c, v) VALUES (1, 0, 1)"))
node2.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
session.execute("INSERT INTO ks.tbl (k, c, v) VALUES (1, 1, 2)")
# re-enable writes on node 3, leave them off on node2
node2.byteman_submit(['./byteman/read_repair/stop_rr_writes.btm'])
node1.byteman_submit(['./byteman/read_repair/sorted_live_endpoints.btm'])
with StorageProxy(node1) as storage_proxy:
assert storage_proxy.blocking_read_repair == 0
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write == 0
session = self.get_cql_connection(node1)
expected = [kcv(1, 0, 1), kcv(1, 1, 2)]
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
assert listify(results) == expected
assert storage_proxy.blocking_read_repair == 1
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write == 1
@since('4.0')
def test_quorum_requirement(self):
"""
Even if we speculate on every stage, we should still only require a quorum of responses for success
"""
node1, node2, node3 = self.cluster.nodelist()
assert isinstance(node1, Node)
assert isinstance(node2, Node)
assert isinstance(node3, Node)
session = self.get_cql_connection(node1, timeout=2)
session.execute(quorum("INSERT INTO ks.tbl (k, c, v) VALUES (1, 0, 1)"))
node2.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
session.execute("INSERT INTO ks.tbl (k, c, v) VALUES (1, 1, 2)")
# re-enable writes
node2.byteman_submit(['-u', './byteman/read_repair/stop_writes.btm'])
node3.byteman_submit(['-u', './byteman/read_repair/stop_writes.btm'])
# force endpoint order
node1.byteman_submit(['./byteman/read_repair/sorted_live_endpoints.btm'])
# node2.byteman_submit(['./byteman/read_repair/stop_digest_reads.btm'])
node2.byteman_submit(['./byteman/read_repair/stop_data_reads.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_rr_writes.btm'])
with StorageProxy(node1) as storage_proxy:
assert storage_proxy.get_table_metric("ks", "tbl", "SpeculativeRetries") == 0
assert storage_proxy.blocking_read_repair == 0
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write == 0
session = self.get_cql_connection(node1)
expected = [kcv(1, 0, 1), kcv(1, 1, 2)]
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
assert listify(results) == expected
assert storage_proxy.get_table_metric("ks", "tbl", "SpeculativeRetries") == 0
assert storage_proxy.blocking_read_repair == 1
assert storage_proxy.speculated_rr_read == 1
assert storage_proxy.speculated_rr_write == 1
@since('4.0')
def test_quorum_requirement_on_speculated_read(self):
"""
Even if we speculate on every stage, we should still only require a quorum of responses for success
"""
node1, node2, node3 = self.cluster.nodelist()
assert isinstance(node1, Node)
assert isinstance(node2, Node)
assert isinstance(node3, Node)
session = self.get_cql_connection(node1, timeout=2)
session.execute(quorum("INSERT INTO ks.tbl (k, c, v) VALUES (1, 0, 1)"))
node2.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_writes.btm'])
session.execute("INSERT INTO ks.tbl (k, c, v) VALUES (1, 1, 2)")
# re-enable writes
node2.byteman_submit(['-u', './byteman/read_repair/stop_writes.btm'])
node3.byteman_submit(['-u', './byteman/read_repair/stop_writes.btm'])
# force endpoint order
node1.byteman_submit(['./byteman/read_repair/sorted_live_endpoints.btm'])
node2.byteman_submit(['./byteman/read_repair/stop_digest_reads.btm'])
node3.byteman_submit(['./byteman/read_repair/stop_data_reads.btm'])
node2.byteman_submit(['./byteman/read_repair/stop_rr_writes.btm'])
with StorageProxy(node1) as storage_proxy:
assert storage_proxy.get_table_metric("ks", "tbl", "SpeculativeRetries") == 0
assert storage_proxy.blocking_read_repair == 0
assert storage_proxy.speculated_rr_read == 0
assert storage_proxy.speculated_rr_write == 0
session = self.get_cql_connection(node1)
expected = [kcv(1, 0, 1), kcv(1, 1, 2)]
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
assert listify(results) == expected
assert storage_proxy.get_table_metric("ks", "tbl", "SpeculativeRetries") == 1
assert storage_proxy.blocking_read_repair == 1
assert storage_proxy.speculated_rr_read == 0 # there shouldn't be any replicas to speculate on
assert storage_proxy.speculated_rr_write == 1
@contextmanager
def _byteman_cycle(nodes, scripts):
script_path = lambda name: './byteman/read_repair/' + name + '.btm'
for script in scripts:
byteman_validate(nodes[0], script_path(script))
for node in nodes:
assert isinstance(node, Node)
for name in scripts:
print(node.name)
node.byteman_submit([script_path(name)])
yield
for node in nodes:
for name in scripts:
print(node.name)
node.byteman_submit(['-u', script_path(name)])
@contextmanager
def stop_writes(*nodes, kind='all'):
assert kind in ('all', 'normal', 'repair')
normal = 'stop_writes'
repair = 'stop_rr_writes'
with _byteman_cycle(nodes, {'normal': [normal], 'repair': [repair], 'all': [normal, repair]}[kind]):
yield
@contextmanager
def stop_reads(*nodes, kind='all'):
data = 'stop_data_reads'
digest = 'stop_digest_reads'
with _byteman_cycle(nodes, {'data': [data], 'digest': [digest], 'all': [data, digest]}[kind]):
yield
kcvv = lambda k, c, v1, v2: [k, c, v1, v2]
@since('4.0')
@ported_to_in_jvm('4.0')
class TestReadRepairGuarantees(Tester):
@pytest.fixture(scope='function', autouse=True)
def fixture_set_cluster_settings(self, fixture_dtest_setup):
cluster = fixture_dtest_setup.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
'dynamic_snitch': False,
'write_request_timeout_in_ms': 500,
'read_request_timeout_in_ms': 500})
cluster.populate(3, install_byteman=True, debug=True).start(jvm_args=['-XX:-PerfDisableSharedMem'])
session = fixture_dtest_setup.patient_exclusive_cql_connection(cluster.nodelist()[0], timeout=2)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}")
def get_cql_connection(self, node, **kwargs):
return self.patient_exclusive_cql_connection(node, retry_policy=None, **kwargs)
@pytest.mark.parametrize("repair_type,expect_monotonic",
(('blocking', True), ('none', False)),
ids=('blocking', 'none'))
def test_monotonic_reads(self, repair_type, expect_monotonic):
"""
tests how read repair provides, or breaks, read monotonicity
blocking read repair should maintain monotonic quorum reads, async and none should not
"""
assert repair_type in ('blocking', 'async', 'none')
node1, node2, node3 = self.cluster.nodelist()
session = self.get_cql_connection(node1, timeout=2)
ddl = "CREATE TABLE ks.tbl (k int, c int, v1 int, v2 int, primary key (k, c)) WITH read_repair = '" + repair_type + "';"
print (ddl)
session.execute(ddl)
session.execute(quorum("INSERT INTO ks.tbl (k, c, v1, v2) VALUES (1, 0, 1, 1)"))
with stop_writes(node2, node3):
session.execute("INSERT INTO ks.tbl (k, c, v1, v2) VALUES (1, 0, 2, 2)")
with stop_reads(node3), stop_writes(node3):
if expect_monotonic:
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
else:
# if we don't expect monotonicity, read repair writes shouldn't block
with stop_writes(node2, kind='repair'):
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
assert listify(results) == [kcvv(1, 0, 2, 2)]
session = self.get_cql_connection(node3, timeout=2)
with stop_reads(node1):
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
if expect_monotonic:
assert listify(results) == [kcvv(1, 0, 2, 2)]
else:
assert listify(results) == [kcvv(1, 0, 1, 1)]
@pytest.mark.parametrize("repair_type,expect_atomic",
(('blocking', False), ('none', True)),
ids=('blocking', 'none'))
def test_atomic_writes(self, repair_type, expect_atomic):
"""
tests how read repair provides, or breaks, write atomicity
'none' read repair should maintain atomic writes, blocking and async should not
"""
assert repair_type in ('blocking', 'async', 'none')
node1, node2, node3 = self.cluster.nodelist()
session = self.get_cql_connection(node1, timeout=2)
ddl = "CREATE TABLE ks.tbl (k int, c int, v1 int, v2 int, primary key (k, c)) WITH read_repair = '" + repair_type + "';"
print (ddl)
session.execute(ddl)
session.execute(quorum("INSERT INTO ks.tbl (k, c, v1, v2) VALUES (1, 0, 1, 1)"))
with stop_writes(node2, node3):
session.execute("INSERT INTO ks.tbl (k, c, v1, v2) VALUES (1, 0, 2, 2)")
with stop_reads(node3), stop_writes(node3):
results = session.execute(quorum("SELECT v1 FROM ks.tbl WHERE k=1"))
assert listify(results) == [[2]]
# make sure async read repair has a chance to write the repair value
if repair_type == 'async':
time.sleep(1)
session = self.get_cql_connection(node3, timeout=2)
with stop_reads(node1):
results = session.execute(quorum("SELECT * FROM ks.tbl WHERE k=1"))
if expect_atomic:
assert listify(results) == [kcvv(1, 0, 1, 1)]
else:
assert listify(results) == [kcvv(1, 0, 2, 1)]
class NotRepairedException(Exception):
"""
Thrown to indicate that the data on a replica hasn't been doesn't match what we'd expect if a
specific read repair has run. See check_data_on_each_replica.
"""
pass