From 833597b585f460aaa17bad93ad59290ec282e77e Mon Sep 17 00:00:00 2001 From: "Jason A. Donenfeld" Date: Sat, 22 Sep 2018 06:29:02 +0200 Subject: More pooling --- receive.go | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) (limited to 'receive.go') diff --git a/receive.go b/receive.go index 9bf3af3..ab86913 100644 --- a/receive.go +++ b/receive.go @@ -55,6 +55,7 @@ func (device *Device) addToInboundAndDecryptionQueues(inboundQueue chan *QueueIn return false } default: + device.PutInboundElement(element) return false } } @@ -168,15 +169,15 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) { } // create work element - peer := value.peer - elem := &QueueInboundElement{ - packet: packet, - buffer: buffer, - keypair: keypair, - dropped: AtomicFalse, - endpoint: endpoint, - } + elem := device.GetInboundElement() + elem.packet = packet + elem.buffer = buffer + elem.keypair = keypair + elem.dropped = AtomicFalse + elem.endpoint = endpoint + elem.counter = 0 + elem.mutex = sync.Mutex{} elem.mutex.Lock() // add to decryption queues @@ -246,6 +247,7 @@ func (device *Device) RoutineDecryption() { // check if dropped if elem.IsDropped() { + device.PutInboundElement(elem) continue } @@ -280,7 +282,6 @@ func (device *Device) RoutineDecryption() { elem.Drop() device.PutMessageBuffer(elem.buffer) elem.buffer = nil - elem.mutex.Unlock() } elem.mutex.Unlock() } @@ -487,12 +488,16 @@ func (peer *Peer) RoutineSequentialReceiver() { logDebug := device.log.Debug var elem *QueueInboundElement + var ok bool defer func() { logDebug.Println(peer, "- Routine: sequential receiver - stopped") peer.routines.stopping.Done() - if elem != nil && elem.buffer != nil { - device.PutMessageBuffer(elem.buffer) + if elem != nil { + if elem.buffer != nil { + device.PutMessageBuffer(elem.buffer) + } + device.PutInboundElement(elem) } }() @@ -501,8 +506,11 @@ func (peer *Peer) RoutineSequentialReceiver() { peer.routines.starting.Done() for { - if elem != nil && elem.buffer != nil { - device.PutMessageBuffer(elem.buffer) + if elem != nil { + if elem.buffer != nil { + device.PutMessageBuffer(elem.buffer) + } + device.PutInboundElement(elem) } select { @@ -510,7 +518,7 @@ func (peer *Peer) RoutineSequentialReceiver() { case <-peer.routines.stop: return - case elem, ok := <-peer.queue.inbound: + case elem, ok = <-peer.queue.inbound: if !ok { return @@ -621,9 +629,7 @@ func (peer *Peer) RoutineSequentialReceiver() { offset := MessageTransportOffsetContent atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) - _, err := device.tun.device.Write( - elem.buffer[:offset+len(elem.packet)], - offset) + _, err := device.tun.device.Write(elem.buffer[:offset+len(elem.packet)], offset) if err != nil { logError.Println("Failed to write packet to TUN device:", err) } -- cgit v1.2.3-59-g8ed1b