Skip to content

Commit

Permalink
Leave the current shard for tpt last, to eliminate double writes for …
Browse files Browse the repository at this point in the history
…inserts.

Signed-off-by: Dorin Hogea <[email protected]>
  • Loading branch information
dorinhogea committed Apr 29, 2024
1 parent de6604c commit db09d66
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 20 deletions.
2 changes: 1 addition & 1 deletion bdb/rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -2057,7 +2057,7 @@ inline static void update_node_acks(bdb_state_type *bdb_state, struct interned_s
if (h->expected_udp_count > 1 &&
delta > bdb_state->attr->udp_drop_delta_threshold &&
rate > bdb_state->attr->udp_drop_warn_percent) {
logmsg(LOGMSG_USER,
logmsg(LOGMSG_DEBUG,
"update_node_acks: host %s, expected_udp_count = %d, delta = "
"%.1f, loss = %f percent\n", host->str, h->expected_udp_count, delta, rate);
}
Expand Down
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ extern int gbl_timer_warn_interval;
int gbl_incoherent_clnt_wait = 10;
int gbl_new_leader_duration = 3;
extern int gbl_transaction_grace_period;
extern int gbl_partition_sc_reorder;
extern int gbl_dohsql_joins;

/*
Expand Down
4 changes: 4 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2491,6 +2491,10 @@ REGISTER_TUNABLE("transaction_grace_period",
"Time to wait for connections with pending transactions to go away on exit. (Default: 60)",
TUNABLE_INTEGER, &gbl_transaction_grace_period, 0, NULL, NULL, NULL, NULL);

REGISTER_TUNABLE("partition_sc_reorder",
"If the schema change is serialized for a partition, run current shard last",
TUNABLE_BOOLEAN, &gbl_partition_sc_reorder, 0, NULL, NULL, NULL, NULL);

REGISTER_TUNABLE("dohsql_joins",
"Enable to support joins in parallel sql execution (default: on)",
TUNABLE_BOOLEAN, &gbl_dohsql_joins, 0, NULL, NULL, NULL, NULL);
Expand Down
24 changes: 12 additions & 12 deletions db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ int gbl_master_sends_query_effects = 1;
int gbl_toblock_random_deadlock_trans;
int gbl_selectv_writelock = 0;
int gbl_debug_invalid_genid;
int gbl_partition_sc_reorder = 1;

extern int db_is_exiting();

Expand Down Expand Up @@ -5748,17 +5749,15 @@ static int start_schema_change_tran_wrapper(const char *tblname,
struct ireq *iq = sc->iq;
int rc;

if (arg->indx == 0) {
/* is first shard aliased? */
struct dbtable *db = get_dbtable_by_name(tblname);
if (!(db && db->sqlaliasname &&
strncasecmp(db->sqlaliasname, tblname,
strlen(db->sqlaliasname)) == 0))
strncpy0(sc->tablename, tblname, sizeof(sc->tablename));

/* we need to use the actual table name here; it might match partition name
*/
struct dbtable *db = get_dbtable_by_name(tblname);
if (db) {
strncpy0(sc->tablename, db->tablename, sizeof(sc->tablename));
} else {
strncpy0(sc->tablename, tblname, sizeof(sc->tablename));
}

if (gbl_disable_tpsc_tblvers) {
sc->fix_tp_badvers = 1;
}
Expand Down Expand Up @@ -5789,7 +5788,7 @@ static int start_schema_change_tran_wrapper(const char *tblname,
} else {
iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc;
if (arg->nshards == arg->indx + 1) {
if (arg->last) {
/* last shard was done */
iq->osql_flags |= OSQL_FLAGS_SCDONE;
} else {
Expand Down Expand Up @@ -5981,7 +5980,7 @@ static int _process_partitioned_table_merge(struct ireq *iq)
arg.indx = start_shard;
/* note: we have already set nothrevent depending on the number of shards */
rc = timepart_foreach_shard(
sc->tablename, start_schema_change_tran_wrapper_merge, &arg, start_shard);
sc->tablename, start_schema_change_tran_wrapper_merge, &arg, start_shard, 0);

if (first_shard->sqlaliasname) {
sc->partition.type = PARTITION_REMOVE; /* first shard is the collapsed table */
Expand Down Expand Up @@ -6076,7 +6075,7 @@ static int _process_single_table_sc_partitioning(struct ireq *iq)
/* should we serialize ? */
arg.s->nothrevent = sc->partition.u.tpt.retention > gbl_dohsql_sc_max_threads;
rc = timepart_foreach_shard_lockless(
sc->newpartition, start_schema_change_tran_wrapper, &arg);
sc->newpartition, start_schema_change_tran_wrapper, &arg, 0);

if (!rc&& sc->partition.type == PARTITION_ADD_MANUAL) {
if (!get_dbtable_by_name(LOGICAL_CRON_SYSTABLE)){
Expand Down Expand Up @@ -6161,7 +6160,8 @@ static int _process_partition_alter_and_drop(struct ireq *iq)
arg.s = sc;
arg.s->iq = iq;
rc = timepart_foreach_shard(sc->tablename,
start_schema_change_tran_wrapper, &arg, -1);
start_schema_change_tran_wrapper, &arg, -1,
gbl_partition_sc_reorder ? sc->nothrevent : 0);
out:
return rc;
}
Expand Down
33 changes: 29 additions & 4 deletions db/views.c
Original file line number Diff line number Diff line change
Expand Up @@ -2393,21 +2393,45 @@ static void _view_unregister_shards_lkless(timepart_views_t *views,
*/
int timepart_foreach_shard_lockless(timepart_view_t *view,
int func(const char *, timepart_sc_arg_t *),
timepart_sc_arg_t *arg)
timepart_sc_arg_t *arg, int reorder)
{
int rc = 0;
int i;
int skip = -1;
arg->nshards = view->nshards;
if (reorder)
skip = (view->rolltype == TIMEPART_ROLLOUT_TRUNCATE) ? view->current_shard : 0;

for (i = arg->indx; i < view->nshards; i++) {
if (arg)
if (skip >= 0 && i == skip) {
logmsg(LOGMSG_INFO, "%s Skipping %p for %s\n", __func__,
func, view->shards[i].tblname);
continue;
}
if (arg) {
arg->indx = i;
if (skip < 0)
arg->last = arg->nshards == arg->indx + 1;
}
logmsg(LOGMSG_INFO, "%s Applying %p to %s (existing shard)\n", __func__,
func, view->shards[i].tblname);
rc = func(view->shards[i].tblname, arg);
if (rc) {
break;
}
}
if (!rc && skip >= 0) {
/* process the current shard last to reduce the number of double writes
* during schema change
*/
if (arg) {
arg->indx = skip;
arg->last = 1;
}
logmsg(LOGMSG_INFO, "%s Applying %p to current shard %s\n", __func__,
func, view->shards[skip].tblname);
rc = func(view->shards[skip].tblname, arg);
}
return rc;
}

Expand All @@ -2420,7 +2444,8 @@ int timepart_foreach_shard_lockless(timepart_view_t *view,
*/
int timepart_foreach_shard(const char *view_name,
int func(const char *, timepart_sc_arg_t *),
timepart_sc_arg_t *arg, int first_shard)
timepart_sc_arg_t *arg, int first_shard,
int reorder)
{
timepart_views_t *views;
timepart_view_t *view;
Expand Down Expand Up @@ -2454,7 +2479,7 @@ int timepart_foreach_shard(const char *view_name,
}
}

rc = timepart_foreach_shard_lockless(view, func, arg);
rc = timepart_foreach_shard_lockless(view, func, arg, reorder);

done:
Pthread_rwlock_unlock(&views_lk);
Expand Down
5 changes: 3 additions & 2 deletions db/views.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ typedef struct timepart_sc_arg {
int nshards;
int rc;
void *tran; /*remove?*/
int last;
} timepart_sc_arg_t;

extern int gbl_partitioned_table_enabled;
Expand Down Expand Up @@ -288,7 +289,7 @@ int comdb2_partition_check_name_reuse(const char *tblname, char **partname, int
*/
int timepart_foreach_shard(const char *view_name,
int func(const char *, timepart_sc_arg_t *),
timepart_sc_arg_t *arg, int first_shard);
timepart_sc_arg_t *arg, int first_shard, int reorder);

/**
* Run "func" for each shard of a partition
Expand All @@ -298,7 +299,7 @@ int timepart_foreach_shard(const char *view_name,
*/
int timepart_foreach_shard_lockless(timepart_view_t *view,
int func(const char *, timepart_sc_arg_t *),
timepart_sc_arg_t *arg);
timepart_sc_arg_t *arg, int reorder);

/**
* Queue up the necessary events to rollout time partitions
Expand Down
2 changes: 1 addition & 1 deletion schemachange/sc_logic.c
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ static int verify_sc_resumed_for_all_shards(void *obj, void *arg)
sc_arg.s = tpt_sc->s;
/* start new sc for shards that were not resumed */
timepart_foreach_shard(tpt_sc->viewname, verify_sc_resumed_for_shard,
&sc_arg, -1);
&sc_arg, -1, 0);
assert(sc_arg.s != tpt_sc->s);
tpt_sc->s = sc_arg.s;
return 0;
Expand Down
1 change: 1 addition & 0 deletions tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@
(name='parallel_recovery', description='', type='INTEGER', value='0', read_only='Y')
(name='parallel_sync', description='Run checkpoint/memptrickle code with parallel writes', type='BOOLEAN', value='ON', read_only='N')
(name='participantid_bits', description='Number of bits allocated for the participant stripe ID (remaining bits are used for the update ID).', type='INTEGER', value='0', read_only='N')
(name='partition_sc_reorder', description='If the schema change is serialized for a partition, run current shard last', type='BOOLEAN', value='ON', read_only='N')
(name='partitioned_table_enabled', description='Allow syntax create/alter table ... partitioned by ...', type='BOOLEAN', value='ON', read_only='N')
(name='pause_moveto', description='pause_moveto', type='BOOLEAN', value='OFF', read_only='N')
(name='pbkdf2_iterations', description='Number of iterations of PBKDF2 algorithm for password hashing.', type='INTEGER', value='4096', read_only='N')
Expand Down

0 comments on commit db09d66

Please sign in to comment.