#include "rx_locl.h" RCSID("$arla: rx_rdwr.c,v 1.9 2002/12/15 11:45:19 lha Exp $"); int rx_Read(struct rx_call *call, void *vbuf, int nbytes) { struct rx_packet *rp; int requestCount; char *buf = (char *)vbuf; SPLVAR; /* XXXX took out clock_NewTime from here. Was it needed? */ requestCount = nbytes; NETPRI; GLOBAL_LOCK(); RX_MUTEX_ENTER(&call->lock); while (nbytes) { if (call->nLeft == 0) { /* Get next packet */ for (;;) { if (call->error || (call->mode != RX_MODE_RECEIVING)) { if (call->error) { RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return 0; } if (call->mode == RX_MODE_SENDING) { rx_FlushWrite(call); continue; } } if (queue_IsNotEmpty(&call->rq)) { /* Check that next packet available is next in sequence */ rp = queue_First(&call->rq, rx_packet); if (rp->header.seq == call->rnext) { long error; struct rx_connection *conn = call->conn; queue_Remove(rp); /* * RXS_CheckPacket called to undo RXS_PreparePacket's * work. It may reduce the length of the packet by * up to conn->maxTrailerSize, to reflect the length * of the data + the header. */ if ((error = RXS_CheckPacket(conn->securityObject, call, rp)) != 0) { /* * Used to merely shut down the call, but now we * shut down the whole connection since this may * indicate an attempt to hijack it */ rxi_ConnectionError(conn, error); rp = rxi_SendConnectionAbort(conn, rp); rxi_FreePacket(rp); RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return 0; } call->rnext++; call->currentPacket = rp; call->curvec = 1; /* 0th vec is always header */ /* * begin at the beginning [ more or less ], continue * on until the end, then stop. */ call->curpos = call->conn->securityHeaderSize; /* * Notice that this code works correctly if the data * size is 0 (which it may be--no reply arguments * from server, for example). This relies heavily on * the fact that the code below immediately frees the * packet (no yields, etc.). If it didn't, this * would be a problem because a value of zero for * call->nLeft normally means that there is no read * packet */ call->nLeft = rp->length; if (rp->header.flags & RX_LAST_PACKET) call->flags |= RX_CALL_RECEIVE_DONE; /* * now, if we haven't send a hard ack for window/2 * packets we spontaneously generate one, to take * care of the case where (most likely in the kernel) * we receive a window-full of packets, and ack all * of them before any are read by the user, thus not * hard-acking any of them. The sender retransmits * in this case only under a timer, which is a real * loser */ { int ack_window; #ifdef ADAPT_WINDOW ack_window = call->conn->peer->maxWindow >> 1; #else /* !ADAPT_WINDOW */ ack_window = rx_Window >> 1; #endif/* ADAPT_WINDOW */ if (call->rnext > (call->lastAcked + ack_window)) rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY); } break; } } /* MTUXXX doesn't there need to be an "else" here ??? */ /* Are there ever going to be any more packets? */ if (call->flags & RX_CALL_RECEIVE_DONE) { RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return requestCount - nbytes; } /* Wait for in-sequence packet */ call->flags |= RX_CALL_READER_WAIT; clock_NewTime(); call->startWait = clock_Sec(); RX_MUTEX_EXIT(&call->lock); RX_MUTEX_ENTER(&call->lockq); #ifdef RX_ENABLE_LOCKS while (call->flags & RX_CALL_READER_WAIT) cv_wait(&call->cv_rq, &call->lockq); #else osi_rxSleep(&call->rq); #endif RX_MUTEX_EXIT(&call->lockq); RX_MUTEX_ENTER(&call->lock); call->startWait = 0; } } else { /* assert(call->currentPacket); */ /* * MTUXXX this should be replaced by * some error-recovery code before * shipping */ /* * It's possible for call->nLeft to be smaller than * any particular iov_len. Usually, recvmsg doesn't change the * iov_len, since it reflects the size of the buffer. We have to * keep track of the number of bytes read in the length field of * the packet struct. On the final portion of a received packet, * it's almost certain that call->nLeft will be smaller than the * final buffer. */ unsigned int t, l1; caddr_t p1; while (nbytes && call->currentPacket) { p1 = (char*)call->currentPacket->wirevec[call->curvec].iov_base + call->curpos; l1 = call->currentPacket->wirevec[call->curvec].iov_len - call->curpos; t = MIN(l1, nbytes); t = MIN(t, call->nLeft); memcpy(buf, p1, t); p1 += t; buf += t; l1 -= t; nbytes -= t; call->curpos += t; call->nLeft -= t; if (call->nLeft == 0) { /* out of packet. Get another one. */ rxi_FreePacket(call->currentPacket); call->currentPacket = NULL; } else if (l1 == 0) { /* need to get another struct iov */ if (++call->curvec > call->currentPacket->niovecs) { /* * current packet is exhausted, get ready for another */ /* * don't worry about curvec and stuff, they get set * somewhere else */ rxi_FreePacket(call->currentPacket); call->currentPacket = NULL; call->nLeft = 0; } else call->curpos = 0; } } if (nbytes == 0) { /* user buffer is full, return */ RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return requestCount; } } } /* while (nbytes) ... */ RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return requestCount; } int rx_Write(struct rx_call *call, const void *vbuf, int nbytes) { struct rx_connection *conn = call->conn; int requestCount = nbytes; const char *buf = (const char *)vbuf; SPLVAR; GLOBAL_LOCK(); RX_MUTEX_ENTER(&call->lock); NETPRI; if (call->mode != RX_MODE_SENDING) { if ((conn->type == RX_SERVER_CONNECTION) && (call->mode == RX_MODE_RECEIVING)) { call->mode = RX_MODE_SENDING; if (call->currentPacket) { rxi_FreePacket(call->currentPacket); call->currentPacket = NULL; call->nLeft = 0; call->nFree = 0; } } else { RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return 0; } } /* * Loop condition is checked at end, so that a write of 0 bytes will * force a packet to be created--specially for the case where there are 0 * bytes on the stream, but we must send a packet anyway. */ do { if (call->nFree == 0) { struct rx_packet *cp = call->currentPacket; if (!call->error && call->currentPacket) { clock_NewTime(); /* Bogus: need new time package */ /* * The 0, below, specifies that it is not the last packet: * there will be others. PrepareSendPacket may alter the * packet length by up to conn->securityMaxTrailerSize */ rxi_PrepareSendPacket(call, cp, 0); queue_Append(&call->tq, cp); rxi_Start(0, call); } /* Wait for transmit window to open up */ while (!call->error && call->tnext + 1 > call->tfirst + call->twind) { clock_NewTime(); call->startWait = clock_Sec(); RX_MUTEX_EXIT(&call->lock); RX_MUTEX_ENTER(&call->lockw); #ifdef RX_ENABLE_LOCKS cv_wait(&call->cv_twind, &call->lockw); #else call->flags |= RX_CALL_WAIT_WINDOW_ALLOC; osi_rxSleep(&call->twind); #endif RX_MUTEX_EXIT(&call->lockw); RX_MUTEX_ENTER(&call->lock); call->startWait = 0; } if ((call->currentPacket = rxi_AllocSendPacket(call, nbytes)) != 0) { call->nFree = call->currentPacket->length; call->curvec = 1; /* 0th vec is always header */ /* * begin at the beginning [ more or less ], continue on until * the end, then stop. */ call->curpos = call->conn->securityHeaderSize; } if (call->error) { if (call->currentPacket) { rxi_FreePacket(call->currentPacket); call->currentPacket = NULL; } RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return 0; } } /* * If the remaining bytes fit in the buffer, then store them and * return. Don't ship a buffer that's full immediately to the * peer--we don't know if it's the last buffer yet */ if (!(call->currentPacket)) { call->nFree = 0; } { struct rx_packet *cp = call->currentPacket; unsigned int t, l1; caddr_t p1; while (nbytes && call->nFree) { p1 = (char *)cp->wirevec[call->curvec].iov_base + call->curpos; l1 = cp->wirevec[call->curvec].iov_len - call->curpos; t = MIN(call->nFree, MIN(l1, nbytes)); memcpy(p1, buf, t); p1 += t; buf += t; l1 -= t; nbytes -= t; call->curpos += t; call->nFree -= t; if (!l1) { call->curpos = 0; /* need to get another struct iov */ if (++call->curvec >= cp->niovecs) { /* current packet is full, extend or send it */ call->nFree = 0; } } if (!call->nFree) { int len, mud; len = cp->length; mud = rx_MaxUserDataSize(conn); if (mud > len) { int want; if (nbytes) want = MIN(nbytes, mud - len); else want = mud - len; rxi_AllocDataBuf(cp, want); if (cp->length > mud) cp->length = mud; call->nFree += (cp->length - len); } } } /* * while bytes to send and room to * send them */ /* might be out of space now */ if (!nbytes) { RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return requestCount; } else; /* * more data to send, so get another * packet and keep going */ } } while (nbytes); RX_MUTEX_EXIT(&call->lock); GLOBAL_UNLOCK(); USERPRI; return requestCount - nbytes; } /* * Flush any buffered data to the stream, switch to read mode * (clients) or to EOF mode (servers) */ void rx_FlushWrite(struct rx_call *call) { SPLVAR; NETPRI; if (call->mode == RX_MODE_SENDING) { struct rx_packet *cp; call->mode = (call->conn->type == RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF); if (call->currentPacket) { /* cp->length is only supposed to be the user's data */ cp = call->currentPacket; /* * cp->length was already set to (then-current) MaxUserDataSize * or less. */ cp->length -= call->nFree; call->currentPacket = (struct rx_packet *) 0; call->nFree = 0; } else { cp = rxi_AllocSendPacket(call, 0); if (!cp) { /* Mode can no longer be MODE_SENDING */ USERPRI; return; } cp->length = 0; cp->niovecs = 1; /* just the header */ call->nFree = 0; } /* The 1 specifies that this is the last packet */ rxi_PrepareSendPacket(call, cp, 1); queue_Append(&call->tq, cp); rxi_Start(0, call); } USERPRI; }