aboutsummaryrefslogtreecommitdiffstats
path: root/src/send.go
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2017-09-09 15:03:01 +0200
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2017-09-09 15:03:01 +0200
commitf212795e51d839910085e08f9c6b09eac11863d3 (patch)
tree8ea6dba582442e46c1b21fe58f52be20f02c5fed /src/send.go
parentFixed TUN interface implementation os OS X (diff)
downloadwireguard-go-f212795e51d839910085e08f9c6b09eac11863d3.tar.xz
wireguard-go-f212795e51d839910085e08f9c6b09eac11863d3.zip
Improved readability of send/receive code
Diffstat (limited to '')
-rw-r--r--src/send.go167
1 files changed, 63 insertions, 104 deletions
diff --git a/src/send.go b/src/send.go
index c598ad4..e9dfb54 100644
--- a/src/send.go
+++ b/src/send.go
@@ -35,7 +35,7 @@ type QueueOutboundElement struct {
dropped int32
mutex sync.Mutex
buffer *[MaxMessageSize]byte // slice holding the packet data
- packet []byte // slice of "data" (always!)
+ packet []byte // slice of "buffer" (always!)
nonce uint64 // nonce for encryption
keyPair *KeyPair // key-pair for encryption
peer *Peer // related peer
@@ -52,11 +52,6 @@ func (peer *Peer) FlushNonceQueue() {
}
}
-var (
- ErrorNoEndpoint = errors.New("No known endpoint for peer")
- ErrorNoConnection = errors.New("No UDP socket for device")
-)
-
func (device *Device) NewOutboundElement() *QueueOutboundElement {
return &QueueOutboundElement{
dropped: AtomicFalse,
@@ -118,14 +113,13 @@ func (peer *Peer) SendBuffer(buffer []byte) (int, error) {
defer peer.mutex.RUnlock()
endpoint := peer.endpoint
- conn := peer.device.net.conn
-
if endpoint == nil {
- return 0, ErrorNoEndpoint
+ return 0, errors.New("No known endpoint for peer")
}
+ conn := peer.device.net.conn
if conn == nil {
- return 0, ErrorNoConnection
+ return 0, errors.New("No UDP socket for device")
}
return conn.WriteToUDP(buffer, endpoint)
@@ -189,16 +183,6 @@ func (device *Device) RoutineReadFromTUN() {
continue
}
- // check if known endpoint (drop early)
-
- peer.mutex.RLock()
- if peer.endpoint == nil {
- peer.mutex.RUnlock()
- logDebug.Println("No known endpoint for peer", peer.String())
- continue
- }
- peer.mutex.RUnlock()
-
// insert into nonce/pre-handshake queue
signalSend(peer.signal.handshakeReset)
@@ -211,86 +195,61 @@ func (device *Device) RoutineReadFromTUN() {
* Then assigns nonces to packets sequentially
* and creates "work" structs for workers
*
- * TODO: Avoid dynamic allocation of work queue elements
- *
* Obs. A single instance per peer
*/
func (peer *Peer) RoutineNonce() {
var keyPair *KeyPair
- var elem *QueueOutboundElement
device := peer.device
logDebug := device.log.Debug
logDebug.Println("Routine, nonce worker, started for peer", peer.String())
- func() {
-
- for {
- NextPacket:
-
- // wait for packet
+ for {
+ NextPacket:
+ select {
+ case <-peer.signal.stop:
+ return
- if elem == nil {
- select {
- case elem = <-peer.queue.nonce:
- case <-peer.signal.stop:
- return
- }
- }
+ case elem := <-peer.queue.nonce:
// wait for key pair
for {
- select {
- case <-peer.signal.newKeyPair:
- default:
- }
-
keyPair = peer.keyPairs.Current()
if keyPair != nil && keyPair.sendNonce < RejectAfterMessages {
if time.Now().Sub(keyPair.created) < RejectAfterTime {
break
}
}
+
signalSend(peer.signal.handshakeBegin)
logDebug.Println("Awaiting key-pair for", peer.String())
select {
case <-peer.signal.newKeyPair:
- logDebug.Println("Key-pair negotiated for", peer.String())
- goto NextPacket
-
case <-peer.signal.flushNonceQueue:
logDebug.Println("Clearing queue for", peer.String())
peer.FlushNonceQueue()
- elem = nil
goto NextPacket
-
case <-peer.signal.stop:
return
}
}
- // process current packet
+ // populate work element
- if elem != nil {
-
- // create work element
-
- elem.keyPair = keyPair
- elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1
- elem.dropped = AtomicFalse
- elem.peer = peer
- elem.mutex.Lock()
+ elem.peer = peer
+ elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1
+ elem.keyPair = keyPair
+ elem.dropped = AtomicFalse
+ elem.mutex.Lock()
- // add to parallel and sequential queue
+ // add to parallel and sequential queue
- addToEncryptionQueue(device.queue.encryption, elem)
- addToOutboundQueue(peer.queue.outbound, elem)
- elem = nil
- }
+ addToEncryptionQueue(device.queue.encryption, elem)
+ addToOutboundQueue(peer.queue.outbound, elem)
}
- }()
+ }
}
/* Encrypts the elements in the queue
@@ -300,7 +259,6 @@ func (peer *Peer) RoutineNonce() {
*/
func (device *Device) RoutineEncryption() {
- var elem *QueueOutboundElement
var nonce [chacha20poly1305.NonceSize]byte
logDebug := device.log.Debug
@@ -311,62 +269,62 @@ func (device *Device) RoutineEncryption() {
// fetch next element
select {
- case elem = <-device.queue.encryption:
case <-device.signal.stop:
logDebug.Println("Routine, encryption worker, stopped")
return
- }
- // check if dropped
+ case elem := <-device.queue.encryption:
- if elem.IsDropped() {
- continue
- }
+ // check if dropped
+
+ if elem.IsDropped() {
+ continue
+ }
- // populate header fields
+ // populate header fields
- header := elem.buffer[:MessageTransportHeaderSize]
+ header := elem.buffer[:MessageTransportHeaderSize]
- fieldType := header[0:4]
- fieldReceiver := header[4:8]
- fieldNonce := header[8:16]
+ fieldType := header[0:4]
+ fieldReceiver := header[4:8]
+ fieldNonce := header[8:16]
- binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
- binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex)
- binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
+ binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
+ binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex)
+ binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
- // pad content to MTU size
+ // pad content to multiple of 16
- mtu := int(atomic.LoadInt32(&device.tun.mtu))
- pad := len(elem.packet) % PaddingMultiple
- if pad > 0 {
- for i := 0; i < PaddingMultiple-pad && len(elem.packet) < mtu; i++ {
- elem.packet = append(elem.packet, 0)
+ mtu := int(atomic.LoadInt32(&device.tun.mtu))
+ rem := len(elem.packet) % PaddingMultiple
+ if rem > 0 {
+ for i := 0; i < PaddingMultiple-rem && len(elem.packet) < mtu; i++ {
+ elem.packet = append(elem.packet, 0)
+ }
}
- // TODO: How good is this code
- }
- // encrypt content (append to header)
-
- binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
- elem.keyPair.send.mutex.RLock()
- if elem.keyPair.send.aead == nil {
- // very unlikely (the key was deleted during queuing)
- elem.Drop()
- } else {
- elem.packet = elem.keyPair.send.aead.Seal(
- header,
- nonce[:],
- elem.packet,
- nil,
- )
- }
- elem.keyPair.send.mutex.RUnlock()
- elem.mutex.Unlock()
+ // encrypt content (append to header)
+
+ binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
+ elem.keyPair.send.mutex.RLock()
+ if elem.keyPair.send.aead == nil {
+ // very unlikely (the key was deleted during queuing)
+ elem.Drop()
+ } else {
+ elem.packet = elem.keyPair.send.aead.Seal(
+ header,
+ nonce[:],
+ elem.packet,
+ nil,
+ )
+ }
+ elem.mutex.Unlock()
+ elem.keyPair.send.mutex.RUnlock()
- // refresh key if necessary
+ // refresh key if necessary
- elem.peer.KeepKeyFreshSending()
+ elem.peer.KeepKeyFreshSending()
+ }
}
}
@@ -399,6 +357,7 @@ func (peer *Peer) RoutineSequentialSender() {
_, err := peer.SendBuffer(elem.packet)
device.PutMessageBuffer(elem.buffer)
if err != nil {
+ logDebug.Println("Failed to send authenticated packet to peer", peer.String())
continue
}
atomic.AddUint64(&peer.stats.txBytes, length)