aboutsummaryrefslogtreecommitdiffstats
path: root/device/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/send.go')
-rw-r--r--device/send.go93
1 files changed, 34 insertions, 59 deletions
diff --git a/device/send.go b/device/send.go
index 0801b71..1b16edd 100644
--- a/device/send.go
+++ b/device/send.go
@@ -352,6 +352,9 @@ func (peer *Peer) RoutineNonce() {
device := peer.device
logDebug := device.log.Debug
+ // We write to the encryption queue; keep it alive until we are done.
+ device.queue.encryption.wg.Add(1)
+
flush := func() {
for {
select {
@@ -368,6 +371,7 @@ func (peer *Peer) RoutineNonce() {
flush()
logDebug.Println(peer, "- Routine: nonce worker - stopped")
peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
+ device.queue.encryption.wg.Done() // no more writes from us
peer.routines.stopping.Done()
}()
@@ -455,7 +459,7 @@ NextPacket:
elem.Lock()
// add to parallel and sequential queue
- addToOutboundAndEncryptionQueues(peer.queue.outbound, device.queue.encryption, elem)
+ addToOutboundAndEncryptionQueues(peer.queue.outbound, device.queue.encryption.c, elem)
}
}
}
@@ -486,76 +490,46 @@ func (device *Device) RoutineEncryption() {
logDebug := device.log.Debug
- defer func() {
- for {
- select {
- case elem, ok := <-device.queue.encryption:
- if ok && !elem.IsDropped() {
- elem.Drop()
- device.PutMessageBuffer(elem.buffer)
- elem.Unlock()
- }
- default:
- goto out
- }
- }
- out:
- logDebug.Println("Routine: encryption worker - stopped")
- device.state.stopping.Done()
- }()
-
+ defer logDebug.Println("Routine: encryption worker - stopped")
logDebug.Println("Routine: encryption worker - started")
- for {
-
- // fetch next element
+ for elem := range device.queue.encryption.c {
- select {
- case <-device.signals.stop:
- return
-
- case elem, ok := <-device.queue.encryption:
-
- if !ok {
- return
- }
-
- // check if dropped
+ // check if dropped
- if elem.IsDropped() {
- continue
- }
+ if elem.IsDropped() {
+ continue
+ }
- // populate header fields
+ // populate header fields
- header := elem.buffer[:MessageTransportHeaderSize]
+ header := elem.buffer[:MessageTransportHeaderSize]
- fieldType := header[0:4]
- fieldReceiver := header[4:8]
- fieldNonce := header[8:16]
+ fieldType := header[0:4]
+ fieldReceiver := header[4:8]
+ fieldNonce := header[8:16]
- binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
- binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex)
- binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
+ binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
+ binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex)
+ binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
- // pad content to multiple of 16
+ // pad content to multiple of 16
- paddingSize := calculatePaddingSize(len(elem.packet), int(atomic.LoadInt32(&device.tun.mtu)))
- for i := 0; i < paddingSize; i++ {
- elem.packet = append(elem.packet, 0)
- }
+ paddingSize := calculatePaddingSize(len(elem.packet), int(atomic.LoadInt32(&device.tun.mtu)))
+ for i := 0; i < paddingSize; i++ {
+ elem.packet = append(elem.packet, 0)
+ }
- // encrypt content and release to consumer
+ // encrypt content and release to consumer
- binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
- elem.packet = elem.keypair.send.Seal(
- header,
- nonce[:],
- elem.packet,
- nil,
- )
- elem.Unlock()
- }
+ binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
+ elem.packet = elem.keypair.send.Seal(
+ header,
+ nonce[:],
+ elem.packet,
+ nil,
+ )
+ elem.Unlock()
}
}
@@ -576,6 +550,7 @@ func (peer *Peer) RoutineSequentialSender() {
select {
case elem, ok := <-peer.queue.outbound:
if ok {
+ elem.Lock()
if !elem.IsDropped() {
device.PutMessageBuffer(elem.buffer)
elem.Drop()