aboutsummaryrefslogtreecommitdiffstats
path: root/src/crypto_pool.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/crypto_pool.rs')
-rw-r--r--src/crypto_pool.rs17
1 files changed, 10 insertions, 7 deletions
diff --git a/src/crypto_pool.rs b/src/crypto_pool.rs
index e0756e9..b8e4536 100644
--- a/src/crypto_pool.rs
+++ b/src/crypto_pool.rs
@@ -50,13 +50,14 @@ pub struct DecryptResult {
/// Spawn a thread pool to efficiently process
/// the CPU-intensive encryption/decryption.
pub fn create() -> Sender<Work> {
- let threads = num_cpus::get() - 1; // One thread for I/O.
+ let threads = num_cpus::get(); // One thread for I/O.
let (sender, receiver) = unbounded();
+ debug!("spinning up a crypto pool with {} threads", threads);
crossbeam::scope(|s| {
- for i in 0..threads {
+ for _ in 0..threads {
let rx = receiver.clone();
- s.spawn(move || worker(rx.clone()));
+ thread::spawn(move || worker(rx.clone()));
}
});
@@ -64,6 +65,7 @@ pub fn create() -> Sender<Work> {
}
fn worker(receiver: Receiver<Work>) {
+ loop {
select_loop! {
recv(receiver, work) => {
match work {
@@ -80,12 +82,12 @@ fn worker(receiver: Receiver<Work>) {
raw_packet.truncate(0);
}
- tx.unbounded_send(DecryptResult {
+ executor::spawn(tx.send(DecryptResult {
endpoint: element.endpoint,
orig_packet: element.packet,
out_packet: raw_packet,
session_type: element.session_type,
- }).unwrap();
+ })).wait_future().unwrap();
},
Work::Encrypt((tx, mut element)) => {
let padding = if element.in_packet.len() % PADDING_MULTIPLE != 0 {
@@ -104,13 +106,14 @@ fn worker(receiver: Receiver<Work>) {
&mut out_packet[16..]).unwrap();
out_packet.truncate(TRANSPORT_HEADER_SIZE + len);
- tx.unbounded_send(EncryptResult {
+ executor::spawn(tx.send(EncryptResult {
endpoint: element.endpoint,
our_index: element.our_index,
out_packet,
- }).unwrap();
+ })).wait_future().unwrap();
}
}
}
}
+ }
} \ No newline at end of file