diff options
author | Jake McGinty <me@jake.su> | 2018-03-01 20:56:05 +0000 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2018-03-04 01:23:31 +0000 |
commit | a99105ffc21caf8ea5fc6c662efe5cc27a76ae7f (patch) | |
tree | 2173517522a2d5bbec8e4d347b3586d42ba6fb8c | |
parent | global: clean up logging, stop using env_logger (diff) | |
download | wireguard-rs-a99105ffc21caf8ea5fc6c662efe5cc27a76ae7f.tar.xz wireguard-rs-a99105ffc21caf8ea5fc6c662efe5cc27a76ae7f.zip |
udp: switch to own tokio UdpSocket implementation for performance reasons
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 2 | ||||
-rw-r--r-- | src/lib.rs | 9 | ||||
-rw-r--r-- | src/udp/frame.rs | 188 | ||||
-rw-r--r-- | src/udp/mod.rs | 477 |
6 files changed, 673 insertions, 5 deletions
@@ -1300,6 +1300,7 @@ dependencies = [ "hex 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "notify 4.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "pnet_packet 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -42,6 +42,7 @@ hex = "^0.3" notify = "4.0.0" rand = "^0.4" nix = "^0.10" +mio = "^0.6" pnet_packet = "^0.20" snow = { git = "https://github.com/mcginty/snow", features = ["ring-accelerated"], branch = "wireguard" } socket2 = "^0.3" diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 42e275c..f9045df 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -15,7 +15,7 @@ use failure::{Error, err_msg}; use futures::{Async, Future, Stream, Sink, Poll, unsync::mpsc, stream, future}; use rand::{self, Rng}; use socket2::{Socket, Domain, Type, Protocol}; -use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed}; +use udp::{UdpSocket, UdpCodec, UdpFramed}; use tokio_core::reactor::Handle; @@ -7,16 +7,18 @@ #![cfg_attr(feature = "cargo-clippy", allow(decimal_literal_representation))] #[macro_use] extern crate failure; +#[macro_use] extern crate futures; #[macro_use] extern crate lazy_static; #[macro_use] extern crate log; +#[macro_use] extern crate tokio_core; extern crate base64; extern crate blake2_rfc; extern crate byteorder; extern crate bytes; extern crate chacha20_poly1305_aead; -extern crate futures; extern crate hex; +extern crate mio; extern crate nix; extern crate notify; extern crate pnet_packet; @@ -25,7 +27,6 @@ extern crate snow; extern crate socket2; extern crate subtle; extern crate test; -extern crate tokio_core; extern crate tokio_io; extern crate tokio_uds; extern crate tokio_utun; @@ -34,6 +35,8 @@ extern crate tokio_signal; extern crate treebitmap; extern crate x25519_dalek; +mod udp; + pub mod consts; pub mod cookie; pub mod error; @@ -47,5 +50,3 @@ pub mod time; pub mod timer; pub mod ip_packet; pub mod xchacha20poly1305; - - diff --git a/src/udp/frame.rs b/src/udp/frame.rs new file mode 100644 index 0000000..c85c873 --- /dev/null +++ b/src/udp/frame.rs @@ -0,0 +1,188 @@ +use std::io; +use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4}; + +use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; + +use udp::{ConnectedUdpSocket, UdpSocket}; + +/// Encoding of frames via buffers. +/// +/// This trait is used when constructing an instance of `UdpFramed` and provides +/// the `In` and `Out` types which are decoded and encoded from the socket, +/// respectively. +/// +/// Because UDP is a connectionless protocol, the `decode` method receives the +/// address where data came from and the `encode` method is also responsible for +/// determining the remote host to which the datagram should be sent +/// +/// The trait itself is implemented on a type that can track state for decoding +/// or encoding, which is particularly useful for streaming parsers. In many +/// cases, though, this type will simply be a unit struct (e.g. `struct +/// HttpCodec`). +pub trait UdpCodec { + /// The type of decoded frames. + type In; + + /// The type of frames to be encoded. + type Out; + + /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// This method is called by `UdpFramed` on a single datagram which has been + /// read from a socket. The `buf` argument contains the data that was + /// received from the remote address, and `src` is the address the data came + /// from. Note that typically this method should require the entire contents + /// of `buf` to be valid or otherwise return an error with trailing data. + /// + /// Finally, if the bytes in the buffer are malformed then an error is + /// returned indicating why. This informs `Framed` that the stream is now + /// corrupt and should be terminated. + fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In>; + + /// Encodes a frame into the buffer provided. + /// + /// This method will encode `msg` into the byte buffer provided by `buf`. + /// The `buf` provided is an internal buffer of the `Framed` instance and + /// will be written out when possible. + /// + /// The encode method also determines the destination to which the buffer + /// should be directed, which will be returned as a `SocketAddr`. + fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr; +} + +pub enum Socket { + Unconnected(UdpSocket), + Connected(ConnectedUdpSocket), +} + +/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using +/// the `UdpCodec` trait to encode and decode frames. +/// +/// You can acquire a `UdpFramed` instance by using the `UdpSocket::framed` +/// adapter. +#[must_use = "sinks do nothing unless polled"] +pub struct UdpFramed<C> { + socket: Socket, + codec: C, + rd: Vec<u8>, + wr: Vec<u8>, + out_addr: SocketAddr, + flushed: bool, +} + +impl<C: UdpCodec> Stream for UdpFramed<C> { + type Item = C::In; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<C::In>, io::Error> { + let (n, addr) = match self.socket { + Socket::Unconnected(ref socket) => try_nb!(socket.recv_from(&mut self.rd)), + Socket::Connected(ref socket) => (try_nb!(socket.inner.recv(&mut self.rd)), socket.addr), + }; + trace!("received {} bytes, decoding", n); + let frame = try!(self.codec.decode(&addr, &self.rd[..n])); + trace!("frame decoded from buffer"); + Ok(Async::Ready(Some(frame))) + } +} + +impl<C: UdpCodec> Sink for UdpFramed<C> { + type SinkItem = C::Out; + type SinkError = io::Error; + + fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> { + trace!("sending frame"); + + if !self.flushed { + match try!(self.poll_complete()) { + Async::Ready(()) => {}, + Async::NotReady => return Ok(AsyncSink::NotReady(item)), + } + } + + self.out_addr = self.codec.encode(item, &mut self.wr); + self.flushed = false; + trace!("frame encoded; length={}", self.wr.len()); + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), io::Error> { + if self.flushed { + return Ok(Async::Ready(())) + } + + trace!("flushing frame; length={}", self.wr.len()); + let n = match self.socket { + Socket::Unconnected(ref socket) => try_nb!(socket.send_to(&self.wr, &self.out_addr)), + Socket::Connected(ref socket) => try_nb!(socket.inner.send(&self.wr)) // TODO check to make sure the address is the connected address + }; + trace!("written {}", n); + + let wrote_all = n == self.wr.len(); + self.wr.clear(); + self.flushed = true; + + if wrote_all { + Ok(Async::Ready(())) + } else { + Err(io::Error::new(io::ErrorKind::Other, + "failed to write entire datagram to socket")) + } + } + + fn close(&mut self) -> Poll<(), io::Error> { + try_ready!(self.poll_complete()); + Ok(().into()) + } +} + +pub fn new<C: UdpCodec>(socket: Socket, codec: C) -> UdpFramed<C> { + UdpFramed { + socket, + codec, + out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)), + rd: vec![0; 64 * 1024], + wr: Vec::with_capacity(8 * 1024), + flushed: true, + } +} + +impl<C> UdpFramed<C> { + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_ref(&self) -> &UdpSocket { + match self.socket { + Socket::Connected(ref socket) => &socket.inner, + Socket::Unconnected(ref socket) => socket + } + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut UdpSocket { + match self.socket { + Socket::Connected(ref mut socket) => &mut socket.inner, + Socket::Unconnected(ref mut socket) => socket + } + } + + /// Consumes the `Framed`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> UdpSocket { + match self.socket { + Socket::Connected(socket) => socket.inner, + Socket::Unconnected(socket) => socket + } + } +}
\ No newline at end of file diff --git a/src/udp/mod.rs b/src/udp/mod.rs new file mode 100644 index 0000000..a7196f9 --- /dev/null +++ b/src/udp/mod.rs @@ -0,0 +1,477 @@ +#![allow(unused)] + +use std::io; +use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; +use std::fmt; + +use futures::{Async, Future, Poll}; +use mio; + +use tokio_core::reactor::{Handle, PollEvented}; + +/// An I/O object representing a UDP socket. +pub struct UdpSocket { + io: PollEvented<mio::net::UdpSocket>, +} + +mod frame; +pub use self::frame::{UdpFramed, UdpCodec}; + +pub struct ConnectedUdpSocket { + inner: UdpSocket, + addr: SocketAddr, +} + +impl ConnectedUdpSocket { + pub fn framed<C: UdpCodec>(self, codec: C) -> UdpFramed<C> { + frame::new(frame::Socket::Connected(self), codec) + } +} + +impl UdpSocket { + /// Create a new UDP socket bound to the specified address. + /// + /// This function will create a new UDP socket and attempt to bind it to the + /// `addr` provided. If the result is `Ok`, the socket has successfully bound. + pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<UdpSocket> { + let udp = try!(mio::net::UdpSocket::bind(addr)); + UdpSocket::new(udp, handle) + } + + fn new(socket: mio::net::UdpSocket, handle: &Handle) -> io::Result<UdpSocket> { + let io = try!(PollEvented::new(socket, handle)); + Ok(UdpSocket { io: io }) + } + + /// Creates a new `UdpSocket` from the previously bound socket provided. + /// + /// The socket given will be registered with the event loop that `handle` is + /// associated with. This function requires that `socket` has previously + /// been bound to an address to work correctly. + /// + /// This can be used in conjunction with net2's `UdpBuilder` interface to + /// configure a socket before it's handed off, such as setting options like + /// `reuse_address` or binding to multiple addresses. + pub fn from_socket(socket: net::UdpSocket, + handle: &Handle) -> io::Result<UdpSocket> { + let udp = try!(mio::net::UdpSocket::from_socket(socket)); + UdpSocket::new(udp, handle) + } + + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `UdpSocket` object, using the provided `UdpCodec` to read and write the + /// raw data. + /// + /// Raw UDP sockets work with datagrams, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of this socket by using the `UdpCodec` + /// trait to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things which require both read and write access to the underlying + /// object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `UdpFramed` returned by this method, which will + /// break them into separate objects, allowing them to interact more + /// easily. + pub fn framed<C: UdpCodec>(self, codec: C) -> UdpFramed<C> { + frame::new(frame::Socket::Unconnected(self), codec) + } + + /// Returns the local address that this stream is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Connects the UDP socket setting the default destination for send() and + /// limiting packets that are read via recv from the address specified in addr. + pub fn connect(self, addr: &SocketAddr) -> io::Result<ConnectedUdpSocket> { + self.io.get_ref().connect(*addr)?; + Ok(ConnectedUdpSocket{ inner: self, addr: *addr }) + } + + /// Sends data on the socket to the address previously bound via connect(). + /// On success, returns the number of bytes written. + pub fn send(&self, buf: &[u8]) -> io::Result<usize> { + if let Async::NotReady = self.io.poll_write() { + return Err(io::ErrorKind::WouldBlock.into()) + } + match self.io.get_ref().send(buf) { + Ok(n) => Ok(n), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + self.io.need_write(); + } + Err(e) + } + } + } + + /// Receives data from the socket previously bound with connect(). + /// On success, returns the number of bytes read. + pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + if let Async::NotReady = self.io.poll_read() { + return Err(io::ErrorKind::WouldBlock.into()) + } + match self.io.get_ref().recv(buf) { + Ok(n) => Ok(n), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + self.io.need_read(); + } + Err(e) + } + } + } + + /// Test whether this socket is ready to be read or not. + /// + /// If the socket is *not* readable then the current task is scheduled to + /// get a notification when the socket does become readable. That is, this + /// is only suitable for calling in a `Future::poll` method and will + /// automatically handle ensuring a retry once the socket is readable again. + pub fn poll_read(&self) -> Async<()> { + self.io.poll_read() + } + + /// Test whether this socket is ready to be written to or not. + /// + /// If the socket is *not* writable then the current task is scheduled to + /// get a notification when the socket does become writable. That is, this + /// is only suitable for calling in a `Future::poll` method and will + /// automatically handle ensuring a retry once the socket is writable again. + pub fn poll_write(&self) -> Async<()> { + self.io.poll_write() + } + + /// Sends data on the socket to the given address. On success, returns the + /// number of bytes written. + /// + /// Address type can be any implementer of `ToSocketAddrs` trait. See its + /// documentation for concrete examples. + pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { + if let Async::NotReady = self.io.poll_write() { + return Err(io::ErrorKind::WouldBlock.into()) + } + match self.io.get_ref().send_to(buf, target) { + Ok(n) => Ok(n), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + self.io.need_write(); + } + Err(e) + } + } + } + + /// Creates a future that will write the entire contents of the buffer + /// `buf` provided as a datagram to this socket. + /// + /// The returned future will return after data has been written to the + /// outbound socket. The future will resolve to the stream as well as the + /// buffer (for reuse if needed). + /// + /// Any error which happens during writing will cause both the stream and + /// the buffer to get destroyed. Note that failure to write the entire + /// buffer is considered an error for the purposes of sending a datagram. + /// + /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which + /// should be broadly applicable to accepting data which can be converted + /// to a slice. The `Window` struct is also available in this crate to + /// provide a different window into a slice if necessary. + pub fn send_dgram<T>(self, buf: T, addr: SocketAddr) -> SendDgram<T> + where T: AsRef<[u8]>, + { + SendDgram(Some((self, buf, addr))) + } + + /// Receives data from the socket. On success, returns the number of bytes + /// read and the address from whence the data came. + pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + if let Async::NotReady = self.io.poll_read() { + return Err(io::ErrorKind::WouldBlock.into()) + } + match self.io.get_ref().recv_from(buf) { + Ok(n) => Ok(n), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + self.io.need_read(); + } + Err(e) + } + } + } + + /// Creates a future that receive a datagram to be written to the buffer + /// provided. + /// + /// The returned future will return after a datagram has been received on + /// this socket. The future will resolve to the socket, the buffer, the + /// amount of data read, and the address the data was received from. + /// + /// An error during reading will cause the socket and buffer to get + /// destroyed and the socket will be returned. + /// + /// The `buf` parameter here only requires the `AsMut<[u8]>` trait, which + /// should be broadly applicable to accepting data which can be converted + /// to a slice. The `Window` struct is also available in this crate to + /// provide a different window into a slice if necessary. + pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T> + where T: AsMut<[u8]>, + { + RecvDgram(Some((self, buf))) + } + + /// Gets the value of the `SO_BROADCAST` option for this socket. + /// + /// For more information about this option, see + /// [`set_broadcast`][link]. + /// + /// [link]: #method.set_broadcast + pub fn broadcast(&self) -> io::Result<bool> { + self.io.get_ref().broadcast() + } + + /// Sets the value of the `SO_BROADCAST` option for this socket. + /// + /// When enabled, this socket is allowed to send packets to a broadcast + /// address. + pub fn set_broadcast(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_broadcast(on) + } + + /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// For more information about this option, see + /// [`set_multicast_loop_v4`][link]. + /// + /// [link]: #method.set_multicast_loop_v4 + pub fn multicast_loop_v4(&self) -> io::Result<bool> { + self.io.get_ref().multicast_loop_v4() + } + + /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// If enabled, multicast packets will be looped back to the local socket. + /// Note that this may not have any affect on IPv6 sockets. + pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_multicast_loop_v4(on) + } + + /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// For more information about this option, see + /// [`set_multicast_ttl_v4`][link]. + /// + /// [link]: #method.set_multicast_ttl_v4 + pub fn multicast_ttl_v4(&self) -> io::Result<u32> { + self.io.get_ref().multicast_ttl_v4() + } + + /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// Indicates the time-to-live value of outgoing multicast packets for + /// this socket. The default value is 1 which means that multicast packets + /// don't leave the local network unless explicitly requested. + /// + /// Note that this may not have any affect on IPv6 sockets. + pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_multicast_ttl_v4(ttl) + } + + /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// For more information about this option, see + /// [`set_multicast_loop_v6`][link]. + /// + /// [link]: #method.set_multicast_loop_v6 + pub fn multicast_loop_v6(&self) -> io::Result<bool> { + self.io.get_ref().multicast_loop_v6() + } + + /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// Controls whether this socket sees the multicast packets it sends itself. + /// Note that this may not have any affect on IPv4 sockets. + pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_multicast_loop_v6(on) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// For more information about this option, see [`set_ttl`][link]. + /// + /// [link]: #method.set_ttl + pub fn ttl(&self) -> io::Result<u32> { + self.io.get_ref().ttl() + } + + /// Sets the value for the `IP_TTL` option on this socket. + /// + /// This value sets the time-to-live field that is used in every packet sent + /// from this socket. + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_ttl(ttl) + } + + /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. + /// + /// This function specifies a new multicast group for this socket to join. + /// The address must be a valid multicast address, and `interface` is the + /// address of the local interface with which the system should join the + /// multicast group. If it's equal to `INADDR_ANY` then an appropriate + /// interface is chosen by the system. + pub fn join_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.io.get_ref().join_multicast_v4(multiaddr, interface) + } + + /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. + /// + /// This function specifies a new multicast group for this socket to join. + /// The address must be a valid multicast address, and `interface` is the + /// index of the interface to join/leave (or 0 to indicate any interface). + pub fn join_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.io.get_ref().join_multicast_v6(multiaddr, interface) + } + + /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. + /// + /// For more information about this option, see + /// [`join_multicast_v4`][link]. + /// + /// [link]: #method.join_multicast_v4 + pub fn leave_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.io.get_ref().leave_multicast_v4(multiaddr, interface) + } + + /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. + /// + /// For more information about this option, see + /// [`join_multicast_v6`][link]. + /// + /// [link]: #method.join_multicast_v6 + pub fn leave_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.io.get_ref().leave_multicast_v6(multiaddr, interface) + } + + /// Sets the value for the `IPV6_V6ONLY` option on this socket. + /// + /// If this is set to `true` then the socket is restricted to sending and + /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications + /// can bind the same port at the same time. + /// + /// If this is set to `false` then the socket can be used to send and + /// receive packets from an IPv4-mapped IPv6 address. + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.io.get_ref().set_only_v6(only_v6) + } + + /// Gets the value of the `IPV6_V6ONLY` option for this socket. + /// + /// For more information about this option, see [`set_only_v6`][link]. + /// + /// [link]: #method.set_only_v6 + pub fn only_v6(&self) -> io::Result<bool> { + self.io.get_ref().only_v6() + } +} + +impl fmt::Debug for UdpSocket { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +/// A future used to write the entire contents of some data to a UDP socket. +/// +/// This is created by the `UdpSocket::send_dgram` method. +#[must_use = "futures do nothing unless polled"] +pub struct SendDgram<T>(Option<(UdpSocket, T, SocketAddr)>); + +fn incomplete_write(reason: &str) -> io::Error { + io::Error::new(io::ErrorKind::Other, reason) +} + +impl<T> Future for SendDgram<T> + where T: AsRef<[u8]>, +{ + type Item = (UdpSocket, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> { + { + let (ref sock, ref buf, ref addr) = + *self.0.as_ref().expect("SendDgram polled after completion"); + let n = try_nb!(sock.send_to(buf.as_ref(), addr)); + if n != buf.as_ref().len() { + return Err(incomplete_write("failed to send entire message \ + in datagram")) + } + } + + let (sock, buf, _addr) = self.0.take().unwrap(); + Ok(Async::Ready((sock, buf))) + } +} + +/// A future used to receive a datagram from a UDP socket. +/// +/// This is created by the `UdpSocket::recv_dgram` method. +#[must_use = "futures do nothing unless polled"] +pub struct RecvDgram<T>(Option<(UdpSocket, T)>); + +impl<T> Future for RecvDgram<T> + where T: AsMut<[u8]>, +{ + type Item = (UdpSocket, T, usize, SocketAddr); + type Error = io::Error; + + fn poll(&mut self) -> Poll<Self::Item, io::Error> { + let (n, addr) = { + let (ref socket, ref mut buf) = + *self.0.as_mut().expect("RecvDgram polled after completion"); + + try_nb!(socket.recv_from(buf.as_mut())) + }; + + let (socket, buf) = self.0.take().unwrap(); + Ok(Async::Ready((socket, buf, n, addr))) + } +} + +#[cfg(all(unix, not(target_os = "fuchsia")))] +mod sys { + use std::os::unix::prelude::*; + use super::UdpSocket; + + impl AsRawFd for UdpSocket { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + // TODO: let's land these upstream with mio and then we can add them here. + // + // use std::os::windows::prelude::*; + // use super::UdpSocket; + // + // impl AsRawHandle for UdpSocket { + // fn as_raw_handle(&self) -> RawHandle { + // self.io.get_ref().as_raw_handle() + // } + // } +}
\ No newline at end of file |