From 48c3b87eb824deb1cb3178a7cdd42276dbc70d2d Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Mon, 11 Jan 2021 17:34:02 -0800 Subject: device: use channel close to shut down and drain decryption channel This is similar to commit e1fa1cc5560020e67d33aa7e74674853671cf0a0, but for the decryption channel. It is an alternative fix to f9f655567930a4cd78d40fa4ba0d58503335ae6a. Signed-off-by: Josh Bleecher Snyder --- device/receive.go | 73 +++++++++++++++++++------------------------------------ 1 file changed, 25 insertions(+), 48 deletions(-) (limited to 'device/receive.go') diff --git a/device/receive.go b/device/receive.go index fa31a1a..20e0c8f 100644 --- a/device/receive.go +++ b/device/receive.go @@ -109,6 +109,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) { logDebug := device.log.Debug defer func() { logDebug.Println("Routine: receive incoming IPv" + strconv.Itoa(IP) + " - stopped") + device.queue.decryption.wg.Done() device.net.stopping.Done() }() @@ -206,7 +207,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) { peer.queue.RLock() if peer.isRunning.Get() { - if device.addToInboundAndDecryptionQueues(peer.queue.inbound, device.queue.decryption, elem) { + if device.addToInboundAndDecryptionQueues(peer.queue.inbound, device.queue.decryption.c, elem) { buffer = device.GetMessageBuffer() } } else { @@ -258,59 +259,35 @@ func (device *Device) RoutineDecryption() { }() logDebug.Println("Routine: decryption worker - started") - for { - select { - case <-device.signals.stop: - for { - select { - case elem, ok := <-device.queue.decryption: - if ok { - if !elem.IsDropped() { - elem.Drop() - device.PutMessageBuffer(elem.buffer) - } - elem.Unlock() - } - default: - return - } - } - - case elem, ok := <-device.queue.decryption: + for elem := range device.queue.decryption.c { + // check if dropped - if !ok { - return - } - - // check if dropped - - if elem.IsDropped() { - continue - } + if elem.IsDropped() { + continue + } - // split message into fields + // split message into fields - counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent] - content := elem.packet[MessageTransportOffsetContent:] + counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent] + content := elem.packet[MessageTransportOffsetContent:] - // decrypt and release to consumer + // decrypt and release to consumer - var err error - elem.counter = binary.LittleEndian.Uint64(counter) - // copy counter to nonce - binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter) - elem.packet, err = elem.keypair.receive.Open( - content[:0], - nonce[:], - content, - nil, - ) - if err != nil { - elem.Drop() - device.PutMessageBuffer(elem.buffer) - } - elem.Unlock() + var err error + elem.counter = binary.LittleEndian.Uint64(counter) + // copy counter to nonce + binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter) + elem.packet, err = elem.keypair.receive.Open( + content[:0], + nonce[:], + content, + nil, + ) + if err != nil { + elem.Drop() + device.PutMessageBuffer(elem.buffer) } + elem.Unlock() } } -- cgit v1.2.3-59-g8ed1b