887: executor: miri fixes r=Dirbaio a=Dirbaio

Fixes a few MIRI errors due to loosely mixing `&TaskStorage<F>` and `&TaskHeader`. References "downgrade" the provenance. `TaskHeader` is smaller, so once you have a `&TaskHeader` you can't use pointer casts to access the whole `TaskStorage<F>`. This fixes it by always keeping the raw pointer around, which doesn't downgrade provenance.

The error was: 

```
[dirbaio@mars std]$ MIRIFLAGS=-Zmiri-disable-isolation cargo miri run --bin tick
    Finished dev [unoptimized + debuginfo] target(s) in 0.05s
     Running `/home/dirbaio/.rustup/toolchains/nightly-2022-07-13-x86_64-unknown-linux-gnu/bin/cargo-miri target/miri/x86_64-unknown-linux-gnu/debug/tick`
error: Undefined Behavior: trying to reborrow <12349> for SharedReadWrite permission at alloc2[0x30], but that tag does not exist in the borrow stack for this location
   --> /home/dirbaio/embassy/embassy/embassy-executor/src/executor/raw/mod.rs:162:20
    |
162 |         let this = &*(p.as_ptr() as *const TaskStorage<F>);
    |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |                    |
    |                    trying to reborrow <12349> for SharedReadWrite permission at alloc2[0x30], but that tag does not exist in the borrow stack for this location
    |                    this error occurs as part of a reborrow at alloc2[0x30..0x40]
    |
    = help: this indicates a potential bug in the program: it performed an invalid operation, but the Stacked Borrows rules it violated are still experimental
    = help: see https://github.com/rust-lang/unsafe-code-guidelines/blob/master/wip/stacked-borrows.md for further information
help: <12349> was created by a retag at offsets [0x0..0x30]
   --> src/bin/tick.rs:15:1
    |
15  | #[embassy_executor::main]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^
    = note: backtrace:
    = note: inside `embassy_executor::executor::raw::TaskStorage::<std::future::from_generator::GenFuture<[static generator@src/bin/tick.rs:15:1: 15:26]>>::poll` at /home/dirbaio/embassy/embassy/embassy-executor/src/executor/raw/mod.rs:162:20
    = note: inside closure at /home/dirbaio/embassy/embassy/embassy-executor/src/executor/raw/mod.rs:390:13
    = note: inside `embassy_executor::executor::raw::run_queue::RunQueue::dequeue_all::<[closure@embassy_executor::executor::raw::Executor::poll::{closure#1}]>` at /home/dirbaio/embassy/embassy/embassy-executor/src/executor/raw/run_queue.rs:69:13
    = note: inside `embassy_executor::executor::raw::Executor::poll` at /home/dirbaio/embassy/embassy/embassy-executor/src/executor/raw/mod.rs:373:9
    = note: inside `embassy_executor::executor::Executor::run::<[closure@src/bin/tick.rs:15:1: 15:26]>` at /home/dirbaio/embassy/embassy/embassy-executor/src/executor/arch/std.rs:52:22
```

Co-authored-by: Dario Nieuwenhuis <dirbaio@dirbaio.net>
This commit is contained in:
bors[bot] 2022-08-01 12:33:58 +00:00 committed by GitHub
commit 2b0786129a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 32 deletions

View File

@ -70,24 +70,6 @@ impl TaskHeader {
timer_queue_item: timer_queue::TimerQueueItem::new(), timer_queue_item: timer_queue::TimerQueueItem::new(),
} }
} }
pub(crate) unsafe fn enqueue(&self) {
critical_section::with(|cs| {
let state = self.state.load(Ordering::Relaxed);
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
return;
}
// Mark it as scheduled
self.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
// We have just marked the task as scheduled, so enqueue it.
let executor = &*self.executor.get();
executor.enqueue(cs, self as *const TaskHeader as *mut TaskHeader);
})
}
} }
/// Raw storage in which a task can be spawned. /// Raw storage in which a task can be spawned.
@ -155,7 +137,7 @@ impl<F: Future + 'static> TaskStorage<F> {
// Initialize the task // Initialize the task
self.raw.poll_fn.write(Self::poll); self.raw.poll_fn.write(Self::poll);
self.future.write(future()); self.future.write(future());
NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader) NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader)
} }
unsafe fn poll(p: NonNull<TaskHeader>) { unsafe fn poll(p: NonNull<TaskHeader>) {
@ -323,7 +305,7 @@ impl Executor {
/// - `task` must be set up to run in this executor. /// - `task` must be set up to run in this executor.
/// - `task` must NOT be already enqueued (in this executor or another one). /// - `task` must NOT be already enqueued (in this executor or another one).
#[inline(always)] #[inline(always)]
unsafe fn enqueue(&self, cs: CriticalSection, task: *mut TaskHeader) { unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) {
if self.run_queue.enqueue(cs, task) { if self.run_queue.enqueue(cs, task) {
(self.signal_fn)(self.signal_ctx) (self.signal_fn)(self.signal_ctx)
} }
@ -339,11 +321,10 @@ impl Executor {
/// In this case, the task's Future must be Send. This is because this is effectively /// In this case, the task's Future must be Send. This is because this is effectively
/// sending the task to the executor thread. /// sending the task to the executor thread.
pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) { pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
let task = task.as_ref(); task.as_ref().executor.set(self);
task.executor.set(self);
critical_section::with(|cs| { critical_section::with(|cs| {
self.enqueue(cs, task as *const _ as _); self.enqueue(cs, task);
}) })
} }
@ -366,9 +347,7 @@ impl Executor {
/// no `poll()` already running. /// no `poll()` already running.
pub unsafe fn poll(&'static self) { pub unsafe fn poll(&'static self) {
#[cfg(feature = "time")] #[cfg(feature = "time")]
self.timer_queue.dequeue_expired(Instant::now(), |p| { self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
p.as_ref().enqueue();
});
self.run_queue.dequeue_all(|p| { self.run_queue.dequeue_all(|p| {
let task = p.as_ref(); let task = p.as_ref();
@ -421,7 +400,22 @@ impl Executor {
/// ///
/// `task` must be a valid task pointer obtained from [`task_from_waker`]. /// `task` must be a valid task pointer obtained from [`task_from_waker`].
pub unsafe fn wake_task(task: NonNull<TaskHeader>) { pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
task.as_ref().enqueue(); critical_section::with(|cs| {
let header = task.as_ref();
let state = header.state.load(Ordering::Relaxed);
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
return;
}
// Mark it as scheduled
header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
// We have just marked the task as scheduled, so enqueue it.
let executor = &*header.executor.get();
executor.enqueue(cs, task);
})
} }
#[cfg(feature = "time")] #[cfg(feature = "time")]

View File

@ -46,10 +46,10 @@ impl RunQueue {
/// ///
/// `item` must NOT be already enqueued in any queue. /// `item` must NOT be already enqueued in any queue.
#[inline(always)] #[inline(always)]
pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: *mut TaskHeader) -> bool { pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool {
let prev = self.head.load(Ordering::Relaxed); let prev = self.head.load(Ordering::Relaxed);
(*task).run_queue_item.next.store(prev, Ordering::Relaxed); task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed);
self.head.store(task, Ordering::Relaxed); self.head.store(task.as_ptr(), Ordering::Relaxed);
prev.is_null() prev.is_null()
} }

View File

@ -2,7 +2,7 @@ use core::mem;
use core::ptr::NonNull; use core::ptr::NonNull;
use core::task::{RawWaker, RawWakerVTable, Waker}; use core::task::{RawWaker, RawWakerVTable, Waker};
use super::TaskHeader; use super::{wake_task, TaskHeader};
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
@ -11,7 +11,7 @@ unsafe fn clone(p: *const ()) -> RawWaker {
} }
unsafe fn wake(p: *const ()) { unsafe fn wake(p: *const ()) {
(*(p as *mut TaskHeader)).enqueue() wake_task(NonNull::new_unchecked(p as *mut TaskHeader))
} }
unsafe fn drop(_: *const ()) { unsafe fn drop(_: *const ()) {