Added outbound part of sync Proposed format: d4:syncd20:<info_hash>8*N:(xxxxyy)*Nee, therefore had to refactor torrent cleanup now that it will hit all torrents once every OT_POOL_TIMEOUT units.

dynamic-accesslists
erdgeist 18 years ago
parent 1d2d3c9d95
commit b38104b986

@ -200,31 +200,9 @@ static void httpresponse( const int64 s, char *data ) {
switch( scan_urlencoded_query( &c, data = c, SCAN_PATH ) ) { switch( scan_urlencoded_query( &c, data = c, SCAN_PATH ) ) {
case 4: /* sync ? */ case 4: /* sync ? */
if( byte_diff( data, 4, "sync") ) HTTPERROR_404; if( byte_diff( data, 4, "sync") ) HTTPERROR_404;
scanon = 1; if( !( reply_size = return_changeset_for_tracker( &reply ) ) ) HTTPERROR_500;
while( scanon ) {
switch( scan_urlencoded_query( &c, data = c, SCAN_SEARCHPATH_PARAM ) ) {
case -2: scanon = 0; break; /* TERMINATOR */
case -1: HTTPERROR_400_PARAM; /* PARSE ERROR */
case 9:
if(byte_diff(data,9,"info_hash")) {
scan_urlencoded_query( &c, NULL, SCAN_SEARCHPATH_VALUE );
continue;
}
/* ignore this, when we have less than 20 bytes */
if( scan_urlencoded_query( &c, data = c, SCAN_SEARCHPATH_VALUE ) != 20 ) HTTPERROR_400_PARAM;
hash = (ot_hash*)data; /* Fall through intended */
break;
default:
scan_urlencoded_query( &c, NULL, SCAN_SEARCHPATH_VALUE );
break;
}
}
if( !hash ) HTTPERROR_400_PARAM;
if( !( reply_size = return_sync_for_torrent( hash, &reply ) ) ) HTTPERROR_500;
return sendmallocdata( s, reply, reply_size ); return sendmallocdata( s, reply, reply_size );
case 5: /* stats ? */ case 5: /* stats ? */
if( byte_diff(data,5,"stats")) HTTPERROR_404; if( byte_diff(data,5,"stats")) HTTPERROR_404;
scanon = 1; scanon = 1;
@ -523,11 +501,13 @@ static void handle_read( const int64 clientsocket ) {
array_catb( &h->request, static_inbuf, l ); array_catb( &h->request, static_inbuf, l );
if( array_failed( &h->request ) ) if( array_failed( &h->request ) )
httperror( clientsocket, "500 Server Error", "Request too long."); return httperror( clientsocket, "500 Server Error", "Request too long.");
else if( array_bytes( &h->request ) > 8192 )
httperror( clientsocket, "500 request too long", "You sent too much headers"); if( array_bytes( &h->request ) > 8192 )
else if( memchr( array_start( &h->request ), '\n', array_length( &h->request, 1 ) ) ) return httperror( clientsocket, "500 request too long", "You sent too much headers");
httpresponse( clientsocket, array_start( &h->request ) );
if( memchr( array_start( &h->request ), '\n', array_length( &h->request, 1 ) ) )
return httpresponse( clientsocket, array_start( &h->request ) );
} }
static void handle_write( const int64 clientsocket ) { static void handle_write( const int64 clientsocket ) {
@ -701,6 +681,9 @@ static void server_mainloop( ) {
taia_now( &next_timeout_check ); taia_now( &next_timeout_check );
taia_addsec( &next_timeout_check, &next_timeout_check, OT_CLIENT_TIMEOUT_CHECKINTERVAL); taia_addsec( &next_timeout_check, &next_timeout_check, OT_CLIENT_TIMEOUT_CHECKINTERVAL);
} }
/* See if we need to move our pools */
clean_all_torrents();
} }
} }

@ -25,6 +25,10 @@
/* GLOBAL VARIABLES */ /* GLOBAL VARIABLES */
static ot_vector all_torrents[256]; static ot_vector all_torrents[256];
static ot_vector changeset;
size_t changeset_size = 0;
time_t last_clean_time = 0;
#ifdef WANT_CLOSED_TRACKER #ifdef WANT_CLOSED_TRACKER
int g_closedtracker = 1; int g_closedtracker = 1;
static ot_torrent* const OT_TORRENT_NOT_ON_WHITELIST = (ot_torrent*)1; static ot_torrent* const OT_TORRENT_NOT_ON_WHITELIST = (ot_torrent*)1;
@ -158,33 +162,6 @@ static int vector_remove_torrent( ot_vector *vector, ot_hash *hash ) {
return 1; return 1;
} }
/* This function deallocates all timedouted pools and shifts all other pools
it Returns 1 if torrent itself has not seen an announce for more than OT_TORRENT_TIMEOUT time units
0 if torrent is not yet timed out
Note: We expect NOW as a parameter since calling time() may be expensive
*/
static int clean_peerlist( time_t time_now, ot_peerlist *peer_list ) {
int i, timedout = (int)( time_now - peer_list->base );
if( !timedout ) return 0;
if( timedout > OT_POOLS_COUNT ) timedout = OT_POOLS_COUNT;
for( i = OT_POOLS_COUNT - timedout; i < OT_POOLS_COUNT; ++i )
free( peer_list->peers[i].data);
memmove( peer_list->peers + timedout, peer_list->peers, sizeof( ot_vector ) * (OT_POOLS_COUNT-timedout) );
byte_zero( peer_list->peers, sizeof( ot_vector ) * timedout );
memmove( peer_list->seed_count + timedout, peer_list->seed_count, sizeof( size_t ) * ( OT_POOLS_COUNT - timedout) );
byte_zero( peer_list->seed_count, sizeof( size_t ) * timedout );
if( timedout == OT_POOLS_COUNT )
return time_now - peer_list->base > OT_TORRENT_TIMEOUT;
peer_list->base = time_now;
return 0;
}
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer ) { ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer ) {
int exactmatch; int exactmatch;
ot_torrent *torrent; ot_torrent *torrent;
@ -219,8 +196,7 @@ ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer ) {
byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); byte_zero( torrent->peer_list, sizeof( ot_peerlist ) );
torrent->peer_list->base = NOW; torrent->peer_list->base = NOW;
} else }
clean_peerlist( NOW, torrent->peer_list );
/* Sanitize flags: Whoever claims to have completed download, must be a seeder */ /* Sanitize flags: Whoever claims to have completed download, must be a seeder */
if( ( OT_FLAG( peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED ) if( ( OT_FLAG( peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED )
@ -294,7 +270,9 @@ size_t return_peers_for_torrent( ot_torrent *torrent, size_t amount, char *reply
peer_count += torrent->peer_list->peers[index].size; peer_count += torrent->peer_list->peers[index].size;
seed_count += torrent->peer_list->seed_count[index]; seed_count += torrent->peer_list->seed_count[index];
} }
if( peer_count < amount ) amount = peer_count;
if( peer_count < amount )
amount = peer_count;
if( is_tcp ) if( is_tcp )
r += sprintf( r, "d8:completei%zde10:incompletei%zde8:intervali%ie5:peers%zd:", seed_count, peer_count-seed_count, OT_CLIENT_REQUEST_INTERVAL_RANDOM, 6*amount ); r += sprintf( r, "d8:completei%zde10:incompletei%zde8:intervali%ie5:peers%zd:", seed_count, peer_count-seed_count, OT_CLIENT_REQUEST_INTERVAL_RANDOM, 6*amount );
@ -348,10 +326,8 @@ size_t return_fullscrape_for_tracker( char **reply ) {
int i, k; int i, k;
char *r; char *r;
for( i=0; i<256; ++i ) { for( i=0; i<256; ++i )
ot_vector *torrents_list = &all_torrents[i]; torrent_count += all_torrents[i].size;
torrent_count += torrents_list->size;
}
if( !( r = *reply = malloc( 128*torrent_count ) ) ) return 0; if( !( r = *reply = malloc( 128*torrent_count ) ) ) return 0;
@ -380,7 +356,6 @@ size_t return_memstat_for_tracker( char **reply ) {
size_t torrent_count = 0, j; size_t torrent_count = 0, j;
int i, k; int i, k;
char *r; char *r;
time_t time_now = NOW;
for( i=0; i<256; ++i ) { for( i=0; i<256; ++i ) {
ot_vector *torrents_list = &all_torrents[i]; ot_vector *torrents_list = &all_torrents[i];
@ -398,7 +373,6 @@ size_t return_memstat_for_tracker( char **reply ) {
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list; ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list;
ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[j] ).hash; ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[j] ).hash;
r += sprintf( r, "\n%s:\n", to_hex( (ot_byte*)hash ) ); r += sprintf( r, "\n%s:\n", to_hex( (ot_byte*)hash ) );
clean_peerlist( time_now, peer_list );
for( k=0; k<OT_POOLS_COUNT; ++k ) for( k=0; k<OT_POOLS_COUNT; ++k )
r += sprintf( r, "\t%04X %04X\n", ((unsigned int)peer_list->peers[k].size), (unsigned int)peer_list->peers[k].space ); r += sprintf( r, "\t%04X %04X\n", ((unsigned int)peer_list->peers[k].size), (unsigned int)peer_list->peers[k].space );
} }
@ -418,7 +392,6 @@ size_t return_udp_scrape_for_torrent( ot_hash *hash, char *reply ) {
memset( reply, 0, 12); memset( reply, 0, 12);
} else { } else {
unsigned long *r = (unsigned long*) reply; unsigned long *r = (unsigned long*) reply;
clean_peerlist( NOW, torrent->peer_list );
for( i=0; i<OT_POOLS_COUNT; ++i ) { for( i=0; i<OT_POOLS_COUNT; ++i ) {
peers += torrent->peer_list->peers[i].size; peers += torrent->peer_list->peers[i].size;
@ -440,7 +413,6 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash, char *reply ) {
ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
if( !exactmatch ) return sprintf( r, "d5:filesdee" ); if( !exactmatch ) return sprintf( r, "d5:filesdee" );
clean_peerlist( NOW, torrent->peer_list );
for( i=0; i<OT_POOLS_COUNT; ++i ) { for( i=0; i<OT_POOLS_COUNT; ++i ) {
peers += torrent->peer_list->peers[i].size; peers += torrent->peer_list->peers[i].size;
@ -453,36 +425,145 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash, char *reply ) {
return r - reply; return r - reply;
} }
size_t return_sync_for_torrent( ot_hash *hash, char **reply ) { /* Throw away old changeset */
int exactmatch; static void release_changeset( void ) {
size_t peers = 0; ot_byte **changeset_ptrs = (ot_byte**)(changeset.data);
char *r; int i;
ot_vector *torrents_list = &all_torrents[*hash[0]];
ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); for( i = 0; i < changeset.size; ++i )
free( changeset_ptrs[i] );
free( changeset_ptrs );
byte_zero( &changeset, sizeof( changeset ) );
changeset_size = 0;
}
static void add_pool_to_changeset( ot_hash *hash, ot_peer *peers, size_t peer_count ) {
ot_byte *pool_copy = (ot_byte *)malloc( sizeof( size_t ) + sizeof( ot_hash ) + sizeof( ot_peer ) * peer_count + 13 );
size_t r = 0;
if( !pool_copy )
return;
if( exactmatch ) { memmove( pool_copy + sizeof( size_t ), "20:", 3 );
clean_peerlist( NOW, torrent->peer_list ); memmove( pool_copy + sizeof( size_t ) + 3, hash, sizeof( ot_hash ) );
peers = torrent->peer_list->peers[0].size; r = sizeof( size_t ) + 3 + sizeof( ot_hash );
r += sprintf( (char*)pool_copy + r, "%zd:", sizeof( ot_peer ) * peer_count );
memmove( pool_copy + r, peers, sizeof( ot_peer ) * peer_count );
r += sizeof( ot_peer ) * peer_count;
/* Without the length field */
*(size_t*)pool_copy = r - sizeof( size_t );
if( changeset.size + 1 >= changeset.space ) {
size_t new_space = changeset.space ? OT_VECTOR_GROW_RATIO * changeset.space : OT_VECTOR_MIN_MEMBERS;
ot_byte *new_data = realloc( changeset.data, new_space * sizeof( ot_byte *) );
if( !new_data )
return free( pool_copy );
changeset.data = new_data;
changeset.space = new_space;
} }
if( !( r = *reply = malloc( 10 + peers * sizeof( ot_peer ) ) ) ) return 0; ((ot_byte**)changeset.data)[changeset.size++] = pool_copy;
memmove( r, "d4:sync", 7 ); /* Without the length field */
r += 7; changeset_size += r - sizeof( size_t );
r += sprintf( r, "%zd:", peers * sizeof( ot_peer ) ); }
if( peers ) {
memmove( r, torrent->peer_list->peers[0].data, peers * sizeof( ot_peer ) ); /* Proposed output format
r += peers * sizeof( ot_peer ); d4:syncd20:<info_hash>8*N:(xxxxyy)*Nee
*/
size_t return_changeset_for_tracker( char **reply ) {
size_t i, r = 8;
clean_all_torrents();
*reply = malloc( 8 + changeset_size + 2 );
if( !*reply )
return 0;
memmove( *reply, "d4:syncd", 8 );
for( i = 0; i < changeset.size; ++i ) {
ot_byte *data = ((ot_byte**)changeset.data)[i];
memmove( *reply + r, data + sizeof( size_t ), *(size_t*)data );
r += *(size_t*)data;
}
(*reply)[r++] = 'e';
(*reply)[r++] = 'e';
return r;
}
/* Clean up all torrents, remove timedout pools and
torrents, also prepare new changeset */
void clean_all_torrents( void ) {
int i, j, k;
time_t time_now = NOW;
size_t peers_count;
if( time_now <= last_clean_time )
return;
last_clean_time = time_now;
release_changeset();
for( i=0; i<256; ++i ) {
ot_vector *torrents_list = &all_torrents[i];
for( j=0; j<torrents_list->size; ++j ) {
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list;
ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[j] ).hash;
time_t timedout = (int)( time_now - peer_list->base );
/* Torrent has idled out */
if( timedout > OT_TORRENT_TIMEOUT ) {
vector_remove_torrent( torrents_list, hash );
--j;
}
/* If nothing to be cleaned here, handle next torrent */
if( timedout > OT_POOLS_COUNT )
continue;
/* Release vectors that have timed out */
for( k = OT_POOLS_COUNT - timedout; k < OT_POOLS_COUNT; ++k )
free( peer_list->peers[k].data);
/* Shift vectors back by the amount of pools that were shifted out */
memmove( peer_list->peers + timedout, peer_list->peers, sizeof( ot_vector ) * ( OT_POOLS_COUNT - timedout ) );
byte_zero( peer_list->peers, sizeof( ot_vector ) * timedout );
/* Shift back seed counts as well */
memmove( peer_list->seed_count + timedout, peer_list->seed_count, sizeof( size_t ) * ( OT_POOLS_COUNT - timedout ) );
byte_zero( peer_list->seed_count, sizeof( size_t ) * timedout );
/* Save the block modified within last OT_POOLS_TIMEOUT */
if( peer_list->peers[1].size )
add_pool_to_changeset( hash, peer_list->peers[1].data, peer_list->peers[1].size );
peers_count = 0;
for( k = 0; k < OT_POOLS_COUNT; ++k )
peers_count += peer_list->peers[k].size;
if( peers_count ) {
peer_list->base = time_now;
} else {
/* When we got here, the last time that torrent
has been touched is OT_POOLS_COUNT units before */
peer_list->base = time_now - OT_POOLS_COUNT;
}
}
} }
*r++ = 'e';
return r - *reply;
} }
typedef struct { int val; ot_torrent * torrent; } ot_record; typedef struct { int val; ot_torrent * torrent; } ot_record;
/* Fetches stats from tracker */ /* Fetches stats from tracker */
size_t return_stats_for_tracker( char *reply, int mode ) { size_t return_stats_for_tracker( char *reply, int mode ) {
time_t time_now = NOW;
size_t torrent_count = 0, peer_count = 0, seed_count = 0, j; size_t torrent_count = 0, peer_count = 0, seed_count = 0, j;
ot_record top5s[5], top5c[5]; ot_record top5s[5], top5c[5];
char *r = reply; char *r = reply;
@ -498,12 +579,6 @@ size_t return_stats_for_tracker( char *reply, int mode ) {
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list; ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list;
size_t local_peers = 0, local_seeds = 0; size_t local_peers = 0, local_seeds = 0;
if( clean_peerlist( time_now, peer_list ) ) {
ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[j] ).hash;
vector_remove_torrent( torrents_list, hash );
--j;
continue;
}
for( k=0; k<OT_POOLS_COUNT; ++k ) { for( k=0; k<OT_POOLS_COUNT; ++k ) {
local_peers += peer_list->peers[k].size; local_peers += peer_list->peers[k].size;
local_seeds += peer_list->seed_count[k]; local_seeds += peer_list->seed_count[k];
@ -549,12 +624,6 @@ void remove_peer_from_torrent( ot_hash *hash, ot_peer *peer ) {
if( !exactmatch ) return; if( !exactmatch ) return;
/* Maybe this does the job */
if( clean_peerlist( NOW, torrent->peer_list ) ) {
vector_remove_torrent( torrents_list, hash );
return;
}
for( i=0; i<OT_POOLS_COUNT; ++i ) for( i=0; i<OT_POOLS_COUNT; ++i )
switch( vector_remove_peer( &torrent->peer_list->peers[i], peer, i == 0 ) ) { switch( vector_remove_peer( &torrent->peer_list->peers[i], peer, i == 0 ) ) {
case 0: continue; case 0: continue;
@ -573,6 +642,8 @@ int init_logic( const char * const serverdir ) {
/* Initialize control structures */ /* Initialize control structures */
byte_zero( all_torrents, sizeof( all_torrents ) ); byte_zero( all_torrents, sizeof( all_torrents ) );
byte_zero( &changeset, sizeof( changeset ) );
changeset_size = 0;
return 0; return 0;
} }
@ -591,4 +662,6 @@ void deinit_logic( void ) {
} }
} }
byte_zero( all_torrents, sizeof (all_torrents)); byte_zero( all_torrents, sizeof (all_torrents));
byte_zero( &changeset, sizeof( changeset ) );
changeset_size = 0;
} }

@ -98,9 +98,10 @@ size_t return_peers_for_torrent( ot_torrent *torrent, size_t amount, char *reply
size_t return_fullscrape_for_tracker( char **reply ); size_t return_fullscrape_for_tracker( char **reply );
size_t return_tcp_scrape_for_torrent( ot_hash *hash, char *reply ); size_t return_tcp_scrape_for_torrent( ot_hash *hash, char *reply );
size_t return_udp_scrape_for_torrent( ot_hash *hash, char *reply ); size_t return_udp_scrape_for_torrent( ot_hash *hash, char *reply );
size_t return_sync_for_torrent( ot_hash *hash, char **reply );
size_t return_stats_for_tracker( char *reply, int mode ); size_t return_stats_for_tracker( char *reply, int mode );
size_t return_memstat_for_tracker( char **reply ); size_t return_memstat_for_tracker( char **reply );
size_t return_changeset_for_tracker( char **reply );
void clean_all_torrents( void );
void remove_peer_from_torrent( ot_hash *hash, ot_peer *peer ); void remove_peer_from_torrent( ot_hash *hash, ot_peer *peer );
#endif #endif

Loading…
Cancel
Save