1use std::ops::{Deref, DerefMut};
2#[cfg(any(unix, target_os = "wasi"))]
3use std::os::fd::AsRawFd;
4#[cfg(target_os = "hermit")]
7use std::os::hermit::io::AsRawFd;
8#[cfg(windows)]
9use std::os::windows::io::AsRawSocket;
10#[cfg(debug_assertions)]
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::{fmt, io};
13
14use crate::sys::IoSourceState;
15use crate::{event, Interest, Registry, Token};
16
17pub struct IoSource<T> {
38 state: IoSourceState,
39 inner: T,
40 #[cfg(debug_assertions)]
41 selector_id: SelectorId,
42}
43
44impl<T> IoSource<T> {
45 pub fn new(io: T) -> IoSource<T> {
47 IoSource {
48 state: IoSourceState::new(),
49 inner: io,
50 #[cfg(debug_assertions)]
51 selector_id: SelectorId::new(),
52 }
53 }
54
55 pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
66 where
67 F: FnOnce(&T) -> io::Result<R>,
68 {
69 self.state.do_io(f, &self.inner)
70 }
71
72 pub fn into_inner(self) -> T {
81 self.inner
82 }
83}
84
85impl<T> Deref for IoSource<T> {
90 type Target = T;
91
92 fn deref(&self) -> &Self::Target {
93 &self.inner
94 }
95}
96
97impl<T> DerefMut for IoSource<T> {
102 fn deref_mut(&mut self) -> &mut Self::Target {
103 &mut self.inner
104 }
105}
106
107#[cfg(any(unix, target_os = "hermit"))]
108impl<T> event::Source for IoSource<T>
109where
110 T: AsRawFd,
111{
112 fn register(
113 &mut self,
114 registry: &Registry,
115 token: Token,
116 interests: Interest,
117 ) -> io::Result<()> {
118 #[cfg(debug_assertions)]
119 self.selector_id.associate(registry)?;
120 self.state
121 .register(registry, token, interests, self.inner.as_raw_fd())
122 }
123
124 fn reregister(
125 &mut self,
126 registry: &Registry,
127 token: Token,
128 interests: Interest,
129 ) -> io::Result<()> {
130 #[cfg(debug_assertions)]
131 self.selector_id.check_association(registry)?;
132 self.state
133 .reregister(registry, token, interests, self.inner.as_raw_fd())
134 }
135
136 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
137 #[cfg(debug_assertions)]
138 self.selector_id.remove_association(registry)?;
139 self.state.deregister(registry, self.inner.as_raw_fd())
140 }
141}
142
143#[cfg(windows)]
144impl<T> event::Source for IoSource<T>
145where
146 T: AsRawSocket,
147{
148 fn register(
149 &mut self,
150 registry: &Registry,
151 token: Token,
152 interests: Interest,
153 ) -> io::Result<()> {
154 #[cfg(debug_assertions)]
155 self.selector_id.associate(registry)?;
156 self.state
157 .register(registry, token, interests, self.inner.as_raw_socket())
158 }
159
160 fn reregister(
161 &mut self,
162 registry: &Registry,
163 token: Token,
164 interests: Interest,
165 ) -> io::Result<()> {
166 #[cfg(debug_assertions)]
167 self.selector_id.check_association(registry)?;
168 self.state.reregister(registry, token, interests)
169 }
170
171 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
172 #[cfg(debug_assertions)]
173 self.selector_id.remove_association(_registry)?;
174 self.state.deregister()
175 }
176}
177
178#[cfg(target_os = "wasi")]
179impl<T> event::Source for IoSource<T>
180where
181 T: AsRawFd,
182{
183 fn register(
184 &mut self,
185 registry: &Registry,
186 token: Token,
187 interests: Interest,
188 ) -> io::Result<()> {
189 #[cfg(debug_assertions)]
190 self.selector_id.associate(registry)?;
191 registry
192 .selector()
193 .register(self.inner.as_raw_fd() as _, token, interests)
194 }
195
196 fn reregister(
197 &mut self,
198 registry: &Registry,
199 token: Token,
200 interests: Interest,
201 ) -> io::Result<()> {
202 #[cfg(debug_assertions)]
203 self.selector_id.check_association(registry)?;
204 registry
205 .selector()
206 .reregister(self.inner.as_raw_fd() as _, token, interests)
207 }
208
209 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
210 #[cfg(debug_assertions)]
211 self.selector_id.remove_association(registry)?;
212 registry.selector().deregister(self.inner.as_raw_fd() as _)
213 }
214}
215
216impl<T> fmt::Debug for IoSource<T>
217where
218 T: fmt::Debug,
219{
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 self.inner.fmt(f)
222 }
223}
224
225#[cfg(debug_assertions)]
227#[derive(Debug)]
228struct SelectorId {
229 id: AtomicUsize,
230}
231
232#[cfg(debug_assertions)]
233impl SelectorId {
234 const UNASSOCIATED: usize = 0;
237
238 const fn new() -> SelectorId {
240 SelectorId {
241 id: AtomicUsize::new(Self::UNASSOCIATED),
242 }
243 }
244
245 fn associate(&self, registry: &Registry) -> io::Result<()> {
248 let registry_id = registry.selector().id();
249 let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
250
251 if previous_id == Self::UNASSOCIATED {
252 Ok(())
253 } else {
254 Err(io::Error::new(
255 io::ErrorKind::AlreadyExists,
256 "I/O source already registered with a `Registry`",
257 ))
258 }
259 }
260
261 fn check_association(&self, registry: &Registry) -> io::Result<()> {
265 let registry_id = registry.selector().id();
266 let id = self.id.load(Ordering::Acquire);
267
268 if id == registry_id {
269 Ok(())
270 } else if id == Self::UNASSOCIATED {
271 Err(io::Error::new(
272 io::ErrorKind::NotFound,
273 "I/O source not registered with `Registry`",
274 ))
275 } else {
276 Err(io::Error::new(
277 io::ErrorKind::AlreadyExists,
278 "I/O source already registered with a different `Registry`",
279 ))
280 }
281 }
282
283 fn remove_association(&self, registry: &Registry) -> io::Result<()> {
286 let registry_id = registry.selector().id();
287 let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
288
289 if previous_id == registry_id {
290 Ok(())
291 } else {
292 Err(io::Error::new(
293 io::ErrorKind::NotFound,
294 "I/O source not registered with `Registry`",
295 ))
296 }
297 }
298}
299
300#[cfg(debug_assertions)]
301impl Clone for SelectorId {
302 fn clone(&self) -> SelectorId {
303 SelectorId {
304 id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
305 }
306 }
307}