diff options
author | Jake McGinty <me@jake.su> | 2017-12-31 02:27:29 -0800 |
---|---|---|
committer | Jake McGinty <me@jake.su> | 2017-12-31 02:27:29 -0800 |
commit | 480d9ba46266ea656b3412441e7b67f95f08df0a (patch) | |
tree | 0cf9f17462d4545b68ee0b9cb7d801684dc39683 /src | |
parent | use blake2-rfc instead of rust-crypto (diff) | |
download | wireguard-rs-480d9ba46266ea656b3412441e7b67f95f08df0a.tar.xz wireguard-rs-480d9ba46266ea656b3412441e7b67f95f08df0a.zip |
move outgoing peer logic to PeerServer as well
Diffstat (limited to 'src')
-rw-r--r-- | src/interface/mod.rs | 40 | ||||
-rw-r--r-- | src/interface/peer_server.rs | 25 |
2 files changed, 29 insertions, 36 deletions
diff --git a/src/interface/mod.rs b/src/interface/mod.rs index c6464ab..76a6430 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -96,40 +96,14 @@ impl Interface { let utun_stream = UtunStream::connect(&self.name, &core.handle()).unwrap().framed(VecUtunCodec{}); let (utun_writer, utun_reader) = utun_stream.split(); - let utun_fut = utun_reader.for_each({ - let state = self.state.clone(); - let utun_handle = core.handle(); - let udp_tx = peer_server.udp_tx(); - move |packet| { - debug_packet("received UTUN packet: ", &packet); - let state = state.borrow(); - let mut ping_packet = [0u8; 1500]; - let destination = Ipv4Packet::new(&packet).unwrap().get_destination(); - if let Some((_, _, peer)) = state.ip4_map.longest_match(destination) { - let mut peer = peer.borrow_mut(); - ping_packet[0] = 4; - let their_index = peer.their_current_index().expect("no current index for them"); - let endpoint = peer.info.endpoint.unwrap(); - peer.tx_bytes += packet.len(); - let noise = peer.current_noise().expect("current noise session"); - LittleEndian::write_u32(&mut ping_packet[4..], their_index); - LittleEndian::write_u64(&mut ping_packet[8..], noise.sending_nonce().unwrap()); - let len = noise.write_message(&packet, &mut ping_packet[16..]).expect("failed to encrypt outgoing UDP packet"); - utun_handle.spawn(udp_tx.clone().send((endpoint, ping_packet[..(16+len)].to_owned())) - .map(|_| ()) - .map_err(|_| ())); - } else { - warn!("got packet with no available outgoing route"); - } - Ok(()) - } - }).map_err(|_| ()); + + let utun_read_fut = peer_server.tx().sink_map_err(|_| ()).send_all( + utun_reader.map_err(|_|())).map_err(|_|()); let utun_write_fut = utun_writer.sink_map_err(|_| ()).send_all( - utun_rx.map(|packet| { - debug_packet("sending UTUN: ", &packet); - packet - }).map_err(|_| ())).map_err(|_| ()); + utun_rx.map_err(|_| ())).map_err(|_| ()); + + let utun_fut = utun_write_fut.join(utun_read_fut); let handle = core.handle(); let listener = UnixListener::bind(ConfigurationServiceManager::get_path(&self.name).unwrap(), &handle).unwrap(); @@ -228,6 +202,6 @@ impl Interface { } }).map_err(|_| ()); - core.run(peer_server.join(utun_fut.join(utun_write_fut.join(config_fut.join(config_server))))).unwrap(); + core.run(peer_server.join(utun_fut.join(config_fut.join(config_server)))).unwrap(); } } diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs index 83d5af4..d271d22 100644 --- a/src/interface/peer_server.rs +++ b/src/interface/peer_server.rs @@ -7,11 +7,13 @@ use std::time::Duration; use byteorder::{ByteOrder, BigEndian, LittleEndian}; use futures::{Async, Future, Stream, Sink, Poll, future, unsync, sync, stream}; +use pnet::packet::ipv4::Ipv4Packet; +use snow::NoiseBuilder; use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed}; use tokio_core::reactor::Handle; use tokio_io::codec::Framed; use tokio_timer::{Interval, Timer}; -use snow::NoiseBuilder; +use treebitmap::{IpLookupTable, IpLookupTableOps}; pub type PeerServerMessage = (SocketAddr, Vec<u8>); @@ -203,8 +205,25 @@ impl PeerServer { } fn handle_outgoing_packet(&mut self, packet: Vec<u8>) { - debug!("handle_outgoing_packet()"); - + debug_packet("received UTUN packet: ", &packet); + let state = self.shared_state.borrow(); + let mut out_packet = vec![0u8; 1500]; + let destination = Ipv4Packet::new(&packet).unwrap().get_destination(); + if let Some((_, _, peer)) = state.ip4_map.longest_match(destination) { + let mut peer = peer.borrow_mut(); + out_packet[0] = 4; + let their_index = peer.their_current_index().expect("no current index for them"); + let endpoint = peer.info.endpoint.unwrap(); + peer.tx_bytes += packet.len(); + let noise = peer.current_noise().expect("current noise session"); + LittleEndian::write_u32(&mut out_packet[4..], their_index); + LittleEndian::write_u64(&mut out_packet[8..], noise.sending_nonce().unwrap()); + let len = noise.write_message(&packet, &mut out_packet[16..]).expect("failed to encrypt outgoing UDP packet"); + out_packet.truncate(16+len); + self.handle.spawn(self.udp_tx.clone().send((endpoint, out_packet)).then(|_| Ok(()))); + } else { + warn!("got packet with no available outgoing route"); + } } } |