aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/connection.c')
-rw-r--r--net/rds/connection.c157
1 files changed, 114 insertions, 43 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 7619b671ca28..870992e08cae 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -37,7 +37,6 @@
#include "rds.h"
#include "loop.h"
-#include "rdma.h"
#define RDS_CONNECTION_HASH_BITS 12
#define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
@@ -63,18 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
var |= RDS_INFO_CONNECTION_FLAG_##suffix; \
} while (0)
-static inline int rds_conn_is_sending(struct rds_connection *conn)
-{
- int ret = 0;
-
- if (!mutex_trylock(&conn->c_send_lock))
- ret = 1;
- else
- mutex_unlock(&conn->c_send_lock);
-
- return ret;
-}
-
+/* rcu read lock must be held or the connection spinlock */
static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
__be32 laddr, __be32 faddr,
struct rds_transport *trans)
@@ -82,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
struct rds_connection *conn, *ret = NULL;
struct hlist_node *pos;
- hlist_for_each_entry(conn, pos, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
conn->c_trans == trans) {
ret = conn;
@@ -129,10 +117,11 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
{
struct rds_connection *conn, *parent = NULL;
struct hlist_head *head = rds_conn_bucket(laddr, faddr);
+ struct rds_transport *loop_trans;
unsigned long flags;
int ret;
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
conn = rds_conn_lookup(head, laddr, faddr, trans);
if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
!is_outgoing) {
@@ -143,12 +132,12 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
parent = conn;
conn = parent->c_passive;
}
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
if (conn)
goto out;
conn = kmem_cache_zalloc(rds_conn_slab, gfp);
- if (conn == NULL) {
+ if (!conn) {
conn = ERR_PTR(-ENOMEM);
goto out;
}
@@ -159,7 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;
- mutex_init(&conn->c_send_lock);
+ init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);
@@ -175,7 +164,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
* can bind to the destination address then we'd rather the messages
* flow through loopback rather than either transport.
*/
- if (rds_trans_get_preferred(faddr)) {
+ loop_trans = rds_trans_get_preferred(faddr);
+ if (loop_trans) {
+ rds_trans_put(loop_trans);
conn->c_loopback = 1;
if (is_outgoing && trans->t_prefer_loopback) {
/* "outgoing" connection - and the transport
@@ -238,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
kmem_cache_free(rds_conn_slab, conn);
conn = found;
} else {
- hlist_add_head(&conn->c_hash_node, head);
+ hlist_add_head_rcu(&conn->c_hash_node, head);
rds_cong_add_conn(conn);
rds_conn_count++;
}
@@ -263,21 +254,91 @@ struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
}
EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
+void rds_conn_shutdown(struct rds_connection *conn)
+{
+ /* shut it down unless it's down already */
+ if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
+ /*
+ * Quiesce the connection mgmt handlers before we start tearing
+ * things down. We don't hold the mutex for the entire
+ * duration of the shutdown operation, else we may be
+ * deadlocking with the CM handler. Instead, the CM event
+ * handler is supposed to check for state DISCONNECTING
+ */
+ mutex_lock(&conn->c_cm_lock);
+ if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
+ && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
+ rds_conn_error(conn, "shutdown called in state %d\n",
+ atomic_read(&conn->c_state));
+ mutex_unlock(&conn->c_cm_lock);
+ return;
+ }
+ mutex_unlock(&conn->c_cm_lock);
+
+ wait_event(conn->c_waitq,
+ !test_bit(RDS_IN_XMIT, &conn->c_flags));
+
+ conn->c_trans->conn_shutdown(conn);
+ rds_conn_reset(conn);
+
+ if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
+ /* This can happen - eg when we're in the middle of tearing
+ * down the connection, and someone unloads the rds module.
+ * Quite reproduceable with loopback connections.
+ * Mostly harmless.
+ */
+ rds_conn_error(conn,
+ "%s: failed to transition to state DOWN, "
+ "current state is %d\n",
+ __func__,
+ atomic_read(&conn->c_state));
+ return;
+ }
+ }
+
+ /* Then reconnect if it's still live.
+ * The passive side of an IB loopback connection is never added
+ * to the conn hash, so we never trigger a reconnect on this
+ * conn - the reconnect is always triggered by the active peer. */
+ cancel_delayed_work_sync(&conn->c_conn_w);
+ rcu_read_lock();
+ if (!hlist_unhashed(&conn->c_hash_node)) {
+ rcu_read_unlock();
+ rds_queue_reconnect(conn);
+ } else {
+ rcu_read_unlock();
+ }
+}
+
+/*
+ * Stop and free a connection.
+ *
+ * This can only be used in very limited circumstances. It assumes that once
+ * the conn has been shutdown that no one else is referencing the connection.
+ * We can only ensure this in the rmmod path in the current code.
+ */
void rds_conn_destroy(struct rds_connection *conn)
{
struct rds_message *rm, *rtmp;
+ unsigned long flags;
rdsdebug("freeing conn %p for %pI4 -> "
"%pI4\n", conn, &conn->c_laddr,
&conn->c_faddr);
- hlist_del_init(&conn->c_hash_node);
+ /* Ensure conn will not be scheduled for reconnect */
+ spin_lock_irq(&rds_conn_lock);
+ hlist_del_init_rcu(&conn->c_hash_node);
+ spin_unlock_irq(&rds_conn_lock);
+ synchronize_rcu();
- /* wait for the rds thread to shut it down */
- atomic_set(&conn->c_state, RDS_CONN_ERROR);
- cancel_delayed_work(&conn->c_conn_w);
- queue_work(rds_wq, &conn->c_down_w);
- flush_workqueue(rds_wq);
+ /* shut the connection down */
+ rds_conn_drop(conn);
+ flush_work(&conn->c_down_w);
+
+ /* make sure lingering queued work won't try to ref the conn */
+ cancel_delayed_work_sync(&conn->c_send_w);
+ cancel_delayed_work_sync(&conn->c_recv_w);
/* tear down queued messages */
list_for_each_entry_safe(rm, rtmp,
@@ -302,7 +363,9 @@ void rds_conn_destroy(struct rds_connection *conn)
BUG_ON(!list_empty(&conn->c_retrans));
kmem_cache_free(rds_conn_slab, conn);
+ spin_lock_irqsave(&rds_conn_lock, flags);
rds_conn_count--;
+ spin_unlock_irqrestore(&rds_conn_lock, flags);
}
EXPORT_SYMBOL_GPL(rds_conn_destroy);
@@ -316,23 +379,23 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
struct list_head *list;
struct rds_connection *conn;
struct rds_message *rm;
- unsigned long flags;
unsigned int total = 0;
+ unsigned long flags;
size_t i;
len /= sizeof(struct rds_info_message);
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
- hlist_for_each_entry(conn, pos, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
if (want_send)
list = &conn->c_send_queue;
else
list = &conn->c_retrans;
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
/* XXX too lazy to maintain counts.. */
list_for_each_entry(rm, list, m_conn_item) {
@@ -343,11 +406,10 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
conn->c_faddr, 0);
}
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
}
}
-
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
lens->nr = total;
lens->each = sizeof(struct rds_info_message);
@@ -377,19 +439,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
uint64_t buffer[(item_len + 7) / 8];
struct hlist_head *head;
struct hlist_node *pos;
- struct hlist_node *tmp;
struct rds_connection *conn;
- unsigned long flags;
size_t i;
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
lens->nr = 0;
lens->each = item_len;
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
- hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
/* XXX no c_lock usage.. */
if (!visitor(conn, buffer))
@@ -405,8 +465,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
lens->nr++;
}
}
-
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
}
EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
@@ -423,8 +482,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
sizeof(cinfo->transport));
cinfo->flags = 0;
- rds_conn_info_set(cinfo->flags,
- rds_conn_is_sending(conn), SENDING);
+ rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+ SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
@@ -444,12 +503,12 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
sizeof(struct rds_info_connection));
}
-int __init rds_conn_init(void)
+int rds_conn_init(void)
{
rds_conn_slab = kmem_cache_create("rds_connection",
sizeof(struct rds_connection),
0, 0, NULL);
- if (rds_conn_slab == NULL)
+ if (!rds_conn_slab)
return -ENOMEM;
rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
@@ -487,6 +546,18 @@ void rds_conn_drop(struct rds_connection *conn)
EXPORT_SYMBOL_GPL(rds_conn_drop);
/*
+ * If the connection is down, trigger a connect. We may have scheduled a
+ * delayed reconnect however - in this case we should not interfere.
+ */
+void rds_conn_connect_if_down(struct rds_connection *conn)
+{
+ if (rds_conn_state(conn) == RDS_CONN_DOWN &&
+ !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
+ queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+}
+EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
+
+/*
* An error occurred on the connection
*/
void