diff options
author | Harald Welte <laforge@osmocom.org> | 2022-11-18 13:54:44 +0100 |
---|---|---|
committer | Harald Welte <laforge@osmocom.org> | 2022-11-18 14:29:06 +0100 |
commit | 58421a19994ba14fc06c2bfdf3531d0d5ac59929 (patch) | |
tree | a509dd1ee41542606890682fc210993269681330 | |
parent | Fix typos in copyright statements. (diff) | |
download | libosmocore-laforge/io_uring.tar.xz libosmocore-laforge/io_uring.zip |
very early WIP prototype for dwillmannlaforge/io_uring
Change-Id: I52c3bf2a2721bf60d46d2a86d625d916f34e0b49
-rw-r--r-- | src/io_uring.c | 61 | ||||
-rw-r--r-- | src/osmo_io.c | 277 |
2 files changed, 338 insertions, 0 deletions
diff --git a/src/io_uring.c b/src/io_uring.c new file mode 100644 index 00000000..02ee2b27 --- /dev/null +++ b/src/io_uring.c @@ -0,0 +1,61 @@ +/*! \file io_uring.c + * io_uring async I/O support. + * + * (C) 2022 by Harald Welte <laforge@osmocom.org> + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include <sys/eventfd.h> +#include <liburing.h> + +/* we keep the io_uring per thread, like we have per-thread select/poll */ +static __thread struct io_uring t_ring; +static __thread struct osmo_fd t_eventfd; + + + +{ + int rc; + + rc = io_uring_queue_init(URING_QUEUE_ENTRIES, &t_ring, NULL); + if (rc) + return -1; + + rc = eventfd(0, 0); + if (rc < 0) + goto err_iou_init; + + osmo_fd_setup(&t_eventfd, rc, OSMO_FD_READ, iou_eventfd_cb, NULL, 0); + + rc = io_uring_register_eventfd(&t_ring, t_eventfd.fd); + if (rc < 0) + goto err_fd_setup; + + rc = osmo_fd_register(&t_eventfd); + if (rc < 0) + goto err_unreg_eventfd; + + return 0; + +err_unreg_eventfd: + io_uring_unregister_eventfd(&t_ring); +err_fdsetup: + close(t_eventfd.fd); + osmo_fd_setup(&t_eventfd, -1, 0, NULL, NULL, 0); +err_iou_init: + io_uring_queue_exit(&t_ring); + + return -1; +} diff --git a/src/osmo_io.c b/src/osmo_io.c new file mode 100644 index 00000000..ba3d783a --- /dev/null +++ b/src/osmo_io.c @@ -0,0 +1,277 @@ +/*! \file osmo_io.c + * New osmocom async I/O API. + * + * (C) 2022 by Harald Welte <laforge@osmocom.org> + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +enum osmo_io_fd_mode { + /*! use read() / write() calls */ + OSMO_IO_FD_MODE_READ_WRITE, + /*! use recvfrom() / sendto() calls */ + OSMO_IO_FD_MODE_RECVFROM_SENDTO, + /*! emulate sctp_recvmsg() and sctp_sendmsg() */ + OSMO_IO_FD_MODE_SCTP_RECVMSG_SENDMSG, +}; + +struct osmo_io_fd { + /*! linked list for internal management */ + struct llist_heads list; + /*! actual operating-system level file decriptor */ + int fd; + /*! bit-mask or of \ref OSMO_FD_READ, \ref OSMO_FD_WRITE and/or OSMO_FD_EXCEPT */ + unsigned int when; + enum osmo_io_fd_mode mode; + + /*! human-readable name to associte with fd */ + const char *name; + + /*! call-back function when something was read from fd */ + void (*read_cb)(struct osmo_io_fd *, int res, struct msgb *); + /*! call-back function when write has completed on fd */ + void (*write_cb)(struct osmo_io_fd *, int res, struct msgb *); + /*! data pointer passed through to call-back function */ + void *data; + /*! private number, extending \a data */ + unsigned int priv_nr; + + struct { + /*! talloc context from which to allocate msgb when reading */ + void *ctx; + /*! size of msgb to allocte (excluding headroom) */ + unsigned int size; + /*! headroom to allocate when allocating msgb's */ + unsigned int headroom; + } msgb_alloc; + + struct { + /*! maximum length of write queue */ + unsigned int max_length; + /*! current length of write queue */ + unsigned int current_length; + /*! actual linked list implementing the transmit queue */ + struct llist_head msg_queue; + } tx_queue; + + union { + struct { + struct osmo_fd ofd; + } poll; + struct { + /* TODO: index into array of registered fd's? */ + } uring; + } u; +}; + +/* serialized version of 'struct msghdr' employed by sendmsg/recvmsg */ +struct serialized_msghdr { + struct msghdr hdr; + struct sockaddr_storage sa; + struct iovec iov[1]; + int flags; + + struct msgb *msg; +}; + +static __thread void *g_msghdr_pool; // = talloc_pool(FIXME, struct serialized_msghdr); + +/*! convenience wrapper to call msgb_alloc with parameters from osmo_io_fd */ +static struct msgb *iofd_msgb_alloc(struct osmo_io_fd *iofd) +{ + uint16_t headroom = iofd->msgb_alloc.headroom; +#if 0 + switch (iofd->mode) { + case OSMO_IO_FD_MODE_READ_WRITE: + break; + case OSMO_IO_FD_MODE_RECVFROM_SENDTO: + /* reserve additional headroom for storing the socket address */ + OSMO_ASSERT(headroom < 0xffff - sizeof(struct sockaddr_storage)); + /* TODO: we might actually get away by just ensuring headroom >= sizeof(sockaddr_storage) */ + headroom += sizeof(struct sockaddr_storage); + break; + case OSMO_IO_FD_MODE_SCTP_RECVMSG_SENDMSG: + /* FIXME */ + break; + } +#endif + OSMO_ASSERT(iofd->msgb_alloc.size < 0xffff - headroom); + return msgb_alloc_headroom_c(iofd->msgb_alloc.ctx, + iofd->msgb_alloc.size + headroom, headroom, iofd->name); +} + + +/*! Request osmo_io to write a message to given ofd. + * If the function returns success (0), it will take ownership of the msgb and + * internally call msgb_free() after the write request completes. + * \param[in] ofd file descriptor to which we shall write + * \param[in] msg message buffer that shall be written to ofd + * \returns 0 in case of success; negative in case of error. */ +int osmo_io_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg) +{ + if (iofd->tx_queue.current_length >= iofd->tex_queue.max_length) { + LOGP(DLGLOBAL, "iofd(%s) rx_queue is full. Rejecting msgb\n", iofd->name); + return -ENOSPC; + } + msgb_enqueue_count(&iofd->tx_queue.msgb_queue, msg, &iofd->tx_queue.current_length); + /* FIXME: trigger write, if not yet pending */ + + return 0; +} +/* TODO: variant with timeout using IORING_OP_LINK_TIMEOUT? */ + +/*! Request osmo_io to read from given ofd; call call-back function with the data that has been read. + * \param[in] ofd file descriptor from which we shall read + * \returns 0 if the read has successfully been scheduled. Negative in case of errors.*/ +int osmo_io_read_msgb(struct osmo_io_fd *iofd) +{ +} +/* TODO: variant with timeout using IORING_OP_LINK_TIMEOUT? */ + + +/* Ideas: + * - intermediate layer de-segmentation callback for stuff like IPA header, CBSP, ... + * + * Problems: + * - in case of IPA we need to read 3 bytes header first, i.e. not all available data.... does + * it really make sense to do this via io_uring? Probably yes, as we have a lot of it. + * We have to reimplement something like ipa_msg_recv_buffered() + * + * Parameters: + * - number of simultaneous read/write in uring for given fd + * + */ + + +/************************************************************************* + * backend using classic osmo_fd / poll + *************************************************************************/ + +static int iofd_poll_ofd_cb_read_write(struct osmo_fd *ofd, unsigned int what) +{ + struct osmo_io_fd *iofd = ofd->data; + struct msgb *msg; + int rc; + + if (what & OSMO_FD_READ) { + msg = iofd_msgb_alloc(iofd); + if (msg) { + rc = read(ofd->fd, msgb_data(msg), msgb_length(msg)); + /* FIXME: handle rc */ + if (rc > 0) + msgb_put(msg, rc); + + iofd->read_cb(iofd, rc, msg); + } + } + + if (what & OSMO_FD_WRITE) { + msg = msgb_dequeue_count(&iofd->tx_queue.msg_queue, &iofd->tx_queue.current_length); + if (msg) { + rc = write(ofd->fd, msgb_data(msg), msgb_length(msg)); + iofd->write_cb(iofd, rc, msg); + msgb_free(msg); + } else + osmo_fd_write_disable(ofd); + } + + /* TODO: FD_EXCEPT handling? However: Rarely used in existing osmo-* */ +} + +static int iofd_poll_ofd_cb_recvfrom_sendto(struct osmo_fd *ofd, unsigned int what) +{ + struct osmo_io_fd *iofd = ofd->data; + struct msgb *msg; + int rc; + + if (what & OSMO_FD_READ) { + msg = iofd_msgb_alloc(iofd); + if (msg) { + struct sockaddr *sa = FIXME; + socklen_t addrlen = sizeof(struct sockaddr_storage) + + rc = recvfrom(ofd->fd, msgb_data(msg), msgb_length(msg), 0, + sa, &addrlen); + if (rc > 0) + msgb_put(msg, rc); + + iofd->recvmsg_cb(iofd, rc, sa, addrlen); + } + } + + if (what & OSMO_FD_WRITE) { + msg = msgb_dequeue_count(&iofd->tx_queue.msg_queue, &iofd->tx_queue.current_length); + if (msg) { + rc = sendto(ofd->fd, msgb_data(msg), msgb_length(msg), 0, + sa, addrlen); + iofd->write_cb(iofd, rc, msg); + msgb_free(msg); + } else + osmo_fd_write_disable(ofd); + } + + /* TODO: FD_EXCEPT handling? However: Rarely used in existing osmo-* */ +} + + +/************************************************************************* + * FIXME: backend using io_uring + *************************************************************************/ + +static int iofd_uring_sendmsg(struct osmo_io_fd *iofd, const struct msghdr *msg, int flags) +{ + struct serialized_msghdr *smh; + + /* check that caller doesn't use features we don't support */ + if (msg->msg_namelen > sizeof(smh->sa)) + return -EINVAL; + if (msg->msg_iovlen > ARRAY_SIZE(smh->iov)) + return -EINVAL; + if (msg->msg_control && msg->msg_controllen) + return -EINVAL; + + smh = talloc_size(g_msghdr_pool, struct serialized_msghdr); + if (smh) + return -ENOMEM; + + memcpy(&smh->hdr, msg, sizeof(smh->hdr)); + smh->flags = flags; + + /* name (socket address), if any */ + if (msg->msg_name && msg->msg_namelen) { + smh->hdr.msg_namelen = msg->msg_namelen; + memcpy(&smh->sa, msg->msg_name, smh->sa_len); + smh->hdr.msg_name = smh->sa; + } else { + smh->hdr.msg_name = NULL; + smh->hdr.msg_namelen = 0; + } + + if (msg->msg_iov && msg->msg_iovlen) { + smh->hdr.msg_iovlen = msg->msg_iovlen; + memcpy(&smh->iov, msg->iov, sizeof(struct iovec)*smh->hdr.msg_iovlen); + smh->hdr.msg_iov = smh->iov; + } else { + smh->hdr.msg_iovlen = 0; + smh->hdr.msg_iov = NULL; + } + + smh->hdr.msg_control = NULL; + smh->hdr.msg_controllen = 0; + + smh->msgb = msgb; + +} + + |