aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/name_table.c10
-rw-r--r--net/tipc/server.c170
-rw-r--r--net/tipc/server.h12
-rw-r--r--net/tipc/subscr.c40
-rw-r--r--net/tipc/subscr.h5
5 files changed, 88 insertions, 149 deletions
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index ed0457cc99d6..c0ca7be46f7a 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -810,14 +810,15 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
*/
void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
{
- struct tipc_net *tn = net_generic(s->net, tipc_net_id);
+ struct tipc_server *srv = s->server;
+ struct tipc_net *tn = tipc_net(srv->net);
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
int index = hash(type);
struct name_seq *seq;
struct tipc_name_seq ns;
spin_lock_bh(&tn->nametbl_lock);
- seq = nametbl_find_seq(s->net, type);
+ seq = nametbl_find_seq(srv->net, type);
if (!seq)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
if (seq) {
@@ -837,12 +838,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
*/
void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
{
- struct tipc_net *tn = net_generic(s->net, tipc_net_id);
+ struct tipc_server *srv = s->server;
+ struct tipc_net *tn = tipc_net(srv->net);
struct name_seq *seq;
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
spin_lock_bh(&tn->nametbl_lock);
- seq = nametbl_find_seq(s->net, type);
+ seq = nametbl_find_seq(srv->net, type);
if (seq != NULL) {
spin_lock_bh(&seq->lock);
list_del_init(&s->nameseq_list);
diff --git a/net/tipc/server.c b/net/tipc/server.c
index b8268c0882a7..7933fb92d852 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -84,9 +84,9 @@ struct tipc_conn {
/* An entry waiting to be sent */
struct outqueue_entry {
- u32 evt;
+ bool inactive;
+ struct tipc_event evt;
struct list_head list;
- struct kvec iov;
};
static void tipc_recv_work(struct work_struct *work);
@@ -154,6 +154,9 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
return con;
}
+/* sock_data_ready - interrupt callback indicating the socket has data to read
+ * The queued job is launched in tipc_recv_from_sock()
+ */
static void sock_data_ready(struct sock *sk)
{
struct tipc_conn *con;
@@ -168,6 +171,10 @@ static void sock_data_ready(struct sock *sk)
read_unlock_bh(&sk->sk_callback_lock);
}
+/* sock_write_space - interrupt callback after a sendmsg EAGAIN
+ * Indicates that there now is more is space in the send buffer
+ * The queued job is launched in tipc_send_to_sock()
+ */
static void sock_write_space(struct sock *sk)
{
struct tipc_conn *con;
@@ -273,10 +280,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
return con;
}
-int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
- void *buf, size_t len)
+static int tipc_con_rcv_sub(struct tipc_server *srv,
+ struct tipc_conn *con,
+ struct tipc_subscr *s)
{
- struct tipc_subscr *s = (struct tipc_subscr *)buf;
struct tipc_subscription *sub;
bool status;
int swap;
@@ -292,7 +299,7 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
return 0;
}
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
- sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
+ sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status);
if (!sub)
return -1;
@@ -304,43 +311,27 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
static int tipc_receive_from_sock(struct tipc_conn *con)
{
- struct tipc_server *s = con->server;
+ struct tipc_server *srv = con->server;
struct sock *sk = con->sock->sk;
struct msghdr msg = {};
+ struct tipc_subscr s;
struct kvec iov;
- void *buf;
int ret;
- buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
- if (!buf) {
- ret = -ENOMEM;
- goto out_close;
- }
-
- iov.iov_base = buf;
- iov.iov_len = s->max_rcvbuf_size;
+ iov.iov_base = &s;
+ iov.iov_len = sizeof(s);
msg.msg_name = NULL;
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
- if (ret <= 0) {
- kmem_cache_free(s->rcvbuf_cache, buf);
- goto out_close;
+ if (ret == -EWOULDBLOCK)
+ return -EWOULDBLOCK;
+ if (ret > 0) {
+ read_lock_bh(&sk->sk_callback_lock);
+ ret = tipc_con_rcv_sub(srv, con, &s);
+ read_unlock_bh(&sk->sk_callback_lock);
}
-
- read_lock_bh(&sk->sk_callback_lock);
- ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
- read_unlock_bh(&sk->sk_callback_lock);
- kmem_cache_free(s->rcvbuf_cache, buf);
if (ret < 0)
- tipc_conn_terminate(s, con->conid);
- return ret;
-
-out_close:
- if (ret != -EWOULDBLOCK)
tipc_close_conn(con);
- else if (ret == 0)
- /* Don't return success if we really got EOF */
- ret = -EAGAIN;
return ret;
}
@@ -442,33 +433,6 @@ static int tipc_open_listening_sock(struct tipc_server *s)
return 0;
}
-static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
-{
- struct outqueue_entry *entry;
- void *buf;
-
- entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
- if (!entry)
- return NULL;
-
- buf = kmemdup(data, len, GFP_ATOMIC);
- if (!buf) {
- kfree(entry);
- return NULL;
- }
-
- entry->iov.iov_base = buf;
- entry->iov.iov_len = len;
-
- return entry;
-}
-
-static void tipc_free_entry(struct outqueue_entry *e)
-{
- kfree(e->iov.iov_base);
- kfree(e);
-}
-
static void tipc_clean_outqueues(struct tipc_conn *con)
{
struct outqueue_entry *e, *safe;
@@ -476,50 +440,40 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
spin_lock_bh(&con->outqueue_lock);
list_for_each_entry_safe(e, safe, &con->outqueue, list) {
list_del(&e->list);
- tipc_free_entry(e);
+ kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
}
-int tipc_conn_sendmsg(struct tipc_server *s, int conid,
- u32 evt, void *data, size_t len)
+/* tipc_conn_queue_evt - interrupt level call from a subscription instance
+ * The queued job is launched in tipc_send_to_sock()
+ */
+void tipc_conn_queue_evt(struct tipc_server *s, int conid,
+ u32 event, struct tipc_event *evt)
{
struct outqueue_entry *e;
struct tipc_conn *con;
con = tipc_conn_lookup(s, conid);
if (!con)
- return -EINVAL;
+ return;
- if (!connected(con)) {
- conn_put(con);
- return 0;
- }
+ if (!connected(con))
+ goto err;
- e = tipc_alloc_entry(data, len);
- if (!e) {
- conn_put(con);
- return -ENOMEM;
- }
- e->evt = evt;
+ e = kmalloc(sizeof(*e), GFP_ATOMIC);
+ if (!e)
+ goto err;
+ e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
+ memcpy(&e->evt, evt, sizeof(*evt));
spin_lock_bh(&con->outqueue_lock);
list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock);
- if (!queue_work(s->send_wq, &con->swork))
- conn_put(con);
- return 0;
-}
-
-void tipc_conn_terminate(struct tipc_server *s, int conid)
-{
- struct tipc_conn *con;
-
- con = tipc_conn_lookup(s, conid);
- if (con) {
- tipc_close_conn(con);
- conn_put(con);
- }
+ if (queue_work(s->send_wq, &con->swork))
+ return;
+err:
+ conn_put(con);
}
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
@@ -542,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
*conid = con->conid;
con->sock = NULL;
- rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
+ rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub);
if (rc < 0)
tipc_close_conn(con);
return !rc;
@@ -587,6 +541,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
struct outqueue_entry *e;
struct tipc_event *evt;
struct msghdr msg;
+ struct kvec iov;
int count = 0;
int ret;
@@ -594,27 +549,28 @@ static void tipc_send_to_sock(struct tipc_conn *con)
while (!list_empty(queue)) {
e = list_first_entry(queue, struct outqueue_entry, list);
-
+ evt = &e->evt;
spin_unlock_bh(&con->outqueue_lock);
- if (e->evt == TIPC_SUBSCR_TIMEOUT) {
- evt = (struct tipc_event *)e->iov.iov_base;
+ if (e->inactive)
tipc_con_delete_sub(con, &evt->s);
- }
+
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
+ iov.iov_base = evt;
+ iov.iov_len = sizeof(*evt);
+ msg.msg_name = NULL;
if (con->sock) {
- ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
- e->iov.iov_len);
+ ret = kernel_sendmsg(con->sock, &msg, &iov,
+ 1, sizeof(*evt));
if (ret == -EWOULDBLOCK || ret == 0) {
cond_resched();
goto out;
} else if (ret < 0) {
- goto send_err;
+ goto err;
}
} else {
- evt = e->iov.iov_base;
tipc_send_kern_top_evt(srv->net, evt);
}
@@ -625,13 +581,12 @@ static void tipc_send_to_sock(struct tipc_conn *con)
}
spin_lock_bh(&con->outqueue_lock);
list_del(&e->list);
- tipc_free_entry(e);
+ kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
out:
return;
-
-send_err:
+err:
tipc_close_conn(con);
}
@@ -695,22 +650,14 @@ int tipc_server_start(struct tipc_server *s)
idr_init(&s->conn_idr);
s->idr_in_use = 0;
- s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
- 0, SLAB_HWCACHE_ALIGN, NULL);
- if (!s->rcvbuf_cache)
- return -ENOMEM;
-
ret = tipc_work_start(s);
- if (ret < 0) {
- kmem_cache_destroy(s->rcvbuf_cache);
+ if (ret < 0)
return ret;
- }
+
ret = tipc_open_listening_sock(s);
- if (ret < 0) {
+ if (ret < 0)
tipc_work_stop(s);
- kmem_cache_destroy(s->rcvbuf_cache);
- return ret;
- }
+
return ret;
}
@@ -731,6 +678,5 @@ void tipc_server_stop(struct tipc_server *s)
spin_unlock_bh(&s->idr_lock);
tipc_work_stop(s);
- kmem_cache_destroy(s->rcvbuf_cache);
idr_destroy(&s->conn_idr);
}
diff --git a/net/tipc/server.h b/net/tipc/server.h
index fcc72321362d..2de8709b467d 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -36,6 +36,7 @@
#ifndef _TIPC_SERVER_H
#define _TIPC_SERVER_H
+#include "core.h"
#include <linux/idr.h>
#include <linux/tipc.h>
#include <net/net_namespace.h>
@@ -68,7 +69,6 @@ struct tipc_server {
spinlock_t idr_lock;
int idr_in_use;
struct net *net;
- struct kmem_cache *rcvbuf_cache;
struct workqueue_struct *rcv_wq;
struct workqueue_struct *send_wq;
int max_rcvbuf_size;
@@ -76,19 +76,13 @@ struct tipc_server {
char name[TIPC_SERVER_NAME_LEN];
};
-int tipc_conn_sendmsg(struct tipc_server *s, int conid,
- u32 evt, void *data, size_t len);
+void tipc_conn_queue_evt(struct tipc_server *s, int conid,
+ u32 event, struct tipc_event *evt);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
-/**
- * tipc_conn_terminate - terminate connection with server
- *
- * Note: Must call it in process context since it might sleep
- */
-void tipc_conn_terminate(struct tipc_server *s, int conid);
int tipc_server_start(struct tipc_server *s);
void tipc_server_stop(struct tipc_server *s);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index c6de1452db69..c3e0b924e8c2 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -52,22 +52,19 @@ static u32 htohl(u32 in, int swap)
static void tipc_subscrp_send_event(struct tipc_subscription *sub,
u32 found_lower, u32 found_upper,
- u32 event, u32 port_ref, u32 node)
+ u32 event, u32 port, u32 node)
{
- struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
- struct kvec msg_sect;
+ struct tipc_event *evt = &sub->evt;
+ bool swap = sub->swap;
if (sub->inactive)
return;
- msg_sect.iov_base = (void *)&sub->evt;
- msg_sect.iov_len = sizeof(struct tipc_event);
- sub->evt.event = htohl(event, sub->swap);
- sub->evt.found_lower = htohl(found_lower, sub->swap);
- sub->evt.found_upper = htohl(found_upper, sub->swap);
- sub->evt.port.ref = htohl(port_ref, sub->swap);
- sub->evt.port.node = htohl(node, sub->swap);
- tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
- msg_sect.iov_base, msg_sect.iov_len);
+ evt->event = htohl(event, swap);
+ evt->found_lower = htohl(found_lower, swap);
+ evt->found_upper = htohl(found_upper, swap);
+ evt->port.ref = htohl(port, swap);
+ evt->port.node = htohl(node, swap);
+ tipc_conn_queue_evt(sub->server, sub->conid, event, evt);
}
/**
@@ -137,10 +134,11 @@ static void tipc_subscrp_timeout(struct timer_list *t)
static void tipc_subscrp_kref_release(struct kref *kref)
{
- struct tipc_subscription *sub = container_of(kref,
- struct tipc_subscription,
- kref);
- struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
+ struct tipc_subscription *sub;
+ struct tipc_net *tn;
+
+ sub = container_of(kref, struct tipc_subscription, kref);
+ tn = tipc_net(sub->server->net);
atomic_dec(&tn->subscription_count);
kfree(sub);
@@ -156,11 +154,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription)
kref_get(&subscription->kref);
}
-static struct tipc_subscription *tipc_subscrp_create(struct net *net,
+static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap)
{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_net *tn = tipc_net(srv->net);
struct tipc_subscription *sub;
u32 filter = htohl(s->filter, swap);
@@ -179,7 +177,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
}
/* Initialize subscription object */
- sub->net = net;
+ sub->server = srv;
sub->conid = conid;
sub->inactive = false;
if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
@@ -197,7 +195,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
return sub;
}
-struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
+struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap,
bool status)
@@ -205,7 +203,7 @@ struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
struct tipc_subscription *sub = NULL;
u32 timeout;
- sub = tipc_subscrp_create(net, s, conid, swap);
+ sub = tipc_subscrp_create(srv, s, conid, swap);
if (!sub)
return NULL;
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index cfd0cb3a1da8..64ffd32d15e6 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -49,7 +49,6 @@ struct tipc_conn;
* struct tipc_subscription - TIPC network topology subscription object
* @subscriber: pointer to its subscriber
* @seq: name sequence associated with subscription
- * @net: point to network namespace
* @timer: timer governing subscription duration (optional)
* @nameseq_list: adjacent subscriptions in name sequence's subscription list
* @subscrp_list: adjacent subscriptions in subscriber's subscription list
@@ -58,7 +57,7 @@ struct tipc_conn;
*/
struct tipc_subscription {
struct kref kref;
- struct net *net;
+ struct tipc_server *server;
struct timer_list timer;
struct list_head nameseq_list;
struct list_head subscrp_list;
@@ -69,7 +68,7 @@ struct tipc_subscription {
spinlock_t lock; /* serialize up/down and timer events */
};
-struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
+struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap,
bool status);