Reenabled syncing

dynamic-accesslists
erdgeist 17 years ago
parent b19bbd6a85
commit 31eada6168

@ -49,6 +49,10 @@ static char *accesslist_filename = NULL;
#define WANT_ACCESS_CONTROL #define WANT_ACCESS_CONTROL
#endif #endif
#ifndef WANT_TRACKER_SYNC
#define add_peer_to_torrent(A,B,C) add_peer_to_torrent(A,B)
#endif
#ifndef NO_FULLSCRAPE_LOGGING #ifndef NO_FULLSCRAPE_LOGGING
#define LOG_TO_STDERR( ... ) fprintf( stderr, __VA_ARGS__ ) #define LOG_TO_STDERR( ... ) fprintf( stderr, __VA_ARGS__ )
#else #else
@ -91,7 +95,12 @@ struct http_data {
int main( int argc, char **argv ); int main( int argc, char **argv );
static void httperror( const int64 s, const char *title, const char *message ); static void httperror( const int64 s, const char *title, const char *message );
#ifdef _DEBUG_HTTPERROR
static void httpresponse( const int64 s, char *data, size_t l ); static void httpresponse( const int64 s, char *data, size_t l );
#else
static void httpresponse( const int64 s, char *data );
#endif
static void sendmmapdata( const int64 s, char *buffer, const size_t size ); static void sendmmapdata( const int64 s, char *buffer, const size_t size );
static void senddata( const int64 s, char *buffer, const size_t size ); static void senddata( const int64 s, char *buffer, const size_t size );
@ -213,7 +222,11 @@ static void senddata( const int64 s, char *buffer, size_t size ) {
} }
} }
#ifdef _DEBUG_HTTPERROR
static void httpresponse( const int64 s, char *data, size_t l ) { static void httpresponse( const int64 s, char *data, size_t l ) {
#else
static void httpresponse( const int64 s, char *data ) {
#endif
struct http_data* h = io_getcookie( s ); struct http_data* h = io_getcookie( s );
char *c, *reply; char *c, *reply;
ot_peer peer; ot_peer peer;
@ -643,8 +656,13 @@ static void handle_read( const int64 clientsocket ) {
/* If we get the whole request in one packet, handle it without copying */ /* If we get the whole request in one packet, handle it without copying */
if( !array_start( &h->request ) ) { if( !array_start( &h->request ) ) {
if( memchr( static_inbuf, '\n', l ) ) if( memchr( static_inbuf, '\n', l ) ) {
return httpresponse( clientsocket, static_inbuf, l ); return httpresponse( clientsocket, static_inbuf
#ifdef _DEBUG_HTTPERROR
, l
#endif
);
}
h->flag |= STRUCT_HTTP_FLAG_ARRAY_USED; h->flag |= STRUCT_HTTP_FLAG_ARRAY_USED;
return array_catb( &h->request, static_inbuf, l ); return array_catb( &h->request, static_inbuf, l );
} }
@ -658,8 +676,13 @@ static void handle_read( const int64 clientsocket ) {
if( ( array_bytes( &h->request ) > 8192 ) && NOTBLESSED( h ) ) if( ( array_bytes( &h->request ) > 8192 ) && NOTBLESSED( h ) )
return httperror( clientsocket, "500 request too long", "You sent too much headers"); return httperror( clientsocket, "500 request too long", "You sent too much headers");
if( memchr( array_start( &h->request ), '\n', array_bytes( &h->request ) ) ) if( memchr( array_start( &h->request ), '\n', array_bytes( &h->request ) ) ) {
return httpresponse( clientsocket, array_start( &h->request ), array_bytes( &h->request ) ); return httpresponse( clientsocket, array_start( &h->request )
#ifdef _DEBUG_HTTPERROR
, array_bytes( &h->request )
#endif
);
}
} }
static void handle_write( const int64 clientsocket ) { static void handle_write( const int64 clientsocket ) {

@ -22,14 +22,11 @@
/* GLOBAL VARIABLES */ /* GLOBAL VARIABLES */
static ot_vector all_torrents[OT_BUCKET_COUNT]; static ot_vector all_torrents[OT_BUCKET_COUNT];
static ot_time all_torrents_clean[OT_BUCKET_COUNT]; static ot_time all_torrents_clean[OT_BUCKET_COUNT];
static ot_vector changeset;
#if defined ( WANT_BLACKLISTING ) || defined( WANT_CLOSED_TRACKER ) #if defined ( WANT_BLACKLISTING ) || defined( WANT_CLOSED_TRACKER )
static ot_vector accesslist; static ot_vector accesslist;
#define WANT_ACCESS_CONTROL #define WANT_ACCESS_CONTROL
#endif #endif
static size_t changeset_size = 0;
static int clean_single_torrent( ot_torrent *torrent ); static int clean_single_torrent( ot_torrent *torrent );
/* Converter function from memory to human readable hex strings /* Converter function from memory to human readable hex strings
@ -126,6 +123,9 @@ static void free_peerlist( ot_peerlist *peer_list ) {
for( i=0; i<OT_POOLS_COUNT; ++i ) for( i=0; i<OT_POOLS_COUNT; ++i )
if( peer_list->peers[i].data ) if( peer_list->peers[i].data )
free( peer_list->peers[i].data ); free( peer_list->peers[i].data );
#ifdef WANT_TRACKER_SYNC
free( peer_list->changeset.data );
#endif
free( peer_list ); free( peer_list );
} }
@ -145,7 +145,11 @@ static void vector_remove_torrent( ot_vector *vector, ot_torrent *match ) {
} }
} }
#ifdef WANT_TRACKER_SYNC
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer, int from_changeset ) { ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer, int from_changeset ) {
#else
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer ) {
#endif
int exactmatch; int exactmatch;
ot_torrent *torrent; ot_torrent *torrent;
ot_peer *peer_dest; ot_peer *peer_dest;
@ -184,6 +188,7 @@ ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer, int from_changese
if( ( OT_FLAG( peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED ) if( ( OT_FLAG( peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED )
OT_FLAG( peer ) ^= PEER_FLAG_COMPLETED; OT_FLAG( peer ) ^= PEER_FLAG_COMPLETED;
#ifdef WANT_TRACKER_SYNC
if( from_changeset ) { if( from_changeset ) {
/* Check, whether peer already is in current pool, do nothing if so */ /* Check, whether peer already is in current pool, do nothing if so */
peer_pool = &torrent->peer_list->peers[0]; peer_pool = &torrent->peer_list->peers[0];
@ -192,6 +197,7 @@ ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer, int from_changese
return torrent; return torrent;
base_pool = 1; base_pool = 1;
} }
#endif
peer_pool = &torrent->peer_list->peers[ base_pool ]; peer_pool = &torrent->peer_list->peers[ base_pool ];
peer_dest = vector_find_or_insert( peer_pool, (void*)peer, sizeof( ot_peer ), OT_PEER_COMPARE_SIZE, &exactmatch ); peer_dest = vector_find_or_insert( peer_pool, (void*)peer, sizeof( ot_peer ), OT_PEER_COMPARE_SIZE, &exactmatch );
@ -431,54 +437,6 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash_list, int amount, char *repl
} }
#ifdef WANT_TRACKER_SYNC #ifdef WANT_TRACKER_SYNC
/* Throw away old changeset */
static void release_changeset( void ) {
ot_byte **changeset_ptrs = (ot_byte**)(changeset.data);
size_t i;
for( i = 0; i < changeset.size; ++i )
free( changeset_ptrs[i] );
free( changeset_ptrs );
byte_zero( &changeset, sizeof( changeset ) );
changeset_size = 0;
}
static void add_pool_to_changeset( ot_hash *hash, ot_peer *peers, size_t peer_count ) {
ot_byte *pool_copy = (ot_byte *)malloc( sizeof( size_t ) + sizeof( ot_hash ) + sizeof( ot_peer ) * peer_count + 13 );
size_t r = 0;
if( !pool_copy )
return;
memmove( pool_copy + sizeof( size_t ), "20:", 3 );
memmove( pool_copy + sizeof( size_t ) + 3, hash, sizeof( ot_hash ) );
r = sizeof( size_t ) + 3 + sizeof( ot_hash );
r += sprintf( (char*)pool_copy + r, "%zd:", sizeof( ot_peer ) * peer_count );
memmove( pool_copy + r, peers, sizeof( ot_peer ) * peer_count );
r += sizeof( ot_peer ) * peer_count;
/* Without the length field */
*(size_t*)pool_copy = r - sizeof( size_t );
if( changeset.size + 1 >= changeset.space ) {
size_t new_space = changeset.space ? OT_VECTOR_GROW_RATIO * changeset.space : OT_VECTOR_MIN_MEMBERS;
ot_byte *new_data = realloc( changeset.data, new_space * sizeof( ot_byte *) );
if( !new_data )
return free( pool_copy );
changeset.data = new_data;
changeset.space = new_space;
}
((ot_byte**)changeset.data)[changeset.size++] = pool_copy;
/* Without the length field */
changeset_size += r - sizeof( size_t );
}
/* Import Changeset from an external authority /* Import Changeset from an external authority
format: d4:syncd[..]ee format: d4:syncd[..]ee
[..]: ( 20:01234567890abcdefghij16:XXXXYYYY )+ [..]: ( 20:01234567890abcdefghij16:XXXXYYYY )+
@ -523,23 +481,45 @@ int add_changeset_to_tracker( ot_byte *data, size_t len ) {
d4:syncd20:<info_hash>8*N:(xxxxyyyy)*Nee d4:syncd20:<info_hash>8*N:(xxxxyyyy)*Nee
*/ */
size_t return_changeset_for_tracker( char **reply ) { size_t return_changeset_for_tracker( char **reply ) {
size_t i, r = 8; size_t allocated = 0, i, replysize;
int bucket;
char *r;
/* Maybe there is time to clean_all_torrents(); */
/* Determine space needed for whole changeset */
for( bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket ) {
ot_vector *torrents_list = all_torrents + bucket;
for( i=0; i<torrents_list->size; ++i ) {
ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + i;
allocated += sizeof( ot_hash ) + sizeof(ot_peer) * torrent->peer_list->changeset.size + 13;
}
}
clean_all_torrents(); /* add "d4:syncd" and "ee" */
allocated += 8 + 2;
if( !( *reply = mmap( NULL, 8 + changeset_size + 2, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0 ) ) ) return 0; if( !( r = *reply = mmap( NULL, allocated, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0 ) ) )
return 0;
memmove( *reply, "d4:syncd", 8 ); memmove( r, "d4:syncd", 8 ); r += 8;
for( i = 0; i < changeset.size; ++i ) { for( bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket ) {
ot_byte *data = ((ot_byte**)changeset.data)[i]; ot_vector *torrents_list = all_torrents + bucket;
memmove( *reply + r, data + sizeof( size_t ), *(size_t*)data ); for( i=0; i<torrents_list->size; ++i ) {
r += *(size_t*)data; ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + i;
const size_t byte_count = sizeof(ot_peer) * torrent->peer_list->changeset.size;
*r++ = '2'; *r++ = '0'; *r++ = ':';
memmove( r, torrent->hash, sizeof( ot_hash ) ); r += sizeof( ot_hash );
r += sprintf( r, "%zd:", byte_count );
memmove( r, torrent->peer_list->changeset.data, byte_count ); r += byte_count;
}
} }
*r++ = 'e'; *r++ = 'e';
(*reply)[r++] = 'e'; replysize = ( r - *reply );
(*reply)[r++] = 'e'; fix_mmapallocation( *reply, allocated, replysize );
return r; return replysize;
} }
#endif #endif
@ -551,6 +531,9 @@ static int clean_single_torrent( ot_torrent *torrent ) {
size_t peers_count = 0, seeds_count; size_t peers_count = 0, seeds_count;
time_t timedout = (int)( NOW - peer_list->base ); time_t timedout = (int)( NOW - peer_list->base );
int i; int i;
#ifdef WANT_TRACKER_SYNC
char *new_peers;
#endif
/* Torrent has idled out */ /* Torrent has idled out */
if( timedout > OT_TORRENT_TIMEOUT ) if( timedout > OT_TORRENT_TIMEOUT )
@ -575,10 +558,20 @@ static int clean_single_torrent( ot_torrent *torrent ) {
memmove( peer_list->seed_counts + timedout, peer_list->seed_counts, sizeof( size_t ) * ( OT_POOLS_COUNT - timedout ) ); memmove( peer_list->seed_counts + timedout, peer_list->seed_counts, sizeof( size_t ) * ( OT_POOLS_COUNT - timedout ) );
byte_zero( peer_list->seed_counts, sizeof( size_t ) * timedout ); byte_zero( peer_list->seed_counts, sizeof( size_t ) * timedout );
/* Save the block modified within last OT_POOLS_TIMEOUT --- XXX no sync for now #ifdef WANT_TRACKER_SYNC
if( peer_list->peers[1].size ) /* Save the block modified within last OT_POOLS_TIMEOUT */
add_pool_to_changeset( hash, peer_list->peers[1].data, peer_list->peers[1].size ); if( peer_list->peers[1].size &&
*/ ( new_peers = realloc( peer_list->changeset.data, sizeof( ot_peer ) * peer_list->peers[1].size ) ) )
{
memmove( new_peers, peer_list->peers[1].data, peer_list->peers[1].size );
peer_list->changeset.data = new_peers;
peer_list->changeset.size = sizeof( ot_peer ) * peer_list->peers[1].size;
} else {
free( peer_list->changeset.data );
memset( &peer_list->changeset, 0, sizeof( ot_vector ) );
}
#endif
peers_count = seeds_count = 0; peers_count = seeds_count = 0;
for( i = 0; i < OT_POOLS_COUNT; ++i ) { for( i = 0; i < OT_POOLS_COUNT; ++i ) {
@ -606,10 +599,6 @@ void clean_all_torrents( void ) {
static int bucket; static int bucket;
ot_time time_now = NOW; ot_time time_now = NOW;
/* No sync for now
release_changeset();
*/
/* Search for an uncleaned bucked */ /* Search for an uncleaned bucked */
while( ( all_torrents_clean[bucket] == time_now ) && ( ++bucket < OT_BUCKET_COUNT ) ); while( ( all_torrents_clean[bucket] == time_now ) && ( ++bucket < OT_BUCKET_COUNT ) );
if( bucket >= OT_BUCKET_COUNT ) { if( bucket >= OT_BUCKET_COUNT ) {
@ -869,8 +858,6 @@ int init_logic( const char * const serverdir ) {
/* Initialize control structures */ /* Initialize control structures */
byte_zero( all_torrents, sizeof( all_torrents ) ); byte_zero( all_torrents, sizeof( all_torrents ) );
byte_zero( &changeset, sizeof( changeset ) );
changeset_size = 0;
return 0; return 0;
} }
@ -890,8 +877,6 @@ void deinit_logic( void ) {
} }
byte_zero( all_torrents, sizeof (all_torrents)); byte_zero( all_torrents, sizeof (all_torrents));
byte_zero( all_torrents_clean, sizeof (all_torrents_clean)); byte_zero( all_torrents_clean, sizeof (all_torrents_clean));
byte_zero( &changeset, sizeof( changeset ) );
changeset_size = 0;
} }
#ifdef WANT_ACCESS_CONTROL #ifdef WANT_ACCESS_CONTROL

@ -81,6 +81,9 @@ typedef struct {
size_t down_count; size_t down_count;
size_t seed_counts[ OT_POOLS_COUNT ]; size_t seed_counts[ OT_POOLS_COUNT ];
ot_vector peers[ OT_POOLS_COUNT ]; ot_vector peers[ OT_POOLS_COUNT ];
#ifdef WANT_TRACKER_SYNC
ot_vector changeset;
#endif
} ot_peerlist; } ot_peerlist;
typedef struct { typedef struct {
@ -97,7 +100,11 @@ void deinit_logic( void );
enum { STATS_MRTG, STATS_TOP5, STATS_DMEM, STATS_TCP, STATS_UDP, STATS_SLASH24S, STATS_SLASH24S_OLD, SYNC_IN, SYNC_OUT }; enum { STATS_MRTG, STATS_TOP5, STATS_DMEM, STATS_TCP, STATS_UDP, STATS_SLASH24S, STATS_SLASH24S_OLD, SYNC_IN, SYNC_OUT };
#ifdef WANT_TRACKER_SYNC
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer, int from_changeset ); ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer, int from_changeset );
#else
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer );
#endif
size_t remove_peer_from_torrent( ot_hash *hash, ot_peer *peer, char *reply, int is_tcp ); size_t remove_peer_from_torrent( ot_hash *hash, ot_peer *peer, char *reply, int is_tcp );
size_t return_peers_for_torrent( ot_torrent *torrent, size_t amount, char *reply, int is_tcp ); size_t return_peers_for_torrent( ot_torrent *torrent, size_t amount, char *reply, int is_tcp );
size_t return_fullscrape_for_tracker( char **reply ); size_t return_fullscrape_for_tracker( char **reply );

Loading…
Cancel
Save