aboutsummaryrefslogtreecommitdiffstats
path: root/src/receive.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/receive.go')
-rw-r--r--src/receive.go205
1 files changed, 87 insertions, 118 deletions
diff --git a/src/receive.go b/src/receive.go
index 5f46925..c47d93c 100644
--- a/src/receive.go
+++ b/src/receive.go
@@ -72,43 +72,6 @@ func (device *Device) addToHandshakeQueue(
}
}
-/* Routine determining the busy state of the interface
- *
- * TODO: Under load for some time
- */
-func (device *Device) RoutineBusyMonitor() {
- samples := 0
- interval := time.Second
- for timer := time.NewTimer(interval); ; {
-
- select {
- case <-device.signal.stop:
- return
- case <-timer.C:
- }
-
- // compute busy heuristic
-
- if len(device.queue.handshake) > QueueHandshakeBusySize {
- samples += 1
- } else if samples > 0 {
- samples -= 1
- }
- samples %= 30
- busy := samples > 5
-
- // update busy state
-
- if busy {
- atomic.StoreInt32(&device.underLoad, AtomicTrue)
- } else {
- atomic.StoreInt32(&device.underLoad, AtomicFalse)
- }
-
- timer.Reset(interval)
- }
-}
-
func (device *Device) RoutineReceiveIncomming() {
logDebug := device.log.Debug
@@ -118,117 +81,121 @@ func (device *Device) RoutineReceiveIncomming() {
// wait for new conn
- var conn *net.UDPConn
+ logDebug.Println("Waiting for udp socket")
select {
+ case <-device.signal.stop:
+ return
+
case <-device.signal.newUDPConn:
+
+ // fetch connection
+
device.net.mutex.RLock()
- conn = device.net.conn
+ conn := device.net.conn
device.net.mutex.RUnlock()
+ if conn == nil {
+ continue
+ }
- case <-device.signal.stop:
- return
- }
-
- if conn == nil {
- continue
- }
+ logDebug.Println("Listening for inbound packets")
- // receive datagrams until closed
+ // receive datagrams until conn is closed
- buffer := device.GetMessageBuffer()
+ buffer := device.GetMessageBuffer()
- for {
+ for {
- // read next datagram
+ // read next datagram
- size, raddr, err := conn.ReadFromUDP(buffer[:]) // TODO: This is broken
+ size, raddr, err := conn.ReadFromUDP(buffer[:]) // Blocks sometimes
- if err != nil {
- break
- }
+ if err != nil {
+ break
+ }
- if size < MinMessageSize {
- continue
- }
+ if size < MinMessageSize {
+ continue
+ }
- // check size of packet
+ // check size of packet
- packet := buffer[:size]
- msgType := binary.LittleEndian.Uint32(packet[:4])
+ packet := buffer[:size]
+ msgType := binary.LittleEndian.Uint32(packet[:4])
- var okay bool
+ var okay bool
- switch msgType {
+ switch msgType {
- // check if transport
+ // check if transport
- case MessageTransportType:
+ case MessageTransportType:
- // check size
+ // check size
- if len(packet) < MessageTransportType {
- continue
- }
+ if len(packet) < MessageTransportType {
+ continue
+ }
- // lookup key pair
+ // lookup key pair
- receiver := binary.LittleEndian.Uint32(
- packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
- )
- value := device.indices.Lookup(receiver)
- keyPair := value.keyPair
- if keyPair == nil {
- continue
- }
+ receiver := binary.LittleEndian.Uint32(
+ packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
+ )
+ value := device.indices.Lookup(receiver)
+ keyPair := value.keyPair
+ if keyPair == nil {
+ continue
+ }
- // check key-pair expiry
+ // check key-pair expiry
- if keyPair.created.Add(RejectAfterTime).Before(time.Now()) {
- continue
- }
+ if keyPair.created.Add(RejectAfterTime).Before(time.Now()) {
+ continue
+ }
- // create work element
+ // create work element
- peer := value.peer
- elem := &QueueInboundElement{
- packet: packet,
- buffer: buffer,
- keyPair: keyPair,
- dropped: AtomicFalse,
- }
- elem.mutex.Lock()
+ peer := value.peer
+ elem := &QueueInboundElement{
+ packet: packet,
+ buffer: buffer,
+ keyPair: keyPair,
+ dropped: AtomicFalse,
+ }
+ elem.mutex.Lock()
- // add to decryption queues
+ // add to decryption queues
- device.addToInboundQueue(device.queue.decryption, elem)
- device.addToInboundQueue(peer.queue.inbound, elem)
- buffer = nil
- continue
+ device.addToInboundQueue(device.queue.decryption, elem)
+ device.addToInboundQueue(peer.queue.inbound, elem)
+ buffer = device.GetMessageBuffer()
+ continue
- // otherwise it is a handshake related packet
+ // otherwise it is a handshake related packet
- case MessageInitiationType:
- okay = len(packet) == MessageInitiationSize
+ case MessageInitiationType:
+ okay = len(packet) == MessageInitiationSize
- case MessageResponseType:
- okay = len(packet) == MessageResponseSize
+ case MessageResponseType:
+ okay = len(packet) == MessageResponseSize
- case MessageCookieReplyType:
- okay = len(packet) == MessageCookieReplySize
- }
+ case MessageCookieReplyType:
+ okay = len(packet) == MessageCookieReplySize
+ }
- if okay {
- device.addToHandshakeQueue(
- device.queue.handshake,
- QueueHandshakeElement{
- msgType: msgType,
- buffer: buffer,
- packet: packet,
- source: raddr,
- },
- )
- buffer = device.GetMessageBuffer()
+ if okay {
+ device.addToHandshakeQueue(
+ device.queue.handshake,
+ QueueHandshakeElement{
+ msgType: msgType,
+ buffer: buffer,
+ packet: packet,
+ source: raddr,
+ },
+ )
+ buffer = device.GetMessageBuffer()
+ }
}
}
}
@@ -326,10 +293,11 @@ func (device *Device) RoutineHandshake() {
return
}
- busy := atomic.LoadInt32(&device.underLoad) == AtomicTrue
-
- if busy {
+ if device.IsUnderLoad() {
if !device.mac.CheckMAC2(elem.packet, elem.source) {
+
+ // construct cookie reply
+
sender := binary.LittleEndian.Uint32(elem.packet[4:8]) // "sender" always follows "type"
reply, err := device.CreateMessageCookieReply(elem.packet, sender, elem.source)
if err != nil {
@@ -347,6 +315,7 @@ func (device *Device) RoutineHandshake() {
}
continue
}
+
if !device.ratelimiter.Allow(elem.source.IP) {
continue
}
@@ -577,7 +546,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
// write to tun
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
- _, err := device.tun.Write(elem.packet)
+ _, err := device.tun.device.Write(elem.packet)
device.PutMessageBuffer(elem.buffer)
if err != nil {
logError.Println("Failed to write packet to TUN device:", err)