diff options
Diffstat (limited to '')
-rw-r--r-- | device/receive.go | 384 |
1 files changed, 217 insertions, 167 deletions
diff --git a/device/receive.go b/device/receive.go index cc34498..1ab3e29 100644 --- a/device/receive.go +++ b/device/receive.go @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: MIT * - * Copyright (C) 2017-2021 WireGuard LLC. All Rights Reserved. + * Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved. */ package device @@ -11,7 +11,6 @@ import ( "errors" "net" "sync" - "sync/atomic" "time" "golang.org/x/crypto/chacha20poly1305" @@ -28,7 +27,6 @@ type QueueHandshakeElement struct { } type QueueInboundElement struct { - sync.Mutex buffer *[MaxMessageSize]byte packet []byte counter uint64 @@ -36,6 +34,11 @@ type QueueInboundElement struct { endpoint conn.Endpoint } +type QueueInboundElementsContainer struct { + sync.Mutex + elems []*QueueInboundElement +} + // clearPointers clears elem fields that contain pointers. // This makes the garbage collector's life easier and // avoids accidentally keeping other objects around unnecessarily. @@ -52,12 +55,12 @@ func (elem *QueueInboundElement) clearPointers() { * NOTE: Not thread safe, but called by sequential receiver! */ func (peer *Peer) keepKeyFreshReceiving() { - if peer.timers.sentLastMinuteHandshake.Get() { + if peer.timers.sentLastMinuteHandshake.Load() { return } keypair := peer.keypairs.Current() if keypair != nil && keypair.isInitiator && time.Since(keypair.created) > (RejectAfterTime-KeepaliveTimeout-RekeyTimeout) { - peer.timers.sentLastMinuteHandshake.Set(true) + peer.timers.sentLastMinuteHandshake.Store(true) peer.SendHandshakeInitiation(false) } } @@ -67,7 +70,7 @@ func (peer *Peer) keepKeyFreshReceiving() { * Every time the bind is updated a new routine is started for * IPv4 and IPv6 (separately) */ -func (device *Device) RoutineReceiveIncoming(recv conn.ReceiveFunc) { +func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.ReceiveFunc) { recvName := recv.PrettyName() defer func() { device.log.Verbosef("Routine: receive incoming %s - stopped", recvName) @@ -80,20 +83,33 @@ func (device *Device) RoutineReceiveIncoming(recv conn.ReceiveFunc) { // receive datagrams until conn is closed - buffer := device.GetMessageBuffer() - var ( + bufsArrs = make([]*[MaxMessageSize]byte, maxBatchSize) + bufs = make([][]byte, maxBatchSize) err error - size int - endpoint conn.Endpoint + sizes = make([]int, maxBatchSize) + count int + endpoints = make([]conn.Endpoint, maxBatchSize) deathSpiral int + elemsByPeer = make(map[*Peer]*QueueInboundElementsContainer, maxBatchSize) ) - for { - size, endpoint, err = recv(buffer[:]) + for i := range bufsArrs { + bufsArrs[i] = device.GetMessageBuffer() + bufs[i] = bufsArrs[i][:] + } + + defer func() { + for i := 0; i < maxBatchSize; i++ { + if bufsArrs[i] != nil { + device.PutMessageBuffer(bufsArrs[i]) + } + } + }() + for { + count, err = recv(bufs, sizes, endpoints) if err != nil { - device.PutMessageBuffer(buffer) if errors.Is(err, net.ErrClosed) { return } @@ -104,101 +120,119 @@ func (device *Device) RoutineReceiveIncoming(recv conn.ReceiveFunc) { if deathSpiral < 10 { deathSpiral++ time.Sleep(time.Second / 3) - buffer = device.GetMessageBuffer() continue } return } deathSpiral = 0 - if size < MinMessageSize { - continue - } + // handle each packet in the batch + for i, size := range sizes[:count] { + if size < MinMessageSize { + continue + } - // check size of packet + // check size of packet - packet := buffer[:size] - msgType := binary.LittleEndian.Uint32(packet[:4]) + packet := bufsArrs[i][:size] + msgType := binary.LittleEndian.Uint32(packet[:4]) - var okay bool + switch msgType { - switch msgType { + // check if transport - // check if transport + case MessageTransportType: - case MessageTransportType: + // check size - // check size + if len(packet) < MessageTransportSize { + continue + } - if len(packet) < MessageTransportSize { - continue - } + // lookup key pair - // lookup key pair + receiver := binary.LittleEndian.Uint32( + packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter], + ) + value := device.indexTable.Lookup(receiver) + keypair := value.keypair + if keypair == nil { + continue + } - receiver := binary.LittleEndian.Uint32( - packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter], - ) - value := device.indexTable.Lookup(receiver) - keypair := value.keypair - if keypair == nil { - continue - } + // check keypair expiry - // check keypair expiry + if keypair.created.Add(RejectAfterTime).Before(time.Now()) { + continue + } - if keypair.created.Add(RejectAfterTime).Before(time.Now()) { + // create work element + peer := value.peer + elem := device.GetInboundElement() + elem.packet = packet + elem.buffer = bufsArrs[i] + elem.keypair = keypair + elem.endpoint = endpoints[i] + elem.counter = 0 + + elemsForPeer, ok := elemsByPeer[peer] + if !ok { + elemsForPeer = device.GetInboundElementsContainer() + elemsForPeer.Lock() + elemsByPeer[peer] = elemsForPeer + } + elemsForPeer.elems = append(elemsForPeer.elems, elem) + bufsArrs[i] = device.GetMessageBuffer() + bufs[i] = bufsArrs[i][:] continue - } - - // create work element - peer := value.peer - elem := device.GetInboundElement() - elem.packet = packet - elem.buffer = buffer - elem.keypair = keypair - elem.endpoint = endpoint - elem.counter = 0 - elem.Mutex = sync.Mutex{} - elem.Lock() - - // add to decryption queues - if peer.isRunning.Get() { - peer.queue.inbound.c <- elem - device.queue.decryption.c <- elem - buffer = device.GetMessageBuffer() - } else { - device.PutInboundElement(elem) - } - continue - // otherwise it is a fixed size & handshake related packet + // otherwise it is a fixed size & handshake related packet - case MessageInitiationType: - okay = len(packet) == MessageInitiationSize + case MessageInitiationType: + if len(packet) != MessageInitiationSize { + continue + } - case MessageResponseType: - okay = len(packet) == MessageResponseSize + case MessageResponseType: + if len(packet) != MessageResponseSize { + continue + } - case MessageCookieReplyType: - okay = len(packet) == MessageCookieReplySize + case MessageCookieReplyType: + if len(packet) != MessageCookieReplySize { + continue + } - default: - device.log.Verbosef("Received message with unknown type") - } + default: + device.log.Verbosef("Received message with unknown type") + continue + } - if okay { select { case device.queue.handshake.c <- QueueHandshakeElement{ msgType: msgType, - buffer: buffer, + buffer: bufsArrs[i], packet: packet, - endpoint: endpoint, + endpoint: endpoints[i], }: - buffer = device.GetMessageBuffer() + bufsArrs[i] = device.GetMessageBuffer() + bufs[i] = bufsArrs[i][:] default: } } + for peer, elemsContainer := range elemsByPeer { + if peer.isRunning.Load() { + peer.queue.inbound.c <- elemsContainer + device.queue.decryption.c <- elemsContainer + } else { + for _, elem := range elemsContainer.elems { + device.PutMessageBuffer(elem.buffer) + device.PutInboundElement(elem) + } + device.PutInboundElementsContainer(elemsContainer) + } + delete(elemsByPeer, peer) + } } } @@ -208,26 +242,28 @@ func (device *Device) RoutineDecryption(id int) { defer device.log.Verbosef("Routine: decryption worker %d - stopped", id) device.log.Verbosef("Routine: decryption worker %d - started", id) - for elem := range device.queue.decryption.c { - // split message into fields - counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent] - content := elem.packet[MessageTransportOffsetContent:] - - // decrypt and release to consumer - var err error - elem.counter = binary.LittleEndian.Uint64(counter) - // copy counter to nonce - binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter) - elem.packet, err = elem.keypair.receive.Open( - content[:0], - nonce[:], - content, - nil, - ) - if err != nil { - elem.packet = nil + for elemsContainer := range device.queue.decryption.c { + for _, elem := range elemsContainer.elems { + // split message into fields + counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent] + content := elem.packet[MessageTransportOffsetContent:] + + // decrypt and release to consumer + var err error + elem.counter = binary.LittleEndian.Uint64(counter) + // copy counter to nonce + binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter) + elem.packet, err = elem.keypair.receive.Open( + content[:0], + nonce[:], + content, + nil, + ) + if err != nil { + elem.packet = nil + } } - elem.Unlock() + elemsContainer.Unlock() } } @@ -268,7 +304,7 @@ func (device *Device) RoutineHandshake(id int) { // consume reply - if peer := entry.peer; peer.isRunning.Get() { + if peer := entry.peer; peer.isRunning.Load() { device.log.Verbosef("Receiving cookie response from %s", elem.endpoint.DstToString()) if !peer.cookieGenerator.ConsumeReply(&reply) { device.log.Verbosef("Could not decrypt invalid cookie response") @@ -341,7 +377,7 @@ func (device *Device) RoutineHandshake(id int) { peer.SetEndpointFromPacket(elem.endpoint) device.log.Verbosef("%v - Received handshake initiation", peer) - atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) + peer.rxBytes.Add(uint64(len(elem.packet))) peer.SendHandshakeResponse() @@ -369,7 +405,7 @@ func (device *Device) RoutineHandshake(id int) { peer.SetEndpointFromPacket(elem.endpoint) device.log.Verbosef("%v - Received handshake response", peer) - atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) + peer.rxBytes.Add(uint64(len(elem.packet))) // update timers @@ -394,7 +430,7 @@ func (device *Device) RoutineHandshake(id int) { } } -func (peer *Peer) RoutineSequentialReceiver() { +func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) { device := peer.device defer func() { device.log.Verbosef("%v - Routine: sequential receiver - stopped", peer) @@ -402,89 +438,103 @@ func (peer *Peer) RoutineSequentialReceiver() { }() device.log.Verbosef("%v - Routine: sequential receiver - started", peer) - for elem := range peer.queue.inbound.c { - if elem == nil { + bufs := make([][]byte, 0, maxBatchSize) + + for elemsContainer := range peer.queue.inbound.c { + if elemsContainer == nil { return } - var err error - elem.Lock() - if elem.packet == nil { - // decryption failed - goto skip - } + elemsContainer.Lock() + validTailPacket := -1 + dataPacketReceived := false + rxBytesLen := uint64(0) + for i, elem := range elemsContainer.elems { + if elem.packet == nil { + // decryption failed + continue + } - if !elem.keypair.replayFilter.ValidateCounter(elem.counter, RejectAfterMessages) { - goto skip - } + if !elem.keypair.replayFilter.ValidateCounter(elem.counter, RejectAfterMessages) { + continue + } - peer.SetEndpointFromPacket(elem.endpoint) - if peer.ReceivedWithKeypair(elem.keypair) { - peer.timersHandshakeComplete() - peer.SendStagedPackets() - } + validTailPacket = i + if peer.ReceivedWithKeypair(elem.keypair) { + peer.SetEndpointFromPacket(elem.endpoint) + peer.timersHandshakeComplete() + peer.SendStagedPackets() + } + rxBytesLen += uint64(len(elem.packet) + MinMessageSize) - peer.keepKeyFreshReceiving() - peer.timersAnyAuthenticatedPacketTraversal() - peer.timersAnyAuthenticatedPacketReceived() - atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)+MinMessageSize)) + if len(elem.packet) == 0 { + device.log.Verbosef("%v - Receiving keepalive packet", peer) + continue + } + dataPacketReceived = true - if len(elem.packet) == 0 { - device.log.Verbosef("%v - Receiving keepalive packet", peer) - goto skip - } - peer.timersDataReceived() + switch elem.packet[0] >> 4 { + case 4: + if len(elem.packet) < ipv4.HeaderLen { + continue + } + field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2] + length := binary.BigEndian.Uint16(field) + if int(length) > len(elem.packet) || int(length) < ipv4.HeaderLen { + continue + } + elem.packet = elem.packet[:length] + src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len] + if device.allowedips.Lookup(src) != peer { + device.log.Verbosef("IPv4 packet with disallowed source address from %v", peer) + continue + } - switch elem.packet[0] >> 4 { - case ipv4.Version: - if len(elem.packet) < ipv4.HeaderLen { - goto skip - } - field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2] - length := binary.BigEndian.Uint16(field) - if int(length) > len(elem.packet) || int(length) < ipv4.HeaderLen { - goto skip - } - elem.packet = elem.packet[:length] - src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len] - if device.allowedips.Lookup(src) != peer { - device.log.Verbosef("IPv4 packet with disallowed source address from %v", peer) - goto skip - } + case 6: + if len(elem.packet) < ipv6.HeaderLen { + continue + } + field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2] + length := binary.BigEndian.Uint16(field) + length += ipv6.HeaderLen + if int(length) > len(elem.packet) { + continue + } + elem.packet = elem.packet[:length] + src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len] + if device.allowedips.Lookup(src) != peer { + device.log.Verbosef("IPv6 packet with disallowed source address from %v", peer) + continue + } - case ipv6.Version: - if len(elem.packet) < ipv6.HeaderLen { - goto skip - } - field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2] - length := binary.BigEndian.Uint16(field) - length += ipv6.HeaderLen - if int(length) > len(elem.packet) { - goto skip - } - elem.packet = elem.packet[:length] - src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len] - if device.allowedips.Lookup(src) != peer { - device.log.Verbosef("IPv6 packet with disallowed source address from %v", peer) - goto skip + default: + device.log.Verbosef("Packet with invalid IP version from %v", peer) + continue } - default: - device.log.Verbosef("Packet with invalid IP version from %v", peer) - goto skip + bufs = append(bufs, elem.buffer[:MessageTransportOffsetContent+len(elem.packet)]) } - _, err = device.tun.device.Write(elem.buffer[:MessageTransportOffsetContent+len(elem.packet)], MessageTransportOffsetContent) - if err != nil && !device.isClosed() { - device.log.Errorf("Failed to write packet to TUN device: %v", err) + peer.rxBytes.Add(rxBytesLen) + if validTailPacket >= 0 { + peer.SetEndpointFromPacket(elemsContainer.elems[validTailPacket].endpoint) + peer.keepKeyFreshReceiving() + peer.timersAnyAuthenticatedPacketTraversal() + peer.timersAnyAuthenticatedPacketReceived() } - if len(peer.queue.inbound.c) == 0 { - err = device.tun.device.Flush() - if err != nil { - peer.device.log.Errorf("Unable to flush packets: %v", err) + if dataPacketReceived { + peer.timersDataReceived() + } + if len(bufs) > 0 { + _, err := device.tun.device.Write(bufs, MessageTransportOffsetContent) + if err != nil && !device.isClosed() { + device.log.Errorf("Failed to write packets to TUN device: %v", err) } } - skip: - device.PutMessageBuffer(elem.buffer) - device.PutInboundElement(elem) + for _, elem := range elemsContainer.elems { + device.PutMessageBuffer(elem.buffer) + device.PutInboundElement(elem) + } + bufs = bufs[:0] + device.PutInboundElementsContainer(elemsContainer) } } |