1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
mod config;
mod grim_reaper;
pub mod peer_server;
use self::config::ConfigurationService;
use self::peer_server::PeerServer;
use router::Router;
use failure::{Error, err_msg};
use peer::Peer;
use std::io;
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::HashMap;
use types::{InterfaceInfo};
use pnet_packet::ipv4::Ipv4Packet;
use futures::{Future, Stream, Sink, unsync};
use tokio_core::reactor::Core;
use tokio_utun::{UtunStream, UtunCodec};
pub fn trace_packet(header: &str, packet: &[u8]) {
let packet = Ipv4Packet::new(packet);
trace!("{} {:?}", header, packet);
}
pub type SharedPeer = Rc<RefCell<Peer>>;
pub type SharedState = Rc<RefCell<State>>;
#[derive(Default)]
pub struct State {
pubkey_map: HashMap<[u8; 32], SharedPeer>,
index_map: HashMap<u32, SharedPeer>,
router: Router,
interface_info: InterfaceInfo,
}
pub struct Interface {
name: String,
state: SharedState,
}
struct VecUtunCodec;
pub enum UtunPacket {
Inet4(Vec<u8>),
Inet6(Vec<u8>),
}
impl UtunPacket {
pub fn payload(&self) -> &[u8] {
use self::UtunPacket::*;
match *self {
Inet4(ref payload) | Inet6(ref payload) => payload,
}
}
pub fn from(raw_packet: Vec<u8>) -> Result<UtunPacket, Error> {
match raw_packet[0] >> 4 {
4 => Ok(UtunPacket::Inet4(raw_packet)),
6 => Ok(UtunPacket::Inet6(raw_packet)),
_ => bail!("unrecognized IP version")
}
}
}
impl UtunCodec for VecUtunCodec {
type In = UtunPacket;
type Out = Vec<u8>;
fn decode(&mut self, buf: &[u8]) -> io::Result<Self::In> {
trace!("utun packet type {}", buf[3]);
match buf[4] >> 4 {
4 => Ok(UtunPacket::Inet4(buf[4..].to_vec())),
6 => Ok(UtunPacket::Inet6(buf[4..].to_vec())),
_ => Err(io::ErrorKind::InvalidData.into())
}
}
fn encode(&mut self, mut msg: Self::Out, buf: &mut Vec<u8>) {
buf.append(&mut msg);
}
}
impl Interface {
pub fn new(name: &str) -> Self {
let state = State::default();
Interface {
name: name.to_owned(),
state: Rc::new(RefCell::new(state)),
}
}
pub fn start(&mut self) -> Result<(), Error> {
let mut core = Core::new()?;
let (utun_tx, utun_rx) = unsync::mpsc::channel::<Vec<u8>>(1024);
let peer_server = PeerServer::new(core.handle(), self.state.clone(), utun_tx.clone())?;
let config_server = ConfigurationService::new(&self.name, &self.state, &core.handle())?;
let config_server = config_server.forward(peer_server.config_tx()).map_err(|_|()); // TODO: don't just forward, this is so hacky.
let utun_stream = UtunStream::connect(&self.name, &core.handle())?.framed(VecUtunCodec{});
let (utun_writer, utun_reader) = utun_stream.split();
let utun_read_fut = peer_server.tx()
.sink_map_err(|e| -> Error { e.into() })
.send_all(utun_reader.map_err(|e| -> Error { e.into() }))
.map_err(|e| { warn!("utun read error: {:?}", e); () });
let utun_write_fut = utun_writer
.sink_map_err(|e| -> Error { e.into() })
.send_all(utun_rx.map_err(|()| -> Error { err_msg("utun rx failure") }))
.map_err(|e| { warn!("utun write error: {:?}", e); () });
let utun_futs = utun_write_fut.join(utun_read_fut);
let fut = peer_server
.map_err(|e| error!("peer_server error: {:?}", e))
.join(config_server.join(utun_futs));
let _ = core.run(fut);
info!("reactor finished.");
Ok(())
}
}
|