aboutsummaryrefslogtreecommitdiffstats
path: root/device/channels.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/channels.go')
-rw-r--r--device/channels.go62
1 files changed, 61 insertions, 1 deletions
diff --git a/device/channels.go b/device/channels.go
index 4471477..8cd6aee 100644
--- a/device/channels.go
+++ b/device/channels.go
@@ -5,7 +5,10 @@
package device
-import "sync"
+import (
+ "runtime"
+ "sync"
+)
// An outboundQueue is a channel of QueueOutboundElements awaiting encryption.
// An outboundQueue is ref-counted using its wg field.
@@ -67,3 +70,60 @@ func newHandshakeQueue() *handshakeQueue {
}()
return q
}
+
+// newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
+// It is useful in cases in which is it hard to manage the lifetime of the channel.
+// The returned channel must not be closed. Senders should signal shutdown using
+// some other means, such as sending a sentinel nil values.
+func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement {
+ type autodrainingInboundQueue struct {
+ c chan *QueueInboundElement
+ }
+ q := &autodrainingInboundQueue{
+ c: make(chan *QueueInboundElement, QueueInboundSize),
+ }
+ runtime.SetFinalizer(q, func(q *autodrainingInboundQueue) {
+ for {
+ select {
+ case elem := <-q.c:
+ if elem == nil {
+ continue
+ }
+ device.PutMessageBuffer(elem.buffer)
+ device.PutInboundElement(elem)
+ default:
+ return
+ }
+ }
+ })
+ return q.c
+}
+
+// newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
+// It is useful in cases in which is it hard to manage the lifetime of the channel.
+// The returned channel must not be closed. Senders should signal shutdown using
+// some other means, such as sending a sentinel nil values.
+// All sends to the channel must be best-effort, because there may be no receivers.
+func newAutodrainingOutboundQueue(device *Device) chan *QueueOutboundElement {
+ type autodrainingOutboundQueue struct {
+ c chan *QueueOutboundElement
+ }
+ q := &autodrainingOutboundQueue{
+ c: make(chan *QueueOutboundElement, QueueOutboundSize),
+ }
+ runtime.SetFinalizer(q, func(q *autodrainingOutboundQueue) {
+ for {
+ select {
+ case elem := <-q.c:
+ if elem == nil {
+ continue
+ }
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ default:
+ return
+ }
+ }
+ })
+ return q.c
+}