From 15133f6e67d8d646d0744336b4daa3135452cb0d Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 14:33:38 -0800 Subject: RDS: Implement atomic operations Implement a CMSG-based interface to do FADD and CSWP ops. Alter send routines to handle atomic ops. Add atomic counters to stats. Add xmit_atomic() to struct rds_transport Inline rds_ib_send_unmap_rdma into unmap_rm Signed-off-by: Andy Grover --- net/rds/send.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 68 insertions(+), 3 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index b751a8e77c41..f3f4e79274bf 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -73,6 +73,7 @@ void rds_send_reset(struct rds_connection *conn) conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; conn->c_xmit_rdma_sent = 0; + conn->c_xmit_atomic_sent = 0; conn->c_map_queued = 0; @@ -171,6 +172,7 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; conn->c_xmit_rdma_sent = 0; + conn->c_xmit_atomic_sent = 0; /* Release the reference to the previous message. */ rds_message_put(rm); @@ -262,6 +264,17 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_rm = rm; } + + if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); + if (ret) + break; + conn->c_xmit_atomic_sent = 1; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); + } + /* * Try and send an rdma message. Let's see if we can * keep this simple and require that the transport either @@ -442,6 +455,41 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) } EXPORT_SYMBOL_GPL(rds_rdma_send_complete); +/* + * Just like above, except looks at atomic op + */ +void rds_atomic_send_complete(struct rds_message *rm, int status) +{ + struct rds_sock *rs = NULL; + struct rm_atomic_op *ao; + struct rds_notifier *notifier; + + spin_lock(&rm->m_rs_lock); + + ao = &rm->atomic; + if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) + && ao->op_active && ao->op_notify && ao->op_notifier) { + notifier = ao->op_notifier; + rs = rm->m_rs; + sock_hold(rds_rs_to_sk(rs)); + + notifier->n_status = status; + spin_lock(&rs->rs_lock); + list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); + spin_unlock(&rs->rs_lock); + + ao->op_notifier = NULL; + } + + spin_unlock(&rm->m_rs_lock); + + if (rs) { + rds_wake_sk_sleep(rs); + sock_put(rds_rs_to_sk(rs)); + } +} +EXPORT_SYMBOL_GPL(rds_atomic_send_complete); + /* * This is the same as rds_rdma_send_complete except we * don't do any locking - we have all the ingredients (message, @@ -788,6 +836,11 @@ static int rds_rm_size(struct msghdr *msg, int data_len) /* these are valid but do no add any size */ break; + case RDS_CMSG_ATOMIC_CSWP: + case RDS_CMSG_ATOMIC_FADD: + size += sizeof(struct scatterlist); + break; + default: return -EINVAL; } @@ -813,7 +866,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, continue; /* As a side effect, RDMA_DEST and RDMA_MAP will set - * rm->m_rdma_cookie and rm->m_rdma_mr. + * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. */ switch (cmsg->cmsg_type) { case RDS_CMSG_RDMA_ARGS: @@ -829,6 +882,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, if (!ret) *allocated_mr = 1; break; + case RDS_CMSG_ATOMIC_CSWP: + case RDS_CMSG_ATOMIC_FADD: + ret = rds_cmsg_atomic(rs, rm, cmsg); + break; default: return -EINVAL; @@ -926,10 +983,18 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op.r_active) && - !conn->c_trans->xmit_rdma) { + !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", - &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + ret = -EOPNOTSUPP; + goto out; + } + + if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { + if (printk_ratelimit()) + printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", + &rm->atomic, conn->c_trans->xmit_atomic); ret = -EOPNOTSUPP; goto out; } -- cgit v1.2.3-59-g8ed1b