aboutsummaryrefslogtreecommitdiffstats
path: root/src/crypto_pool.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/crypto_pool.rs')
-rw-r--r--src/crypto_pool.rs115
1 files changed, 57 insertions, 58 deletions
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs
index b8e4536..fefc2f4 100644
--- a/src/crypto_pool.rs
+++ b/src/crypto_pool.rs
@@ -4,6 +4,8 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use futures::sync::mpsc;
use futures::executor;
use futures::Sink;
+use futures::future::*;
+use futures_cpupool::CpuPool;
use num_cpus;
use snow::AsyncTransportState;
use std::thread;
@@ -47,73 +49,70 @@ pub struct DecryptResult {
pub session_type: SessionType,
}
-/// Spawn a thread pool to efficiently process
-/// the CPU-intensive encryption/decryption.
-pub fn create() -> Sender<Work> {
- let threads = num_cpus::get(); // One thread for I/O.
- let (sender, receiver) = unbounded();
+pub struct Pool {
+ pool: CpuPool,
+}
- debug!("spinning up a crypto pool with {} threads", threads);
- crossbeam::scope(|s| {
- for _ in 0..threads {
- let rx = receiver.clone();
- thread::spawn(move || worker(rx.clone()));
+impl Pool {
+ pub fn new() -> Self {
+ let pool = CpuPool::new_num_cpus();
+ Self {
+ pool
}
- });
+ }
- sender
+ pub fn send(&self, work: Work) {
+ self.pool.spawn_fn(move || {
+ worker(work);
+ ok::<(), ()>(())
+ }).forget();
+ }
}
-fn worker(receiver: Receiver<Work>) {
- loop {
- select_loop! {
- recv(receiver, work) => {
- match work {
- Work::Decrypt((tx, element)) => {
- let mut raw_packet = vec![0u8; element.packet.len()];
- let nonce = element.packet.nonce();
- let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
- if len > 0 {
- let len = IpPacket::new(&raw_packet[..len])
- .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
- .length();
- raw_packet.truncate(len as usize);
- } else {
- raw_packet.truncate(0);
- }
+fn worker(work: Work) {
+ match work {
+ Work::Decrypt((tx, element)) => {
+ let mut raw_packet = vec![0u8; element.packet.len()];
+ let nonce = element.packet.nonce();
+ let len = element.transport.read_transport_message(nonce, element.packet.payload(), &mut raw_packet).unwrap();
+ if len > 0 {
+ let len = IpPacket::new(&raw_packet[..len])
+ .ok_or_else(||format_err!("invalid IP packet (len {})", len)).unwrap()
+ .length();
+ raw_packet.truncate(len as usize);
+ } else {
+ raw_packet.truncate(0);
+ }
- executor::spawn(tx.send(DecryptResult {
- endpoint: element.endpoint,
- orig_packet: element.packet,
- out_packet: raw_packet,
- session_type: element.session_type,
- })).wait_future().unwrap();
- },
- Work::Encrypt((tx, mut element)) => {
- let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
- PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
- } else { 0 };
- let padded_len = element.in_packet.len() + padding;
- let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
+ executor::spawn(tx.send(DecryptResult {
+ endpoint: element.endpoint,
+ orig_packet: element.packet,
+ out_packet: raw_packet,
+ session_type: element.session_type,
+ })).wait_future().unwrap();
+ },
+ Work::Encrypt((tx, mut element)) => {
+ let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
+ PADDING_MULTIPLE - (element.in_packet.len() % PADDING_MULTIPLE)
+ } else { 0 };
+ let padded_len = element.in_packet.len() + padding;
+ let mut out_packet = vec![0u8; padded_len + TRANSPORT_OVERHEAD];
- out_packet[0] = 4;
- LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
- LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
+ out_packet[0] = 4;
+ LittleEndian::write_u32(&mut out_packet[4..], element.their_index);
+ LittleEndian::write_u64(&mut out_packet[8..], element.nonce);
- element.in_packet.resize(padded_len, 0);
- let len = element.transport.write_transport_message(element.nonce,
- &element.in_packet,
- &mut out_packet[16..]).unwrap();
- out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
+ element.in_packet.resize(padded_len, 0);
+ let len = element.transport.write_transport_message(element.nonce,
+ &element.in_packet,
+ &mut out_packet[16..]).unwrap();
+ out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
- executor::spawn(tx.send(EncryptResult {
- endpoint: element.endpoint,
- our_index: element.our_index,
- out_packet,
- })).wait_future().unwrap();
- }
- }
+ executor::spawn(tx.send(EncryptResult {
+ endpoint: element.endpoint,
+ our_index: element.our_index,
+ out_packet,
+ })).wait_future().unwrap();
}
}
- }
} \ No newline at end of file