aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c198
1 files changed, 115 insertions, 83 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index f0993af2ae4d..524f4e4f598b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -653,54 +653,57 @@ static void prepare_write_keepalive(struct ceph_connection *con)
* Connection negotiation.
*/
-static int prepare_connect_authorizer(struct ceph_connection *con)
+static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con,
+ int *auth_proto)
{
- void *auth_buf;
- int auth_len = 0;
- int auth_protocol = 0;
+ struct ceph_auth_handshake *auth;
+
+ if (!con->ops->get_authorizer) {
+ con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
+ con->out_connect.authorizer_len = 0;
+
+ return NULL;
+ }
+
+ /* Can't hold the mutex while getting authorizer */
mutex_unlock(&con->mutex);
- if (con->ops->get_authorizer)
- con->ops->get_authorizer(con, &auth_buf, &auth_len,
- &auth_protocol, &con->auth_reply_buf,
- &con->auth_reply_buf_len,
- con->auth_retry);
+
+ auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
+
mutex_lock(&con->mutex);
- if (test_bit(CLOSED, &con->state) ||
- test_bit(OPENING, &con->state))
- return -EAGAIN;
+ if (IS_ERR(auth))
+ return auth;
+ if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
+ return ERR_PTR(-EAGAIN);
- con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
- con->out_connect.authorizer_len = cpu_to_le32(auth_len);
+ con->auth_reply_buf = auth->authorizer_reply_buf;
+ con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
- if (auth_len)
- ceph_con_out_kvec_add(con, auth_len, auth_buf);
- return 0;
+ return auth;
}
/*
* We connected to a peer and are saying hello.
*/
-static void prepare_write_banner(struct ceph_messenger *msgr,
- struct ceph_connection *con)
+static void prepare_write_banner(struct ceph_connection *con)
{
- ceph_con_out_kvec_reset(con);
ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
- ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
- &msgr->my_enc_addr);
+ ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
+ &con->msgr->my_enc_addr);
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
-static int prepare_write_connect(struct ceph_messenger *msgr,
- struct ceph_connection *con,
- int include_banner)
+static int prepare_write_connect(struct ceph_connection *con)
{
- unsigned global_seq = get_global_seq(con->msgr, 0);
+ unsigned int global_seq = get_global_seq(con->msgr, 0);
int proto;
+ int auth_proto;
+ struct ceph_auth_handshake *auth;
switch (con->peer_name.type) {
case CEPH_ENTITY_TYPE_MON:
@@ -719,23 +722,32 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
con->connect_seq, global_seq, proto);
- con->out_connect.features = cpu_to_le64(msgr->supported_features);
+ con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(global_seq);
con->out_connect.protocol_version = cpu_to_le32(proto);
con->out_connect.flags = 0;
- if (include_banner)
- prepare_write_banner(msgr, con);
- else
- ceph_con_out_kvec_reset(con);
- ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
+ auth_proto = CEPH_AUTH_UNKNOWN;
+ auth = get_connect_authorizer(con, &auth_proto);
+ if (IS_ERR(auth))
+ return PTR_ERR(auth);
+
+ con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
+ con->out_connect.authorizer_len = auth ?
+ cpu_to_le32(auth->authorizer_buf_len) : 0;
+
+ ceph_con_out_kvec_add(con, sizeof (con->out_connect),
+ &con->out_connect);
+ if (auth && auth->authorizer_buf_len)
+ ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
+ auth->authorizer_buf);
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
- return prepare_connect_authorizer(con);
+ return 0;
}
/*
@@ -816,7 +828,7 @@ static void iter_bio_next(struct bio **bio_iter, int *seg)
static int write_partial_msg_pages(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
- unsigned data_len = le32_to_cpu(msg->hdr.data_len);
+ unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
size_t len;
bool do_datacrc = !con->msgr->nocrc;
int ret;
@@ -992,11 +1004,10 @@ static int prepare_read_message(struct ceph_connection *con)
static int read_partial(struct ceph_connection *con,
- int *to, int size, void *object)
+ int end, int size, void *object)
{
- *to += size;
- while (con->in_base_pos < *to) {
- int left = *to - con->in_base_pos;
+ while (con->in_base_pos < end) {
+ int left = end - con->in_base_pos;
int have = size - left;
int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
if (ret <= 0)
@@ -1012,37 +1023,52 @@ static int read_partial(struct ceph_connection *con,
*/
static int read_partial_banner(struct ceph_connection *con)
{
- int ret, to = 0;
+ int size;
+ int end;
+ int ret;
dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
/* peer's banner */
- ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
+ size = strlen(CEPH_BANNER);
+ end = size;
+ ret = read_partial(con, end, size, con->in_banner);
if (ret <= 0)
goto out;
- ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
- &con->actual_peer_addr);
+
+ size = sizeof (con->actual_peer_addr);
+ end += size;
+ ret = read_partial(con, end, size, &con->actual_peer_addr);
if (ret <= 0)
goto out;
- ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
- &con->peer_addr_for_me);
+
+ size = sizeof (con->peer_addr_for_me);
+ end += size;
+ ret = read_partial(con, end, size, &con->peer_addr_for_me);
if (ret <= 0)
goto out;
+
out:
return ret;
}
static int read_partial_connect(struct ceph_connection *con)
{
- int ret, to = 0;
+ int size;
+ int end;
+ int ret;
dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
- ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
+ size = sizeof (con->in_reply);
+ end = size;
+ ret = read_partial(con, end, size, &con->in_reply);
if (ret <= 0)
goto out;
- ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
- con->auth_reply_buf);
+
+ size = le32_to_cpu(con->in_reply.authorizer_len);
+ end += size;
+ ret = read_partial(con, end, size, con->auth_reply_buf);
if (ret <= 0)
goto out;
@@ -1377,7 +1403,8 @@ static int process_connect(struct ceph_connection *con)
return -1;
}
con->auth_retry = 1;
- ret = prepare_write_connect(con->msgr, con, 0);
+ ceph_con_out_kvec_reset(con);
+ ret = prepare_write_connect(con);
if (ret < 0)
return ret;
prepare_read_connect(con);
@@ -1397,7 +1424,10 @@ static int process_connect(struct ceph_connection *con)
ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr.in_addr));
reset_connection(con);
- prepare_write_connect(con->msgr, con, 0);
+ ceph_con_out_kvec_reset(con);
+ ret = prepare_write_connect(con);
+ if (ret < 0)
+ return ret;
prepare_read_connect(con);
/* Tell ceph about it. */
@@ -1420,7 +1450,10 @@ static int process_connect(struct ceph_connection *con)
le32_to_cpu(con->out_connect.connect_seq),
le32_to_cpu(con->in_connect.connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
- prepare_write_connect(con->msgr, con, 0);
+ ceph_con_out_kvec_reset(con);
+ ret = prepare_write_connect(con);
+ if (ret < 0)
+ return ret;
prepare_read_connect(con);
break;
@@ -1434,7 +1467,10 @@ static int process_connect(struct ceph_connection *con)
le32_to_cpu(con->in_connect.global_seq));
get_global_seq(con->msgr,
le32_to_cpu(con->in_connect.global_seq));
- prepare_write_connect(con->msgr, con, 0);
+ ceph_con_out_kvec_reset(con);
+ ret = prepare_write_connect(con);
+ if (ret < 0)
+ return ret;
prepare_read_connect(con);
break;
@@ -1491,10 +1527,10 @@ static int process_connect(struct ceph_connection *con)
*/
static int read_partial_ack(struct ceph_connection *con)
{
- int to = 0;
+ int size = sizeof (con->in_temp_ack);
+ int end = size;
- return read_partial(con, &to, sizeof(con->in_temp_ack),
- &con->in_temp_ack);
+ return read_partial(con, end, size, &con->in_temp_ack);
}
@@ -1554,7 +1590,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
static int read_partial_message_pages(struct ceph_connection *con,
struct page **pages,
- unsigned data_len, bool do_datacrc)
+ unsigned int data_len, bool do_datacrc)
{
void *p;
int ret;
@@ -1587,7 +1623,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
#ifdef CONFIG_BLOCK
static int read_partial_message_bio(struct ceph_connection *con,
struct bio **bio_iter, int *bio_seg,
- unsigned data_len, bool do_datacrc)
+ unsigned int data_len, bool do_datacrc)
{
struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
void *p;
@@ -1627,9 +1663,10 @@ static int read_partial_message_bio(struct ceph_connection *con,
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
+ int size;
+ int end;
int ret;
- int to, left;
- unsigned front_len, middle_len, data_len;
+ unsigned int front_len, middle_len, data_len;
bool do_datacrc = !con->msgr->nocrc;
int skip;
u64 seq;
@@ -1638,15 +1675,11 @@ static int read_partial_message(struct ceph_connection *con)
dout("read_partial_message con %p msg %p\n", con, m);
/* header */
- while (con->in_base_pos < sizeof(con->in_hdr)) {
- left = sizeof(con->in_hdr) - con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->in_hdr + con->in_base_pos,
- left);
- if (ret <= 0)
- return ret;
- con->in_base_pos += ret;
- }
+ size = sizeof (con->in_hdr);
+ end = size;
+ ret = read_partial(con, end, size, &con->in_hdr);
+ if (ret <= 0)
+ return ret;
crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
if (cpu_to_le32(crc) != con->in_hdr.crc) {
@@ -1759,16 +1792,12 @@ static int read_partial_message(struct ceph_connection *con)
}
/* footer */
- to = sizeof(m->hdr) + sizeof(m->footer);
- while (con->in_base_pos < to) {
- left = to - con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
- (con->in_base_pos - sizeof(m->hdr)),
- left);
- if (ret <= 0)
- return ret;
- con->in_base_pos += ret;
- }
+ size = sizeof (m->footer);
+ end += size;
+ ret = read_partial(con, end, size, &m->footer);
+ if (ret <= 0)
+ return ret;
+
dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
m, front_len, m->footer.front_crc, middle_len,
m->footer.middle_crc, data_len, m->footer.data_crc);
@@ -1835,7 +1864,6 @@ static void process_message(struct ceph_connection *con)
*/
static int try_write(struct ceph_connection *con)
{
- struct ceph_messenger *msgr = con->msgr;
int ret = 1;
dout("try_write start %p state %lu nref %d\n", con, con->state,
@@ -1846,7 +1874,11 @@ more:
/* open the socket first? */
if (con->sock == NULL) {
- prepare_write_connect(msgr, con, 1);
+ ceph_con_out_kvec_reset(con);
+ prepare_write_banner(con);
+ ret = prepare_write_connect(con);
+ if (ret < 0)
+ goto out;
prepare_read_banner(con);
set_bit(CONNECTING, &con->state);
clear_bit(NEGOTIATING, &con->state);
@@ -2345,9 +2377,9 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
{
mutex_lock(&con->mutex);
if (con->in_msg && con->in_msg == msg) {
- unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
- unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
- unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+ unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+ unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+ unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
/* skip rest of message */
dout("con_revoke_pages %p msg %p revoked\n", con, msg);