aboutsummaryrefslogtreecommitdiffstats
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/linux/udp.rs237
-rw-r--r--src/platform/udp.rs2
2 files changed, 136 insertions, 103 deletions
diff --git a/src/platform/linux/udp.rs b/src/platform/linux/udp.rs
index 9815ab1..2d77df5 100644
--- a/src/platform/linux/udp.rs
+++ b/src/platform/linux/udp.rs
@@ -9,14 +9,17 @@ use std::mem;
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::os::unix::io::RawFd;
use std::ptr;
+use std::sync::Arc;
-fn errno() -> libc::c_int {
- unsafe {
- let ptr = libc::__errno_location();
- if ptr.is_null() {
- 0
- } else {
- *ptr
+pub struct FD(RawFd);
+
+impl Drop for FD {
+ fn drop(&mut self) {
+ if self.0 != -1 {
+ log::debug!("linux udp, release fd (fd = {})", self.0);
+ unsafe {
+ libc::close(self.0);
+ };
}
}
}
@@ -47,19 +50,19 @@ pub struct LinuxUDP();
pub struct LinuxOwner {
port: u16,
- sock4: Option<RawFd>,
- sock6: Option<RawFd>,
+ sock4: Option<Arc<FD>>,
+ sock6: Option<Arc<FD>>,
}
pub enum LinuxUDPReader {
- V4(RawFd),
- V6(RawFd),
+ V4(Arc<FD>),
+ V6(Arc<FD>),
}
#[derive(Clone)]
pub struct LinuxUDPWriter {
- sock4: RawFd,
- sock6: RawFd,
+ sock4: Arc<FD>,
+ sock6: Arc<FD>,
}
pub enum LinuxEndpoint {
@@ -67,6 +70,67 @@ pub enum LinuxEndpoint {
V6(EndpointV6),
}
+fn errno() -> libc::c_int {
+ unsafe {
+ let ptr = libc::__errno_location();
+ if ptr.is_null() {
+ 0
+ } else {
+ *ptr
+ }
+ }
+}
+
+fn setsockopt<V: Sized>(
+ fd: RawFd,
+ level: libc::c_int,
+ name: libc::c_int,
+ value: &V,
+) -> Result<(), io::Error> {
+ let res = unsafe {
+ libc::setsockopt(
+ fd,
+ level,
+ name,
+ mem::transmute(value),
+ mem::size_of_val(value).try_into().unwrap(),
+ )
+ };
+ if res == 0 {
+ Ok(())
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::Other,
+ format!("Failed to set sockopt (res = {}, errno = {})", res, errno()),
+ ))
+ }
+}
+
+#[inline(always)]
+fn setsockopt_int(
+ fd: RawFd,
+ level: libc::c_int,
+ name: libc::c_int,
+ value: libc::c_int,
+) -> Result<(), io::Error> {
+ setsockopt(fd, level, name, &value)
+}
+
+#[allow(non_snake_case)]
+const fn CMSG_ALIGN(len: usize) -> usize {
+ (((len) + mem::size_of::<u32>() - 1) & !(mem::size_of::<u32>() - 1))
+}
+
+#[allow(non_snake_case)]
+const fn CMSG_LEN(len: usize) -> usize {
+ CMSG_ALIGN(len + mem::size_of::<libc::cmsghdr>())
+}
+
+#[inline(always)]
+fn safe_cast<T, D>(v: &mut T) -> *mut D {
+ (v as *mut T) as *mut D
+}
+
impl Endpoint for LinuxEndpoint {
fn clear_src(&mut self) {
match self {
@@ -134,56 +198,6 @@ impl Endpoint for LinuxEndpoint {
}
}
-fn setsockopt<V: Sized>(
- fd: RawFd,
- level: libc::c_int,
- name: libc::c_int,
- value: &V,
-) -> Result<(), io::Error> {
- let res = unsafe {
- libc::setsockopt(
- fd,
- level,
- name,
- mem::transmute(value),
- mem::size_of_val(value).try_into().unwrap(),
- )
- };
- if res == 0 {
- Ok(())
- } else {
- Err(io::Error::new(
- io::ErrorKind::Other,
- format!("Failed to set sockopt (res = {}, errno = {})", res, errno()),
- ))
- }
-}
-
-#[inline(always)]
-fn setsockopt_int(
- fd: RawFd,
- level: libc::c_int,
- name: libc::c_int,
- value: libc::c_int,
-) -> Result<(), io::Error> {
- setsockopt(fd, level, name, &value)
-}
-
-#[allow(non_snake_case)]
-const fn CMSG_ALIGN(len: usize) -> usize {
- (((len) + mem::size_of::<u32>() - 1) & !(mem::size_of::<u32>() - 1))
-}
-
-#[allow(non_snake_case)]
-const fn CMSG_LEN(len: usize) -> usize {
- CMSG_ALIGN(len + mem::size_of::<libc::cmsghdr>())
-}
-
-#[inline(always)]
-fn safe_cast<T, D>(v: &mut T) -> *mut D {
- (v as *mut T) as *mut D
-}
-
impl LinuxUDPReader {
fn read6(fd: RawFd, buf: &mut [u8]) -> Result<(usize, LinuxEndpoint), io::Error> {
log::trace!(
@@ -192,6 +206,8 @@ impl LinuxUDPReader {
buf.len()
);
+ debug_assert!(buf.len() > 0, "reading into empty buffer (will fail)");
+
let mut iovs: [libc::iovec; 1] = [libc::iovec {
iov_base: buf.as_mut_ptr() as *mut core::ffi::c_void,
iov_len: buf.len(),
@@ -215,10 +231,16 @@ impl LinuxUDPReader {
let len = unsafe { libc::recvmsg(fd, &mut hdr as *mut libc::msghdr, 0) };
- if len < 0 {
+ if len <= 0 {
+ // TODO: FIX!
return Err(io::Error::new(
io::ErrorKind::NotConnected,
- "failed to receive",
+ format!(
+ "Failed to receive (len = {}, fd = {}, errno = {})",
+ len,
+ fd,
+ errno()
+ ),
));
}
@@ -238,6 +260,8 @@ impl LinuxUDPReader {
buf.len()
);
+ debug_assert!(buf.len() > 0, "reading into empty buffer (will fail)");
+
let mut iovs: [libc::iovec; 1] = [libc::iovec {
iov_base: buf.as_mut_ptr() as *mut core::ffi::c_void,
iov_len: buf.len(),
@@ -261,10 +285,15 @@ impl LinuxUDPReader {
let len = unsafe { libc::recvmsg(fd, &mut hdr as *mut libc::msghdr, 0) };
- if len < 0 {
+ if len <= 0 {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
- "failed to receive",
+ format!(
+ "failed to receive (len = {}, fd = {}, errno = {})",
+ len,
+ fd,
+ errno()
+ ),
));
}
@@ -283,8 +312,8 @@ impl Reader<LinuxEndpoint> for LinuxUDPReader {
fn read(&self, buf: &mut [u8]) -> Result<(usize, LinuxEndpoint), Self::Error> {
match self {
- Self::V4(fd) => Self::read4(*fd, buf),
- Self::V6(fd) => Self::read6(*fd, buf),
+ Self::V4(fd) => Self::read4(fd.0, buf),
+ Self::V6(fd) => Self::read6(fd.0, buf),
}
}
}
@@ -426,8 +455,8 @@ impl Writer<LinuxEndpoint> for LinuxUDPWriter {
fn write(&self, buf: &[u8], dst: &mut LinuxEndpoint) -> Result<(), Self::Error> {
match dst {
- LinuxEndpoint::V4(ref mut end) => Self::write4(self.sock4, buf, end),
- LinuxEndpoint::V6(ref mut end) => Self::write6(self.sock6, buf, end),
+ LinuxEndpoint::V4(ref mut end) => Self::write4(self.sock4.0, buf, end),
+ LinuxEndpoint::V6(ref mut end) => Self::write6(self.sock6.0, buf, end),
}
}
}
@@ -448,21 +477,21 @@ impl Owner for LinuxOwner {
}
}
let value = value.unwrap_or(0);
- set_mark(self.sock6, value)?;
- set_mark(self.sock4, value)
+ set_mark(self.sock6.as_ref().map(|fd| fd.0), value)?;
+ set_mark(self.sock4.as_ref().map(|fd| fd.0), value)
}
}
impl Drop for LinuxOwner {
fn drop(&mut self) {
- log::trace!("closing the bind (port {})", self.port);
- self.sock4.map(|fd| unsafe {
- libc::shutdown(fd, libc::SHUT_RDWR);
- libc::close(fd)
+ log::debug!("closing the bind (port = {})", self.port);
+ self.sock4.as_ref().map(|fd| unsafe {
+ log::debug!("shutdown IPv4 (fd = {})", fd.0);
+ libc::shutdown(fd.0, libc::SHUT_RDWR);
});
- self.sock6.map(|fd| unsafe {
- libc::shutdown(fd, libc::SHUT_RDWR);
- libc::close(fd)
+ self.sock6.as_ref().map(|fd| unsafe {
+ log::debug!("shutdown IPv6 (fd = {})", fd.0);
+ libc::shutdown(fd.0, libc::SHUT_RDWR);
});
}
}
@@ -491,7 +520,7 @@ impl LinuxUDP {
// create socket fd
let fd: RawFd = unsafe { libc::socket(libc::AF_INET6, libc::SOCK_DGRAM, 0) };
if fd < 0 {
- log::debug!("failed to create IPv6 socket");
+ log::debug!("failed to create IPv6 socket (errno = {})", errno());
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to create socket",
@@ -502,11 +531,13 @@ impl LinuxUDP {
setsockopt_int(fd, libc::IPPROTO_IPV6, libc::IPV6_RECVPKTINFO, 1)?;
setsockopt_int(fd, libc::IPPROTO_IPV6, libc::IPV6_V6ONLY, 1)?;
+ const INADDR_ANY: libc::in6_addr = libc::in6_addr { s6_addr: [0; 16] };
+
// bind
let mut sockaddr = libc::sockaddr_in6 {
- sin6_addr: libc::in6_addr { s6_addr: [0; 16] },
+ sin6_addr: INADDR_ANY,
sin6_family: libc::AF_INET6 as libc::sa_family_t,
- sin6_port: port.to_be(), // convert to network (big-endian) byteorder
+ sin6_port: port.to_be(), // convert to network (big-endian) byte-order
sin6_scope_id: 0,
sin6_flowinfo: 0,
};
@@ -514,13 +545,12 @@ impl LinuxUDP {
let err = unsafe {
libc::bind(
fd,
- mem::transmute(&sockaddr as *const libc::sockaddr_in6),
+ safe_cast(&mut sockaddr),
mem::size_of_val(&sockaddr).try_into().unwrap(),
)
};
-
if err != 0 {
- log::debug!("failed to bind IPv6 socket");
+ log::debug!("failed to bind IPv6 socket (errno = {})", errno());
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to create socket",
@@ -532,12 +562,12 @@ impl LinuxUDP {
let err = unsafe {
libc::getsockname(
fd,
- mem::transmute(&mut sockaddr as *mut libc::sockaddr_in6),
+ safe_cast(&mut sockaddr),
&mut socklen as *mut libc::socklen_t,
)
};
if err != 0 {
- log::debug!("failed to get port of IPv6 socket");
+ log::debug!("failed to get port of IPv6 socket (errno = {})", errno());
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to create socket",
@@ -569,7 +599,7 @@ impl LinuxUDP {
// create socket fd
let fd: RawFd = unsafe { libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0) };
if fd < 0 {
- log::trace!("failed to create IPv4 socket (errno = {})", errno());
+ log::debug!("failed to create IPv4 socket (errno = {})", errno());
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to create socket",
@@ -592,13 +622,12 @@ impl LinuxUDP {
let err = unsafe {
libc::bind(
fd,
- mem::transmute(&sockaddr as *const libc::sockaddr_in),
+ safe_cast(&mut sockaddr),
mem::size_of_val(&sockaddr).try_into().unwrap(),
)
};
-
if err != 0 {
- log::trace!("failed to bind IPv4 socket (errno = {})", errno());
+ log::debug!("failed to bind IPv4 socket (errno = {})", errno());
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to create socket",
@@ -610,12 +639,12 @@ impl LinuxUDP {
let err = unsafe {
libc::getsockname(
fd,
- mem::transmute(&mut sockaddr as *mut libc::sockaddr_in),
+ safe_cast(&mut sockaddr),
&mut socklen as *mut libc::socklen_t,
)
};
if err != 0 {
- log::trace!("failed to get port of IPv4 socket (errno = {})", errno());
+ log::debug!("failed to get port of IPv4 socket (errno = {})", errno());
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to create socket",
@@ -656,26 +685,30 @@ impl PlatformUDP for LinuxUDP {
return Err(bind6.unwrap_err());
}
- let sock6 = bind6.ok().map(|(_, fd)| fd);
- let sock4 = bind4.ok().map(|(_, fd)| fd);
+ let sock6 = bind6.ok().map(|(_, fd)| Arc::new(FD(fd)));
+ let sock4 = bind4.ok().map(|(_, fd)| Arc::new(FD(fd)));
// create owner
let owner = LinuxOwner {
port,
- sock6: sock6,
- sock4: sock4,
+ sock6: sock6.clone(),
+ sock4: sock4.clone(),
};
// create readers
let mut readers: Vec<Self::Reader> = Vec::with_capacity(2);
- sock6.map(|sock| readers.push(LinuxUDPReader::V6(sock)));
- sock4.map(|sock| readers.push(LinuxUDPReader::V4(sock)));
+ sock6
+ .clone()
+ .map(|sock| readers.push(LinuxUDPReader::V6(sock)));
+ sock4
+ .clone()
+ .map(|sock| readers.push(LinuxUDPReader::V4(sock)));
debug_assert!(readers.len() > 0);
// create writer
let writer = LinuxUDPWriter {
- sock4: sock4.unwrap_or(-1),
- sock6: sock6.unwrap_or(-1),
+ sock4: sock4.unwrap_or(Arc::new(FD(-1))),
+ sock6: sock6.unwrap_or(Arc::new(FD(-1))),
};
Ok((readers, writer, owner))
diff --git a/src/platform/udp.rs b/src/platform/udp.rs
index e1180fb..4098b10 100644
--- a/src/platform/udp.rs
+++ b/src/platform/udp.rs
@@ -7,7 +7,7 @@ pub trait Reader<E: Endpoint>: Send + Sync {
fn read(&self, buf: &mut [u8]) -> Result<(usize, E), Self::Error>;
}
-pub trait Writer<E: Endpoint>: Send + Sync + Clone + 'static {
+pub trait Writer<E: Endpoint>: Send + Sync + 'static {
type Error: Error;
fn write(&self, buf: &[u8], dst: &mut E) -> Result<(), Self::Error>;