00001 00036 #include <itpp/protocol/tcp.h> 00037 #include <itpp/base/itfile.h> 00038 #include <limits> 00039 #include <cstdlib> 00040 #include <ctime> 00041 00042 00043 #ifdef _MSC_VER 00044 #pragma warning(disable:4355) 00045 #endif 00046 00047 namespace itpp { 00048 00049 // -------------------- Default parameters ---------------------------------- 00050 00051 // TCP sender and receiver 00052 00053 #define TCP_HEADERLENGTH 40 00054 00055 // TCP sender 00056 00057 #define TCP_VERSION kReno 00058 #define TCP_SMSS 1460 00059 #define TCP_INITIALCWNDREL 2 // related to MSS 00060 #define TCP_INITIALSSTHRESHREL 1 // related to MaxCWnd 00061 #define TCP_MAXCWNDREL 32 // related to MSS 00062 #define TCP_DUPACKS 3 00063 #define TCP_INITIALRTT 1 00064 const double TCP_STIMERGRAN = 0.2; 00065 const double TCP_SWSATIMERVALUE = 0.2; 00066 #define TCP_MAXBACKOFF 64 00067 const double TCP_MAXRTO = std::numeric_limits<double>::max(); 00068 #define TCP_IMMEDIATEBACKOFFRESET false 00069 #define TCP_TIMESTAMPS false 00070 #define TCP_KARN true 00071 #define TCP_NAGLE false 00072 #define TCP_GOBACKN true 00073 #define TCP_FLIGHTSIZERECOVERY false 00074 #define TCP_RENOCONSERVATION true 00075 #define TCP_CAREFULSSTHRESHREDUCTION true 00076 #define TCP_IGNOREDUPACKONTORECOVERY true 00077 #define TCP_CAREFULMULFASTRTXAVOIDANCE true 00078 #define TCP_RESTARTAFTERIDLE true 00079 00080 // TCP receiver 00081 00082 #define TCP_RMSS 1460 00083 const int TCP_BUFFERSIZE = std::numeric_limits<int>::max()/4; 00084 #define TCP_DELAYEDACK true 00085 const double TCP_ACKDELAYTIME = 0.2; 00086 #define TCP_SENDPERIODICACKS false 00087 #define TCP_STRICTPERIODICACKS false 00088 #define TCP_PERIODICACKINTERVAL 1 00089 #define TCP_ACKSCHEDULINGDELAY 0 00090 #define TCP_ACKBUFFERWRITE false 00091 #define TCP_ACKBUFFERREAD true 00092 const int TCP_MAXUSERBLOCKSIZE = std::numeric_limits<int>::max()/4; 00093 #define TCP_MINUSERBLOCKSIZE 1 00094 #define TCP_USERBLOCKPROCDELAY 0 00095 00096 // TCP generator 00097 00098 #define TCPGEN_BLOCKSIZE 1460 00099 00100 // TCP applications 00101 00102 #define TCPAPP_MAXNOOFACTIVEAPPS 500 00103 #define TCPAPP_DISTSTATARRAYSIZE 100 00104 #define TCPAPP_DISTSTATMAXGOODPUT 1000 00105 #define TCPAPP_DISTSTATMAXTRANSFERTIME 10000 00106 #define TCPAPP_CONDMEANSTATARRAYSIZE 100 00107 #define TCPAPP_CONDMEANSTATMAXREQSIZE 100000 00108 00109 00110 00111 inline int min(int opd1, int opd2) 00112 { 00113 return (opd1 < opd2)? opd1 : opd2; 00114 } 00115 00116 00117 inline int max(int opd1, int opd2) 00118 { 00119 return (opd1 > opd2)? opd1 : opd2; 00120 } 00121 00122 00123 // round is used to map a double value (e.g. RTO in TTCPSender) to the 00124 // next higher value of a certain granularity (e.g. timer granularity). 00125 inline double round (const double value, const double granularity) 00126 { 00127 return ceil(value / granularity) * granularity; 00128 } 00129 00130 // -------------------- TCP_Segment ---------------------------------------- 00131 00132 TCP_Segment::TCP_Segment() : 00133 seq_begin(), 00134 seq_end() 00135 { 00136 } 00137 00138 TCP_Segment::TCP_Segment(const Sequence_Number &sn_begin, const Sequence_Number &sn_end) : 00139 seq_begin(sn_begin), 00140 seq_end(sn_end) 00141 { 00142 it_assert(seq_begin <= seq_end, "TCP_Segment::TCP_Segment, end byte " + to_str(seq_end.value()) + 00143 " < begin byte " + to_str(seq_begin.value())); 00144 } 00145 00146 00147 TCP_Segment::TCP_Segment(const TCP_Segment &segment) : 00148 seq_begin(segment.seq_begin), 00149 seq_end(segment.seq_end) 00150 { 00151 } 00152 00153 00154 TCP_Segment &TCP_Segment::operator=(const TCP_Segment &segment) 00155 { 00156 this->seq_begin = segment.seq_begin; 00157 this->seq_end = segment.seq_end; 00158 00159 return *this; 00160 } 00161 00162 00163 void TCP_Segment::combine(const TCP_Segment &segment) 00164 { 00165 it_assert(can_be_combined(segment), "TCP_Segment::CombineWith, segments cannot be combined"); 00166 00167 seq_begin = min(seq_begin, segment.seq_begin); 00168 seq_end = max(seq_end, segment.seq_end); 00169 } 00170 00171 00172 std::ostream & operator<<(std::ostream &os, const TCP_Segment &segment) 00173 { 00174 os << "(" << segment.seq_begin << "," << segment.seq_end << ")"; 00175 return os; 00176 } 00177 00178 00179 // -------------------- TCP_Packet ---------------------------------------- 00180 TCP_Packet::TCP_Packet() : 00181 fSegment(), 00182 fACK(), 00183 fWnd(0), 00184 fSessionId(0), 00185 fInfo(0) 00186 { 00187 } 00188 00189 00190 TCP_Packet::TCP_Packet(const TCP_Packet &packet) : 00191 fSegment(packet.fSegment), 00192 fACK(packet.fACK), 00193 fWnd(packet.fWnd), 00194 fSessionId(packet.fSessionId), 00195 fInfo(0) 00196 { 00197 std::cout << "TCP_Packet::TCP_Packet ############" << " "; 00198 00199 if (packet.fInfo != 0) { 00200 std::cout << "TCP_Packet::TCP_Packet rhs.fInfo ###########" << " "; 00201 fInfo = new TDebugInfo(*packet.fInfo); 00202 } 00203 } 00204 00205 00206 TCP_Packet::~TCP_Packet() 00207 { 00208 delete fInfo; 00209 } 00210 00211 00212 TCP_Packet & TCP_Packet::clone() const 00213 { 00214 return *new TCP_Packet(*this); 00215 } 00216 00217 00218 void TCP_Packet::set_info(unsigned ssThresh, unsigned recWnd, unsigned cWnd, 00219 double estRTT, Sequence_Number sndUna, 00220 Sequence_Number sndNxt, bool isRtx) 00221 { 00222 if (fInfo == 0) { 00223 fInfo = new TDebugInfo; 00224 } 00225 00226 fInfo->fSSThresh = ssThresh; 00227 fInfo->fRecWnd = recWnd; 00228 fInfo->fCWnd = cWnd; 00229 fInfo->fRTTEstimate = estRTT; 00230 fInfo->fSndUna = sndUna; 00231 fInfo->fSndNxt = sndNxt; 00232 fInfo->fRtxFlag = isRtx; 00233 } 00234 00235 00236 void TCP_Packet::print_header(std::ostream &out) const 00237 { 00238 std::cout << "Hello!\n"; 00239 00240 std::cout << "Ses = " << get_session_id() << " "; 00241 00242 std::cout << "Segment = " << get_segment() << " " 00243 << "ACK = " << get_ACK() << " " 00244 << "Wnd = " << get_wnd() << " "; 00245 00246 std::cout << "DestPort = " << fDestinationPort << " " 00247 << "SourcePort = " << fSourcePort << " "; 00248 00249 00250 if (fInfo != 0) { 00251 std::cout << "SndSSThresh = " << fInfo->fSSThresh << " "; 00252 std::cout << "RecWnd = " << fInfo->fRecWnd << " "; 00253 std::cout << "SndCWnd = " << fInfo->fCWnd << " "; 00254 std::cout << "RTTEstimate = " << fInfo->fRTTEstimate << " "; 00255 std::cout << "RtxFlag = " << fInfo->fRtxFlag; 00256 } 00257 else 00258 std::cout << "fInfo = " << fInfo << " "; 00259 00260 std::cout << std::endl; 00261 00262 } 00263 00264 00265 00266 std::ostream & operator<<(std::ostream & out, TCP_Packet & msg) 00267 { 00268 msg.print_header(out); 00269 return out; 00270 } 00271 00272 00273 // -------------------- TCP_Sender ---------------------------------------- 00274 TCP_Sender::TCP_Sender(int label) : 00275 fLabel(label), 00276 fTCPVersion(TCP_VERSION), 00277 fMSS(TCP_SMSS), 00278 fTCPIPHeaderLength(TCP_HEADERLENGTH), 00279 fInitialRTT(TCP_INITIALRTT), 00280 fInitialCWnd(0), // default initialization see below 00281 fInitialSSThresh(0), // default initialization see below 00282 fMaxCWnd(0), // default initialization see below 00283 fDupACKThreshold(TCP_DUPACKS), 00284 fTimerGranularity(TCP_STIMERGRAN), 00285 fMaxRTO(TCP_MAXRTO), 00286 fMaxBackoff(TCP_MAXBACKOFF), 00287 fImmediateBackoffReset(TCP_IMMEDIATEBACKOFFRESET), 00288 fKarn(TCP_KARN), 00289 fGoBackN(TCP_GOBACKN), 00290 fFlightSizeRecovery(TCP_FLIGHTSIZERECOVERY), 00291 fRenoConservation(TCP_RENOCONSERVATION), 00292 fCarefulSSThreshReduction(TCP_CAREFULSSTHRESHREDUCTION), 00293 fIgnoreDupACKOnTORecovery(TCP_IGNOREDUPACKONTORECOVERY), 00294 fCarefulMulFastRtxAvoidance(TCP_CAREFULMULFASTRTXAVOIDANCE), 00295 fNagle(TCP_NAGLE), 00296 fSWSATimerValue(TCP_SWSATIMERVALUE), 00297 fRestartAfterIdle(TCP_RESTARTAFTERIDLE), 00298 fDebug(false), 00299 fTrace(false), 00300 fSessionId(0), 00301 fRtxTimer(*this, &TCP_Sender::HandleRtxTimeout), 00302 fSWSATimer(*this, &TCP_Sender::HandleSWSATimeout)/*,*/ 00303 { 00304 00305 // default values and parameter check for MaxCWND, InitCWND, InitSSThresh 00306 if (fMaxCWnd == 0) { 00307 fMaxCWnd = (unsigned)(TCP_MAXCWNDREL * fMSS); 00308 } else if (fMaxCWnd < fMSS) { 00309 // throw (UL_CException("TCP_Sender::TCP_Sender", 00310 // "MaxCWnd must be >= MSS")); 00311 } 00312 00313 if (fInitialCWnd == 0) { 00314 fInitialCWnd = (unsigned)(TCP_INITIALCWNDREL * fMSS); 00315 } else if ((fInitialCWnd < fMSS) || (fInitialCWnd > fMaxCWnd)) { 00316 // throw (UL_CException("TCP_Sender::TCP_Sender", 00317 // "initial CWnd must be >= MSS and <= MaxCWnd")); 00318 } 00319 00320 if ((fInitialSSThresh == 0) && (fMaxCWnd >= 2 * fMSS)) { 00321 fInitialSSThresh = (unsigned)(TCP_INITIALSSTHRESHREL * fMaxCWnd); 00322 } else if ((fInitialSSThresh < 2*fMSS) || (fInitialCWnd > fMaxCWnd)) { 00323 // throw (UL_CException("TCP_Sender::TCP_Sender", 00324 // "initial CWnd must be >= 2*MSS and <= MaxCWnd")); 00325 } 00326 00327 setup(); 00328 00329 InitStatistics(); 00330 00331 00332 tcp_send.set_name("TCP Send"); 00333 tcp_receive_ack.forward(this, &TCP_Sender::ReceiveMessageFromNet); 00334 tcp_receive_ack.set_name("TCP ACK"); 00335 tcp_socket_write.forward(this, &TCP_Sender::HandleUserMessageIndication); 00336 tcp_socket_write.set_name("SocketWrite"); 00337 tcp_release.forward(this, &TCP_Sender::release); 00338 tcp_release.set_name("Release"); 00339 00340 } 00341 00342 00343 TCP_Sender::~TCP_Sender () 00344 { 00345 } 00346 00347 void TCP_Sender::set_debug(const bool enable_debug) 00348 { 00349 fDebug = enable_debug; 00350 tcp_send.set_debug(enable_debug); 00351 } 00352 00353 void TCP_Sender::set_debug(bool enable_debug, bool enable_signal_debug) 00354 { 00355 fDebug = enable_debug; 00356 tcp_send.set_debug(enable_signal_debug); 00357 } 00358 00359 void TCP_Sender::set_trace(const bool enable_trace) 00360 { 00361 fTrace = enable_trace; 00362 } 00363 00364 void TCP_Sender::set_label(int label) 00365 { 00366 fLabel = label; 00367 } 00368 00369 void TCP_Sender::setup() 00370 { 00371 fSndUna = 0; 00372 fSndNxt = 0; 00373 fSndMax = 0; 00374 fMaxRecWnd = 0; 00375 fRecWnd = fMaxCWnd; 00376 fUserNxt = 0; 00377 fCWnd = fInitialCWnd; 00378 fSSThresh = fInitialSSThresh; 00379 fRecoveryDupACK = 0; 00380 fRecoveryTO = 0; 00381 fDupACKCnt = 0; 00382 00383 // timers 00384 fBackoff = 1; 00385 fPendingBackoffReset = false; 00386 fLastSendTime = Event_Queue::now(); 00387 00388 // RTT measurement 00389 fTimUna = 0; 00390 fSRTT = 0; 00391 fRTTVar = 0; 00392 fRTTEstimate = fInitialRTT; 00393 fRTTMPending = false; 00394 fRTTMByte = 0; 00395 00396 CWnd_val.set_size(1000); 00397 CWnd_val.zeros(); 00398 CWnd_time.set_size(1000); 00399 CWnd_time.zeros(); 00400 CWnd_val(0) = fInitialCWnd; 00401 CWnd_time(0) = 0; 00402 CWnd_index=1; 00403 00404 SSThresh_val.set_size(1000); 00405 SSThresh_val.zeros(); 00406 SSThresh_time.set_size(1000); 00407 SSThresh_time.zeros(); 00408 SSThresh_val(0) = fInitialSSThresh; 00409 SSThresh_time(0) = 0; 00410 SSThresh_index=1; 00411 00412 sent_seq_num_val.set_size(1000); 00413 sent_seq_num_val.zeros(); 00414 sent_seq_num_time.set_size(1000); 00415 sent_seq_num_time.zeros(); 00416 sent_seq_num_val(0) = 0; 00417 sent_seq_num_time(0) = 0; 00418 sent_seq_num_index=1; 00419 00420 sender_recv_ack_seq_num_val.set_size(1000); 00421 sender_recv_ack_seq_num_val.zeros(); 00422 sender_recv_ack_seq_num_time.set_size(1000); 00423 sender_recv_ack_seq_num_time.zeros(); 00424 sender_recv_ack_seq_num_val(0) = 0; 00425 sender_recv_ack_seq_num_time(0) = 0; 00426 sender_recv_ack_seq_num_index=1; 00427 00428 RTTEstimate_val.set_size(1000); 00429 RTTEstimate_val.zeros(); 00430 RTTEstimate_time.set_size(1000); 00431 RTTEstimate_time.zeros(); 00432 RTTEstimate_val(0) = fInitialRTT; 00433 RTTEstimate_time(0) = 0; 00434 RTTEstimate_index=1; 00435 00436 RTTsample_val.set_size(1000); 00437 RTTsample_val.zeros(); 00438 RTTsample_time.set_size(1000); 00439 RTTsample_time.zeros(); 00440 RTTsample_val(0) = 0; 00441 RTTsample_time(0) = 0; 00442 RTTsample_index=1; 00443 00444 } 00445 00446 std::string TCP_Sender::GenerateFilename() 00447 { 00448 time_t rawtime; 00449 struct tm *timeinfo; 00450 timeinfo = localtime(&rawtime); 00451 std::ostringstream filename_stream; 00452 filename_stream << "trace_tcp_sender_u" << fLabel 00453 << "_" << 1900+timeinfo->tm_year 00454 << "_" << timeinfo->tm_mon 00455 << "_" << timeinfo->tm_mday 00456 << "__" << timeinfo->tm_hour 00457 << "_" << timeinfo->tm_min 00458 << "_" << timeinfo->tm_sec 00459 << "_.it"; 00460 return filename_stream.str(); 00461 } 00462 00463 00464 void TCP_Sender::release(std::string file) 00465 { 00466 std::string filename; 00467 fSessionId++; 00468 00469 fRtxTimer.Reset(); 00470 fSWSATimer.Reset(); 00471 00472 if (fTrace) { 00473 if (file == "") 00474 filename = GenerateFilename(); 00475 else 00476 filename = file; 00477 00478 save_trace(filename); 00479 } 00480 } 00481 00482 00483 void TCP_Sender::InitStatistics() 00484 { 00485 fNumberOfTimeouts = 0; 00486 fNumberOfIdleTimeouts = 0; 00487 fNumberOfFastRetransmits = 0; 00488 fNumberOfRTTMeasurements = 0; 00489 fNumberOfReceivedACKs = 0; 00490 } 00491 00492 00493 void TCP_Sender::StopTransientPhase() 00494 { 00495 InitStatistics(); 00496 } 00497 00498 00499 void TCP_Sender::HandleUserMessageIndication(itpp::Packet *user_data_p) 00500 { 00501 if (fDebug) { 00502 std::cout << "TCP_Sender::HandleUserMessageIndication" 00503 << " byte_size=" << user_data_p->bit_size()/8 00504 << " ptr=" << user_data_p 00505 << " time=" << Event_Queue::now() << std::endl; 00506 } 00507 00508 SocketWriteQueue.push(user_data_p); 00509 00510 SendNewData(); // will call GetMessage (via GetNextSegmentSize) 00511 // if new data can be sent 00512 } 00513 00514 00515 void TCP_Sender::ReceiveMessageFromNet(itpp::Packet *msg) 00516 { 00517 TCP_Packet & packet = (TCP_Packet &)*msg; 00518 00519 if (fDebug) { 00520 std::cout << "TCP_Sender::ReceiveMessageFromNet" 00521 << " byte_size=" << msg->bit_size()/8 00522 << " ptr=" << msg 00523 << " time=" << Event_Queue::now() << std::endl; 00524 } 00525 00526 if((packet.get_session_id() == fSessionId) && // ACK of current session 00527 (packet.get_ACK() >= fSndUna)) { // ACK is OK 00528 HandleACK(packet); 00529 } 00530 00531 delete &packet; 00532 } 00533 00534 00535 void TCP_Sender::HandleACK(TCP_Packet &msg) 00536 { 00537 it_assert(msg.get_ACK() <= fSndMax, "TCP_Sender::HandleACK, received ACK > SndMax at "); 00538 00539 fNumberOfReceivedACKs++; 00540 00541 if (fTrace) { 00542 TraceACKedSeqNo(msg.get_ACK()); 00543 } 00544 00545 if (fDebug) { 00546 std::cout << "sender " << fLabel << ": " 00547 << "receive ACK: " 00548 << " t = " << Event_Queue::now() << ", " 00549 << msg << std::endl; 00550 } 00551 00552 // update receiver advertised window size 00553 fRecWnd = msg.get_wnd(); 00554 fMaxRecWnd = max(fRecWnd, fMaxRecWnd); 00555 00556 if (msg.get_ACK() == fSndUna) { // duplicate ACK 00557 00558 bool ignoreDupACK = (fSndMax == fSndUna); // no outstanding data 00559 00560 if (fIgnoreDupACKOnTORecovery) { 00561 // don't count dupacks during TO recovery! 00562 if (fCarefulMulFastRtxAvoidance) { // see RFC 2582, Section 5 00563 // like in Solaris 00564 ignoreDupACK = ignoreDupACK || (fSndUna <= fRecoveryTO); 00565 } else { 00566 // like in ns 00567 ignoreDupACK = ignoreDupACK || (fSndUna < fRecoveryTO); 00568 } 00569 } 00570 00571 if (!ignoreDupACK) { 00572 fDupACKCnt++; // count the number of duplicate ACKs 00573 00574 if (fDupACKCnt == fDupACKThreshold) { 00575 // dupack threshold is reached 00576 fNumberOfFastRetransmits++; 00577 00578 fRecoveryDupACK = fSndMax; 00579 00580 ReduceSSThresh(); // halve ssthresh (in most cases) 00581 00582 if ((fTCPVersion == kReno) || (fTCPVersion == kNewReno)) { 00583 fCWnd = fSSThresh; 00584 } else if (fTCPVersion == kTahoe) { 00585 fCWnd = fMSS; 00586 } 00587 00588 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00589 // conservation of packets: 00590 if (fRenoConservation) { 00591 fCWnd += fDupACKThreshold * fMSS; 00592 } 00593 } else if (fTCPVersion == kTahoe) { 00594 if (fGoBackN) { 00595 fSndNxt = fSndUna; // Go-Back-N (like in ns) 00596 } 00597 } 00598 00599 UnaRetransmit(); // initiate retransmission 00600 } else if (fDupACKCnt > fDupACKThreshold) { 00601 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00602 // conservation of packets 00603 // CWnd may exceed MaxCWnd during fast recovery, 00604 // however, the result of SendWindow() is always <= MaxCwnd 00605 if (fRenoConservation) { 00606 fCWnd += fMSS; 00607 } 00608 } 00609 } 00610 } 00611 } else { // new ACK 00612 Sequence_Number oldSndUna = fSndUna; // required for NewReno partial ACK 00613 fSndUna = msg.get_ACK(); 00614 fSndNxt = max(fSndNxt, fSndUna); // required in case of "Go-Back-N" 00615 00616 // reset retransmission timer 00617 00618 if ((fSndUna > fTimUna) && fRtxTimer.IsPending()) { 00619 // seq. no. for which rtx timer is running has been received 00620 fRtxTimer.Reset(); 00621 } 00622 00623 // backoff reset 00624 00625 if (fImmediateBackoffReset) { 00626 fBackoff = 1; 00627 } else { 00628 if (fPendingBackoffReset) { 00629 fBackoff = 1; 00630 fPendingBackoffReset = false; 00631 } else if (fBackoff > 1) { 00632 // reset backoff counter only on next new ACK (this is probably 00633 // the way to operate intended by Karn) 00634 fPendingBackoffReset = true; 00635 } 00636 } 00637 00638 // RTT measurement 00639 00640 if ((fSndUna > fRTTMByte) && fRTTMPending) { 00641 UpdateRTTVariables(Event_Queue::now() - fRTTMStartTime); 00642 fRTTMPending = false; 00643 } 00644 00645 // update CWnd and reset dupack counter 00646 00647 if (fDupACKCnt >= fDupACKThreshold) { 00648 // we are in fast recovery 00649 if (fTCPVersion == kNewReno && fSndUna < fRecoveryDupACK) { 00650 // New Reno partial ACK handling 00651 if (fRenoConservation) { 00652 fCWnd = max(fMSS, fCWnd - (fSndUna - oldSndUna) + fMSS); 00653 } 00654 UnaRetransmit(); // start retransmit immediately 00655 } else { 00656 FinishFastRecovery(); 00657 } 00658 } else { 00659 // no fast recovery 00660 fDupACKCnt = 0; 00661 if (fCWnd < fSSThresh) { 00662 // slow start phase 00663 fCWnd = min (fCWnd + fMSS, fMaxCWnd); 00664 } else { 00665 // congestion avoidance phase 00666 fCWnd += max (fMSS * fMSS / fCWnd, 1); // RFC 2581 00667 fCWnd = min (fCWnd, fMaxCWnd); 00668 } 00669 } 00670 } // new ACK 00671 00672 SendNewData(); // try to send new data (even in the case that a retransmit 00673 // had to be performed) 00674 00675 if (fTrace) { 00676 TraceCWnd(); 00677 } 00678 } 00679 00680 00681 void TCP_Sender::SendNewData(bool skipSWSA) 00682 { 00683 unsigned nextSegmentSize; 00684 00685 it_assert(fSndUna <= fSndNxt, "TCP_Sender::SendNewData, SndUna > SndNxt in sender " + to_str(fLabel) + "!"); 00686 00687 if (fRestartAfterIdle) { 00688 IdleCheck(); 00689 } 00690 00691 bool sillyWindowAvoidanceFailed = false; 00692 00693 while (!sillyWindowAvoidanceFailed && 00694 ((nextSegmentSize = GetNextSegmentSize(fSndNxt)) > 0)) 00695 { 00696 // there is new data to send and window is large enough 00697 00698 // SWSA and Nagle (RFC 1122): assume PUSH to be set 00699 unsigned queuedUnsent = fUserNxt - fSndNxt; 00700 unsigned usableWindow = max(0, (fSndUna + SendWindow()) - fSndNxt); 00701 00702 if (((unsigned)min(queuedUnsent, usableWindow) >= fMSS) || 00703 ((!fNagle || (fSndUna == fSndNxt)) && 00704 ((queuedUnsent <= usableWindow) || // Silly W. A. 00705 ((unsigned)min(queuedUnsent, usableWindow) >= fMaxRecWnd / 2) 00706 ) 00707 ) || 00708 skipSWSA 00709 ) { 00710 // Silly Window Syndrome Avoidance (SWSA) and Nagle passed 00711 00712 TCP_Segment nextSegment(fSndNxt, fSndNxt + nextSegmentSize); 00713 TCP_Packet & msg = * new TCP_Packet (); 00714 00715 msg.set_segment(nextSegment); 00716 msg.set_session_id(fSessionId); 00717 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00718 msg.set_source_port(fLabel); // number for simplicity. 00719 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00720 00721 if (fDebug) { 00722 std::cout << "TCP_Sender::SendNewData," 00723 << " nextSegmentSize=" << nextSegmentSize 00724 << " fTCPIPHeaderLength=" << fTCPIPHeaderLength 00725 << " byte_size=" << msg.bit_size()/8 00726 << " ptr=" << &msg 00727 << " time=" << Event_Queue::now() << std::endl; 00728 } 00729 00730 // no RTT measurement for retransmitted segments 00731 // changes on Dec. 13. 2002 (Ga, Bo, Scharf) 00732 00733 if (!fRTTMPending && fSndNxt >= fSndMax) { // ##Bo## 00734 fRTTMStartTime = Event_Queue::now(); 00735 fRTTMByte = nextSegment.begin(); 00736 fRTTMPending = true; 00737 } 00738 00739 fSndNxt += nextSegmentSize; 00740 fSndMax = max(fSndNxt, fSndMax); 00741 00742 // reset SWSA timer if necessary 00743 if (skipSWSA) { 00744 skipSWSA = false; 00745 } else if (fSWSATimer.IsPending()) { 00746 fSWSATimer.Reset(); 00747 } 00748 00749 // set rtx timer if necessary 00750 if (!fRtxTimer.IsPending()) { 00751 SetRtxTimer(); 00752 } 00753 00754 00755 if (fDebug) { 00756 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00757 fSndUna, fSndNxt, false); 00758 std::cout << "sender " << fLabel 00759 << ": send new data: " 00760 << " t = " << Event_Queue::now() << ", " 00761 << msg << std::endl; 00762 } 00763 00764 SendMsg(msg); 00765 } else { 00766 sillyWindowAvoidanceFailed = true; 00767 // set SWSA timer 00768 if (!fSWSATimer.IsPending()) { 00769 fSWSATimer.Set(fSWSATimerValue); 00770 } 00771 } 00772 } 00773 00774 // set timers in case that no new data could have been sent 00775 if (!fRtxTimer.IsPending()) { 00776 if (fSndMax > fSndUna) { // there is outstanding data 00777 if (!fImmediateBackoffReset && fPendingBackoffReset) { 00778 // backoff is reset if no new data could have been sent since last 00779 // (successfull) retransmission; this is useful in case of 00780 // Reno recovery and multiple losses to avoid that in 00781 // the (unavoidable) series of timeouts the timer value 00782 // increases exponentially as this is not the intention 00783 // of the delayed backoff reset in Karn's algorithm 00784 fBackoff = 1; 00785 fPendingBackoffReset = false; 00786 } 00787 SetRtxTimer(); 00788 } 00789 } 00790 } 00791 00792 00793 void TCP_Sender::UnaRetransmit() 00794 { 00795 // resend after timeout or fast retransmit 00796 unsigned nextSegmentSize = GetNextSegmentSize(fSndUna); 00797 00798 if (nextSegmentSize > 0) { 00799 TCP_Segment nextSegment(fSndUna, fSndUna + nextSegmentSize); 00800 TCP_Packet & msg = *new TCP_Packet(); 00801 msg.set_segment(nextSegment); 00802 msg.set_session_id(fSessionId); 00803 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00804 msg.set_source_port(fLabel); // number for simplicity. 00805 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00806 00807 fSndNxt = max(fSndNxt, fSndUna + nextSegmentSize); 00808 fSndMax = max(fSndNxt, fSndMax); 00809 00810 // The RTT measurement is cancelled if the RTTM byte has a sequence 00811 // number higher or equal than the first retransmitted byte as 00812 // the ACK for the RTTM byte will be delayed by the rtx for at least 00813 // one round 00814 if (fKarn && (nextSegment.begin() <= fRTTMByte) && fRTTMPending) { 00815 fRTTMPending = false; 00816 } 00817 00818 SetRtxTimer(); 00819 00820 if (fDebug) { 00821 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00822 fSndUna, fSndNxt, true); 00823 std::cout << "sender " << fLabel; 00824 if (fDupACKCnt >= fDupACKThreshold) { 00825 std::cout << ": fast rtx: "; 00826 } else { 00827 std::cout << ": TO rtx: "; 00828 } 00829 std::cout << " t = " << Event_Queue::now() << ", " 00830 << msg << std::endl; 00831 } 00832 00833 SendMsg(msg); 00834 } else { 00835 // throw(UL_CException("TCP_Sender::UnaRetransmit", "no bytes to send")); 00836 } 00837 } 00838 00839 00840 void TCP_Sender::FinishFastRecovery() 00841 { 00842 if (fTCPVersion == kTahoe) { 00843 fDupACKCnt = 0; 00844 } else if (fTCPVersion == kReno) { 00845 // Reno fast recovery 00846 fDupACKCnt = 0; 00847 if (fFlightSizeRecovery) { 00848 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00849 } else { 00850 fCWnd = fSSThresh; 00851 } 00852 } else if (fTCPVersion == kNewReno) { 00853 // New Reno fast recovery 00854 // "Set CWnd to ... min (ssthresh, FlightSize + MSS) 00855 // ... or ssthresh" (RFC 2582) 00856 if (fFlightSizeRecovery) { 00857 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00858 } else { 00859 fCWnd = fSSThresh; 00860 } 00861 fDupACKCnt = 0; 00862 } 00863 } 00864 00865 00866 void TCP_Sender::ReduceSSThresh() 00867 { 00868 if (fCarefulSSThreshReduction) { 00869 // If Reno conservation is enabled the amount of 00870 // outstanding data ("flight size") might be rather large 00871 // and even larger than twice the old ssthresh value; 00872 // so this corresponds more to the ns behaviour where always cwnd is 00873 // taken instead of flight size. 00874 fSSThresh = max(2 * fMSS, 00875 min(min(fCWnd, fSndMax - fSndUna), fRecWnd) / 2); 00876 } else { 00877 // use filght size / 2 as recommended in RFC 2581 00878 fSSThresh = max(2 * fMSS, min(fSndMax - fSndUna, fRecWnd) / 2); 00879 } 00880 00881 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleACK, internal error: SndSSThresh is > MaxCWnd"); 00882 00883 if (fTrace) { 00884 TraceSSThresh(); 00885 } 00886 } 00887 00888 00889 void TCP_Sender::SendMsg(TCP_Packet &msg) 00890 { 00891 if (fTrace) { 00892 TraceSentSeqNo(msg.get_segment().end()); 00893 } 00894 00895 if (fRestartAfterIdle) { 00896 fLastSendTime = Event_Queue::now(); // needed for idle detection 00897 } 00898 00899 tcp_send(&msg); 00900 } 00901 00902 00903 void TCP_Sender::IdleCheck() 00904 { 00905 // idle detection according to Jacobson, SIGCOMM, 1988: 00906 // sender is currently idle and nothing has been send since RTO 00907 00908 if (fSndMax == fSndUna && Event_Queue::now() - fLastSendTime > CalcRTOValue()) { 00909 fCWnd = fInitialCWnd; // see RFC2581 00910 00911 fNumberOfIdleTimeouts++; 00912 00913 if (fTrace) { 00914 TraceCWnd(); 00915 } 00916 00917 if (fDebug) { 00918 std::cout << "sender " << fLabel 00919 << ": idle timeout: " 00920 << "t = " << Event_Queue::now() 00921 << ", SndNxt = " << fSndNxt 00922 << ", SndUna = " << fSndUna 00923 << ", Backoff = " << fBackoff 00924 << std::endl; 00925 } 00926 } 00927 } 00928 00929 00930 void TCP_Sender::HandleRtxTimeout(Ttype time) 00931 { 00932 fNumberOfTimeouts++; 00933 00934 // update backoff 00935 fBackoff = min(fMaxBackoff, fBackoff * 2); 00936 if (!fImmediateBackoffReset) { 00937 fPendingBackoffReset = false; 00938 } 00939 00940 if (fDupACKCnt >= fDupACKThreshold) { 00941 FinishFastRecovery(); // reset dup ACK cnt and CWnd 00942 } else if (fDupACKCnt > 0) { 00943 fDupACKCnt = 0; // don't allow dupack action during TO recovery 00944 } 00945 00946 // update CWnd and SSThresh 00947 ReduceSSThresh(); // halve ssthresh (in most cases) 00948 fCWnd = fMSS; // not initial CWnd, see RFC 2581 00949 00950 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleRtxTimeout, internal error: SndSSThresh is > MaxCWnd"); 00951 00952 fRecoveryTO = fSndMax; 00953 00954 if (fGoBackN) { 00955 // go back N is mainly relevant in the case of multiple losses 00956 // which would lead to a series of timeouts without resetting sndnxt 00957 fSndNxt = fSndUna; 00958 } 00959 00960 if (fDebug) { 00961 std::cout << "sender " << fLabel 00962 << ": rtx timeout: " 00963 << "t = " << Event_Queue::now() 00964 << ", SndNxt = " << fSndNxt 00965 << ", SndUna = " << fSndUna 00966 << std::endl; 00967 } 00968 00969 if (fTrace) { 00970 TraceCWnd(); 00971 } 00972 00973 UnaRetransmit(); // initiate retransmission 00974 } 00975 00976 00977 void TCP_Sender::HandleSWSATimeout(Ttype) 00978 { 00979 SendNewData(true); 00980 } 00981 00982 00983 unsigned TCP_Sender::GetNextSegmentSize(const Sequence_Number & begin) 00984 { 00985 // try to get new user messages if available and necessary 00986 while ((fUserNxt < begin + fMSS) && (!SocketWriteQueue.empty())) { 00987 itpp::Packet *packet_p = SocketWriteQueue.front(); 00988 SocketWriteQueue.pop(); 00989 fUserNxt += (unsigned) packet_p->bit_size()/8; 00990 delete packet_p; 00991 } 00992 00993 Sequence_Number end = min(min(fUserNxt, begin + fMSS), 00994 fSndUna + SendWindow()); 00995 00996 if (fDebug) { 00997 std::cout << "TCP_Sender::GetNextSegmentSize," 00998 << " fUserNxt=" << fUserNxt 00999 << " begin_seq_num=" << begin 01000 << " fMSS=" << fMSS 01001 << " fSndUna=" << fSndUna 01002 << " SendWindow()=" << SendWindow() 01003 << " end_seq_num=" << end 01004 << " time=" << Event_Queue::now() << std::endl; 01005 } 01006 01007 return max(0, end - begin); 01008 } 01009 01010 01011 unsigned TCP_Sender::SendWindow() const 01012 { 01013 return min(fRecWnd, min (fMaxCWnd, fCWnd)); 01014 } 01015 01016 01017 double TCP_Sender::CalcRTOValue() const 01018 { 01019 static const double factor = 1 + 1e-8; 01020 // to avoid "simultaneous" TO/receive ACK events in case of const. RTT 01021 01022 double rto = fBackoff * fRTTEstimate * factor; 01023 01024 if (rto > fMaxRTO) { 01025 rto = fMaxRTO; 01026 } 01027 01028 return rto; 01029 } 01030 01031 01032 void TCP_Sender::SetRtxTimer() 01033 { 01034 double rto = CalcRTOValue(); 01035 fRtxTimer.Set(rto); 01036 fTimUna = fSndUna; 01037 if (fDebug) { 01038 std::cout << "sender " << fLabel 01039 << ": set rtx timer: " 01040 << "t = " << Event_Queue::now() 01041 << ", RTO = " << rto 01042 << ", Backoff = " << fBackoff 01043 << ", TimUna = " << fTimUna 01044 << std::endl; 01045 } 01046 } 01047 01048 01049 void TCP_Sender::UpdateRTTVariables(double sampleRTT) 01050 { 01051 if (fSRTT == 0) { 01052 fSRTT = sampleRTT; 01053 fRTTVar = sampleRTT / 2; 01054 } else { 01055 // see, e.g., Comer for the values used as weights 01056 fSRTT = 0.875 * fSRTT + 0.125 * sampleRTT; 01057 fRTTVar = 0.75 * fRTTVar + 0.25 * fabs(sampleRTT - fSRTT); 01058 } 01059 01060 fRTTEstimate = round(fSRTT + 4 * fRTTVar, fTimerGranularity); 01061 01062 if (fTrace) { 01063 TraceRTTVariables(sampleRTT); 01064 } 01065 01066 fNumberOfRTTMeasurements++; 01067 } 01068 01069 01070 void TCP_Sender::TraceRTTVariables(double sampleRTT) 01071 { 01072 if (fDebug) { 01073 std::cout << "sender " << fLabel 01074 << ": RTT update: " 01075 << "t = " << Event_Queue::now() 01076 << ", sample = " << sampleRTT 01077 << ", SRTT = " << fSRTT 01078 << ", RTTVar = " << fRTTVar 01079 << ", RTTEstimate = " << fRTTEstimate 01080 << std::endl; 01081 } 01082 01083 if (RTTsample_index >= RTTsample_time.size()) { 01084 RTTsample_time.set_size(2*RTTsample_time.size(),true); 01085 RTTsample_val.set_size(2*RTTsample_val.size(),true); 01086 } 01087 RTTsample_val(RTTsample_index) = sampleRTT; 01088 RTTsample_time(RTTsample_index) = Event_Queue::now(); 01089 RTTsample_index++; 01090 01091 if (RTTEstimate_index >= RTTEstimate_time.size()) { 01092 RTTEstimate_time.set_size(2*RTTEstimate_time.size(),true); 01093 RTTEstimate_val.set_size(2*RTTEstimate_val.size(),true); 01094 } 01095 RTTEstimate_val(RTTEstimate_index) = fRTTEstimate; 01096 RTTEstimate_time(RTTEstimate_index) = Event_Queue::now(); 01097 RTTEstimate_index++; 01098 } 01099 01100 01101 void TCP_Sender::TraceCWnd() 01102 { 01103 if (fDebug) { 01104 std::cout << "sender " << fLabel 01105 << " t = " << Event_Queue::now() 01106 << " cwnd = " << fCWnd << std::endl; 01107 } 01108 if (CWnd_index >= CWnd_time.size()) { 01109 CWnd_time.set_size(2*CWnd_time.size(),true); 01110 CWnd_val.set_size(2*CWnd_val.size(),true); 01111 } 01112 CWnd_val(CWnd_index) = fCWnd; 01113 CWnd_time(CWnd_index) = Event_Queue::now(); 01114 CWnd_index++; 01115 01116 } 01117 01118 void TCP_Sender::TraceSSThresh() 01119 { 01120 if (fDebug) { 01121 std::cout << "sender " << fLabel 01122 << " t = " << Event_Queue::now() 01123 << " cwnd = " << fSSThresh << std::endl; 01124 } 01125 if (SSThresh_index >= SSThresh_time.size()) { 01126 SSThresh_time.set_size(2*SSThresh_time.size(),true); 01127 SSThresh_val.set_size(2*SSThresh_val.size(),true); 01128 } 01129 SSThresh_val(SSThresh_index) = fSSThresh; 01130 SSThresh_time(SSThresh_index) = Event_Queue::now(); 01131 SSThresh_index++; 01132 01133 } 01134 01135 void TCP_Sender::TraceSentSeqNo(const Sequence_Number sn) 01136 { 01138 if (fDebug) { 01139 std::cout << "sender " << fLabel 01140 << " t = " << Event_Queue::now() 01141 << " sent = " << sn 01142 << std::endl; 01143 } 01144 if (sent_seq_num_index >= sent_seq_num_time.size()) { 01145 sent_seq_num_time.set_size(2*sent_seq_num_time.size(),true); 01146 sent_seq_num_val.set_size(2*sent_seq_num_val.size(),true); 01147 } 01148 sent_seq_num_val(sent_seq_num_index) = sn.value(); 01149 sent_seq_num_time(sent_seq_num_index) = Event_Queue::now(); 01150 sent_seq_num_index++; 01151 } 01152 01153 01154 void TCP_Sender::TraceACKedSeqNo(const Sequence_Number sn) 01155 { 01156 if (fDebug) { 01157 std::cout << "sender " << fLabel 01158 << " t = " << Event_Queue::now() 01159 << " ACK = " << sn 01160 << std::endl; 01161 } 01162 01163 if (sender_recv_ack_seq_num_index >= sender_recv_ack_seq_num_time.size()) { 01164 sender_recv_ack_seq_num_time.set_size(2*sender_recv_ack_seq_num_time.size(),true); 01165 sender_recv_ack_seq_num_val.set_size(2*sender_recv_ack_seq_num_val.size(),true); 01166 } 01167 sender_recv_ack_seq_num_val(sender_recv_ack_seq_num_index) = sn.value(); 01168 sender_recv_ack_seq_num_time(sender_recv_ack_seq_num_index) = Event_Queue::now(); 01169 sender_recv_ack_seq_num_index++; 01170 } 01171 01172 01173 void TCP_Sender::save_trace(std::string filename) { 01174 01175 CWnd_val.set_size(CWnd_index, true); 01176 CWnd_time.set_size(CWnd_index,true); 01177 01178 SSThresh_val.set_size(SSThresh_index, true); 01179 SSThresh_time.set_size(SSThresh_index,true); 01180 01181 sent_seq_num_val.set_size(sent_seq_num_index, true); 01182 sent_seq_num_time.set_size(sent_seq_num_index,true); 01183 01184 sender_recv_ack_seq_num_val.set_size(sender_recv_ack_seq_num_index, true); 01185 sender_recv_ack_seq_num_time.set_size(sender_recv_ack_seq_num_index,true); 01186 01187 RTTEstimate_val.set_size(RTTEstimate_index, true); 01188 RTTEstimate_time.set_size(RTTEstimate_index,true); 01189 01190 RTTsample_val.set_size(RTTsample_index, true); 01191 RTTsample_time.set_size(RTTsample_index,true); 01192 01193 if (fDebug) { 01194 std::cout << "CWnd_val" << CWnd_val << std::endl; 01195 std::cout << "CWnd_time" << CWnd_time << std::endl; 01196 std::cout << "CWnd_index" << CWnd_index << std::endl; 01197 01198 std::cout << "SSThresh_val" << SSThresh_val << std::endl; 01199 std::cout << "SSThresh_time" << SSThresh_time << std::endl; 01200 std::cout << "SSThresh_index" << SSThresh_index << std::endl; 01201 01202 std::cout << "sent_seq_num_val" << sent_seq_num_val << std::endl; 01203 std::cout << "sent_seq_num_time" << sent_seq_num_time << std::endl; 01204 std::cout << "sent_seq_num_index" << sent_seq_num_index << std::endl; 01205 01206 std::cout << "sender_recv_ack_seq_num_val" << sender_recv_ack_seq_num_val << std::endl; 01207 std::cout << "sender_recv_ack_seq_num_time" << sender_recv_ack_seq_num_time << std::endl; 01208 std::cout << "sender_recv_ack_seq_num_index" << sender_recv_ack_seq_num_index << std::endl; 01209 01210 std::cout << "RTTEstimate_val" << RTTEstimate_val << std::endl; 01211 std::cout << "RTTEstimate_time" << RTTEstimate_time << std::endl; 01212 std::cout << "RTTEstimate_index" << RTTEstimate_index << std::endl; 01213 01214 std::cout << "RTTsample_val" << RTTsample_val << std::endl; 01215 std::cout << "RTTsample_time" << RTTsample_time << std::endl; 01216 std::cout << "RTTsample_index" << RTTsample_index << std::endl; 01217 01218 std::cout << "TCP_Sender::saving to file: " << filename << std::endl; 01219 } 01220 01221 it_file ff2; 01222 ff2.open(filename); 01223 01224 ff2 << Name("CWnd_val") << CWnd_val; 01225 ff2 << Name("CWnd_time") << CWnd_time; 01226 ff2 << Name("CWnd_index") << CWnd_index; 01227 01228 ff2 << Name("SSThresh_val") << SSThresh_val; 01229 ff2 << Name("SSThresh_time") << SSThresh_time; 01230 ff2 << Name("SSThresh_index") << SSThresh_index; 01231 01232 ff2 << Name("sent_seq_num_val") << sent_seq_num_val; 01233 ff2 << Name("sent_seq_num_time") << sent_seq_num_time; 01234 ff2 << Name("sent_seq_num_index") << sent_seq_num_index; 01235 01236 ff2 << Name("sender_recv_ack_seq_num_val") << sender_recv_ack_seq_num_val; 01237 ff2 << Name("sender_recv_ack_seq_num_time") << sender_recv_ack_seq_num_time; 01238 ff2 << Name("sender_recv_ack_seq_num_index") << sender_recv_ack_seq_num_index; 01239 01240 ff2 << Name("RTTEstimate_val") << RTTEstimate_val; 01241 ff2 << Name("RTTEstimate_time") << RTTEstimate_time; 01242 ff2 << Name("RTTEstimate_index") << RTTEstimate_index; 01243 01244 ff2 << Name("RTTsample_val") << RTTsample_val; 01245 ff2 << Name("RTTsample_time") << RTTsample_time; 01246 ff2 << Name("RTTsample_index") << RTTsample_index; 01247 01248 ff2.flush(); 01249 ff2.close(); 01250 } 01251 01252 01253 void TCP_Sender::print_item(std::ostream & out, const std::string & keyword) 01254 { 01255 if (keyword == "Label") { 01256 std::cout << fLabel; 01257 } else if (keyword == "CWnd") { 01258 std::cout << fCWnd; 01259 } else if (keyword == "SSThresh") { 01260 std::cout << fSSThresh; 01261 } else if (keyword == "SRTT") { 01262 std::cout << fSRTT; 01263 } else if (keyword == "RTTvar") { 01264 std::cout << fRTTVar; 01265 } else if (keyword == "Backoff") { 01266 std::cout << fBackoff; 01267 } else if (keyword == "RTO") { 01268 std::cout << CalcRTOValue(); 01269 } else if (keyword == "NoOfFastRets") { 01270 std::cout << fNumberOfFastRetransmits; 01271 } else if (keyword == "NoOfRetTOs") { 01272 std::cout << fNumberOfTimeouts; 01273 } else if (keyword == "NoOfIdleTOs") { 01274 std::cout << fNumberOfIdleTimeouts; 01275 } else if (keyword == "NoOfRTTMs") { 01276 std::cout << fNumberOfRTTMeasurements; 01277 } else if (keyword == "NoOfRecACKs") { 01278 std::cout << fNumberOfReceivedACKs; 01279 } else { 01280 } 01281 } 01282 01283 01284 // -------------------- TCP_Receiver_Buffer ---------------------------------------- 01285 TCP_Receiver_Buffer::TCP_Receiver_Buffer() : 01286 fFirstByte() 01287 { 01288 } 01289 01290 01291 TCP_Receiver_Buffer::TCP_Receiver_Buffer(const TCP_Receiver_Buffer & rhs) : 01292 fFirstByte(rhs.fFirstByte), 01293 fBufList(rhs.fBufList) 01294 { 01295 } 01296 01297 01298 void TCP_Receiver_Buffer::reset() 01299 { 01300 fBufList.clear(); 01301 fFirstByte = 0; 01302 } 01303 01304 01305 TCP_Receiver_Buffer::~TCP_Receiver_Buffer() 01306 { 01307 } 01308 01309 01310 void TCP_Receiver_Buffer::write(TCP_Segment newBlock) 01311 { 01312 // error cases 01313 it_assert(newBlock.begin() <= newBlock.end(), "TCP_Receiver_Buffer::Write, no valid segment"); 01314 01315 // cut blocks beginning before fFirstByte 01316 if (newBlock.begin() < fFirstByte) { 01317 if (newBlock.end() > fFirstByte) { 01318 newBlock.set_begin(fFirstByte); 01319 } else { 01320 return; //// TODO: Is this strange? 01321 } 01322 } 01323 01324 if (newBlock.length() == 0) { // empty block, nothing to do 01325 return; 01326 } 01327 01328 if (fBufList.empty() || (newBlock.begin() > fBufList.back().end())) { 01329 // new block is behind last block in buffer 01330 fBufList.push_back(newBlock); 01331 } else { 01332 // skip list entries if beginning of newBlock > end of current one 01333 // (search for correct list position) 01334 std::list<TCP_Segment>::iterator iter; 01335 iter = fBufList.begin(); 01336 while (newBlock.begin() > iter->end()) { 01337 iter++; 01338 it_assert(iter != fBufList.end(), "TCP_Receiver_Buffer::Write, internal error"); 01339 } 01340 01341 TCP_Segment & exBlock = *iter; 01342 01343 if (exBlock.can_be_combined(newBlock)) { 01344 // overlapping or contiguous blocks -> combine 01345 exBlock.combine(newBlock); 01346 01347 // check following blocks 01348 iter++; 01349 while ((iter != fBufList.end()) && 01350 exBlock.can_be_combined(*iter)) { 01351 exBlock.combine(*iter); 01352 iter = fBufList.erase(iter); 01353 } 01354 } else { 01355 // no overlap, newBlock lies between two existing list entries 01356 // new list entry has to be created 01357 01358 fBufList.insert(iter, newBlock); 01359 } 01360 } 01361 01362 it_assert(!fBufList.empty() && fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Write, internal error"); 01363 01364 } 01365 01366 01367 // The amount of data read from the buffer is given as parameter. It has 01368 // to be less than or equal to the size of the first block stored. This 01369 // mean the caller of Read should first check how much data is available 01370 // by calling FirstBlockSize. 01371 void TCP_Receiver_Buffer::read(unsigned noOfBytes) 01372 { 01373 it_assert(first_block_size() > 0, "TCP_Receiver_Buffer::Read, No block to read"); 01374 it_assert(noOfBytes <= first_block_size(), "TCP_Receiver_Buffer::Read, submitted block size not valid"); 01375 01376 01377 if (noOfBytes < first_block_size()) { 01378 fBufList.front().set_begin(fBufList.front().begin() + noOfBytes); 01379 } else { // first block will be read completely 01380 fBufList.pop_front(); 01381 } 01382 fFirstByte += noOfBytes; 01383 01384 it_assert(fBufList.empty() || fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Read, internal error"); 01385 } 01386 01387 01388 // FirstBlockSize returns the size of the first block stored in the 01389 // buffer or 0 if the buffer is empty 01390 unsigned TCP_Receiver_Buffer::first_block_size() const 01391 { 01392 if (!fBufList.empty() && (fBufList.front().begin() == fFirstByte)) { 01393 return fBufList.front().length(); 01394 } else { 01395 return 0; 01396 } 01397 } 01398 01399 01400 std::ostream & TCP_Receiver_Buffer::info(std::ostream &os, int detail) const 01401 { 01402 os << "receiver buffer information" << std::endl 01403 << "number of blocks: " << fBufList.size() << std::endl 01404 << "first byte stored: " << fFirstByte << std::endl 01405 << "last byte stored +1: " << last_byte() << std::endl 01406 << "next byte expected: " << next_expected() << std::endl; 01407 01408 if (detail>0) { 01409 os << "segments in receiver buffer:" << std::endl; 01410 01411 typedef std::list<TCP_Segment>::const_iterator LI; 01412 for (LI i = fBufList.begin(); i != fBufList.end(); ++i) { 01413 const TCP_Segment & block = *i; 01414 os << ". segment: " << block << std::endl; 01415 } 01416 01417 } 01418 01419 return os; 01420 } 01421 01422 01423 // -------------------- TCP_Receiver ---------------------------------------- 01424 TCP_Receiver::TCP_Receiver(int label) : 01425 fReceiverBuffer(), 01426 fLabel(label), 01427 fTCPIPHeaderLength(TCP_HEADERLENGTH), 01428 fMSS(TCP_RMSS), 01429 fBufferSize(TCP_BUFFERSIZE), 01430 fDelayedACK(TCP_DELAYEDACK), 01431 fACKDelayTime(TCP_ACKDELAYTIME), 01432 fSendPeriodicACKs(TCP_SENDPERIODICACKS), 01433 fStrictPeriodicACKs(TCP_STRICTPERIODICACKS), 01434 fPeriodicACKInterval(TCP_PERIODICACKINTERVAL), 01435 fACKSchedulingDelay(TCP_ACKSCHEDULINGDELAY), 01436 fACKOnBufferWrite(TCP_ACKBUFFERWRITE), 01437 fACKOnBufferRead(TCP_ACKBUFFERREAD), 01438 fMaxUserBlockSize(TCP_MAXUSERBLOCKSIZE), 01439 fMinUserBlockSize(TCP_MINUSERBLOCKSIZE), 01440 fUserBlockProcDelay(TCP_USERBLOCKPROCDELAY), 01441 fTrace(false), 01442 fDebug(false), 01443 fSessionId(0), 01444 fDelayedACKTimer(*this, &TCP_Receiver::DelayedACKHandler), 01445 fPeriodicACKTimer(*this, &TCP_Receiver::PeriodicACKHandler), 01446 fACKSchedulingTimer(*this, &TCP_Receiver::SendACKMessage), 01447 fWaitingACKMsg(0), 01448 fUserBlockProcTimer(*this, &TCP_Receiver::HandleEndOfProcessing) 01449 { 01450 fUserMessage = NULL; 01451 01452 01453 if (!fACKOnBufferRead && !fACKOnBufferWrite) { 01454 // throw(UL_CException("TCP_Receiver::TCP_Receiver", 01455 // "ACKs must be sent on buffer read or write or both")); 01456 } 01457 01458 setup(); 01459 01460 tcp_receive.forward(this, &TCP_Receiver::ReceiveMessageFromNet); 01461 tcp_receive.set_name("TCP Receive"); 01462 tcp_send_ack.set_name("TCP send ACK"); 01463 tcp_new_data.set_name("TCP New Data"); 01464 tcp_release.forward(this, &TCP_Receiver::release); 01465 tcp_release.set_name("TCP Release"); 01466 01467 } 01468 01469 01470 TCP_Receiver::~TCP_Receiver () 01471 { 01472 delete fWaitingACKMsg; 01473 delete fUserMessage; 01474 } 01475 01476 01477 void TCP_Receiver::set_debug(const bool enable_debug) 01478 { 01479 fDebug = enable_debug; 01480 tcp_send_ack.set_debug(enable_debug); 01481 tcp_new_data.set_debug(); 01482 } 01483 01484 void TCP_Receiver::set_debug(bool enable_debug, bool enable_signal_debug) 01485 { 01486 fDebug = enable_debug; 01487 tcp_send_ack.set_debug(enable_signal_debug); 01488 tcp_new_data.set_debug(); 01489 } 01490 01491 void TCP_Receiver::set_trace(const bool enable_trace) 01492 { 01493 fTrace = enable_trace; 01494 } 01495 01496 01497 01498 void TCP_Receiver::setup() 01499 { 01500 fAdvRcvWnd = 0; 01501 fAdvRcvNxt = 0; 01502 01503 if (fSendPeriodicACKs) { 01504 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01505 } 01506 01507 fReceiverBuffer.reset(); 01508 01509 received_seq_num_val.set_size(1000); 01510 received_seq_num_val.zeros(); 01511 received_seq_num_time.set_size(1000); 01512 received_seq_num_time.zeros(); 01513 received_seq_num_val(0) = 0; 01514 received_seq_num_time(0) = 0; 01515 received_seq_num_index=1; 01516 } 01517 01518 std::string TCP_Receiver::GenerateFilename() 01519 { 01520 time_t rawtime; 01521 struct tm *timeinfo; 01522 timeinfo = localtime(&rawtime); 01523 std::ostringstream filename_stream; 01524 filename_stream << "trace_tcp_receiver_u" << fLabel 01525 << "_" << 1900+timeinfo->tm_year 01526 << "_" << timeinfo->tm_mon 01527 << "_" << timeinfo->tm_mday 01528 << "__" << timeinfo->tm_hour 01529 << "_" << timeinfo->tm_min 01530 << "_" << timeinfo->tm_sec 01531 << "_.it"; 01532 return filename_stream.str(); 01533 } 01534 01535 void TCP_Receiver::release(std::string file) 01536 { 01537 std::string filename; 01538 fSessionId++; 01539 01540 if (fWaitingACKMsg != 0) { 01541 delete fWaitingACKMsg; 01542 fWaitingACKMsg = 0; 01543 } 01544 if (fUserMessage != 0) { 01545 delete fUserMessage; 01546 fUserMessage = 0; 01547 } 01548 01549 fUserBlockProcTimer.Reset(); 01550 fDelayedACKTimer.Reset(); 01551 fPeriodicACKTimer.Reset(); 01552 fACKSchedulingTimer.Reset(); 01553 01554 if (fTrace) { 01555 if (file == "") 01556 filename = GenerateFilename(); 01557 else 01558 filename = file; 01559 01560 save_trace(filename); 01561 } 01562 } 01563 01564 01565 void TCP_Receiver::ReceiveMessageFromNet(itpp::Packet *msg) 01566 { 01567 TCP_Packet & packet = (TCP_Packet &) *msg; 01568 if (packet.get_destination_port() == fLabel) { 01569 if (packet.get_session_id() == fSessionId) { 01570 ReceiveDataPacket(packet); 01571 } 01572 else { 01573 it_warning("Received a TCP packet with wrong SessionId"); 01574 std::cout << "TCP_Receiver::ReceiveMessageFromNet, " 01575 << "fLabel= " << fLabel 01576 << "fSessionId= " << fSessionId << std::endl; 01577 std::cout << "packet=" << packet 01578 << ", next exp. = " << fReceiverBuffer.next_expected() 01579 << std::endl; 01580 exit(0); 01581 } 01582 } 01583 else { 01584 it_warning("Received a TCP packet with label"); 01585 exit(0); 01586 } 01587 } 01588 01589 01590 void TCP_Receiver::ReceiveDataPacket(TCP_Packet &msg) 01591 { 01592 TCP_Segment segment = msg.get_segment(); 01593 01594 bool isOutOfOrder = (segment.begin() > fReceiverBuffer.next_expected()) || 01595 (segment.end() <= fReceiverBuffer.next_expected()); 01596 01597 if (fDebug) { 01598 std::cout << "TCP_Receiver::ReceiveDataPacket receiver: " << fLabel << ": " 01599 << "receive msg: " 01600 << "t = " << Event_Queue::now() 01601 << ", next exp. = " << fReceiverBuffer.next_expected() 01602 << ", " << msg << std::endl; 01603 } 01604 01605 if (fTrace) { 01606 TraceReceivedSeqNo(segment.end()); 01607 } 01608 01609 it_assert(segment.end() <= fReceiverBuffer.first_byte() + fBufferSize, "TCP_Receiver::ReceiveTCPPacket, packet exceeds window at "); 01610 it_assert(segment.begin() < segment.end(), "TCP_Receiver::ReceiveTCPPacket, silly packet received at "); 01611 01612 fReceiverBuffer.write(segment); 01613 01614 if (isOutOfOrder) { 01615 SendACK(true); // create dupack conditionless 01616 } else { 01617 if (fACKOnBufferWrite) { 01618 SendACK(false); 01619 } 01620 IndicateUserMessage(); 01621 } 01622 01623 delete &msg; 01624 } 01625 01626 01627 void TCP_Receiver::IndicateUserMessage() 01628 { 01629 if (fUserMessage == 0) { 01630 // receive a block 01631 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01632 fMaxUserBlockSize); 01633 01634 if (fDebug) { 01635 std::cout << "TCP_Receiver::IndicateUserMessage " 01636 << "t = " << Event_Queue::now() 01637 << " noOfBytes = " << noOfBytes 01638 << " firstBlock = " << fReceiverBuffer.first_block_size() 01639 << std::endl; 01640 } 01641 01642 if (noOfBytes >= fMinUserBlockSize) { 01643 fUserMessage = new Packet(); 01644 fUserMessage->set_bit_size(8*noOfBytes); 01645 fUserBlockProcTimer.Set(fUserBlockProcDelay); 01646 } 01647 } 01648 } 01649 01650 01651 bool TCP_Receiver::is_user_message_available() 01652 { 01653 if (fUserMessage != 0) { 01654 return true; 01655 } 01656 01657 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01658 fMaxUserBlockSize); 01659 01660 if (noOfBytes >= fMinUserBlockSize) { 01661 fUserMessage = new Packet(); 01662 fUserMessage->set_bit_size(8*noOfBytes); 01663 return true; 01664 } else { 01665 return false; 01666 } 01667 } 01668 01669 01670 itpp::Packet & TCP_Receiver::get_user_message() 01671 { 01672 it_assert(fUserMessage != 0, "TCP_Receiver::GetUserMessage, no message available"); 01673 if (fDebug) { 01674 std::cout << "TCP_Receiver::GetUserMessage " 01675 << "receiver: " << fLabel << ": " 01676 << "read from buffer: " 01677 << "t = " << Event_Queue::now() 01678 << ", user msg length = " << (fUserMessage->bit_size()/8) 01679 << ", first byte = " << fReceiverBuffer.first_byte() 01680 << ", first block size = " << fReceiverBuffer.first_block_size() 01681 << std::endl; 01682 } 01683 01684 fReceiverBuffer.read(fUserMessage->bit_size()/8); 01685 if (fACKOnBufferRead) { 01686 SendACK(false); // send acknowledgement 01687 } 01688 01689 itpp::Packet & msg = *fUserMessage; 01690 fUserMessage = 0; 01691 01692 if (fReceiverBuffer.first_block_size() > 0) { 01693 IndicateUserMessage(); 01694 } 01695 01696 return msg; 01697 } 01698 01699 01700 01701 void TCP_Receiver::HandleEndOfProcessing(Ttype) 01702 { 01703 it_assert(fUserMessage != 0, "TCP_Receiver::HandleEndOfProcessing, no message available"); 01704 01705 01706 tcp_new_data(fLabel); 01707 } 01708 01709 01710 void TCP_Receiver::DelayedACKHandler(Ttype) 01711 { 01712 if (fDebug) { 01713 std::cout << "TCP_Receiver::DelayedACKHandler " 01714 << "receiver " << fLabel 01715 << ": delACK TO: " 01716 << "t = " << Event_Queue::now() << std::endl; 01717 } 01718 01719 SendACK(true); 01720 } 01721 01722 01723 void TCP_Receiver::PeriodicACKHandler(Ttype) 01724 { 01725 if (fDebug) { 01726 std::cout << "TCP_Receiver::PeriodicACKHandler" 01727 << "receiver " << fLabel 01728 << ": periodicACK TO: " 01729 << "t = " << Event_Queue::now() << std::endl; 01730 } 01731 01732 SendACK(true); 01733 } 01734 01735 01736 void TCP_Receiver::SendACK(bool sendConditionless) 01737 { 01738 // sendConditionless is set 01739 // ... if packet was received out of order or 01740 // ... if delayed ACK timer has expired 01741 01742 // Bei eingeschaltetem "delayed ACK" wird ein ACK nur 01743 // gesendet, wenn das Fenster um 2MSS oder 35% der 01744 // maximalen Fenstergroesse verschoben worden ist 01745 // ... oder nach delayed ACK Timeout 01746 // ... oder wenn es das ACK fur ein Out of Order Segment ist 01747 // ... oder (in der Realitat), wenn ich auch was zu senden habe. 01748 01749 if (sendConditionless || !fDelayedACK || 01750 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= (int)(2 * fMSS)) || 01751 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= 01752 (int)(0.35 * fBufferSize))) { 01753 // Remark: RFC2581 recommends to acknowledge every second 01754 // packet conditionless (without setting this as a requirement) 01755 // in order to avoid excessive ack delays when the receiver MSS 01756 // is larger than the sender MSS. In this uni-directional 01757 // implementation, the receiver's MSS is not actively 01758 // used for sending but only for deciding when acknowledgments 01759 // have to be returned. Thus, the best solution to account for 01760 // RFC2581 is to set the receiver's MSS always equal to the 01761 // sender's MSS. 01762 01763 // Receiver Silly Window Syndrome Avoidance: 01764 01765 if (fAdvRcvNxt + fAdvRcvWnd + min(fBufferSize / 2, fMSS) 01766 <= fReceiverBuffer.first_byte() + fBufferSize) { 01767 // Die rechte Grenze des Empfangerfensters wird nur anders angezeigt 01768 // als beim letzten ACK, wenn sie sich seither um mindestens 01769 // min (BufferSize/ 2, MSS) geandert hat. 01770 fAdvRcvWnd = fBufferSize - fReceiverBuffer.first_block_size(); 01771 } else { 01772 fAdvRcvWnd = fAdvRcvNxt + fAdvRcvWnd - fReceiverBuffer.next_expected(); 01773 } 01774 01775 fAdvRcvNxt = fReceiverBuffer.next_expected(); 01776 01777 if (fSendPeriodicACKs && 01778 (!fStrictPeriodicACKs || !fPeriodicACKTimer.IsPending())) { 01779 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01780 } 01781 01782 if (fDelayedACK && fDelayedACKTimer.IsPending()) { 01783 fDelayedACKTimer.Reset(); 01784 } 01785 01786 ScheduleACKMessage(); 01787 } else { 01788 if (!fDelayedACKTimer.IsPending()) { 01789 fDelayedACKTimer.Set(fACKDelayTime); 01790 if (fDebug) { 01791 std::cout << "TCP_Receiver::SendACK" 01792 << "receiver " << fLabel 01793 << ": set delACK timer: " 01794 << "t = " << Event_Queue::now() << std::endl; 01795 } 01796 } 01797 } 01798 } 01799 01800 01801 void TCP_Receiver::ScheduleACKMessage() 01802 { 01803 if (fWaitingACKMsg == 0) { 01804 fWaitingACKMsg = new TCP_Packet; 01805 } 01806 01807 fWaitingACKMsg->set_ACK(fAdvRcvNxt); 01808 fWaitingACKMsg->set_wnd(fAdvRcvWnd); 01809 fWaitingACKMsg->set_session_id(fSessionId); 01810 fWaitingACKMsg->set_destination_port(fLabel); 01811 fWaitingACKMsg->set_source_port(fLabel); 01812 fWaitingACKMsg->set_bit_size(8*fTCPIPHeaderLength); 01813 01814 if (fACKSchedulingDelay > 0) { 01815 if (!fACKSchedulingTimer.IsPending()) { 01816 fACKSchedulingTimer.Set(fACKSchedulingDelay); 01817 } 01818 } else { 01819 SendACKMessage(Event_Queue::now()); 01820 } 01821 } 01822 01823 01824 void TCP_Receiver::SendACKMessage(Ttype) 01825 { 01826 it_assert(fWaitingACKMsg != 0, "TCP_Receiver::SendACKMessage, no ACK message waiting"); 01827 01828 if (fDebug) { 01829 std::cout << "TCP_Receiver::SendACKMessage Ack sent" 01830 << "receiver " << fLabel 01831 << ": send ACK: " 01832 << "t = " << Event_Queue::now() 01833 << ", " << (*fWaitingACKMsg) 01834 << " byte_size=" << fWaitingACKMsg->bit_size()/8 01835 << " ptr=" << fWaitingACKMsg << std::endl; 01836 } 01837 01838 tcp_send_ack(fWaitingACKMsg); 01839 01840 fWaitingACKMsg = 0; 01841 } 01842 01843 01844 void TCP_Receiver::TraceReceivedSeqNo(const Sequence_Number &sn) 01845 { 01846 if (fDebug) { 01847 std::cout << "TCP_Receiver::TraceReceivedSeqNo " 01848 << "receiver " << fLabel 01849 << " t = " << Event_Queue::now() 01850 << " sn = " << sn << std::endl; 01851 } 01852 if (received_seq_num_index >= received_seq_num_time.size()) { 01853 received_seq_num_time.set_size(2*received_seq_num_time.size(),true); 01854 received_seq_num_val.set_size(2*received_seq_num_val.size(),true); 01855 } 01856 received_seq_num_val(received_seq_num_index) = sn.value(); 01857 received_seq_num_time(received_seq_num_index) = Event_Queue::now(); 01858 received_seq_num_index++; 01859 } 01860 01861 01862 void TCP_Receiver::save_trace(std::string filename) { 01863 01864 received_seq_num_val.set_size(received_seq_num_index, true); 01865 received_seq_num_time.set_size(received_seq_num_index,true); 01866 01867 if (fDebug) { 01868 std::cout << "received_seq_num_val" << received_seq_num_val << std::endl; 01869 std::cout << "received_seq_num_time" << received_seq_num_time << std::endl; 01870 std::cout << "received_seq_num_index" << received_seq_num_index << std::endl; 01871 std::cout << "TCP_Receiver::saving to file: " << filename << std::endl; 01872 } 01873 01874 it_file ff2; 01875 ff2.open(filename); 01876 01877 ff2 << Name("received_seq_num_val") << received_seq_num_val; 01878 ff2 << Name("received_seq_num_time") << received_seq_num_time; 01879 ff2 << Name("received_seq_num_index") << received_seq_num_index; 01880 01881 ff2.flush(); 01882 ff2.close(); 01883 } 01884 01885 01886 } //namespace itpp 01887 01888 #ifdef _MSC_VER 01889 #pragma warning(default:4355) 01890 #endif
Generated on Fri Jun 8 01:07:14 2007 for IT++ by Doxygen 1.5.2