*** lock/lock_deadlock.c 2008-03-11 00:31:33.000000000 +1100 --- lock/lock_deadlock.c 2008-12-16 21:54:18.000000000 +1100 *************** *** 121,127 **** DB_LOCKTAB *lt; db_timespec now; locker_info *idmap; ! u_int32_t *bitmap, *copymap, **deadp, **free_me, *tmpmap; u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers; u_int32_t lock_max, txn_max; int ret, status; --- 121,127 ---- DB_LOCKTAB *lt; db_timespec now; locker_info *idmap; ! u_int32_t *bitmap, *copymap, **deadp, **deadlist, *tmpmap; u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers; u_int32_t lock_max, txn_max; int ret, status; *************** *** 133,139 **** if (IS_REP_CLIENT(env)) atype = DB_LOCK_MINWRITE; ! free_me = NULL; lt = env->lk_handle; if (rejectp != NULL) --- 133,140 ---- if (IS_REP_CLIENT(env)) atype = DB_LOCK_MINWRITE; ! copymap = tmpmap = NULL; ! deadlist = NULL; lt = env->lk_handle; if (rejectp != NULL) *************** *** 179,189 **** memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc); if ((ret = __os_calloc(env, sizeof(u_int32_t), nalloc, &tmpmap)) != 0) ! goto err1; /* Find a deadlock. */ if ((ret = ! __dd_find(env, bitmap, idmap, nlockers, nalloc, &deadp)) != 0) return (ret); /* --- 180,190 ---- memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc); if ((ret = __os_calloc(env, sizeof(u_int32_t), nalloc, &tmpmap)) != 0) ! goto err; /* Find a deadlock. */ if ((ret = ! __dd_find(env, bitmap, idmap, nlockers, nalloc, &deadlist)) != 0) return (ret); /* *************** *** 204,211 **** txn_max = TXN_MAXIMUM; killid = BAD_KILLID; ! free_me = deadp; ! for (; *deadp != NULL; deadp++) { if (rejectp != NULL) ++*rejectp; killid = (u_int32_t)(*deadp - bitmap) / nalloc; --- 205,211 ---- txn_max = TXN_MAXIMUM; killid = BAD_KILLID; ! for (deadp = deadlist; *deadp != NULL; deadp++) { if (rejectp != NULL) ++*rejectp; killid = (u_int32_t)(*deadp - bitmap) / nalloc; *************** *** 342,352 **** __db_msg(env, "Aborting locker %lx", (u_long)idmap[killid].id); } ! __os_free(env, tmpmap); ! err1: __os_free(env, copymap); ! ! err: if (free_me != NULL) ! __os_free(env, free_me); __os_free(env, bitmap); __os_free(env, idmap); --- 342,353 ---- __db_msg(env, "Aborting locker %lx", (u_long)idmap[killid].id); } ! err: if(copymap != NULL) ! __os_free(env, copymap); ! if (deadlist != NULL) ! __os_free(env, deadlist); ! if(tmpmap != NULL) ! __os_free(env, tmpmap); __os_free(env, bitmap); __os_free(env, idmap); *************** *** 360,365 **** --- 361,377 ---- #define DD_INVALID_ID ((u_int32_t) -1) + /* + * __dd_build -- + * Build the lock dependency bit maps. + * Notes on syncronization: + * LOCK_SYSTEM_LOCK is used to hold objects locked when we have + * a single partition. + * LOCK_LOCKERS is held while we are walking the lockers list and + * to single thread the use of lockerp->dd_id. + * LOCK_DD protects the DD list of objects. + */ + static int __dd_build(env, atype, bmp, nlockers, allocp, idmap, rejectp) ENV *env; *************** *** 393,398 **** --- 405,411 ---- * In particular we do not build the conflict array and our caller * needs to expect this. */ + LOCK_SYSTEM_LOCK(lt, region); if (atype == DB_LOCK_EXPIRE) { skip: LOCK_DD(env, region); op = SH_TAILQ_FIRST(®ion->dd_objs, __db_lockobj); *************** *** 430,446 **** OBJECT_UNLOCK(lt, region, indx); } UNLOCK_DD(env, region); goto done; } /* ! * We'll check how many lockers there are, add a few more in for ! * good measure and then allocate all the structures. Then we'll ! * verify that we have enough room when we go back in and get the ! * mutex the second time. */ ! retry: count = region->stat.st_nlockers; if (count == 0) { *nlockers = 0; return (0); } --- 443,460 ---- OBJECT_UNLOCK(lt, region, indx); } UNLOCK_DD(env, region); + LOCK_SYSTEM_UNLOCK(lt, region); goto done; } /* ! * Allocate after locking the region ! * to make sure the structures are large enough. */ ! LOCK_LOCKERS(env, region); ! count = region->stat.st_nlockers; if (count == 0) { + UNLOCK_LOCKERS(env, region); *nlockers = 0; return (0); } *************** *** 448,497 **** if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK)) __db_msg(env, "%lu lockers", (u_long)count); - count += 20; nentries = (u_int32_t)DB_ALIGN(count, 32) / 32; ! /* ! * Allocate enough space for a count by count bitmap matrix. ! * ! * XXX ! * We can probably save the malloc's between iterations just ! * reallocing if necessary because count grew by too much. ! */ if ((ret = __os_calloc(env, (size_t)count, ! sizeof(u_int32_t) * nentries, &bitmap)) != 0) return (ret); if ((ret = __os_calloc(env, sizeof(u_int32_t), nentries, &tmpmap)) != 0) { __os_free(env, bitmap); return (ret); } if ((ret = __os_calloc(env, (size_t)count, sizeof(locker_info), &id_array)) != 0) { __os_free(env, bitmap); __os_free(env, tmpmap); return (ret); } /* - * Now go back in and actually fill in the matrix. - */ - if (region->stat.st_nlockers > count) { - __os_free(env, bitmap); - __os_free(env, tmpmap); - __os_free(env, id_array); - goto retry; - } - - /* * First we go through and assign each locker a deadlock detector id. */ id = 0; - LOCK_LOCKERS(env, region); SH_TAILQ_FOREACH(lip, ®ion->lockers, ulinks, __db_locker) { if (lip->master_locker == INVALID_ROFF) { lip->dd_id = id++; id_array[lip->dd_id].id = lip->id; switch (atype) { --- 462,498 ---- if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK)) __db_msg(env, "%lu lockers", (u_long)count); nentries = (u_int32_t)DB_ALIGN(count, 32) / 32; ! /* Allocate enough space for a count by count bitmap matrix. */ if ((ret = __os_calloc(env, (size_t)count, ! sizeof(u_int32_t) * nentries, &bitmap)) != 0) { ! UNLOCK_LOCKERS(env, region); return (ret); + } if ((ret = __os_calloc(env, sizeof(u_int32_t), nentries, &tmpmap)) != 0) { + UNLOCK_LOCKERS(env, region); __os_free(env, bitmap); return (ret); } if ((ret = __os_calloc(env, (size_t)count, sizeof(locker_info), &id_array)) != 0) { + UNLOCK_LOCKERS(env, region); __os_free(env, bitmap); __os_free(env, tmpmap); return (ret); } /* * First we go through and assign each locker a deadlock detector id. */ id = 0; SH_TAILQ_FOREACH(lip, ®ion->lockers, ulinks, __db_locker) { if (lip->master_locker == INVALID_ROFF) { + DB_ASSERT(env, id < count); lip->dd_id = id++; id_array[lip->dd_id].id = lip->id; switch (atype) { *************** *** 510,516 **** lip->dd_id = DD_INVALID_ID; } - UNLOCK_LOCKERS(env, region); /* * We only need consider objects that have waiters, so we use --- 511,516 ---- *************** *** 669,675 **** * status after building the bit maps so that we will not detect * a blocked transaction without noting that it is already aborting. */ - LOCK_LOCKERS(env, region); for (id = 0; id < count; id++) { if (!id_array[id].valid) continue; --- 669,674 ---- *************** *** 738,743 **** --- 737,743 ---- id_array[id].in_abort = 1; } UNLOCK_LOCKERS(env, region); + LOCK_SYSTEM_UNLOCK(lt, region); /* * Now we can release everything except the bitmap matrix that we *************** *** 839,844 **** --- 839,845 ---- ret = 0; /* We must lock so this locker cannot go away while we abort it. */ + LOCK_SYSTEM_LOCK(lt, region); LOCK_LOCKERS(env, region); /* *************** *** 895,900 **** --- 896,902 ---- done: OBJECT_UNLOCK(lt, region, info->last_ndx); err: out: UNLOCK_LOCKERS(env, region); + LOCK_SYSTEM_UNLOCK(lt, region); return (ret); }