由于从事eMule协议的相关开发已经有一段时间了,最近经常收到一些网友的邮件,探讨p2p网络中片选择的一些问题。  比如,在p2p假如一个文件被分为很多块,当有很多个client请求时,谁向谁请求哪些文件块,因为client和文件的提供者都是不断变化的啊。不知道emule是怎样处理这个问题的。就某一个时刻而言,client和文件的提供者是固定的,以什么样的规则请求文件块呢?

对一个下载者来说,在选择下一个被下载的片断时,通常选择的是它的peers们所拥有的最少的那个片断,也就是所谓的"最少优先"。确保了每个下载者都拥有它的peers们最希望得到的那些片断,从而一旦有需要,上载就可以开始。这也确保了那些越普通的片断越放在最后下载,从而减少了这样一种可能性,即某个peer当前正提供上载,而随后却没有任何的被别人感兴趣的片断了。也就说,每个peer都优先选择整个系统中最少的那些片断去下载,而那些在系统中相对较多的片断,放在后面下载,这样,整个系统就趋向于一种更优的状态。如果不用这种算法,大家都去下载最多的那些片断,那么这些片断就会在系统中分布的越来越多,而那些在系统中相对较少的片断仍然很少,最后,某些 peer 就不再拥有其它 peer 感兴趣的片断了,那么系统的参与者越来越少,整个系统的性能就下降。通常在下载的过程分为几个阶段,第一片选择,最后阶段模式,片选择要遵循的一个基本规则:一旦请求了某个片断的子片断,那么该片断剩下的子片断优先被请求。这样,可以尽可能快的获得一个完整的片断。
 
具体对于emule来说,eMule仔细挑选选块的下载顺序。下面是emule网络中片选择的规则,具体的实现可以参见源码的CpartFile.cpp文件
每个文件被分成9.28M的块,每部分分成180KB的片。
块下载的顺序是由发送请求文件块消息(6.4.4节)的下载客户端决定。下载客户端可以在任何给定时刻从各个源中下载一个单独的文件块,所有从相同源中请求的片都在同一个块中。下面的原理(以这个顺序)应用于下载块等级:
1.(可获得的)大片的频率,尽可能快的下载非常稀少的大片来形成一个新的源。
2.用来预览的块(最初+最后的大片),预览或检查文件(比如,电影、mp3)
3.请求状态(过程中下载),尝试向每个源询问其它的大片。在所有源之间扩散请求
4.完成(未到某种程度的完成),在开始下载另一个时应该完成获得部分的大片
 
频率标准定义了三个区域:非常稀少、稀少和一般。在每个区域里,标准有特定的权重,用来计算块等级。较低等级的块先下载。下面的列表根据上面的原理指定文件等级范围:
l         0-9999 - 不请求和请求非常稀少的块
l         10000-19999 - 不请求稀少和预览块
l         20000-29999 - 不请求大部分完成的一般的块
l         30000-39999 - 请求的稀少和预览的块
l         40000-49999 - 请求的没有完成的一般的块
 
这个算法通常选择第一个最稀少的块。然而,部分完成的块,接近完成的,也可能被选中。对于一般的块,在不同的源之间扩散下载。理论上是可以统计出所有文件块的拥有者的,但是在实际情况下只能达到一个局部最优的效果,也就是小世界理论所说的。一个Peer通过服务器或者KAD网络获得对方的Peer信息,然后交换Peer的片信息,在他的邻居范围内就可以确定一个片的请求频率的。在emule中具体的片选择策略的实现是在:(emule0.48a官方版本的算法,至于其他的修改版本不再次讨论之列,不过也大通小异)
bool CPartFile::GetNextRequestedBlock(CUpDownClient* sender,
                                      Requested_Block_Struct** newblocks,
           uint16* count) 我们可以分析一下它的源代码的实现:
这个函数是在每次emule的一个Peer根据自己的Gaplist的分布状况,决定如何申请一个具体的片的时候调用的
bool CPartFile::GetNextRequestedBlock(CUpDownClient* sender,
Requested_Block_Struct** newblocks,
uint16* count) /const/
{
// The purpose of this function is to return a list of blocks (~180KB) to
// download. To avoid a prematurely stop of the downloading, all blocks that
// are requested from the same source must be located within the same
// chunk (=> part ~9MB).
// 这个函数的功能是返回一组180Kb的块供下载,为了避免不必要的暂停,所有的请求块必须来自同一个源的一个9.28M的大块
// The selection of the chunk to download is one of the CRITICAL parts of the
// edonkey network. The selection algorithm must insure the best spreading
// of files.
// 片选择算法是edonkey 网络中非常重要的一部分,他必须保重文件能够最好地散布。
// The selection is based on several criteria:
// - Frequency of the chunk (availability), very rare chunks must be downloaded
// as quickly as possible to become a new available source.
// - Parts used for preview (first + last chunk), preview or check a
// file (e.g. movie, mp3)
// - Completion (shortest-to-complete), partially retrieved chunks should be
// completed before starting to download other one.
//
// The frequency criterion defines several zones: very rare, rare, almost rare,
// and common. Inside each zone, the criteria have a specific 憌eight? used
// to calculate the priority of chunks. The chunk(s) with the highest
// priority (highest=0, lowest=0xffff) is/are selected first.
//
// This algorithm usually selects first the rarest chunk(s). However, partially
// complete chunk(s) that is/are close to completion may overtake the priority
// (priority inversion). For common chunks, it also tries to put the transferring
// clients on the same chunk, to complete it sooner.
//

// Check input parameters
if(count == 0)
return false;
if(sender->GetPartStatus() == NULL)
return false;

//AddDebugLogLine(DLP_VERYLOW, false, _T("Evaluating chunks for file: \"%s\" Client: %s"), GetFileName(), sender->DbgGetClientInfo());

// Define and create the list of the chunks to download
const uint16 partCount = GetPartCount();
CList chunksList(partCount);

uint16 tempLastPartAsked = (uint16)-1;
if(sender->m_lastPartAsked != ((uint16)-1) && sender->GetClientSoft() == SO_EMULE && sender->GetVersion() < MAKE_CLIENT_VERSION(0, 43, 1)){
tempLastPartAsked = sender->m_lastPartAsked; //最近请求的一个块
}

// Main loop
uint16 newBlockCount = 0;
while(newBlockCount != *count){
// Create a request block stucture if a chunk has been previously selected
if(tempLastPartAsked != (uint16)-1){
Requested_Block_Struct* pBlock = new Requested_Block_Struct;
if(GetNextEmptyBlockInPart(tempLastPartAsked, pBlock) == true){
//AddDebugLogLine(false, _T("Got request block. Interval %i-%i. File %s. Client: %s"), pBlock->StartOffset, pBlock->EndOffset, GetFileName(), sender->DbgGetClientInfo());
// Keep a track of all pending requested blocks
requestedblocks_list.AddTail(pBlock);
// Update list of blocks to return
newblocks[newBlockCount++] = pBlock;
// Skip end of loop (=> CPU load)
continue;
}
else {
   // All blocks for this chunk have been already requested
    delete pBlock;
   // => Try to select another chunk
    sender->m_lastPartAsked = tempLastPartAsked = (uint16)-1;
  }
}

// Check if a new chunk must be selected (e.g. download starting, previous chunk complete)
i f(tempLastPartAsked == (uint16)-1){

// Quantify all chunks (create list of chunks to download)
// This is done only one time and only if it is necessary (=> CPU load)
if(chunksList.IsEmpty() == TRUE){
// Indentify the locally missing part(s) that this source has
   for(uint16 i = 0; i < partCount; i++){
if(sender->IsPartAvailable(i) == true && GetNextEmptyBlockInPart(i, NULL) == true){
  // Create a new entry for this chunk and add it to the list
Chunk newEntry;
  newEntry.part = i;
  newEntry.frequency = m_SrcpartFrequency[i];
  chunksList.AddTail(newEntry);
}
}

// Check if any block(s) could be downloaded
if(chunksList.IsEmpty() == TRUE){
break; // Exit main loop while()
}

// Define the bounds of the zones (very rare, rare etc)
// more depending on available sources
uint16 limit = (uint16)ceil(GetSourceCount()/ 10.0);
if (limit<3) limit=3;

const uint16 veryRareBound = limit;
const uint16 rareBound = 2*limit;
const uint16 almostRareBound = 4*limit;

// Cache Preview state (Criterion 2)
const bool isPreviewEnable = (thePrefs.GetPreviewPrio() || thePrefs.IsExtControlsEnabled() && GetPreviewPrio()) && IsPreviewableFileType();

// Collect and calculate criteria for all chunks
for(POSITION pos = chunksList.GetHeadPosition(); pos != NULL; ){
Chunk& cur_chunk = chunksList.GetNext(pos);

// Offsets of chunk
UINT uCurChunkPart = cur_chunk.part; // help VC71...
const uint64 uStart = (uint64)uCurChunkPart * PARTSIZE;
const uint64 uEnd = ((GetFileSize() - (uint64)1) < (uStart + PARTSIZE - 1)) ?
(GetFileSize() - (uint64)1) : (uStart + PARTSIZE - 1);
ASSERT( uStart <= uEnd );

// Criterion 2. Parts used for preview 用于预览的片
// Remark: - We need to download the first part and the last part(s).
// - When the last part is very small, it's necessary to
// download the two last parts.
bool critPreview = false;
if(isPreviewEnable == true){
if(cur_chunk.part == 0){
critPreview = true; / First chunk
}
else if(cur_chunk.part == partCount-1){
critPreview = true; // Last chunk
}
else if(cur_chunk.part == partCount-2){
// Last chunk - 1 (only if last chunk is too small)
if( (GetFileSize() - uEnd) < (uint64)PARTSIZE/3){
critPreview = true; // Last chunk - 1
}
}
}

// Criterion 3. Request state (downloading in process from other source(s))
//const bool critRequested = IsAlreadyRequested(uStart, uEnd);
bool critRequested = false; // <--- This is set as a part of the second critCompletion loop below

// Criterion 4. Completion
uint64 partSize = uEnd - uStart + 1; //If all is covered by gaps, we have downloaded PARTSIZE, or possibly less for the last chunk;
ASSERT(partSize <= PARTSIZE);
for(POSITION pos = gaplist.GetHeadPosition(); pos != NULL; ) {
const Gap_Struct* cur_gap = gaplist.GetNext(pos);
// Check if Gap is into the limit
if(cur_gap->start < uStart) {
if(cur_gap->end > uStart && cur_gap->end < uEnd) {
ASSERT(partSize >= (cur_gap->end - uStart + 1));
partSize -= cur_gap->end - uStart + 1;
}
else if(cur_gap->end >= uEnd) {
partSize = 0;
break; // exit loop for()
}
}
else if(cur_gap->start <= uEnd) {
if(cur_gap->end < uEnd) {
ASSERT(partSize >= (cur_gap->end - cur_gap->start + 1));
partSize -= cur_gap->end - cur_gap->start + 1;
}
else {
ASSERT(partSize >= (uEnd - cur_gap->start + 1));
partSize -= uEnd - cur_gap->start + 1;
}
}
}
//ASSERT(partSize <= PARTSIZE && partSize <= (uEnd - uStart + 1));

// requested blocks from sources we are currently downloading from is counted as if already downloaded
// this code will cause bytes that has been requested AND transferred to be counted twice, so we can end
// up with a completion number > PARTSIZE. That's ok, since it's just a relative number to compare chunks.
for(POSITION reqPos = requestedblocks_list.GetHeadPosition(); reqPos != NULL; ) {
const Requested_Block_Struct* reqBlock = requestedblocks_list.GetNext(reqPos);
if(reqBlock->StartOffset < uStart) {
if(reqBlock->EndOffset > uStart) {
if(reqBlock->EndOffset < uEnd) {
//ASSERT(partSize + (reqBlock->EndOffset - uStart + 1) <= (uEnd - uStart + 1));
partSize += reqBlock->EndOffset - uStart + 1;
critRequested = true;
} else if(reqBlock->EndOffset >= uEnd) {
//ASSERT(partSize + (uEnd - uStart + 1) <= uEnd - uStart);
partSize += uEnd - uStart + 1;
critRequested = true;
}
}
} else if(reqBlock->StartOffset <= uEnd) {
if(reqBlock->EndOffset < uEnd) {
//ASSERT(partSize + (reqBlock->EndOffset - reqBlock->StartOffset + 1) <= (uEnd - uStart + 1));
partSize += reqBlock->EndOffset - reqBlock->StartOffset + 1;
critRequested = true;
} else {
//ASSERT(partSize + (uEnd - reqBlock->StartOffset + 1) <= (uEnd - uStart + 1));
partSize += uEnd - reqBlock->StartOffset + 1;
critRequested = true;
}
}
}
//Don't check this (see comment above for explanation): ASSERT(partSize <= PARTSIZE && partSize <= (uEnd - uStart + 1));

if(partSize > PARTSIZE) partSize = PARTSIZE;

uint16 critCompletion = (uint16)ceil((double)(partSize*100)/PARTSIZE); // in [%]. Last chunk is always counted as a full size chunk, to not give it any advantage in this comparison due to smaller size. So a 1/3 of PARTSIZE downloaded in last chunk will give 33% even if there's just one more byte do download to complete the chunk.
if(critCompletion > 100) critCompletion = 100;

// Criterion 5. Prefer to continue the same chunk
const bool sameChunk = (cur_chunk.part == sender->m_lastPartAsked);

// Criterion 6. The more transferring clients that has this part, the better (i.e. lower).
uint16 transferringClientsScore = (uint16)m_downloadingSourceList.GetSize();

// Criterion 7. Sooner to completion (how much of a part is completed, how fast can be transferred to this part, if all currently transferring clients with this part are put on it. Lower is better.)
uint16 bandwidthScore = 2000;

// Calculate criterion 6 and 7
if(m_downloadingSourceList.GetSize() > 1) {
UINT totalDownloadDatarateForThisPart = 1;
for(POSITION downloadingClientPos = m_downloadingSourceList.GetHeadPosition(); downloadingClientPos != NULL; ) {
const CUpDownClient* downloadingClient = m_downloadingSourceList.GetNext(downloadingClientPos);
if(downloadingClient->IsPartAvailable(cur_chunk.part)) {
transferringClientsScore--;
totalDownloadDatarateForThisPart += downloadingClient->GetDownloadDatarate() + 500; // + 500 to make sure that a unstarted chunk available at two clients will end up just barely below 2000 (max limit)
}
}

bandwidthScore = (uint16)min((UINT)((PARTSIZE-partSize)/(totalDownloadDatarateForThisPart*5)), 2000);
//AddDebugLogLine(DLP_VERYLOW, false,
// _T("BandwidthScore for chunk %i: bandwidthScore = %u = min((PARTSIZE-partSize)/(totalDownloadDatarateForThisChunk*5), 2000) = min((PARTSIZE-%I64u)/(%u*5), 2000)"),
// cur_chunk.part, bandwidthScore, partSize, totalDownloadDatarateForThisChunk);
}

//AddDebugLogLine(DLP_VERYLOW, false, _T("Evaluating chunk number: %i, SourceCount: %u/%i, critPreview: %s, critRequested: %s, critCompletion: %i%%, sameChunk: %s"), cur_chunk.part, cur_chunk.frequency, GetSourceCount(), ((critPreview == true) ? _T("true") : _T("false")), ((critRequested == true) ? _T("true") : _T("false")), critCompletion, ((sameChunk == true) ? _T("true") : _T("false")));

// Calculate priority with all criteria
if(partSize > 0 && GetSourceCount() <= GetSrcA4AFCount()) {
// If there are too many a4af sources, the completion of blocks have very high prio
cur_chunk.rank = (cur_chunk.frequency) + // Criterion 1
((critPreview == true) ? 0 : 200) + // Criterion 2
((critRequested == true) ? 0 : 1) + // Criterion 3
(100 - critCompletion) + // Criterion 4
((sameChunk == true) ? 0 : 1) + // Criterion 5
bandwidthScore; // Criterion 7
} else if(cur_chunk.frequency <= veryRareBound){
// 3000..xxxx unrequested + requested very rare chunks
cur_chunk.rank = (75 * cur_chunk.frequency) + // Criterion 1
((critPreview == true) ? 0 : 1) + // Criterion 2
((critRequested == true) ? 3000 : 3001) + // Criterion 3
(100 - critCompletion) + // Criterion 4
((sameChunk == true) ? 0 : 1) + // Criterion 5
transferringClientsScore; // Criterion 6
}
else if(critPreview == true){
// 10000..10100 unrequested preview chunks
// 20000..20100 requested preview chunks
cur_chunk.rank = ((critRequested == true &&
sameChunk == false) ? 20000 : 10000) + // Criterion 3
(100 - critCompletion); // Criterion 4
}
else if(cur_chunk.frequency <= rareBound){
// 10101..1xxxx requested rare chunks
// 10102..1xxxx unrequested rare chunks
//ASSERT(cur_chunk.frequency >= veryRareBound);

cur_chunk.rank = (25 * cur_chunk.frequency) + // Criterion 1
((critRequested == true) ? 10101 : 10102) + // Criterion 3
(100 - critCompletion) + // Criterion 4
((sameChunk == true) ? 0 : 1) + // Criterion 5
transferringClientsScore; // Criterion 6
}
else if(cur_chunk.frequency <= almostRareBound){
// 20101..1xxxx requested almost rare chunks
// 20150..1xxxx unrequested almost rare chunks
//ASSERT(cur_chunk.frequency >= rareBound);

// used to slightly lessen the imporance of frequency
uint16 randomAdd = 1 + (uint16)((((uint32)rand()*(almostRareBound-rareBound))+(RAND_MAX/2))/RAND_MAX);
//AddDebugLogLine(LP_VERYLOW, false, _T("RandomAdd: %i, (%i-%i=%i)"), randomAdd, rareBound, almostRareBound, almostRareBound-rareBound);

cur_chunk.rank = (cur_chunk.frequency) + // Criterion 1
((critRequested == true) ? 20101 : (20201+almostRareBound-rareBound)) + // Criterion 3
((partSize > 0) ? 0 : 500) + // Criterion 4
(5*100 - (5*critCompletion)) + // Criterion 4
((sameChunk == true) ? (uint16)0 : randomAdd) + // Criterion 5
bandwidthScore; // Criterion 7
}
else { // common chunk
// 30000..30100 requested common chunks
// 30001..30101 unrequested common chunks
cur_chunk.rank = ((critRequested == true) ? 30000 : 30001) + // Criterion 3
(100 - critCompletion) + // Criterion 4
((sameChunk == true) ? 0 : 1) + // Criterion 5
bandwidthScore; // Criterion 7
}

//AddDebugLogLine(DLP_VERYLOW, false, _T("Rank: %u"), cur_chunk.rank);
}
}

// Select the next chunk to download
if(chunksList.IsEmpty() == FALSE){
// Find and count the chunck(s) with the highest priority
uint16 count = 0; // Number of found chunks with same priority
uint16 rank = 0xffff; // Highest priority found
for(POSITION pos = chunksList.GetHeadPosition(); pos != NULL; ){
const Chunk& cur_chunk = chunksList.GetNext(pos);
if(cur_chunk.rank < rank){
count = 1;
rank = cur_chunk.rank;
}
else if(cur_chunk.rank == rank){
count++;
}
}

// Use a random access to avoid that everybody tries to download the
// same chunks at the same time (=> spread the selected chunk among clients)
uint16 randomness = 1 + (uint16)((((uint32)rand()*(count-1))+(RAND_MAX/2))/RAND_MAX);
for(POSITION pos = chunksList.GetHeadPosition(); ; ){
POSITION cur_pos = pos;
const Chunk& cur_chunk = chunksList.GetNext(pos);
if(cur_chunk.rank == rank){
randomness--;
if(randomness == 0){
// Selection process is over
sender->m_lastPartAsked = tempLastPartAsked = cur_chunk.part;
//AddDebugLogLine(DLP_VERYLOW, false, _T("Chunk number %i selected. Rank: %u"), cur_chunk.part, cur_chunk.rank);

// Remark: this list might be reused up to ?count?times
chunksList.RemoveAt(cur_pos);
break; // exit loop for()
}
}
}
}
else {
// There is no remaining chunk to download
break; // Exit main loop while()
}
}
}
// Return the number of the blocks
*count = newBlockCount;


// Return
return (newBlockCount > 0);
}
上面的算法和注释已经对选择的实现过程做了详细的说明,有兴趣的可以查看源代码。对于BT和emule由于设计者的思路不太一样,所以具体实现有很多差别,但是核心思想是一样的:)