diff options
Diffstat (limited to '')
-rw-r--r-- | device/device.go | 77 |
1 files changed, 44 insertions, 33 deletions
diff --git a/device/device.go b/device/device.go index 3625608..83c33ee 100644 --- a/device/device.go +++ b/device/device.go @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: MIT * - * Copyright (C) 2017-2021 WireGuard LLC. All Rights Reserved. + * Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved. */ package device @@ -30,7 +30,7 @@ type Device struct { // will become the actual state; Up can fail. // The device can also change state multiple times between time of check and time of use. // Unsynchronized uses of state must therefore be advisory/best-effort only. - state uint32 // actually a deviceState, but typed uint32 for convenience + state atomic.Uint32 // actually a deviceState, but typed uint32 for convenience // stopping blocks until all inputs to Device have been closed. stopping sync.WaitGroup // mu protects state changes. @@ -58,9 +58,8 @@ type Device struct { keyMap map[NoisePublicKey]*Peer } - // Keep this 8-byte aligned rate struct { - underLoadUntil int64 + underLoadUntil atomic.Int64 limiter ratelimiter.Ratelimiter } @@ -69,9 +68,11 @@ type Device struct { cookieChecker CookieChecker pool struct { - messageBuffers *WaitPool - inboundElements *WaitPool - outboundElements *WaitPool + inboundElementsContainer *WaitPool + outboundElementsContainer *WaitPool + messageBuffers *WaitPool + inboundElements *WaitPool + outboundElements *WaitPool } queue struct { @@ -82,7 +83,7 @@ type Device struct { tun struct { device tun.Device - mtu int32 + mtu atomic.Int32 } ipcMutex sync.RWMutex @@ -94,10 +95,9 @@ type Device struct { // There are three states: down, up, closed. // Transitions: // -// down -----+ -// ↑↓ ↓ -// up -> closed -// +// down -----+ +// ↑↓ ↓ +// up -> closed type deviceState uint32 //go:generate go run golang.org/x/tools/cmd/stringer -type deviceState -trimprefix=deviceState @@ -110,7 +110,7 @@ const ( // deviceState returns device.state.state as a deviceState // See those docs for how to interpret this value. func (device *Device) deviceState() deviceState { - return deviceState(atomic.LoadUint32(&device.state.state)) + return deviceState(device.state.state.Load()) } // isClosed reports whether the device is closed (or is closing). @@ -149,14 +149,14 @@ func (device *Device) changeState(want deviceState) (err error) { case old: return nil case deviceStateUp: - atomic.StoreUint32(&device.state.state, uint32(deviceStateUp)) + device.state.state.Store(uint32(deviceStateUp)) err = device.upLocked() if err == nil { break } fallthrough // up failed; bring the device all the way back down case deviceStateDown: - atomic.StoreUint32(&device.state.state, uint32(deviceStateDown)) + device.state.state.Store(uint32(deviceStateDown)) errDown := device.downLocked() if err == nil { err = errDown @@ -182,7 +182,7 @@ func (device *Device) upLocked() error { device.peers.RLock() for _, peer := range device.peers.keyMap { peer.Start() - if atomic.LoadUint32(&peer.persistentKeepaliveInterval) > 0 { + if peer.persistentKeepaliveInterval.Load() > 0 { peer.SendKeepalive() } } @@ -219,11 +219,11 @@ func (device *Device) IsUnderLoad() bool { now := time.Now() underLoad := len(device.queue.handshake.c) >= QueueHandshakeSize/8 if underLoad { - atomic.StoreInt64(&device.rate.underLoadUntil, now.Add(UnderLoadAfterTime).UnixNano()) + device.rate.underLoadUntil.Store(now.Add(UnderLoadAfterTime).UnixNano()) return true } // check if recently under load - return atomic.LoadInt64(&device.rate.underLoadUntil) > now.UnixNano() + return device.rate.underLoadUntil.Load() > now.UnixNano() } func (device *Device) SetPrivateKey(sk NoisePrivateKey) error { @@ -267,7 +267,7 @@ func (device *Device) SetPrivateKey(sk NoisePrivateKey) error { expiredPeers := make([]*Peer, 0, len(device.peers.keyMap)) for _, peer := range device.peers.keyMap { handshake := &peer.handshake - handshake.precomputedStaticStatic = device.staticIdentity.privateKey.sharedSecret(handshake.remoteStatic) + handshake.precomputedStaticStatic, _ = device.staticIdentity.privateKey.sharedSecret(handshake.remoteStatic) expiredPeers = append(expiredPeers, peer) } @@ -283,7 +283,7 @@ func (device *Device) SetPrivateKey(sk NoisePrivateKey) error { func NewDevice(tunDevice tun.Device, bind conn.Bind, logger *Logger) *Device { device := new(Device) - device.state.state = uint32(deviceStateDown) + device.state.state.Store(uint32(deviceStateDown)) device.closed = make(chan struct{}) device.log = logger device.net.bind = bind @@ -293,10 +293,11 @@ func NewDevice(tunDevice tun.Device, bind conn.Bind, logger *Logger) *Device { device.log.Errorf("Trouble determining MTU, assuming default: %v", err) mtu = DefaultMTU } - device.tun.mtu = int32(mtu) + device.tun.mtu.Store(int32(mtu)) device.peers.keyMap = make(map[NoisePublicKey]*Peer) device.rate.limiter.Init() device.indexTable.Init() + device.PopulatePools() // create queues @@ -324,6 +325,19 @@ func NewDevice(tunDevice tun.Device, bind conn.Bind, logger *Logger) *Device { return device } +// BatchSize returns the BatchSize for the device as a whole which is the max of +// the bind batch size and the tun batch size. The batch size reported by device +// is the size used to construct memory pools, and is the allowed batch size for +// the lifetime of the device. +func (device *Device) BatchSize() int { + size := device.net.bind.BatchSize() + dSize := device.tun.device.BatchSize() + if size < dSize { + size = dSize + } + return size +} + func (device *Device) LookupPeer(pk NoisePublicKey) *Peer { device.peers.RLock() defer device.peers.RUnlock() @@ -356,10 +370,12 @@ func (device *Device) RemoveAllPeers() { func (device *Device) Close() { device.state.Lock() defer device.state.Unlock() + device.ipcMutex.Lock() + defer device.ipcMutex.Unlock() if device.isClosed() { return } - atomic.StoreUint32(&device.state.state, uint32(deviceStateClosed)) + device.state.state.Store(uint32(deviceStateClosed)) device.log.Verbosef("Device closing") device.tun.device.Close() @@ -445,11 +461,7 @@ func (device *Device) BindSetMark(mark uint32) error { // clear cached source addresses device.peers.RLock() for _, peer := range device.peers.keyMap { - peer.Lock() - defer peer.Unlock() - if peer.endpoint != nil { - peer.endpoint.ClearSrc() - } + peer.markEndpointSrcForClearing() } device.peers.RUnlock() @@ -474,11 +486,13 @@ func (device *Device) BindUpdate() error { var err error var recvFns []conn.ReceiveFunc netc := &device.net + recvFns, netc.port, err = netc.bind.Open(netc.port) if err != nil { netc.port = 0 return err } + netc.netlinkCancel, err = device.startRouteListener(netc.bind) if err != nil { netc.bind.Close() @@ -497,11 +511,7 @@ func (device *Device) BindUpdate() error { // clear cached source addresses device.peers.RLock() for _, peer := range device.peers.keyMap { - peer.Lock() - defer peer.Unlock() - if peer.endpoint != nil { - peer.endpoint.ClearSrc() - } + peer.markEndpointSrcForClearing() } device.peers.RUnlock() @@ -509,8 +519,9 @@ func (device *Device) BindUpdate() error { device.net.stopping.Add(len(recvFns)) device.queue.decryption.wg.Add(len(recvFns)) // each RoutineReceiveIncoming goroutine writes to device.queue.decryption device.queue.handshake.wg.Add(len(recvFns)) // each RoutineReceiveIncoming goroutine writes to device.queue.handshake + batchSize := netc.bind.BatchSize() for _, fn := range recvFns { - go device.RoutineReceiveIncoming(fn) + go device.RoutineReceiveIncoming(batchSize, fn) } device.log.Verbosef("UDP bind has been updated") |