From 93e3848ea76e755477bec8d9540a3c4c31ea7320 Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Thu, 13 Jul 2017 14:32:40 +0200 Subject: Terminate on interface deletion Program now terminates when the interface is removed Increases the number of os threads (relevant for Go <1.5, not tested) More consistent commenting Improved logging (additional peer information) --- src/send.go | 69 +++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 40 insertions(+), 29 deletions(-) (limited to 'src/send.go') diff --git a/src/send.go b/src/send.go index 5ea9a8f..d8ddc82 100644 --- a/src/send.go +++ b/src/send.go @@ -3,6 +3,8 @@ package main import ( "encoding/binary" "golang.org/x/crypto/chacha20poly1305" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" "net" "sync" "sync/atomic" @@ -21,28 +23,26 @@ import ( * The functions in this file occure (roughly) in the order packets are processed. */ -/* A work unit - * - * The sequential consumers will attempt to take the lock, - * workers release lock when they have completed work on the packet. +/* The sequential consumers will attempt to take the lock, + * workers release lock when they have completed work (encryption) on the packet. * * If the element is inserted into the "encryption queue", - * the content is preceeded by enough "junk" to contain the header + * the content is preceeded by enough "junk" to contain the transport header * (to allow the construction of transport messages in-place) */ type QueueOutboundElement struct { dropped int32 mutex sync.Mutex - data [MaxMessageSize]byte - packet []byte // slice of "data" (always!) - nonce uint64 // nonce for encryption - keyPair *KeyPair // key-pair for encryption - peer *Peer // related peer + data [MaxMessageSize]byte // slice holding the packet data + packet []byte // slice of "data" (always!) + nonce uint64 // nonce for encryption + keyPair *KeyPair // key-pair for encryption + peer *Peer // related peer } func (peer *Peer) FlushNonceQueue() { elems := len(peer.queue.nonce) - for i := 0; i < elems; i += 1 { + for i := 0; i < elems; i++ { select { case <-peer.queue.nonce: default: @@ -111,14 +111,18 @@ func addToEncryptionQueue( * Obs. Single instance per TUN device */ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { + if tun == nil { - // dummy return } elem := device.NewOutboundElement() - device.log.Debug.Println("Routine, TUN Reader: started") + logDebug := device.log.Debug + logError := device.log.Error + + logDebug.Println("Routine, TUN Reader: started") + for { // read packet @@ -129,12 +133,17 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { elem.packet = elem.data[MessageTransportHeaderSize:] size, err := tun.Read(elem.packet) if err != nil { - device.log.Error.Println("Failed to read packet from TUN device:", err) - continue + + // stop process + + logError.Println("Failed to read packet from TUN device:", err) + device.Close() + return } + elem.packet = elem.packet[:size] - if len(elem.packet) < IPv4headerSize { - device.log.Error.Println("Packet too short, length:", size) + if len(elem.packet) < ipv4.HeaderLen { + logError.Println("Packet too short, length:", size) continue } @@ -142,23 +151,24 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { var peer *Peer switch elem.packet[0] >> 4 { - case IPv4version: + case ipv4.Version: dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len] peer = device.routingTable.LookupIPv4(dst) - case IPv6version: + case ipv6.Version: dst := elem.packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len] peer = device.routingTable.LookupIPv6(dst) default: - device.log.Debug.Println("Receieved packet with unknown IP version") + logDebug.Println("Receieved packet with unknown IP version") } if peer == nil { continue } + if peer.endpoint == nil { - device.log.Debug.Println("No known endpoint for peer", peer.id) + logDebug.Println("No known endpoint for peer", peer.String()) continue } @@ -184,7 +194,7 @@ func (peer *Peer) RoutineNonce() { device := peer.device logDebug := device.log.Debug - logDebug.Println("Routine, nonce worker, started for peer", peer.id) + logDebug.Println("Routine, nonce worker, started for peer", peer.String()) func() { @@ -216,15 +226,15 @@ func (peer *Peer) RoutineNonce() { } } signalSend(peer.signal.handshakeBegin) - logDebug.Println("Waiting for key-pair, peer", peer.id) + logDebug.Println("Awaiting key-pair for", peer.String()) select { case <-peer.signal.newKeyPair: - logDebug.Println("Key-pair negotiated for peer", peer.id) + logDebug.Println("Key-pair negotiated for", peer.String()) goto NextPacket case <-peer.signal.flushNonceQueue: - logDebug.Println("Clearing queue for peer", peer.id) + logDebug.Println("Clearing queue for", peer.String()) peer.FlushNonceQueue() elem = nil goto NextPacket @@ -313,13 +323,14 @@ func (peer *Peer) RoutineSequentialSender() { device := peer.device logDebug := device.log.Debug - logDebug.Println("Routine, sequential sender, started for peer", peer.id) + logDebug.Println("Routine, sequential sender, started for", peer.String()) for { select { case <-peer.signal.stop: - logDebug.Println("Routine, sequential sender, stopped for peer", peer.id) + logDebug.Println("Routine, sequential sender, stopped for", peer.String()) return + case work := <-peer.queue.outbound: work.mutex.Lock() if work.IsDropped() { @@ -334,7 +345,7 @@ func (peer *Peer) RoutineSequentialSender() { defer peer.mutex.RUnlock() if peer.endpoint == nil { - logDebug.Println("No endpoint for peer:", peer.id) + logDebug.Println("No endpoint for", peer.String()) return } @@ -352,7 +363,7 @@ func (peer *Peer) RoutineSequentialSender() { } atomic.AddUint64(&peer.txBytes, uint64(len(work.packet))) - // reset keep-alive (passive keep-alives / acknowledgements) + // reset keep-alive peer.TimerResetKeepalive() }() -- cgit v1.2.3-59-g8ed1b