1use std::{
2 borrow::Borrow,
3 io,
4 net::{Ipv4Addr, Ipv6Addr, SocketAddr},
5 ops::{Deref, DerefMut},
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures_util::stream::{self, Fuse, Stream, StreamExt};
11#[cfg(feature = "tokio")]
12use tokio::net::TcpStream;
13
14#[cfg(feature = "tokio")]
15use crate::ToProxyAddrs;
16use crate::{
17 io::{AsyncSocket, AsyncSocketExt},
18 Authentication, Error, IntoTargetAddr, Result, TargetAddr,
19};
20
21#[repr(u8)]
22#[derive(Clone, Copy)]
23enum Command {
24 Connect = 0x01,
25 Bind = 0x02,
26 #[allow(dead_code)]
27 Associate = 0x03,
28 #[cfg(feature = "tor")]
29 TorResolve = 0xF0,
30 #[cfg(feature = "tor")]
31 TorResolvePtr = 0xF1,
32}
33
34#[derive(Debug)]
38pub struct Socks5Stream<S> {
39 socket: S,
40 target: TargetAddr<'static>,
41}
42
43impl<S> Deref for Socks5Stream<S> {
44 type Target = S;
45
46 fn deref(&self) -> &Self::Target {
47 &self.socket
48 }
49}
50
51impl<S> DerefMut for Socks5Stream<S> {
52 fn deref_mut(&mut self) -> &mut Self::Target {
53 &mut self.socket
54 }
55}
56
57#[cfg(feature = "tokio")]
58impl Socks5Stream<TcpStream> {
59 pub async fn connect<'t, P, T>(proxy: P, target: T) -> Result<Socks5Stream<TcpStream>>
67 where
68 P: ToProxyAddrs,
69 T: IntoTargetAddr<'t>,
70 {
71 Self::execute_command(proxy, target, Authentication::None, Command::Connect).await
72 }
73
74 pub async fn connect_with_password<'a, 't, P, T>(
82 proxy: P,
83 target: T,
84 username: &'a str,
85 password: &'a str,
86 ) -> Result<Socks5Stream<TcpStream>>
87 where
88 P: ToProxyAddrs,
89 T: IntoTargetAddr<'t>,
90 {
91 Self::execute_command(
92 proxy,
93 target,
94 Authentication::Password { username, password },
95 Command::Connect,
96 )
97 .await
98 }
99
100 #[cfg(feature = "tor")]
101 pub async fn tor_resolve<'t, P, T>(proxy: P, target: T) -> Result<TargetAddr<'static>>
104 where
105 P: ToProxyAddrs,
106 T: IntoTargetAddr<'t>,
107 {
108 let sock = Self::execute_command(proxy, target, Authentication::None, Command::TorResolve).await?;
109
110 Ok(sock.target_addr().to_owned())
111 }
112
113 #[cfg(feature = "tor")]
114 pub async fn tor_resolve_ptr<'t, P, T>(proxy: P, target: T) -> Result<TargetAddr<'static>>
118 where
119 P: ToProxyAddrs,
120 T: IntoTargetAddr<'t>,
121 {
122 let sock = Self::execute_command(proxy, target, Authentication::None, Command::TorResolvePtr).await?;
123
124 Ok(sock.target_addr().to_owned())
125 }
126
127 async fn execute_command<'a, 't, P, T>(
128 proxy: P,
129 target: T,
130 auth: Authentication<'a>,
131 command: Command,
132 ) -> Result<Socks5Stream<TcpStream>>
133 where
134 P: ToProxyAddrs,
135 T: IntoTargetAddr<'t>,
136 {
137 Self::validate_auth(&auth)?;
138
139 let sock = SocksConnector::new(auth, command, proxy.to_proxy_addrs().fuse(), target.into_target_addr()?)
140 .execute()
141 .await?;
142
143 Ok(sock)
144 }
145}
146
147impl<S> Socks5Stream<S>
148where
149 S: AsyncSocket + Unpin,
150{
151 pub async fn connect_with_socket<'t, T>(socket: S, target: T) -> Result<Socks5Stream<S>>
158 where
159 T: IntoTargetAddr<'t>,
160 {
161 Self::execute_command_with_socket(socket, target, Authentication::None, Command::Connect).await
162 }
163
164 pub async fn connect_with_password_and_socket<'a, 't, T>(
172 socket: S,
173 target: T,
174 username: &'a str,
175 password: &'a str,
176 ) -> Result<Socks5Stream<S>>
177 where
178 T: IntoTargetAddr<'t>,
179 {
180 Self::execute_command_with_socket(
181 socket,
182 target,
183 Authentication::Password { username, password },
184 Command::Connect,
185 )
186 .await
187 }
188
189 fn validate_auth(auth: &Authentication<'_>) -> Result<()> {
190 match auth {
191 Authentication::Password { username, password } => {
192 let username_len = username.len();
193 if !(1..=255).contains(&username_len) {
194 Err(Error::InvalidAuthValues("username length should between 1 to 255"))?
195 }
196 let password_len = password.len();
197 if !(1..=255).contains(&password_len) {
198 Err(Error::InvalidAuthValues("password length should between 1 to 255"))?
199 }
200 },
201 Authentication::None => {},
202 }
203 Ok(())
204 }
205
206 #[cfg(feature = "tor")]
207 pub async fn tor_resolve_with_socket<'t, T>(socket: S, target: T) -> Result<TargetAddr<'static>>
210 where
211 T: IntoTargetAddr<'t>,
212 {
213 let sock = Self::execute_command_with_socket(socket, target, Authentication::None, Command::TorResolve).await?;
214
215 Ok(sock.target_addr().to_owned())
216 }
217
218 #[cfg(feature = "tor")]
219 pub async fn tor_resolve_ptr_with_socket<'t, T>(socket: S, target: T) -> Result<TargetAddr<'static>>
223 where
224 T: IntoTargetAddr<'t>,
225 {
226 let sock =
227 Self::execute_command_with_socket(socket, target, Authentication::None, Command::TorResolvePtr).await?;
228
229 Ok(sock.target_addr().to_owned())
230 }
231
232 async fn execute_command_with_socket<'a, 't, T>(
233 socket: S,
234 target: T,
235 auth: Authentication<'a>,
236 command: Command,
237 ) -> Result<Socks5Stream<S>>
238 where
239 T: IntoTargetAddr<'t>,
240 {
241 Self::validate_auth(&auth)?;
242
243 let sock = SocksConnector::new(auth, command, stream::empty().fuse(), target.into_target_addr()?)
244 .execute_with_socket(socket)
245 .await?;
246
247 Ok(sock)
248 }
249
250 pub fn into_inner(self) -> S {
252 self.socket
253 }
254
255 pub fn target_addr(&self) -> TargetAddr<'_> {
257 match &self.target {
258 TargetAddr::Ip(addr) => TargetAddr::Ip(*addr),
259 TargetAddr::Domain(domain, port) => {
260 let domain: &str = domain.borrow();
261 TargetAddr::Domain(domain.into(), *port)
262 },
263 }
264 }
265}
266
267pub struct SocksConnector<'a, 't, S> {
269 auth: Authentication<'a>,
270 command: Command,
271 #[allow(dead_code)]
272 proxy: Fuse<S>,
273 target: TargetAddr<'t>,
274 buf: [u8; 513],
275 ptr: usize,
276 len: usize,
277}
278
279impl<'a, 't, S> SocksConnector<'a, 't, S>
280where
281 S: Stream<Item = Result<SocketAddr>> + Unpin,
282{
283 fn new(auth: Authentication<'a>, command: Command, proxy: Fuse<S>, target: TargetAddr<'t>) -> Self {
284 SocksConnector {
285 auth,
286 command,
287 proxy,
288 target,
289 buf: [0; 513],
290 ptr: 0,
291 len: 0,
292 }
293 }
294
295 #[cfg(feature = "tokio")]
296 pub async fn execute(&mut self) -> Result<Socks5Stream<TcpStream>> {
298 let next_addr = self.proxy.select_next_some().await?;
299 let tcp = TcpStream::connect(next_addr)
300 .await
301 .map_err(|_| Error::ProxyServerUnreachable)?;
302
303 self.execute_with_socket(tcp).await
304 }
305
306 pub async fn execute_with_socket<T: AsyncSocket + Unpin>(&mut self, mut socket: T) -> Result<Socks5Stream<T>> {
307 self.authenticate(&mut socket).await?;
308
309 self.prepare_send_request();
311 socket.write_all(&self.buf[self.ptr..self.len]).await?;
312
313 let target = self.receive_reply(&mut socket).await?;
314
315 Ok(Socks5Stream { socket, target })
316 }
317
318 fn prepare_send_method_selection(&mut self) {
319 self.ptr = 0;
320 self.buf[0] = 0x05;
321 match self.auth {
322 Authentication::None => {
323 self.buf[1..3].copy_from_slice(&[1, 0x00]);
324 self.len = 3;
325 },
326 Authentication::Password { .. } => {
327 self.buf[1..4].copy_from_slice(&[2, 0x00, 0x02]);
328 self.len = 4;
329 },
330 }
331 }
332
333 fn prepare_recv_method_selection(&mut self) {
334 self.ptr = 0;
335 self.len = 2;
336 }
337
338 fn prepare_send_password_auth(&mut self) {
339 if let Authentication::Password { username, password } = self.auth {
340 self.ptr = 0;
341 self.buf[0] = 0x01;
342 let username_bytes = username.as_bytes();
343 let username_len = username_bytes.len();
344 self.buf[1] = username_len as u8;
345 self.buf[2..(2 + username_len)].copy_from_slice(username_bytes);
346 let password_bytes = password.as_bytes();
347 let password_len = password_bytes.len();
348 self.len = 3 + username_len + password_len;
349 self.buf[2 + username_len] = password_len as u8;
350 self.buf[(3 + username_len)..self.len].copy_from_slice(password_bytes);
351 } else {
352 unreachable!()
353 }
354 }
355
356 fn prepare_recv_password_auth(&mut self) {
357 self.ptr = 0;
358 self.len = 2;
359 }
360
361 fn prepare_send_request(&mut self) {
362 self.ptr = 0;
363 self.buf[..3].copy_from_slice(&[0x05, self.command as u8, 0x00]);
364 match &self.target {
365 TargetAddr::Ip(SocketAddr::V4(addr)) => {
366 self.buf[3] = 0x01;
367 self.buf[4..8].copy_from_slice(&addr.ip().octets());
368 self.buf[8..10].copy_from_slice(&addr.port().to_be_bytes());
369 self.len = 10;
370 },
371 TargetAddr::Ip(SocketAddr::V6(addr)) => {
372 self.buf[3] = 0x04;
373 self.buf[4..20].copy_from_slice(&addr.ip().octets());
374 self.buf[20..22].copy_from_slice(&addr.port().to_be_bytes());
375 self.len = 22;
376 },
377 TargetAddr::Domain(domain, port) => {
378 self.buf[3] = 0x03;
379 let domain = domain.as_bytes();
380 let len = domain.len();
381 self.buf[4] = len as u8;
382 self.buf[5..5 + len].copy_from_slice(domain);
383 self.buf[(5 + len)..(7 + len)].copy_from_slice(&port.to_be_bytes());
384 self.len = 7 + len;
385 },
386 }
387 }
388
389 fn prepare_recv_reply(&mut self) {
390 self.ptr = 0;
391 self.len = 4;
392 }
393
394 async fn password_authentication_protocol<T: AsyncSocket + Unpin>(&mut self, tcp: &mut T) -> Result<()> {
395 if let Authentication::None = self.auth {
396 return Err(Error::AuthorizationRequired);
397 }
398
399 self.prepare_send_password_auth();
400 tcp.write_all(&self.buf[self.ptr..self.len]).await?;
401
402 self.prepare_recv_password_auth();
403 tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
404
405 if self.buf[0] != 0x01 {
406 return Err(Error::InvalidResponseVersion);
407 }
408 if self.buf[1] != 0x00 {
409 return Err(Error::PasswordAuthFailure(self.buf[1]));
410 }
411
412 Ok(())
413 }
414
415 async fn authenticate<T: AsyncSocket + Unpin>(&mut self, tcp: &mut T) -> Result<()> {
416 self.prepare_send_method_selection();
418 tcp.write_all(&self.buf[self.ptr..self.len]).await?;
419
420 self.prepare_recv_method_selection();
422 tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
423 if self.buf[0] != 0x05 {
424 return Err(Error::InvalidResponseVersion);
425 }
426 match self.buf[1] {
427 0x00 => {
428 },
430 0x02 => {
431 self.password_authentication_protocol(tcp).await?;
432 },
433 0xff => {
434 return Err(Error::NoAcceptableAuthMethods);
435 },
436 m if m != self.auth.id() => return Err(Error::UnknownAuthMethod),
437 _ => unimplemented!(),
438 }
439
440 Ok(())
441 }
442
443 async fn receive_reply<T: AsyncSocket + Unpin>(&mut self, tcp: &mut T) -> Result<TargetAddr<'static>> {
444 self.prepare_recv_reply();
445 self.ptr += tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
446 if self.buf[0] != 0x05 {
447 return Err(Error::InvalidResponseVersion);
448 }
449 if self.buf[2] != 0x00 {
450 return Err(Error::InvalidReservedByte);
451 }
452
453 match self.buf[1] {
454 0x00 => {}, 0x01 => Err(Error::GeneralSocksServerFailure)?,
456 0x02 => Err(Error::ConnectionNotAllowedByRuleset)?,
457 0x03 => Err(Error::NetworkUnreachable)?,
458 0x04 => Err(Error::HostUnreachable)?,
459 0x05 => Err(Error::ConnectionRefused)?,
460 0x06 => Err(Error::TtlExpired)?,
461 0x07 => Err(Error::CommandNotSupported)?,
462 0x08 => Err(Error::AddressTypeNotSupported)?,
463 _ => Err(Error::UnknownAuthMethod)?,
464 }
465
466 match self.buf[3] {
467 0x01 => {
469 self.len = 10;
470 },
471 0x04 => {
473 self.len = 22;
474 },
475 0x03 => {
477 self.len = 5;
478 self.ptr += tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
479 self.len += self.buf[4] as usize + 2;
480 },
481 _ => Err(Error::UnknownAddressType)?,
482 }
483
484 self.ptr += tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
485 let target: TargetAddr<'static> = match self.buf[3] {
486 0x01 => {
488 let mut ip = [0; 4];
489 ip[..].copy_from_slice(&self.buf[4..8]);
490 let ip = Ipv4Addr::from(ip);
491 let port = u16::from_be_bytes([self.buf[8], self.buf[9]]);
492 (ip, port).into_target_addr()?
493 },
494 0x04 => {
496 let mut ip = [0; 16];
497 ip[..].copy_from_slice(&self.buf[4..20]);
498 let ip = Ipv6Addr::from(ip);
499 let port = u16::from_be_bytes([self.buf[20], self.buf[21]]);
500 (ip, port).into_target_addr()?
501 },
502 0x03 => {
504 let domain_bytes = self.buf[5..(self.len - 2)].to_vec();
505 let domain = String::from_utf8(domain_bytes)
506 .map_err(|_| Error::InvalidTargetAddress("not a valid UTF-8 string"))?;
507 let port = u16::from_be_bytes([self.buf[self.len - 2], self.buf[self.len - 1]]);
508 TargetAddr::Domain(domain.into(), port)
509 },
510 _ => unreachable!(),
511 };
512
513 Ok(target)
514 }
515}
516
517pub struct Socks5Listener<S> {
524 inner: Socks5Stream<S>,
525}
526
527#[cfg(feature = "tokio")]
528impl Socks5Listener<TcpStream> {
529 pub async fn bind<'t, P, T>(proxy: P, target: T) -> Result<Socks5Listener<TcpStream>>
539 where
540 P: ToProxyAddrs,
541 T: IntoTargetAddr<'t>,
542 {
543 Self::bind_with_auth(Authentication::None, proxy, target).await
544 }
545
546 pub async fn bind_with_password<'a, 't, P, T>(
557 proxy: P,
558 target: T,
559 username: &'a str,
560 password: &'a str,
561 ) -> Result<Socks5Listener<TcpStream>>
562 where
563 P: ToProxyAddrs,
564 T: IntoTargetAddr<'t>,
565 {
566 Self::bind_with_auth(Authentication::Password { username, password }, proxy, target).await
567 }
568
569 async fn bind_with_auth<'t, P, T>(
570 auth: Authentication<'_>,
571 proxy: P,
572 target: T,
573 ) -> Result<Socks5Listener<TcpStream>>
574 where
575 P: ToProxyAddrs,
576 T: IntoTargetAddr<'t>,
577 {
578 let socket = SocksConnector::new(
579 auth,
580 Command::Bind,
581 proxy.to_proxy_addrs().fuse(),
582 target.into_target_addr()?,
583 )
584 .execute()
585 .await?;
586
587 Ok(Socks5Listener { inner: socket })
588 }
589}
590
591impl<S> Socks5Listener<S>
592where
593 S: AsyncSocket + Unpin,
594{
595 pub async fn bind_with_socket<'t, T>(socket: S, target: T) -> Result<Socks5Listener<S>>
606 where
607 T: IntoTargetAddr<'t>,
608 {
609 Self::bind_with_auth_and_socket(Authentication::None, socket, target).await
610 }
611
612 pub async fn bind_with_password_and_socket<'a, 't, T>(
623 socket: S,
624 target: T,
625 username: &'a str,
626 password: &'a str,
627 ) -> Result<Socks5Listener<S>>
628 where
629 T: IntoTargetAddr<'t>,
630 {
631 Self::bind_with_auth_and_socket(Authentication::Password { username, password }, socket, target).await
632 }
633
634 async fn bind_with_auth_and_socket<'t, T>(
635 auth: Authentication<'_>,
636 socket: S,
637 target: T,
638 ) -> Result<Socks5Listener<S>>
639 where
640 T: IntoTargetAddr<'t>,
641 {
642 let socket = SocksConnector::new(auth, Command::Bind, stream::empty().fuse(), target.into_target_addr()?)
643 .execute_with_socket(socket)
644 .await?;
645
646 Ok(Socks5Listener { inner: socket })
647 }
648
649 pub fn bind_addr(&self) -> TargetAddr {
654 self.inner.target_addr()
655 }
656
657 pub async fn accept(mut self) -> Result<Socks5Stream<S>> {
663 let mut connector = SocksConnector {
664 auth: Authentication::None,
665 command: Command::Bind,
666 proxy: stream::empty().fuse(),
667 target: self.inner.target,
668 buf: [0; 513],
669 ptr: 0,
670 len: 0,
671 };
672
673 let target = connector.receive_reply(&mut self.inner.socket).await?;
674
675 Ok(Socks5Stream {
676 socket: self.inner.socket,
677 target,
678 })
679 }
680}
681
682#[cfg(feature = "tokio")]
683impl<T> tokio::io::AsyncRead for Socks5Stream<T>
684where
685 T: tokio::io::AsyncRead + Unpin,
686{
687 fn poll_read(
688 mut self: Pin<&mut Self>,
689 cx: &mut Context<'_>,
690 buf: &mut tokio::io::ReadBuf<'_>,
691 ) -> Poll<io::Result<()>> {
692 tokio::io::AsyncRead::poll_read(Pin::new(&mut self.socket), cx, buf)
693 }
694}
695
696#[cfg(feature = "tokio")]
697impl<T> tokio::io::AsyncWrite for Socks5Stream<T>
698where
699 T: tokio::io::AsyncWrite + Unpin,
700{
701 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
702 tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.socket), cx, buf)
703 }
704
705 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
706 tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.socket), cx)
707 }
708
709 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
710 tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.socket), cx)
711 }
712}
713
714#[cfg(feature = "futures-io")]
715impl<T> futures_io::AsyncRead for Socks5Stream<T>
716where
717 T: futures_io::AsyncRead + Unpin,
718{
719 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
720 futures_io::AsyncRead::poll_read(Pin::new(&mut self.socket), cx, buf)
721 }
722}
723
724#[cfg(feature = "futures-io")]
725impl<T> futures_io::AsyncWrite for Socks5Stream<T>
726where
727 T: futures_io::AsyncWrite + Unpin,
728{
729 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
730 futures_io::AsyncWrite::poll_write(Pin::new(&mut self.socket), cx, buf)
731 }
732
733 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
734 futures_io::AsyncWrite::poll_flush(Pin::new(&mut self.socket), cx)
735 }
736
737 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
738 futures_io::AsyncWrite::poll_close(Pin::new(&mut self.socket), cx)
739 }
740}