diff options
Diffstat (limited to 'src/crypto_pool.rs')
-rw-r--r-- | src/crypto_pool.rs | 98 |
1 files changed, 48 insertions, 50 deletions
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs index db5826e..c99002a 100644 --- a/src/crypto_pool.rs +++ b/src/crypto_pool.rs @@ -1,8 +1,6 @@ use consts::{PADDING_MULTIPLE, TRANSPORT_OVERHEAD, TRANSPORT_HEADER_SIZE}; use crossbeam_channel::{bounded, Receiver, Sender}; -use futures::sync::mpsc; -use futures::executor; -use futures::Sink; +use futures::task::Task; use num_cpus; use snow::AsyncTransportState; use std::thread; @@ -13,8 +11,8 @@ use ip_packet::IpPacket; use byteorder::{ByteOrder, LittleEndian}; pub enum Work { - Decrypt((mpsc::UnboundedSender<DecryptResult>, DecryptWork)), - Encrypt((mpsc::UnboundedSender<EncryptResult>, EncryptWork)), + Decrypt((Sender<DecryptResult>, Task, DecryptWork)), + Encrypt((Sender<EncryptResult>, Task, EncryptWork)), } pub struct EncryptWork { @@ -49,7 +47,7 @@ pub struct DecryptResult { /// Spawn a thread pool to efficiently process /// the CPU-intensive encryption/decryption. pub fn create() -> Sender<Work> { - let threads = num_cpus::get(); // One thread for I/O. + let threads = num_cpus::get() - 2; // One thread for I/O. let (sender, receiver) = bounded(4096); debug!("spinning up a crypto pool with {} threads", threads); @@ -62,55 +60,55 @@ pub fn create() -> Sender<Work> { } fn worker(receiver: Receiver<Work>) { - loop { - select_loop! { - recv(receiver, work) => { - match work { - Work::Decrypt((tx, element)) => { - let mut raw_packet = vec![0u8; element.packet.len()]; - let nonce = element.packet.nonce(); - let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); - if len > 0 { - let len = IpPacket::new(&raw_packet[..len]) - .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() - .length(); - raw_packet.truncate(len as usize); - } else { - raw_packet.truncate(0); - } + while let Some(work) = receiver.recv() { + match work { + Work::Decrypt((tx, task, element)) => { + let mut raw_packet = vec![0u8; element.packet.len()]; + let nonce = element.packet.nonce(); + let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap(); + if len > 0 { + let len = IpPacket::new(&raw_packet[..len]) + .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap() + .length(); + raw_packet.truncate(len as usize); + } else { + raw_packet.truncate(0); + } - executor::spawn(tx.send(DecryptResult { - endpoint: element.endpoint, - orig_packet: element.packet, - out_packet: raw_packet, - session_type: element.session_type, - })).wait_future().unwrap(); - }, - Work::Encrypt((tx, mut element)) => { - let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { - PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) - } else { 0 }; - let padded_len = element.in_packet.len() + padding; - let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; + tx.send(DecryptResult { + endpoint: element.endpoint, + orig_packet: element.packet, + out_packet: raw_packet, + session_type: element.session_type, + }); - out_packet[0] = 4; - LittleEndian::write_u32(&mut out_packet[4..], element.their_index); - LittleEndian::write_u64(&mut out_packet[8..], element.nonce); + task.notify(); + }, + Work::Encrypt((tx, task, mut element)) => { + let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 { + PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE) + } else { 0 }; + let padded_len = element.in_packet.len() + padding; + let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD]; - element.in_packet.resize(padded_len, 0); - let len = element.transport.write_transport_message(element.nonce, - &element.in_packet, - &mut out_packet[16..]).unwrap(); - out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + out_packet[0] = 4; + LittleEndian::write_u32(&mut out_packet[4..], element.their_index); + LittleEndian::write_u64(&mut out_packet[8..], element.nonce); - executor::spawn(tx.send(EncryptResult { - endpoint: element.endpoint, - our_index: element.our_index, - out_packet, - })).wait_future().unwrap(); - } + element.in_packet.resize(padded_len, 0); + let len = element.transport.write_transport_message(element.nonce, + &element.in_packet, + &mut out_packet[16..]).unwrap(); + out_packet.truncate(TRANSPORT_HEADER_SIZE + len); + + tx.send(EncryptResult { + endpoint: element.endpoint, + our_index: element.our_index, + out_packet, + }); + + task.notify(); } } } - } }
\ No newline at end of file |