Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bdb/bdb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,10 @@ int bdb_add_rep_blob(bdb_state_type *bdb_state, tran_type *tran, int session,
int seqno, void *blob, int sz, int *bdberr);

void bdb_upgrade_all_prepared(bdb_state_type *bdb_state);
int bdb_collect_ddl_prepared(bdb_state_type *bdb_state, char ***dist_txnids, char ***coordinator_names,
char ***coordinator_tiers, int *count);
int bdb_mark_prepared_resolved(bdb_state_type *bdb_state, const char *dist_txnid, int committed);
int bdb_is_dist_committed(bdb_state_type *bdb_state, const char *dist_txnid);

const char *bdb_get_tmpdir(bdb_state_type *bdb_state);

Expand Down
26 changes: 25 additions & 1 deletion bdb/tran.c
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,7 @@ int bdb_tran_prepare(bdb_state_type *bdb_state, tran_type *tran, const char *dis
return -1;
}

if (tran->tranclass != TRANCLASS_BERK) {
if (tran->tranclass != TRANCLASS_BERK && tran->tranclass != TRANCLASS_PHYSICAL) {
logmsg(LOGMSG_FATAL, "%s preparing incorrect tranclass: %d\n", __func__, tran->tranclass);
abort();
}
Expand Down Expand Up @@ -2872,6 +2872,30 @@ void bdb_upgrade_all_prepared(bdb_state_type *bdb_state)
bdb_state->dbenv->txn_upgrade_all_prepared(bdb_state->dbenv);
}

int bdb_collect_ddl_prepared(bdb_state_type *bdb_state, char ***dist_txnids, char ***coordinator_names,
char ***coordinator_tiers, int *count)
{
if (bdb_state->parent)
bdb_state = bdb_state->parent;
return bdb_state->dbenv->txn_collect_ddl_prepared(bdb_state->dbenv, dist_txnids, coordinator_names,
coordinator_tiers, count);
}

int bdb_mark_prepared_resolved(bdb_state_type *bdb_state, const char *dist_txnid, int committed)
{
if (bdb_state->parent)
bdb_state = bdb_state->parent;
return bdb_state->dbenv->txn_mark_prepared_resolved(bdb_state->dbenv, dist_txnid, committed);
}

int bdb_is_dist_committed(bdb_state_type *bdb_state, const char *dist_txnid)
{
if (bdb_state->parent)
bdb_state = bdb_state->parent;
extern int __txn_is_dist_committed(DB_ENV *, const char *);
return __txn_is_dist_committed(bdb_state->dbenv, dist_txnid);
}

unsigned long long bdb_get_current_lsn(bdb_state_type *bdb_state,
unsigned int *file, unsigned int *offset)
{
Expand Down
6 changes: 5 additions & 1 deletion berkdb/build/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,8 @@ typedef enum {
DB_DIST_INFLIGHT = 0x00000010,
DB_DIST_RECOVERED = 0x00000020,
DB_DIST_UPDSHADOWS = 0x00000040,
DB_DIST_DISCARDED = 0x00000080
DB_DIST_DISCARDED = 0x00000080,
DB_DIST_NEEDS_SCHEMA_LK = 0x00000100
} db_dist_state;

struct __db_txn_prepared_child {
Expand All @@ -1147,6 +1148,7 @@ struct __db_txn_prepared {
char *dist_txnid;
u_int64_t utxnid;
u_int32_t flags;
u_int32_t lflags;
DB_LSN prepare_lsn;
DB_LSN prev_lsn;
DB_LSN begin_lsn;
Expand Down Expand Up @@ -2644,6 +2646,8 @@ struct __db_env {
int (*txn_discard_all_recovered) __P((DB_ENV *));
int (*txn_upgrade_all_prepared) __P((DB_ENV *));
int (*txn_recover_all_prepared) __P((DB_ENV *));
int (*txn_collect_ddl_prepared) __P((DB_ENV *, char ***, char ***, char ***, int *));
int (*txn_mark_prepared_resolved) __P((DB_ENV *, const char *, int));
int (*txn_abort_prepared_waiters) __P((DB_ENV *));
int (*set_recover_prepared_callback) __P((DB_ENV *, void (*)(const char *, const char *, const char *)));
void (*recover_prepared_callback)(const char *, const char *, const char *);
Expand Down
4 changes: 3 additions & 1 deletion berkdb/dbinc_auto/txn_ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ int __txn_recover_abort_prepared __P((DB_ENV *, const char *dist_txnid, DB_LSN *
DBT *blkseq_key, u_int32_t coordinator_gen, DBT *coordinator_name, DBT *coordinator_tier));
int __txn_recover_prepared __P((DB_ENV *, DB_TXN *, const char *dist_txnid, DB_LSN *prep_lsn,
DB_LSN *begin_lsn, DBT *blkseq_key, u_int32_t coordinator_gen, DBT *coordinator_name,
DBT *coordinator_tier));
DBT *coordinator_tier, u_int32_t lflags));
int __txn_collect_ddl_prepared __P((DB_ENV *, char ***, char ***, char ***, int *));
int __txn_mark_prepared_resolved __P((DB_ENV *, const char *, int));
int __txn_master_prepared __P((DB_ENV *, const char *dist_txnid, DB_LSN *prep_lsn,
DB_LSN *begin_lsn, DBT *blkseq_key, u_int32_t coordinator_gen, DBT *coordinator_name,
DBT *coordinator_tier));
Expand Down
4 changes: 2 additions & 2 deletions berkdb/rep/rep_record.c
Original file line number Diff line number Diff line change
Expand Up @@ -3842,9 +3842,9 @@ gap_check: use_range = 0;
memcpy(dist_txnid, dist_prepare_args->dist_txnid.data, dist_prepare_args->dist_txnid.size);
dist_txnid[dist_prepare_args->dist_txnid.size] = '\0';
if ((ret = __txn_recover_prepared(dbenv, dist_prepare_args->txnid, dist_txnid,
&rp->lsn, &dist_prepare_args->begin_lsn, &dist_prepare_args->blkseq_key,
&rp->lsn, &dist_prepare_args->begin_lsn, &dist_prepare_args->blkseq_key,
dist_prepare_args->coordinator_gen, &dist_prepare_args->coordinator_name,
&dist_prepare_args->coordinator_tier)) != 0) {
&dist_prepare_args->coordinator_tier, dist_prepare_args->lflags)) != 0) {
goto err;
}
__os_free(dbenv, dist_prepare_args);
Expand Down
2 changes: 2 additions & 0 deletions berkdb/txn/txn_method.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ __txn_dbenv_create(dbenv)
dbenv->txn_discard_all_recovered = __txn_discard_all_recovered_pp;
dbenv->txn_upgrade_all_prepared = __txn_upgrade_all_prepared;
dbenv->txn_recover_all_prepared = __txn_recover_all_prepared;
dbenv->txn_collect_ddl_prepared = __txn_collect_ddl_prepared;
dbenv->txn_mark_prepared_resolved = __txn_mark_prepared_resolved;
dbenv->txn_abort_prepared_waiters = __txn_abort_prepared_waiters;
}

Expand Down
4 changes: 2 additions & 2 deletions berkdb/txn/txn_rec.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ __txn_dist_prepare_recover(dbenv, dbtp, lsnp, op, info)

MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if ((ret = __txn_recover_prepared(dbenv, argp->txnid, dist_txnid, lsnp, &argp->begin_lsn,
&argp->blkseq_key, argp->coordinator_gen, &argp->coordinator_name, &argp->coordinator_tier)) != 0)
&argp->blkseq_key, argp->coordinator_gen, &argp->coordinator_name, &argp->coordinator_tier, argp->lflags)) != 0)
abort();
}
}
Expand All @@ -399,7 +399,7 @@ __txn_dist_prepare_recover(dbenv, dbtp, lsnp, op, info)
} else {
logmsg(LOGMSG_DEBUG, "%s Recovering prepared txn %s\n", __func__, dist_txnid);
if ((ret = __txn_recover_prepared(dbenv, argp->txnid, dist_txnid, lsnp, &argp->begin_lsn,
&argp->blkseq_key, argp->coordinator_gen, &argp->coordinator_name, &argp->coordinator_tier)) != 0)
&argp->blkseq_key, argp->coordinator_gen, &argp->coordinator_name, &argp->coordinator_tier, argp->lflags)) != 0)
abort();
}
}
Expand Down
163 changes: 159 additions & 4 deletions berkdb/txn/txn_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -1257,10 +1257,10 @@ int __txn_recover_abort_prepared(dbenv, dist_txnid, prep_lsn, blkseq_key, coordi
* handled in bdb_recover_blkseq.
*
* PUBLIC: int __txn_recover_prepared __P((DB_ENV *, DB_TXN *txnid,
* PUBLIC: u_int64_t, DB_LSN *, DB_LSN *, DBT *, u_int32_t, DBT *, DBT *));
* PUBLIC: u_int64_t, DB_LSN *, DB_LSN *, DBT *, u_int32_t, DBT *, DBT *, u_int32_t));
*/
int __txn_recover_prepared(dbenv, txnid, dist_txnid, prep_lsn, begin_lsn, blkseq_key,
coordinator_gen, coordinator_name, coordinator_tier)
coordinator_gen, coordinator_name, coordinator_tier, lflags)
DB_ENV *dbenv;
DB_TXN *txnid;
const char *dist_txnid;
Expand All @@ -1270,6 +1270,7 @@ int __txn_recover_prepared(dbenv, txnid, dist_txnid, prep_lsn, begin_lsn, blkseq
u_int32_t coordinator_gen;
DBT *coordinator_name;
DBT *coordinator_tier;
u_int32_t lflags;
{
#if defined (DEBUG_PREPARE)
comdb2_cheapstack_sym(stderr, "%s", __func__);
Expand Down Expand Up @@ -1344,13 +1345,18 @@ int __txn_recover_prepared(dbenv, txnid, dist_txnid, prep_lsn, begin_lsn, blkseq
memcpy(p->coordinator_tier.data, coordinator_tier->data, coordinator_tier->size);
p->coordinator_tier.size = coordinator_tier->size;

p->lflags = lflags;

Pthread_mutex_lock(&dbenv->prepared_txn_lk);
if ((fnd = hash_find(dbenv->prepared_txn_hash, &dist_txnid)) != NULL) {
logmsg(LOGMSG_FATAL, "Recovery found multiple prepared records with dist-txnid %s\n",
dist_txnid);
abort();
}
F_SET(p, DB_DIST_RECOVERED);
if (lflags & DB_TXN_SCHEMA_LOCK) {
F_SET(p, DB_DIST_NEEDS_SCHEMA_LK);
}

hash_add(dbenv->prepared_txn_hash, p);
hash_add(dbenv->prepared_utxnid_hash, p);
Expand Down Expand Up @@ -1493,11 +1499,160 @@ int __txn_recover_all_prepared(dbenv)
return 0;
}

/*
/*
* __txn_collect_ddl_prepared --
*
* Collect DDL prepared transactions (those needing schema lock) that are
* unresolved. These must be resolved before upgrade to avoid deadlock with
* the schema lock held during initialization.
*
* PUBLIC: int __txn_collect_ddl_prepared __P((DB_ENV *, char ***, char ***, char ***, int *));
*/

struct __collect_ddl_disttxn_node {
char *coordinator_name;
char *coordinator_tier;
char *dist_txnid;
};

struct __collect_ddl_disttxns {
DB_ENV *dbenv;
struct __collect_ddl_disttxn_node *disttxns;
int count;
};

static int __collect_ddl_prepared_cb(void *obj, void *arg)
{
DB_TXN_PREPARED *p = (DB_TXN_PREPARED *)obj;
struct __collect_ddl_disttxns *collect = (struct __collect_ddl_disttxns *)arg;
int ret;

if (!F_ISSET(p, DB_DIST_NEEDS_SCHEMA_LK))
return 0;
if (F_ISSET(p, DB_DIST_HAVELOCKS | DB_DIST_COMMITTED | DB_DIST_ABORTED))
return 0;

if ((ret = __os_calloc(collect->dbenv, 1, p->coordinator_name.size + 1,
&collect->disttxns[collect->count].coordinator_name)) != 0) {
logmsg(LOGMSG_FATAL, "%s error allocating memory %d\n", __func__, ret);
abort();
}
memcpy(collect->disttxns[collect->count].coordinator_name,
p->coordinator_name.data, p->coordinator_name.size);

if ((ret = __os_calloc(collect->dbenv, 1, p->coordinator_tier.size + 1,
&collect->disttxns[collect->count].coordinator_tier)) != 0) {
logmsg(LOGMSG_FATAL, "%s error allocating memory %d\n", __func__, ret);
abort();
}
memcpy(collect->disttxns[collect->count].coordinator_tier,
p->coordinator_tier.data, p->coordinator_tier.size);

if ((ret = __os_strdup(collect->dbenv, p->dist_txnid,
&collect->disttxns[collect->count].dist_txnid)) != 0) {
logmsg(LOGMSG_FATAL, "%s error allocating memory %d\n", __func__, ret);
abort();
}
collect->count++;
return 0;
}

int __txn_collect_ddl_prepared(dbenv, dist_txnids, coordinator_names,
coordinator_tiers, count)
DB_ENV *dbenv;
char ***dist_txnids;
char ***coordinator_names;
char ***coordinator_tiers;
int *count;
{
int alloc;
struct __collect_ddl_disttxns collect = {0};
collect.dbenv = dbenv;

Pthread_mutex_lock(&dbenv->prepared_txn_lk);
hash_info(dbenv->prepared_txn_hash, NULL, NULL, NULL, NULL, &alloc, NULL, NULL);
if (alloc == 0) {
Pthread_mutex_unlock(&dbenv->prepared_txn_lk);
*count = 0;
return 0;
}

int ret;
if ((ret = __os_malloc(dbenv, sizeof(struct __collect_ddl_disttxn_node) * alloc,
&collect.disttxns)) != 0) {
Pthread_mutex_unlock(&dbenv->prepared_txn_lk);
return ret;
}
hash_for(dbenv->prepared_txn_hash, __collect_ddl_prepared_cb, &collect);
Pthread_mutex_unlock(&dbenv->prepared_txn_lk);

*count = collect.count;
if (collect.count == 0) {
__os_free(dbenv, collect.disttxns);
*dist_txnids = NULL;
*coordinator_names = NULL;
*coordinator_tiers = NULL;
return 0;
}

if ((ret = __os_malloc(dbenv, sizeof(char *) * collect.count, dist_txnids)) != 0)
goto err;
if ((ret = __os_malloc(dbenv, sizeof(char *) * collect.count, coordinator_names)) != 0)
goto err;
if ((ret = __os_malloc(dbenv, sizeof(char *) * collect.count, coordinator_tiers)) != 0)
goto err;

for (int i = 0; i < collect.count; i++) {
(*dist_txnids)[i] = collect.disttxns[i].dist_txnid;
(*coordinator_names)[i] = collect.disttxns[i].coordinator_name;
(*coordinator_tiers)[i] = collect.disttxns[i].coordinator_tier;
}
__os_free(dbenv, collect.disttxns);
return 0;

err:
__os_free(dbenv, collect.disttxns);
return ret;
}

/*
* __txn_mark_prepared_resolved --
*
* Mark a prepared transaction as committed or aborted without requiring
* DB_DIST_HAVELOCKS. Used during startup to resolve DDL transactions before
* upgrade. __txn_prune_resolved_prepared will remove them before upgrade.
*
* PUBLIC: int __txn_mark_prepared_resolved __P((DB_ENV *, const char *, int));
*/
int __txn_mark_prepared_resolved(dbenv, dist_txnid, committed)
DB_ENV *dbenv;
const char *dist_txnid;
int committed;
{
DB_TXN_PREPARED *p;
Pthread_mutex_lock(&dbenv->prepared_txn_lk);
p = hash_find(dbenv->prepared_txn_hash, &dist_txnid);
if (p != NULL) {
if (committed)
F_SET(p, DB_DIST_COMMITTED);
else
F_SET(p, DB_DIST_ABORTED);
}
Pthread_mutex_unlock(&dbenv->prepared_txn_lk);
if (p == NULL) {
logmsg(LOGMSG_ERROR, "%s unable to locate txnid %s\n", __func__, dist_txnid);
return -1;
}
logmsg(LOGMSG_INFO, "%s marked DDL prepared txn %s as %s\n", __func__,
dist_txnid, committed ? "committed" : "aborted");
return 0;
}

/*
* __txn_upgrade_all_prepared --
*
* Create berkley txns for unresolved but prepared transactions we found
* during recovery. Acquire locks for these transactions. Update the
* during recovery. Acquire locks for these transactions. Update the
* transaction id-space and write a txn-recycle record
*
* PUBLIC: int __txn_upgrade_all_prepared __P((DB_ENV *));
Expand Down
7 changes: 7 additions & 0 deletions db/comdb2.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ int __berkdb_read_alarm_ms;
int __berkdb_fsync_alarm_ms;

extern int gbl_delay_sql_lock_release_sec;
extern int gbl_2pc;

void __berkdb_set_num_read_ios(long long *n);
void __berkdb_set_num_write_ios(long long *n);
Expand Down Expand Up @@ -4167,6 +4168,12 @@ static int init(int argc, char **argv)

disttxn_init_recover_prepared();

/* Resolve DDL prepared transactions before acquiring schema lock
* to prevent deadlock in bdb_upgrade_all_prepared() */
if (!gbl_exit && !gbl_create_mode && gbl_2pc) {
disttxn_resolve_ddl_prepared();
}

if (!gbl_exit && gbl_modsnap_asof) {
bdb_gbl_asof_modsnap_init(thedb->bdb_env);
} else {
Expand Down
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ extern int gbl_coordinator_block_until_durable;
extern int gbl_disttxn_random_retry_poll;
extern int gbl_disttxn_handle_cache;
extern int gbl_disttxn_handle_linger_time;
extern int gbl_disttxn_ddl_resolve_fatal;
extern int gbl_disttxn_async_messages;
extern int gbl_debug_sleep_before_dispatch;
extern int gbl_debug_exit_participant_after_prepare;
Expand Down
2 changes: 2 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ REGISTER_TUNABLE("disttxn_handle_cache", "Enable the disttxn handle-cache. (Def
&gbl_disttxn_handle_cache, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("disttxn_handle_linger_time", "Time that unused handles persist. (Default: 60s)", TUNABLE_INTEGER,
&gbl_disttxn_handle_linger_time, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("disttxn_ddl_resolve_fatal", "Abort startup if DDL prepared txn cannot be resolved. (Default: on)",
TUNABLE_BOOLEAN, &gbl_disttxn_ddl_resolve_fatal, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("flush_on_prepare", "Flush master log on prepare. (Default: on)", TUNABLE_BOOLEAN,
&gbl_flush_on_prepare, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("flush_replicant_on_prepare", "Flush replicant log on prepare. (Default: on)", TUNABLE_BOOLEAN,
Expand Down
Loading
Loading