aboutsummaryrefslogtreecommitdiffstats
path: root/device/channels.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/channels.go')
-rw-r--r--device/channels.go137
1 files changed, 137 insertions, 0 deletions
diff --git a/device/channels.go b/device/channels.go
new file mode 100644
index 0000000..e526f6b
--- /dev/null
+++ b/device/channels.go
@@ -0,0 +1,137 @@
+/* SPDX-License-Identifier: MIT
+ *
+ * Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
+ */
+
+package device
+
+import (
+ "runtime"
+ "sync"
+)
+
+// An outboundQueue is a channel of QueueOutboundElements awaiting encryption.
+// An outboundQueue is ref-counted using its wg field.
+// An outboundQueue created with newOutboundQueue has one reference.
+// Every additional writer must call wg.Add(1).
+// Every completed writer must call wg.Done().
+// When no further writers will be added,
+// call wg.Done to remove the initial reference.
+// When the refcount hits 0, the queue's channel is closed.
+type outboundQueue struct {
+ c chan *QueueOutboundElementsContainer
+ wg sync.WaitGroup
+}
+
+func newOutboundQueue() *outboundQueue {
+ q := &outboundQueue{
+ c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
+ }
+ q.wg.Add(1)
+ go func() {
+ q.wg.Wait()
+ close(q.c)
+ }()
+ return q
+}
+
+// A inboundQueue is similar to an outboundQueue; see those docs.
+type inboundQueue struct {
+ c chan *QueueInboundElementsContainer
+ wg sync.WaitGroup
+}
+
+func newInboundQueue() *inboundQueue {
+ q := &inboundQueue{
+ c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
+ }
+ q.wg.Add(1)
+ go func() {
+ q.wg.Wait()
+ close(q.c)
+ }()
+ return q
+}
+
+// A handshakeQueue is similar to an outboundQueue; see those docs.
+type handshakeQueue struct {
+ c chan QueueHandshakeElement
+ wg sync.WaitGroup
+}
+
+func newHandshakeQueue() *handshakeQueue {
+ q := &handshakeQueue{
+ c: make(chan QueueHandshakeElement, QueueHandshakeSize),
+ }
+ q.wg.Add(1)
+ go func() {
+ q.wg.Wait()
+ close(q.c)
+ }()
+ return q
+}
+
+type autodrainingInboundQueue struct {
+ c chan *QueueInboundElementsContainer
+}
+
+// newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
+// It is useful in cases in which is it hard to manage the lifetime of the channel.
+// The returned channel must not be closed. Senders should signal shutdown using
+// some other means, such as sending a sentinel nil values.
+func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
+ q := &autodrainingInboundQueue{
+ c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
+ }
+ runtime.SetFinalizer(q, device.flushInboundQueue)
+ return q
+}
+
+func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
+ for {
+ select {
+ case elemsContainer := <-q.c:
+ elemsContainer.Lock()
+ for _, elem := range elemsContainer.elems {
+ device.PutMessageBuffer(elem.buffer)
+ device.PutInboundElement(elem)
+ }
+ device.PutInboundElementsContainer(elemsContainer)
+ default:
+ return
+ }
+ }
+}
+
+type autodrainingOutboundQueue struct {
+ c chan *QueueOutboundElementsContainer
+}
+
+// newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
+// It is useful in cases in which is it hard to manage the lifetime of the channel.
+// The returned channel must not be closed. Senders should signal shutdown using
+// some other means, such as sending a sentinel nil values.
+// All sends to the channel must be best-effort, because there may be no receivers.
+func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
+ q := &autodrainingOutboundQueue{
+ c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
+ }
+ runtime.SetFinalizer(q, device.flushOutboundQueue)
+ return q
+}
+
+func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
+ for {
+ select {
+ case elemsContainer := <-q.c:
+ elemsContainer.Lock()
+ for _, elem := range elemsContainer.elems {
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ }
+ device.PutOutboundElementsContainer(elemsContainer)
+ default:
+ return
+ }
+ }
+}