diff options
author | Jake McGinty <me@jake.su> | 2018-04-13 18:38:42 -0700 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2018-04-22 14:08:41 -0700 |
commit | 5bed7b5015d92b01d2f3a0992c33aa7aa73f52fa (patch) | |
tree | 2a9bd444c81d8e329a8ce8d1a896a8bb76924254 /src | |
parent | udp: set IPV6_RECVPKTINFO sockopt (darwin) (diff) | |
download | wireguard-rs-5bed7b5015d92b01d2f3a0992c33aa7aa73f52fa.tar.xz wireguard-rs-5bed7b5015d92b01d2f3a0992c33aa7aa73f52fa.zip |
udp: remove the unused Connected/Unconnected UDP enum
Diffstat (limited to 'src')
-rw-r--r-- | src/udp/frame.rs | 49 | ||||
-rw-r--r-- | src/udp/mod.rs | 140 |
2 files changed, 101 insertions, 88 deletions
diff --git a/src/udp/frame.rs b/src/udp/frame.rs index 1f4feaf..860d421 100644 --- a/src/udp/frame.rs +++ b/src/udp/frame.rs @@ -5,15 +5,10 @@ use std::os::unix::io::{AsRawFd, RawFd}; use failure::Error; use futures::{Async, Future, Poll, Stream, Sink, StartSend, AsyncSink, future, stream, unsync::mpsc}; use nix::sys::socket::{sockopt, setsockopt}; -use udp::{ConnectedUdpSocket, UdpSocket}; +use udp::UdpSocket; use tokio_core::reactor::Handle; use std::net::Ipv6Addr; -pub enum Socket { - Unconnected(UdpSocket), - Connected(ConnectedUdpSocket), -} - /// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using /// the `UdpCodec` trait to encode and decode frames. /// @@ -21,7 +16,7 @@ pub enum Socket { /// adapter. #[must_use = "sinks do nothing unless polled"] pub struct UdpFramed { - socket: Socket, + socket: UdpSocket, codec: VecUdpCodec, rd: Vec<u8>, wr: Vec<u8>, @@ -29,24 +24,12 @@ pub struct UdpFramed { flushed: bool, } -impl UdpFramed { - pub fn socket(&self) -> &UdpSocket { - match self.socket { - Socket::Unconnected(ref socket) => socket, - Socket::Connected(ref socket) => &socket.inner, - } - } -} - impl Stream for UdpFramed { type Item = PeerServerMessage; type Error = io::Error; fn poll(&mut self) -> Poll<Option<PeerServerMessage>, io::Error> { - let (n, addr) = match self.socket { - Socket::Unconnected(ref socket) => try_nb!(socket.recv_from(&mut self.rd)), - Socket::Connected(ref socket) => (try_nb!(socket.inner.recv(&mut self.rd)), socket.addr), - }; + let (n, addr) = try_nb!(self.socket.recv_from(&mut self.rd)); trace!("received {} bytes, decoding", n); let frame = self.codec.decode(&addr, &self.rd[..n])?; trace!("frame decoded from buffer"); @@ -81,10 +64,7 @@ impl Sink for UdpFramed { } trace!("flushing frame; length={}", self.wr.len()); - let n = match self.socket { - Socket::Unconnected(ref socket) => try_nb!(socket.send_to(&self.wr, &self.out_addr)), - Socket::Connected(ref socket) => try_nb!(socket.inner.send(&self.wr)) // TODO check to make sure the address is the connected address - }; + let n = try_nb!(self.socket.send_to(&self.wr, &self.out_addr)); trace!("written {}", n); let wrote_all = n == self.wr.len(); @@ -105,7 +85,7 @@ impl Sink for UdpFramed { } } -pub fn new(socket: Socket) -> UdpFramed { +pub fn new(socket: UdpSocket) -> UdpFramed { UdpFramed { socket, codec: VecUdpCodec {}, @@ -123,10 +103,7 @@ impl UdpFramed { /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn get_ref(&self) -> &UdpSocket { - match self.socket { - Socket::Connected(ref socket) => &socket.inner, - Socket::Unconnected(ref socket) => socket - } + &self.socket } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -136,10 +113,7 @@ impl UdpFramed { /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn get_mut(&mut self) -> &mut UdpSocket { - match self.socket { - Socket::Connected(ref mut socket) => &mut socket.inner, - Socket::Unconnected(ref mut socket) => socket - } + &mut self.socket } /// Consumes the `Framed`, returning its underlying I/O stream. @@ -148,10 +122,7 @@ impl UdpFramed { /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn into_inner(self) -> UdpSocket { - match self.socket { - Socket::Connected(socket) => socket.inner, - Socket::Unconnected(socket) => socket - } + self.socket } } @@ -203,8 +174,8 @@ pub struct UdpChannel { impl From<UdpFramed> for UdpChannel { fn from(framed: UdpFramed) -> Self { - let fd = framed.socket().as_raw_fd(); - let handle = framed.socket().handle.clone(); + let fd = framed.socket.as_raw_fd(); + let handle = framed.socket.handle.clone(); let (udp_sink, ingress) = framed.split(); let (egress, egress_rx) = mpsc::channel(1024); let udp_writethrough = udp_sink diff --git a/src/udp/mod.rs b/src/udp/mod.rs index f75a05d..61851e2 100644 --- a/src/udp/mod.rs +++ b/src/udp/mod.rs @@ -8,7 +8,7 @@ use futures::{Async, Future, Poll}; use libc; use mio; use nix::{self, errno::Errno}; -use nix::sys::{uio::IoVec, socket::{CmsgSpace, MsgFlags, SockAddr, recvmsg}}; +use nix::sys::{uio::IoVec, socket::{CmsgSpace, ControlMessage, UnknownCmsg, MsgFlags, SockAddr, recvmsg}}; use socket2::{Socket, Domain, Type, Protocol}; use tokio_core::reactor::{Handle, PollEvented}; @@ -20,34 +20,25 @@ pub struct UdpSocket { } /// IPV6_RECVPKTINFO is missing from the libc crate. Value taken from https://git.io/vxNel. -pub const IPV6_RECVPKTINFO: i32 = 61; - -/* -struct in6_pktinfo { - struct in6_addr ipi6_addr; /* src/dst IPv6 address */ - unsigned int ipi6_ifindex; /* send/recv interface index */ -}; -*/ +pub const IPV6_RECVPKTINFO : i32 = 61; +pub const IP_PKTINFO : i32 = 26; +pub const IP_RECVDSTADDR : i32 = 7; #[repr(C)] struct in6_pktinfo { - ipi6_addr : libc::in6_addr, + ipi6_addr : libc::in6_addr, ipi6_ifindex : libc::c_uint } -mod frame; -pub use self::frame::{UdpChannel, UdpFramed, VecUdpCodec, PeerServerMessage}; - -pub struct ConnectedUdpSocket { - inner: UdpSocket, - addr: SocketAddr, +#[repr(C)] +struct in_pktinfo { + ipi_ifindex : libc::c_uint, + ipi_spec_dst : libc::in_addr, + ipi_addr : libc::in_addr, } -impl ConnectedUdpSocket { - pub fn framed(self) -> UdpFramed { - frame::new(frame::Socket::Connected(self)) - } -} +mod frame; +pub use self::frame::{UdpChannel, UdpFramed, VecUdpCodec, PeerServerMessage}; impl UdpSocket { /// Create a new UDP socket bound to the specified address. @@ -56,23 +47,41 @@ impl UdpSocket { /// `addr` provided. If the result is `Ok`, the socket has successfully bound. pub fn bind(addr: SocketAddr, handle: Handle) -> io::Result<UdpSocket> { let socket = Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))?; - socket.set_only_v6(false)?; - socket.set_nonblocking(true)?; - socket.set_reuse_port(true)?; + + let off: libc::c_int = 0; + let on: libc::c_int = 1; +// unsafe { +// let ret = libc::setsockopt(socket.as_raw_fd(), +// libc::IPPROTO_IP, +// 3, +// &off as *const _ as *const libc::c_void, +// mem::size_of_val(&off) as libc::socklen_t); +// if ret != 0 { +// let err: Result<(), _> = Err(io::Error::last_os_error()); +// err.expect("setsockopt failed"); +// } +// debug!("set IP_PKTINFO"); +// } unsafe { - let optval: libc::c_int = 1; let ret = libc::setsockopt(socket.as_raw_fd(), libc::IPPROTO_IPV6, IPV6_RECVPKTINFO, - &optval as *const _ as *const libc::c_void, - mem::size_of_val(&optval) as libc::socklen_t); + &on as *const _ as *const libc::c_void, + mem::size_of_val(&on) as libc::socklen_t); if ret != 0 { let err: Result<(), _> = Err(io::Error::last_os_error()); err.expect("setsockopt failed"); } + + debug!("set IPV6_PKTINFO"); } + socket.set_only_v6(false)?; + socket.set_nonblocking(true)?; + socket.set_reuse_port(true)?; + socket.set_reuse_address(true)?; + socket.bind(&addr.into())?; Self::from_socket(socket.into_udp_socket(), handle) } @@ -91,8 +100,7 @@ impl UdpSocket { /// This can be used in conjunction with net2's `UdpBuilder` interface to /// configure a socket before it's handed off, such as setting options like /// `reuse_address` or binding to multiple addresses. - pub fn from_socket(socket: net::UdpSocket, - handle: Handle) -> io::Result<UdpSocket> { + pub fn from_socket(socket: net::UdpSocket, handle: Handle) -> io::Result<UdpSocket> { let udp = mio::net::UdpSocket::from_socket(socket)?; UdpSocket::new(udp, handle) } @@ -117,7 +125,7 @@ impl UdpSocket { /// break them into separate objects, allowing them to interact more /// easily. pub fn framed(self) -> UdpFramed { - frame::new(frame::Socket::Unconnected(self)) + frame::new(self) } /// Returns the local address that this stream is bound to. @@ -125,13 +133,6 @@ impl UdpSocket { self.io.get_ref().local_addr() } - /// Connects the UDP socket setting the default destination for send() and - /// limiting packets that are read via recv from the address specified in addr. - pub fn connect(self, addr: &SocketAddr) -> io::Result<ConnectedUdpSocket> { - self.io.get_ref().connect(*addr)?; - Ok(ConnectedUdpSocket{ inner: self, addr: *addr }) - } - /// Sends data on the socket to the address previously bound via connect(). /// On success, returns the number of bytes written. pub fn send(&self, buf: &[u8]) -> io::Result<usize> { @@ -155,13 +156,27 @@ impl UdpSocket { if let Async::NotReady = self.io.poll_read() { return Err(io::ErrorKind::WouldBlock.into()) } - match self.io.get_ref().recv(buf) { - Ok(n) => Ok(n), + let mut cmsgs = CmsgSpace::<in6_pktinfo>::new(); + let res = recvmsg(self.io.get_ref().as_raw_fd(), + &[IoVec::from_mut_slice(buf)], + Some(&mut cmsgs), + MsgFlags::empty()); + + match res { + Ok(msg) => { + debug!("address: {:?}", msg.address); + Ok(msg.bytes) + }, + Err(nix::Error::Sys(Errno::EAGAIN)) => { + debug!("EAGAIN"); + self.io.need_read(); + Err(io::ErrorKind::WouldBlock.into()) + }, + Err(nix::Error::Sys(errno)) => { + Err(io::Error::last_os_error()) + }, Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_read(); - } - Err(e) + Err(io::Error::new(io::ErrorKind::Other, e)) } } } @@ -212,13 +227,40 @@ impl UdpSocket { if let Async::NotReady = self.io.poll_read() { return Err(io::ErrorKind::WouldBlock.into()) } - match self.io.get_ref().recv_from(buf) { - Ok(n) => Ok(n), - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_read(); + if let Async::NotReady = self.io.poll_read() { + return Err(io::ErrorKind::WouldBlock.into()) + } + let mut cmsgs = CmsgSpace::<[u8; 1024]>::new(); + let res = recvmsg(self.io.get_ref().as_raw_fd(), + &[IoVec::from_mut_slice(buf)], + Some(&mut cmsgs), + MsgFlags::empty()); + + match res { + Ok(msg) => { + for cmsg in msg.cmsgs() { + match cmsg { + ControlMessage::Unknown(_) => { + debug!("unknown cmsg"); + } + _ => debug!("known cmsg") + } } - Err(e) + if let Some(SockAddr::Inet(addr)) = msg.address { + Ok((msg.bytes, addr.to_std())) + } else { + Err(io::Error::new(io::ErrorKind::Other, "invalid source address")) + } + }, + Err(nix::Error::Sys(Errno::EAGAIN)) => { + self.io.need_read(); + Err(io::ErrorKind::WouldBlock.into()) + }, + Err(nix::Error::Sys(errno)) => { + Err(io::Error::last_os_error()) + }, + Err(e) => { + Err(io::Error::new(io::ErrorKind::Other, e)) } } } |