Priority inversion is a killer in real-time systems. It happens when a (TL) less-urgent task acquires a critical region before a more urgent one (TH). Any task more urgent than TL that does not need to acquire that critical region preempts TL. TH waiting time is now unbounded. That is a critical failure.
The Priority Inheritance Protocol (PIP) is used on Mutex Semaphores and is defined by a single invariant: At any instant, a task assumes the highest priority among the tasks it is blocking.
At first glance, it might sound simple: because if you think a bit, the effective priority of a task is the same as the urgency of whatever allows/prevents it from running. But it is not. The value regarded as ‘highest priority among the tasks it is blocking’ is not that easy to inspect: it can span as a chain of tasks across different mutexes.
Although a task can block on at most one mutex, it can own many mutexes. And each mutex has a waiting queue of tasks. The subtle complexities of dynamically choosing the ‘highest priority‘, when assign (or reassign) it to the blocking task, until restoring its nominal priority is what this test case exercises.
Interestingly, the most well-known priority inversion problem (the Mars Pathfinder happened on a single mutex).
Test case
A dependency chain with 2 mutexes and 4 contenders:
| Task | Priority | Behaviour |
|---|---|---|
| TL | 3 (lowest) | Starts first (because higher tasks are delayed) and acquires (owns) mutex A and B |
| TM | 2 | Blocks on A |
| TH | 1 | Blocks on B |
| TH0 | 0 (highest) | Blocks on B with timeout |
| T4 | 2 | ‘Noise’ task |
The intricate scenario
While lowest priority TL holds mutexes A + B:
- Highest priority tasks TH0 + TH are also waiting on B, we expect that:
→ TL effective priority must be 0 - If TH0 times out (removed from wait set!)
→ TL effective priority must drop to 1 (while still holding B) - If TL unlocks A first (wrong order)
→ TL must remain at priority 1 (TH still waits on B) - TL returns to its nominal priority when it is not blocking any higher-priority task.
Anything different from that is incorrect.
The code:
<kapi.h> <application.h> <logger.h> STACKSIZE 128 /* words *//* TH0: waits on B (prio 0) with timeout */RK_DECLARE_TASK(task0Handle, Task0, stack0, STACKSIZE) /* TH : waits on B (prio 1) forever */RK_DECLARE_TASK(task1Handle, Task1, stack1, STACKSIZE) /* TM : waits on A (prio 2) */RK_DECLARE_TASK(task2Handle, Task2, stack2, STACKSIZE) /* TL : higher priority tasks are delayed, so TL kicks off and owns A+B (prio 3) */RK_DECLARE_TASK(task3Handle, Task3, stack3, STACKSIZE) /* X or T4 : middle priority poke (prio 2) */RK_DECLARE_TASK(task4Handle, Task4, stack4, STACKSIZE)RK_MUTEX mutexA;RK_MUTEX mutexB;VOID kApplicationInit(VOID){ K_ASSERT(!kCreateTask(&task0Handle, Task0, RK_NO_ARGS, "TH0", stack0, STACKSIZE, 0, RK_PREEMPT)); K_ASSERT(!kCreateTask(&task1Handle, Task1, RK_NO_ARGS, "TH", stack1, STACKSIZE, 1, RK_PREEMPT)); K_ASSERT(!kCreateTask(&task2Handle, Task2, RK_NO_ARGS, "TM", stack2, STACKSIZE, 2, RK_PREEMPT)); K_ASSERT(!kCreateTask(&task3Handle, Task3, RK_NO_ARGS, "TL", stack3, STACKSIZE, 3, RK_PREEMPT)); K_ASSERT(!kCreateTask(&task4Handle, Task4, RK_NO_ARGS, "X", stack4, STACKSIZE, 2, RK_PREEMPT)); logInit(5); /* initialise log facility - active object on lowest priority */ /* the mutexes with prio inh enabled */ kMutexInit(&mutexA, RK_INHERIT); kMutexInit(&mutexB, RK_INHERIT);}/** * TL holds BOTH A and B, then blocks. * While TL blocked: * - TM waits on A * - TH waits on B forever (prio 1) * - TH0 waits on B but times out (prio 0) * * - TL wakes boosted to 0 * - after TH0 times out, while TL still holds B, * TL must drop to 1 (TH still waiting) * - TL unlocks A first and must stay at 1 * - TL unlocks B and drops to 3 */volatile UINT gCycle = 0;VOID Task3(VOID *args){ RK_UNUSEARGS RK_ERR err = -1; while (1) { kMutexLock(&mutexA, RK_WAIT_FOREVER); kMutexLock(&mutexB, RK_WAIT_FOREVER); UINT cycle = ++gCycle; logPost(": [TL] cycle=%u HOLDING A+B, BLOCK#1 | Eff:%d Nom:%d\r\n", cycle, RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); /* Let TM block on A and TH/TH0 block on B */ kSleep(10); logPost(": [TL] WOKE #1. (expect Eff=0) | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); /* * Keep A+B, let TH0 timeout * while TH remains blocked on B. */ logPost("[TL] HOLDING A+B, BLOCK#2 (wait H0 timeout) | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); kSleep(30); /* * Wrong release order so TL * keeps boosted at 1, since TH still wants B. */ logPost("[TL] WOKE #2. UNLOCKING A (expect Eff=1) | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); err = kMutexUnlock(&mutexA); K_ASSERT(err==0); /* Give TM time to run/ */ kSleep(10); logPost("[TL] UNLOCKING B (shall be preempted)\r\n"); err = kMutexUnlock(&mutexB); K_ASSERT(err == 0); logPost(" A+B UNLOCKED. Restore prio.| Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); kSleep(50); }}/* TM: blocks on A while TL holds it */VOID Task2(VOID *args){ RK_UNUSEARGS RK_ERR err=-1; while (1) { kSleep(3); logPost(": [TM] Attempt LOCK A (should block) | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); err = kMutexLock(&mutexA, RK_WAIT_FOREVER); K_ASSERT(err==0); logPost(": [TM] LOCKED A | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); err = kMutexUnlock(&mutexA); K_ASSERT(err==0); logPost(": [TM] UNLOCKED A | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); kSleep(60); }}/* TH (prio 1) */VOID Task1(VOID *args){ RK_UNUSEARGS RK_ERR err = -1; while (1) { kSleep(4); logPost("[TH ] Tries LOCK B (shall block) | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); err = kMutexLock(&mutexB, RK_WAIT_FOREVER); K_ASSERT(err==0); logPost(" [TH ] LOCKED B | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); err = kMutexUnlock(&mutexB); K_ASSERT(err==0); logPost(" [TH ] UNLOCKED B | Eff:%d Nom:%d\r\n", RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); kSleep(50); }}/* * TH0: blocks on B with a timeout. * This forces donor switching while TL still holds B. * */VOID Task0(VOID *args){ RK_UNUSEARGS UINT seen = 0; RK_ERR err = -1; while (1) { while (gCycle == seen) { kSleep(1); } seen = gCycle; logPost(": [TH0] cycle=%u Try LOCK B (timeout) | Eff:%d Nom:%d\r\n", seen, RK_RUNNING_PRIO, RK_RUNNING_NOM_PRIO); err = kMutexLock(&mutexB, 15); if (err == RK_ERR_TIMEOUT) logPost(": [TH0] cycle=%u TIMEOUT LOCKING B \r\n", seen); else if (err == RK_ERR_SUCCESS) { logPost(": [TH0] cycle=%u LOCKED B (TL released early)\r\n", seen); kMutexUnlock(&mutexB); } else { K_ASSERT(0); } }}/* bothers periodically */VOID Task4(VOID *args){ RK_UNUSEARGS while (1) { kSleep(5); kDelay(3); /* spins */ kSleep(2); logPost("[T4] ran\r\n") ; }}
/* REPLICATING THE PASS CRITERIA HERE FOR CONVENIENCE *//* While lowest priority TL holds mutexes A + B:If Highest priority tasks TH0 + TH are also waiting on B, EXPECTED: TL effective priority must be 0 If TH0 times out (removed from wait set!)EXPECTED: TL effective priority must drop to 1 (while still holding B) If TL unlocks A first (wrong order)EXPTECTED: TL must remain at priority 1 (TH still waits on B)EXPECTED: TL returns to its nominal priority when it is not blocking any higher-priority task. */ 0 ms :: : [TL] cycle=1 HOLDING A+B, BLOCK#1 | Eff:3 Nom:3 10 ms :: : [TH0] cycle=1 Try LOCK B (timeout) | Eff:0 Nom:0 30 ms :: : [TM] Attempt LOCK A (should block) | Eff:2 Nom:2 40 ms :: [TH ] Tries LOCK B (shall block) | Eff:1 Nom:1 100 ms :: : [TL] WOKE #1. (expect Eff=0) | Eff:0 Nom:3 100 ms :: [TL] HOLDING A+B, BLOCK#2 (wait H0 timeout) | Eff:0 Nom:3 100 ms :: [T4] ran 160 ms :: : [TH0] cycle=1 TIMEOUT LOCKING B 200 ms :: [T4] ran 300 ms :: [T4] ran 400 ms :: [TL] WOKE #2. UNLOCKING A (expect Eff=1) | Eff:1 Nom:3 400 ms :: [T4] ran 400 ms :: : [TM] LOCKED A | Eff:2 Nom:2 400 ms :: : [TM] UNLOCKED A | Eff:2 Nom:2 500 ms :: [TL] UNLOCKING B (shall be preempted) 500 ms :: [TH ] LOCKED B | Eff:1 Nom:1 500 ms :: [TH ] UNLOCKED B | Eff:1 Nom:1 500 ms :: [T4] ran 500 ms :: A+B UNLOCKED. Restore prio.| Eff:3 Nom:3 600 ms :: [T4] ran 700 ms :: [T4] ran 800 ms :: [T4] ran 900 ms :: [T4] ran 1000 ms :: [T4] ran 1000 ms :: : [TL] cycle=2 HOLDING A+B, BLOCK#1 | Eff:3 Nom:3 1010 ms :: : [TH0] cycle=2 Try LOCK B (timeout) | Eff:0 Nom:0 1030 ms :: : [TM] Attempt LOCK A (should block) | Eff:2 Nom:2 1040 ms :: [TH ] Tries LOCK B (shall block) | Eff:1 Nom:1 1100 ms :: : [TL] WOKE #1. (expect Eff=0) | Eff:0 Nom:3 1100 ms :: [TL] HOLDING A+B, BLOCK#2 (wait H0 timeout) | Eff:0 Nom:3 1100 ms :: [T4] ran 1160 ms :: : [TH0] cycle=2 TIMEOUT LOCKING B 1200 ms :: [T4] ran 1300 ms :: [T4] ran 1400 ms :: [TL] WOKE #2. UNLOCKING A (expect Eff=1) | Eff:1 Nom:3 1400 ms :: [T4] ran 1400 ms :: : [TM] LOCKED A | Eff:2 Nom:2 1400 ms :: : [TM] UNLOCKED A | Eff:2 Nom:2 1500 ms :: [TL] UNLOCKING B (shall be preempted) 1500 ms :: [TH ] LOCKED B | Eff:1 Nom:1 1500 ms :: [TH ] UNLOCKED B | Eff:1 Nom:1 1500 ms :: [T4] ran 1500 ms :: A+B UNLOCKED. Restore prio.| Eff:3 Nom:3

This log shows RK0 excelling in real-time correctness under a combination that raises the bar:
- nested locks (TL holds A+B),
- a changing donor set (TH0 timing out while TH still waits),
- to add a bit more of ‘noise’, the unlock order is different from the lock order (A first, then B).
The result is: TL is boosted to the highest waiter; drops as donors disappear, and only returns to its nominal priority when it is no longer blocking any higher-priority task. This is the PIP invariant, timestamped.
The reasons it can be done in RK0 stem from its design.
The Implementation in RK0
Let’s stick to the data structures for a more concrete view:
/* mutex control block (excerpt) */
struct RK_OBJ_MUTEX
{
UINT lock;
struct RK_OBJ_LIST waitingQueue; /* tasks waiting */
struct RK_OBJ_TCB *ownerPtr; /* the owner task */
struct RK_OBJ_LIST_NODE mutexNode; /* the node that maps direct to a owner task */
};
/* task control block (excerpt) */
struct RK_OBJ_TCB
{
RK_TASK_STATUS status; /* running, blocked.. */
RK_PRIO priority;/* Task effective priority */
RK_PRIO prioNominal;/* Task nominal priority */
struct RK_OBJ_MUTEX *waitingForMutexPtr; /* blocker mutex */
struct RK_OBJ_LIST ownedMutexList; /* owned mutex */
}
A Mutex is associated with its owner and its waiting tasks. Every time a task performs a Lock or Release on a Mutex, the PIP is applied while traversing a graph, as described in pseudo code here (the real code is in kmutex.c):
# OwnedList(T): set of mutexes a Task owns # (is the ownedMutexList in RK_OBJ_TCB)# WaitList(M): set of tasks waiting on a mutexes# (is the waitingQueue in RK_OBJ_MUTEX)# waitingMutex: the Mutex a task is blocked# (is the waitingMutexPtr in RK_OBJ_TCB)# Owner(M): owner of a Mutex M# (is ownerPtr in RK_OBJ_MUTEX)# in RK0 a lower priority value means higher priority. # 0 is the highest priority that # can be assigned to a task. 31 is the lowest. PIP protocol:T = Owner(M); # owner of the mutex operation lock or unlock was applied while (T != NULL) { # 1) recompute T's eff prio from the set OwnedList(T) # starts with the nominal (real) prio as the baseline # so we know if a task has inherited # (when comparing with the eff) # and keep the recomputed prio no higher than the # necessary for current waiters. newPrio = prioNominal(T); for each mutex M in OwnedList(T) { if (waitList(M) is not empty) { W = head(WaitList(M)); # highest-priority waiter newPrio = highestPriority(newPrio, prio(W)); } } # the value of the highest priority T is blocking was found if (newPrio == prio(T)) break; # nothing to be done prio(T) = newPrio; # inherit # 2) check if need to propagate # this is the transitive step if (T is BLOCKED && T->waitingMutex != NULL && Owner(T->waitingMutex) != NULL) { # if T's priority changed, keep its queue position correct if (size(wait(T->waitingMutex)) > 1) { reorder T inside that waiting queue by priority; } T = Owner(T->waitingMutex); } else { break; # end of chain }}
In the next post, we will discuss RK0 priority-driven message-passing. This mechanism suits fully-synchronous (RPCish) communication, and handles a to a not-so-obvious priority inversion case.
Many mainstream kernels do not provide fully transitive priority inheritance.

/* comment here */