aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJake McGinty <me@jake.su>2018-06-01 16:59:37 -0500
committerJake McGinty <me@jake.su>2018-06-01 17:03:30 -0500
commita2c84873b8d7c9cc5fb3136cb6ae392685d2e54a (patch)
treea443a9fbbd22ec533b84556b8231e34ce15e4509
parentfinish up nonce rework (diff)
downloadwireguard-rs-a2c84873b8d7c9cc5fb3136cb6ae392685d2e54a.tar.xz
wireguard-rs-a2c84873b8d7c9cc5fb3136cb6ae392685d2e54a.zip
encryption wip
-rw-r--r--src/interface/peer_server.rs56
-rw-r--r--src/peer.rs36
2 files changed, 67 insertions, 25 deletions
diff --git a/src/interface/peer_server.rs b/src/interface/peer_server.rs
index d03fa58..aa82fb7 100644
--- a/src/interface/peer_server.rs
+++ b/src/interface/peer_server.rs
@@ -62,6 +62,7 @@ pub struct PeerServer {
rng : ThreadRng,
cpu_pool : CpuPool,
decrypt_channel : Channel<(Endpoint, Transport, Vec<u8>, SessionType)>,
+ encrypt_channel : Channel<(SharedPeer, (Endpoint, Vec<u8>))>,
}
impl PeerServer {
@@ -81,6 +82,7 @@ impl PeerServer {
rng : rand::thread_rng(),
cpu_pool : CpuPool::new_num_cpus(),
decrypt_channel : mpsc::unbounded().into(),
+ encrypt_channel : mpsc::unbounded().into(),
})
}
@@ -250,7 +252,8 @@ impl PeerServer {
}
debug!("got handshake response (0x02)");
- let mut state = self.shared_state.borrow_mut();
+ let shared_state = self.shared_state.clone();
+ let mut state = shared_state.borrow_mut();
let our_index = LittleEndian::read_u32(&packet[8..]);
let peer_ref = state.index_map.get(&our_index)
.ok_or_else(|| format_err!("unknown our_index ({})", our_index))?
@@ -266,11 +269,11 @@ impl PeerServer {
if !peer.outgoing_queue.is_empty() {
debug!("sending {} queued egress packets", peer.outgoing_queue.len());
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
}
} else {
debug!("sending empty keepalive");
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
}
} else {
error!("peer not ready for transport after processing handshake response. this shouldn't happen.");
@@ -307,6 +310,18 @@ impl PeerServer {
Ok(())
}
+ fn encrypt_and_send(&mut self, peer_ref: SharedPeer, peer: &mut Peer, packet: UtunPacket) -> Result<(), Error> {
+ let tx = self.encrypt_channel.tx.clone();
+ let f = self.cpu_pool.spawn(peer.handle_outgoing_transport(packet)?)
+ .and_then(move |result| {
+ tx.unbounded_send((peer_ref, result)).expect("broken decrypt channel");
+ Ok(())
+ })
+ .map_err(|e| warn!("{:?}", e));
+ self.handle.spawn(f);
+ Ok(())
+ }
+
fn handle_ingress_decrypted_transport(&mut self, addr: Endpoint, orig_packet: Transport, raw_packet: Vec<u8>, session_type: SessionType)
-> Result<(), Error>
{
@@ -316,7 +331,8 @@ impl PeerServer {
let needs_handshake = {
let mut peer = peer_ref.borrow_mut();
let transition = peer.handle_incoming_decrypted_transport(addr, &raw_packet, session_type)?;
- let mut state = self.shared_state.borrow_mut();
+ let shared_state = self.shared_state.clone();
+ let mut state = shared_state.borrow_mut();
if let SessionTransition::Transition(possible_dead_index) = transition {
if let Some(index) = possible_dead_index {
let _ = state.index_map.remove(&index);
@@ -325,10 +341,7 @@ impl PeerServer {
let outgoing: Vec<UtunPacket> = peer.outgoing_queue.drain(..).collect();
for packet in outgoing {
- match peer.handle_outgoing_transport(packet.payload()) {
- Ok(message) => self.send_to_peer(message)?,
- Err(e) => warn!("failed to encrypt packet: {}", e)
- }
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
}
self.timer.send_after(*WIPE_AFTER_TIME, TimerMessage::Wipe(Rc::downgrade(&peer_ref)));
@@ -369,7 +382,7 @@ impl PeerServer {
}
while let Some(packet) = peer.outgoing_queue.pop_front() {
- self.send_to_peer(peer.handle_outgoing_transport(packet.payload())?)?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, packet)?;
}
}
@@ -383,6 +396,13 @@ impl PeerServer {
Ok(())
}
+ fn handle_egress_encrypted_packet(&mut self, peer_ref: SharedPeer, endpoint: Endpoint, packet: Vec<u8>) -> Result<(), Error> {
+ let mut peer = peer_ref.borrow_mut();
+ peer.handle_outgoing_encrypted_transport(&packet);
+
+ self.send_to_peer((endpoint, packet))
+ }
+
fn send_cookie_reply(&mut self, addr: Endpoint, mac1: &[u8], index: u32) -> Result<(), Error> {
let reply = match addr.ip() {
IpAddr::V4(ip) => self.cookie.generate_reply(index, mac1, &ip.octets())?,
@@ -488,7 +508,7 @@ impl PeerServer {
}
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
debug!("sent passive keepalive packet");
self.timer.send_after(*KEEPALIVE_TIMEOUT, PassiveKeepAlive(peer_ref.clone()));
@@ -506,7 +526,7 @@ impl PeerServer {
bail!("persistent keepalive tick (waiting ~{}s due to last authenticated packet time)", wait.as_secs());
}
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.encrypt_and_send(upgraded_peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
let handle = self.timer.send_after(persistent_keepalive, PersistentKeepAlive(peer_ref.clone()));
peer.timers.persistent_timer = Some(handle);
debug!("sent persistent keepalive packet");
@@ -564,7 +584,7 @@ impl PeerServer {
if let Some(keepalive) = peer.info.persistent_keepalive() {
let handle = self.timer.send_after(keepalive, TimerMessage::PersistentKeepAlive(Rc::downgrade(&peer_ref)));
peer.timers.persistent_timer = Some(handle);
- self.send_to_peer(peer.handle_outgoing_transport(&[])?)?;
+ self.encrypt_and_send(peer_ref.clone(), &mut peer, UtunPacket::Inet4(vec![]))?;
debug!("set new keepalive timer and immediately sent new keepalive packet.");
}
}
@@ -638,6 +658,18 @@ impl Future for PeerServer {
}
loop {
+ // Handle UDP packets from the outside world
+ match self.encrypt_channel.rx.poll() {
+ Ok(Async::Ready(Some((peer_ref, (endpoint, packet))))) => {
+ let _ = self.handle_egress_encrypted_packet(peer_ref, endpoint, packet).map_err(|e| warn!("UDP ERR: {:?}", e));
+ },
+ Ok(Async::NotReady) => { break; },
+ Ok(Async::Ready(None)) => bail!("incoming udp stream ended unexpectedly"),
+ Err(e) => bail!("incoming udp stream error: {:?}", e)
+ }
+ }
+
+ loop {
// Handle packets coming from the local tunnel
match self.outgoing.rx.poll() {
Ok(Async::Ready(Some(packet))) => {
diff --git a/src/peer.rs b/src/peer.rs
index c0b21eb..3a853ae 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -395,33 +395,43 @@ impl Peer {
Ok(transition)
}
- pub fn handle_outgoing_transport(&mut self, packet: &[u8]) -> Result<(Endpoint, Vec<u8>), Error> {
+ pub fn handle_outgoing_transport(&mut self, packet: UtunPacket)
+ -> Result<Box<Future<Item = (Endpoint, Vec<u8>), Error = Error> + 'static + Send>, Error>
+ {
let session = self.sessions.current.as_mut().ok_or_else(|| err_msg("no current noise session"))?;
let endpoint = self.info.endpoint.ok_or_else(|| err_msg("no known peer endpoint"))?;
- let padding = if packet.len() % PADDING_MULTIPLE != 0 {
- PADDING_MULTIPLE - (packet.len() % PADDING_MULTIPLE)
+ let padding = if packet.payload().len() % PADDING_MULTIPLE != 0 {
+ PADDING_MULTIPLE - (packet.payload().len() % PADDING_MULTIPLE)
} else { 0 };
- let padded_len = packet.len() + padding;
+ let padded_len = packet.payload().len() + padding;
let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
ensure!(session.nonce < REJECT_AFTER_MESSAGES, "exceeded REJECT-AFTER-MESSAGES");
ensure!(session.birthday.elapsed() < *REJECT_AFTER_TIME, "exceeded REJECT-AFTER-TIME");
+ let mut transport = session.noise.get_async_transport_state()?.clone();
+ session.nonce += 1;
+ let nonce = session.nonce - 1;
+
out_packet[0] = 4;
LittleEndian::write_u32(&mut out_packet[4..], session.their_index);
- LittleEndian::write_u64(&mut out_packet[8..], session.nonce);
- let padded_packet = &[packet, &vec![0u8; padding]].concat();
- let len = session.noise.write_async_message(session.nonce, padded_packet, &mut out_packet[16..])?;
- session.nonce += 1;
- self.tx_bytes += len as u64;
+ LittleEndian::write_u64(&mut out_packet[8..], nonce);
- if !packet.is_empty() {
+ Ok(Box::new(future::lazy(move || {
+ let padded_packet = &[packet.payload(), &vec![0u8; padding]].concat();
+ let len = transport.write_transport_message(nonce, padded_packet, &mut out_packet[16..])?;
+ out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+ Ok((endpoint, out_packet))
+ })))
+ }
+
+ pub fn handle_outgoing_encrypted_transport(&mut self, packet: &[u8]) {
+ self.tx_bytes += packet.len() as u64;
+
+ if packet.len() > 32 { // TODO make constant
self.timers.data_sent = Timestamp::now();
}
self.timers.authenticated_traversed = Timestamp::now();
-
- out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
- Ok((endpoint, out_packet))
}
pub fn to_config_string(&self) -> String {