aboutsummaryrefslogtreecommitdiffstats
path: root/receive.go
diff options
context:
space:
mode:
Diffstat (limited to 'receive.go')
-rw-r--r--receive.go79
1 files changed, 26 insertions, 53 deletions
diff --git a/receive.go b/receive.go
index b23c5e0..6b6543c 100644
--- a/receive.go
+++ b/receive.go
@@ -43,59 +43,28 @@ func (elem *QueueInboundElement) IsDropped() bool {
return atomic.LoadInt32(&elem.dropped) == AtomicTrue
}
-func (device *Device) addToInboundQueue(
- queue chan *QueueInboundElement,
- element *QueueInboundElement,
-) {
- for {
+func (device *Device) addToInboundAndDecryptionQueues(inboundQueue chan *QueueInboundElement, decryptionQueue chan *QueueInboundElement, element *QueueInboundElement) bool {
+ select {
+ case inboundQueue <- element:
select {
- case queue <- element:
- return
+ case decryptionQueue <- element:
+ return true
default:
- select {
- case old := <-queue:
- old.Drop()
- default:
- }
+ element.Drop()
+ element.mutex.Unlock()
+ return false
}
+ default:
+ return false
}
}
-func (device *Device) addToDecryptionQueue(
- queue chan *QueueInboundElement,
- element *QueueInboundElement,
-) {
- for {
- select {
- case queue <- element:
- return
- default:
- select {
- case old := <-queue:
- // drop & release to potential consumer
- old.Drop()
- old.mutex.Unlock()
- default:
- }
- }
- }
-}
-
-func (device *Device) addToHandshakeQueue(
- queue chan QueueHandshakeElement,
- element QueueHandshakeElement,
-) {
- for {
- select {
- case queue <- element:
- return
- default:
- select {
- case elem := <-queue:
- device.PutMessageBuffer(elem.buffer)
- default:
- }
- }
+func (device *Device) addToHandshakeQueue(queue chan QueueHandshakeElement, element QueueHandshakeElement) bool {
+ select {
+ case queue <- element:
+ return true
+ default:
+ return false
}
}
@@ -154,6 +123,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) {
}
if err != nil {
+ device.PutMessageBuffer(buffer)
return
}
@@ -212,9 +182,9 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) {
// add to decryption queues
if peer.isRunning.Get() {
- device.addToDecryptionQueue(device.queue.decryption, elem)
- device.addToInboundQueue(peer.queue.inbound, elem)
- buffer = device.GetMessageBuffer()
+ if device.addToInboundAndDecryptionQueues(peer.queue.inbound, device.queue.decryption, elem) {
+ buffer = device.GetMessageBuffer()
+ }
}
continue
@@ -235,7 +205,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) {
}
if okay {
- device.addToHandshakeQueue(
+ if (device.addToHandshakeQueue(
device.queue.handshake,
QueueHandshakeElement{
msgType: msgType,
@@ -243,8 +213,9 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) {
packet: packet,
endpoint: endpoint,
},
- )
- buffer = device.GetMessageBuffer()
+ )) {
+ buffer = device.GetMessageBuffer()
+ }
}
}
}
@@ -307,6 +278,8 @@ func (device *Device) RoutineDecryption() {
)
if err != nil {
elem.Drop()
+ device.PutMessageBuffer(elem.buffer)
+ elem.mutex.Unlock()
}
elem.mutex.Unlock()
}