Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2505,9 +2505,9 @@ REGISTER_TUNABLE("pstack_self", "Dump stack traces on certain slow events.", TUN
EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE(
"noleader_retry_duration_ms",
"The amount of time in milliseconds that a replicant retries if there isn't a leader. (Default: 50,000)",
"The amount of time in milliseconds that a replicant retries if there isn't a leader. (Default: 60,000)",
TUNABLE_INTEGER, &gbl_noleader_retry_duration_ms, INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("noleader_retry_poll_ms", "Wait this long before retrying on no-leader. (Default: 10)",
REGISTER_TUNABLE("noleader_retry_poll_ms", "Wait this long before retrying on no-leader. (Default: 1000)",
TUNABLE_INTEGER, &gbl_noleader_retry_poll_ms, INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("cdb2api_policy_override", "Use this policy override with cdb2api. (Default: none)", TUNABLE_STRING,
&gbl_cdb2api_policy_override, 0, NULL, NULL, NULL, NULL);
Expand Down
18 changes: 7 additions & 11 deletions db/glue.c
Original file line number Diff line number Diff line change
Expand Up @@ -3080,6 +3080,7 @@ static void new_master_callback_int(void *bdb_handle, int assert_sc_clear)
if (!gbl_exit && comdb2_ipc_master_set) {
comdb2_ipc_master_set(host);
}
osql_noleader_broadcast();
gbl_lost_master_time = 0;
osql_checkboard_for_each(thedb->master, osql_checkboard_master_changed);
}
Expand Down Expand Up @@ -3779,7 +3780,7 @@ int open_bdb_env(struct dbenv *dbenv)
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_SIGNAL_LOGFILL, signal_logfill);

#if 0
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_CATCHUP,
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_CATCHUP,
catchup_callback);
#endif

Expand Down Expand Up @@ -3829,10 +3830,8 @@ int open_bdb_env(struct dbenv *dbenv)
dbenv->handle_sibling, intern(dbenv->sibling_hostname[ii]),
dbenv->sibling_port[ii][NET_REPLICATION]);
if (rcv == 0) {
logmsg(LOGMSG_ERROR,
"open_bdb_env:failed add_to_netinfo host %s port %d\n",
dbenv->sibling_hostname[ii],
dbenv->sibling_port[ii][NET_REPLICATION]);
logmsg(LOGMSG_ERROR, "open_bdb_env:failed add_to_netinfo host %s port %d\n",
dbenv->sibling_hostname[ii], dbenv->sibling_port[ii][NET_REPLICATION]);
return -1;
}
}
Expand Down Expand Up @@ -4020,8 +4019,7 @@ static void get_disable_skipscan(struct dbtable *tbl, tran_type *tran)
free(str);
}


void get_disable_skipscan_all()
void get_disable_skipscan_all()
{
#ifdef DEBUGSKIPSCAN
logmsg(LOGMSG_WARN, "get_disable_skipscan_all() called\n");
Expand All @@ -4033,8 +4031,6 @@ void get_disable_skipscan_all()
}
curtran_puttran(tran);
}



/* open the db files, etc */
int backend_open_tran(struct dbenv *dbenv, tran_type *tran, uint32_t flags)
Expand Down Expand Up @@ -6177,8 +6173,8 @@ int table_version_upsert(struct dbtable *db, void *trans, int *bdberr)
int rc = bdb_table_version_upsert(db->handle, trans, bdberr);
if(rc) return rc;

//select needs to be done with the same transaction to avoid
//undetectable deadlock for writing and reading from same thread
// select needs to be done with the same transaction to avoid
// undetectable deadlock for writing and reading from same thread
unsigned long long version;
rc = bdb_table_version_select(db->tablename, trans, &version, bdberr);
if (rc || *bdberr) {
Expand Down
62 changes: 43 additions & 19 deletions db/osqlsqlthr.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,18 @@ extern int gbl_reorder_socksql_no_deadlock;

int gbl_allow_bplog_restarts = 600;
int gbl_master_retry_poll_ms = 100;
int gbl_noleader_retry_duration_ms = 50 * 1000; /* wait up to 50 seconds for a new leader */
int gbl_noleader_retry_poll_ms = 10;
int gbl_noleader_retry_duration_ms = 60 * 1000; /* wait up to 60 seconds for a new leader */
int gbl_noleader_retry_poll_ms = 1000;

static pthread_mutex_t gbl_noleader_wait_mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t gbl_noleader_wait_cv = PTHREAD_COND_INITIALIZER;

void osql_noleader_broadcast(void)
{
pthread_mutex_lock(&gbl_noleader_wait_mtx);
pthread_cond_broadcast(&gbl_noleader_wait_cv);
pthread_mutex_unlock(&gbl_noleader_wait_mtx);
}

static int osql_send_usedb_logic(struct BtCursor *pCur, struct sql_thread *thd,
int nettype);
Expand Down Expand Up @@ -199,6 +209,27 @@ static inline int osql_should_restart(struct sqlclntstate *clnt, int rc,
} \
} while (0)

static void noleader_wait_for_timeout(int poll_ms)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += poll_ms / 1000;
ts.tv_nsec += (poll_ms % 1000) * 1000000L;
if (ts.tv_nsec >= 1000000000L) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000L;
}
pthread_mutex_lock(&gbl_noleader_wait_mtx);
// TODO: if callback received, should it still retry? Think so.
int rc = pthread_cond_timedwait(&gbl_noleader_wait_cv, &gbl_noleader_wait_mtx, &ts);
if (rc && rc != ETIMEDOUT) {
logmsg(LOGMSG_FATAL, "pthread_cond_timedwait:%d rc:%d (%s) thd:%p\n", __LINE__, rc, strerror(rc),
(void *)pthread_self());
abort();
}
pthread_mutex_unlock(&gbl_noleader_wait_mtx);
}

/* see below */
enum { OSQL_START_KEEP_RQID = 1, OSQL_START_NO_REORDER = 2, OSQL_START_IS_FINAL = 4 };
extern int gbl_debug_disttxn_trace;
Expand Down Expand Up @@ -265,7 +296,7 @@ static int osql_sock_start_int(struct sqlclntstate *clnt, int type,
comdb2uuidstr(osql->uuid, us);
logmsg(LOGMSG_WARN, "Retrying to find the master retries=%d uuid:%s\n", retries, us);
}
poll(NULL, 0, poll_ms);
noleader_wait_for_timeout(poll_ms);
goto retry;
} else {
uuidstr_t us;
Expand Down Expand Up @@ -304,7 +335,7 @@ static int osql_sock_start_int(struct sqlclntstate *clnt, int type,
comdb2uuidstr(osql->uuid, us);
logmsg(LOGMSG_WARN, "Retrying to find the master (2) retries=%d uuid:%s\n", retries, us);
}
poll(NULL, 0, poll_ms);
noleader_wait_for_timeout(poll_ms);
goto retry;
}

Expand Down Expand Up @@ -1066,7 +1097,7 @@ int osql_sock_commit(struct sqlclntstate *clnt, int type, enum trans_clntcomm si
if (clnt->dbtran.mode == TRANLEVEL_SOSQL && !osql->sock_started) {
/* we only have remote writes; at this point, we have clnt->effects tracking all remote writes
* also clnt->remote_effects
* if we have local writes, the clnt->effects gets reset, so final commit results are ok (we
* if we have local writes, the clnt->effects gets reset, so final commit results are ok (we
* add whatever local master sends to the clnt->remote_effects);
* HERE howewer, there is no local write, so clnt->effects does not get reset; we need to reset
* it so we do not overcount
Expand Down Expand Up @@ -1114,8 +1145,7 @@ int osql_sock_commit(struct sqlclntstate *clnt, int type, enum trans_clntcomm si
rc = osql_wait(clnt);
if (rc) {
rcout = SQLITE_CLIENT_CHANGENODE;
logmsg(LOGMSG_ERROR, "%s line %d setting rcout to (%d) from %d\n",
__func__, __LINE__, rcout, rc);
logmsg(LOGMSG_ERROR, "%s line %d setting rcout to (%d) from %d\n", __func__, __LINE__, rcout, rc);
} else {

if (gbl_random_blkseq_replays && ((rand() % 50) == 0)) {
Expand Down Expand Up @@ -1227,8 +1257,8 @@ int osql_sock_commit(struct sqlclntstate *clnt, int type, enum trans_clntcomm si
/* unregister this osql thread from checkboard */
rc = osql_end(clnt);
if (rc && !rcout) {
logmsg(LOGMSG_ERROR, "%s line %d setting rout to SQLITE_INTERNAL (%d) rc is %d\n",
__func__, __LINE__, SQLITE_INTERNAL, rc);
logmsg(LOGMSG_ERROR, "%s line %d setting rout to SQLITE_INTERNAL (%d) rc is %d\n", __func__, __LINE__,
SQLITE_INTERNAL, rc);
rcout = SQLITE_INTERNAL;
}

Expand All @@ -1252,9 +1282,7 @@ int osql_sock_commit(struct sqlclntstate *clnt, int type, enum trans_clntcomm si
if (iirc != 0) /* if error or has selectv rows */
{
if (iirc < 0) {
logmsg(LOGMSG_ERROR,
"%s: osql_shadtbl_has_selectv failed rc=%d bdberr=%d\n",
__func__, rc, bdberr);
logmsg(LOGMSG_ERROR, "%s: osql_shadtbl_has_selectv failed rc=%d bdberr=%d\n", __func__, rc, bdberr);
}
osql_set_replay(__FILE__, __LINE__, clnt, OSQL_RETRY_LAST);
}
Expand All @@ -1274,9 +1302,7 @@ int osql_sock_commit(struct sqlclntstate *clnt, int type, enum trans_clntcomm si
/* we also need to free the tran object */
rc = trans_abort_shadow((void **)&clnt->dbtran.shadow_tran, &bdberr);
if (rc)
logmsg(LOGMSG_ERROR,
"%s:%d failed to abort shadow tran for socksql rc=%d\n",
__FILE__, __LINE__, rc);
logmsg(LOGMSG_ERROR, "%s:%d failed to abort shadow tran for socksql rc=%d\n", __FILE__, __LINE__, rc);
}

osql->sock_started = 0;
Expand Down Expand Up @@ -1362,9 +1388,7 @@ int osql_sock_abort(struct sqlclntstate *clnt, int type)
*/
rc = trans_abort_shadow((void **)&clnt->dbtran.shadow_tran, &bdberr);
if (rc) {
logmsg(LOGMSG_ERROR,
"%s:%d failed to abort shadow tran for socksql rc=%d\n",
__FILE__, __LINE__, rc);
logmsg(LOGMSG_ERROR, "%s:%d failed to abort shadow tran for socksql rc=%d\n", __FILE__, __LINE__, rc);
}

if (clnt->osql.tablename) {
Expand Down Expand Up @@ -1864,7 +1888,7 @@ int osql_schemachange_logic(struct schema_change_type *sc, int usedb)
/* this is a distributed create for a partition, creating individual shard here, info passed from
* SET OPTIONS through clnt struct
*/
if (sc->kind == SC_ADDTABLE) {
if (sc->kind == SC_ADDTABLE) {
sc->partition.type = PARTITION_ADD_GENSHARD;
} else {
sc->partition.type = PARTITION_REM_GENSHARD;
Expand Down
3 changes: 3 additions & 0 deletions db/osqlsqlthr.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,7 @@ int osql_bpfunc_logic(struct sql_thread *thd, BpfuncArg *arg);
int osql_dbq_consume_logic(struct sqlclntstate *, const char *spname, genid_t);
int osql_dbq_consume(struct sqlclntstate *, const char *spname, genid_t);

/* Wake no-leader timed waiters when master state changes. */
void osql_noleader_broadcast(void);

#endif
Loading