From 5168a3314c822a74389011d44277e70c329590b0 Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Tue, 29 Sep 2009 06:03:39 +0000 Subject: [PATCH] Reaching completion soon --- proxy.c | 468 +++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 413 insertions(+), 55 deletions(-) diff --git a/proxy.c b/proxy.c index 3577f44..c27599b 100644 --- a/proxy.c +++ b/proxy.c @@ -4,6 +4,7 @@ $Id$ */ /* System */ +#include #include #include #include @@ -14,6 +15,7 @@ #include #include #include +#include /* Libowfat */ #include "socket.h" @@ -26,30 +28,59 @@ /* Opentracker */ #include "trackerlogic.h" +#include "ot_vector.h" +#include "ot_mutex.h" #include "ot_livesync.h" +#include "ot_stats.h" +ot_ip6 g_serverip; +uint16_t g_serverport = 9009; uint32_t g_tracker_id; -char groupip_1[4] = { 224,0,23,5 }; +char groupip_1[4] = { 224,0,23,5 }; +int g_self_pipe[2]; + +/* If you have more than 10 peers, don't use this proxy + Use 20 slots for 10 peers to have room for 10 incoming connection slots + */ +#define MAX_PEERS 20 #define LIVESYNC_INCOMING_BUFFSIZE (256*256) +#define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) +/* The amount of time a complete sync cycle should take */ +#define OT_SYNC_INTERVAL_MINUTES 2 + +/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ +#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) + enum { OT_SYNC_PEER }; +enum { FLAG_SERVERSOCKET = 1 }; -/* For outgoing packets */ +/* For incoming packets */ static int64 g_socket_in = -1; +static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; -/* For incoming packets */ +/* For outgoing packets */ static int64 g_socket_out = -1; -static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; +//static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE]; + +static void * livesync_worker( void * args ); +static void * streamsync_worker( void * args ); void exerr( char * message ) { fprintf( stderr, "%s\n", message ); exit( 111 ); } +void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { + (void) event; + (void) proto; + (void) event_data; +} + void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { char tmpip[4] = {0,0,0,0}; char *v4ip; @@ -80,16 +111,6 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { socket_mcloop4(g_socket_out, 1); } -static ot_vector all_torrents[OT_BUCKET_COUNT]; -ot_vector *mutex_bucket_lock_by_hash( ot_hash hash ) { - return all_torrents + ( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT ); -} -ot_vector *mutex_bucket_lock( int bucket ) { - return all_torrents + bucket; -} -#define mutex_bucket_unlock_by_hash(A,B) -#define mutex_bucket_unlock(A) - size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { int exactmatch; ot_torrent *torrent; @@ -106,6 +127,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { vector_remove_torrent( torrents_list, torrent ); + mutex_bucket_unlock_by_hash( hash, 0 ); return -1; } @@ -114,8 +136,10 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { /* Check for peer in torrent */ peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); - if( !peer_dest ) return -1; - + if( !peer_dest ) { + mutex_bucket_unlock_by_hash( hash, 0 ); + return -1; + } /* Tell peer that it's fresh */ OT_PEERTIME( peer ) = 0; @@ -126,6 +150,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { torrent->peer_list->seed_count++; } memcpy( peer_dest, peer, sizeof(ot_peer) ); + mutex_bucket_unlock_by_hash( hash, 0 ); return 0; } @@ -143,6 +168,7 @@ size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) { } } + mutex_bucket_unlock_by_hash( hash, 0 ); return 0; } @@ -182,16 +208,243 @@ int usage( char *self ) { return 0; } -static uint32_t peer_counts[1024]; -#ifdef WANT_SCROOOOOOOLL -static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d>4];*d++=m[*s++&15];}*d=0;return t;} -#endif +enum { + FLAG_OUTGOING = 0x80, + + FLAG_DISCONNECTED = 0x00, + FLAG_CONNECTING = 0x01, + FLAG_WAITTRACKERID = 0x02, + FLAG_CONNECTED = 0x03, + + FLAG_MASK = 0x07 +}; + +#define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) +#define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) +#define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) +#define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) +#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) + +typedef struct { + int state; /* Whether we want to connect, how far our handshake is, etc. */ + ot_ip6 ip; /* The peer to connect to */ + uint16_t port; /* The peers port */ + uint8_t *indata; /* Any data not processed yet */ + size_t indata_length; /* Length of unprocessed data */ + uint32_t tracker_id; /* How the other end greeted */ + int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ + io_batch outdata; /* The iobatch containing our sync data */ +} proxy_peer; + +/* Number of connections to peers + * If a peer's IP is set, we try to reconnect, when the connection drops + * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it + * Multiple connections to/from the same ip are okay, if tracker_id doesn't match + * Reconnect attempts occur only twice a minute +*/ +static int g_connection_count; +static ot_time g_connection_reconn; +static proxy_peer g_connections[MAX_PEERS]; + +static void handle_reconnects( void ) { + int i; + for( i=0; istate & FLAG_MASK ) { + case FLAG_DISCONNECTED: break; /* Shouldnt happen */ + case FLAG_CONNECTING: + case FLAG_WAITTRACKERID: + /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */ + if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) + goto close_socket; + + /* See, if we already have a connection to that peer */ + for( i=0; istate == FLAG_CONNECTING ) + io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); + + peer->tracker_id = tracker_id; + PROXYPEER_SETCONNECTED( peer->state ); + + break; +close_socket: + io_close( peersocket ); + PROXYPEER_SETDISCONNECTED( peer->state ); + break; + case FLAG_CONNECTED: + + break; + + } +} + +/* Can write new sync data to the stream */ +static void handle_write( int64 peersocket ) { + proxy_peer *peer = io_getcookie( peersocket ); + if( !peer ) { + /* Can't happen ;) */ + close( peersocket ); + return; + } + + switch( peer->state & FLAG_MASK ) { + case FLAG_DISCONNECTED: break; /* Shouldnt happen */ + case FLAG_CONNECTING: + io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); + PROXYPEER_SETWAITTRACKERID( peer->state ); + io_dontwantwrite( peersocket ); + io_wantread( peersocket ); + break; + case FLAG_CONNECTED: + switch( iob_send( peersocket, &peer->outdata ) ) { + case 0: /* all data sent */ + io_dontwantwrite( peersocket ); + break; + case -3: /* an error occured */ + io_close( peersocket ); + PROXYPEER_SETDISCONNECTED( peer->state ); + iob_reset( &peer->outdata ); + free( peer->indata ); + default: /* Normal operation or eagain */ + break; + } + break; + default: + break; + } + + return; +} + +static void server_mainloop() { + int64 sock; + tai6464 now; + + while(1) { + /* See, if we need to connect to anyone */ + if( time(NULL) > g_connection_reconn ) + handle_reconnects( ); + + /* Wait for io events until next approx reconn check time */ + taia_now( &now ); + taia_addsec( &now, &now, 30 ); + io_waituntil( now ); + + /* Loop over readable sockets */ + while( ( sock = io_canread( ) ) != -1 ) { + const void *cookie = io_getcookie( sock ); + if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) + handle_accept( sock ); + else + handle_read( sock ); + } + + /* Loop over writable sockets */ + while( ( sock = io_canwrite( ) ) != -1 ) + handle_write( sock ); + } +} int main( int argc, char **argv ) { + static pthread_t sync_in_thread_id; + static pthread_t sync_out_thread_id; ot_ip6 serverip; uint16_t tmpport; int scanon = 1, bound = 0; - time_t next_dump = time(NULL)+1; srandom( time(NULL) ); g_tracker_id = random(); @@ -199,7 +452,7 @@ int main( int argc, char **argv ) { while( scanon ) { switch( getopt( argc, argv, ":i:p:vh" ) ) { case -1: scanon = 0; break; - case 'i': + case 'S': if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } break; case 'p': @@ -211,7 +464,144 @@ int main( int argc, char **argv ) { } if( !bound ) exerr( "No port bound." ); + pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); + pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); + + server_mainloop(); + return 0; +} + +static void * streamsync_worker( void * args ) { + (void)args; + while( 1 ) { + int bucket; + /* For each bucket... */ + for( bucket=0; bucketsize; ++tor_offset ) { + /* Address torrents members */ + ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; + switch( peer_list->peer_count ) { + case 2: count_two++; break; + case 1: count_one++; break; + case 0: break; + default: + count_peers += peer_list->peer_count; + count_def += 1 + ( peer_list->peer_count >> 8 ); + } + } + + /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ + mem = 3 * ( 4 + 1 + 1 + 2 ) + ( count_one + count_two ) * 19 + count_def * 20 + + ( count_one + 2 * count_two + count_peers ) * 7; + + ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); + if( !ptr ) goto unlock_continue; + + if( count_one > 8 ) { + mem_a = 4 + 1 + 1 + 2 + count_one * ( 19 + 7 ); + ptr_b += mem_a; ptr_c += mem_a; + memcpy( ptr_a, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ + ptr_a[4] = 1; /* Offset 4: packet type 1 */ + ptr_a[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ + ptr_a[6] = count_one >> 8; + ptr_a[7] = count_one & 255; + ptr_a += 8; + } else { + count_def += count_one; + count_peers += count_one; + } + + if( count_two > 8 ) { + mem_b = 4 + 1 + 1 + 2 + count_two * ( 19 + 14 ); + ptr_c += mem_b; + memcpy( ptr_b, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ + ptr_b[4] = 2; /* Offset 4: packet type 2 */ + ptr_b[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ + ptr_b[6] = count_two >> 8; + ptr_b[7] = count_two & 255; + ptr_b += 8; + } else { + count_def += count_two; + count_peers += 2 * count_two; + } + + if( count_def ) { + memcpy( ptr_c, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ + ptr_c[4] = 0; /* Offset 4: packet type 0 */ + ptr_c[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ + ptr_c[6] = count_def >> 8; + ptr_c[7] = count_def & 255; + ptr_c += 8; + } + + /* For each torrent in this bucket.. */ + for( tor_offset=0; tor_offsetsize; ++tor_offset ) { + /* Address torrents members */ + ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; + ot_peerlist *peer_list = torrent->peer_list; + ot_peer *peers = (ot_peer*)(peer_list->peers.data); + uint8_t **dst; + int multi = 0; + switch( peer_list->peer_count ) { + case 0: continue; + case 1: dst = mem_a ? &ptr_a : &ptr_c; break; + case 2: dst = mem_b ? &ptr_b : &ptr_c; break; + default: dst = &ptr_c; multi = 1; break; + } + + do { + size_t i, pc = peer_list->peer_count; + if( pc > 255 ) pc = 255; + memcpy( *dst, torrent->hash + 1, sizeof( ot_hash ) - 1); + *dst += sizeof( ot_hash ) - 1; + if( multi ) *(*dst)++ = pc; + for( i=0; i < pc; ++i ) { + memcpy( *dst, peers++, OT_IP_SIZE + 3 ); + *dst += OT_IP_SIZE + 3; + } + peer_list->peer_count -= pc; + } while( peer_list->peer_count ); + free_peerlist(peer_list); + } + + free( torrents_list->data ); + memset( torrents_list, 0, sizeof(*torrents_list ) ); +unlock_continue: + mutex_bucket_unlock( bucket, 0 ); + + if( ptr ) { + int i; + + if( ptr_b > ptr_c ) ptr_c = ptr_b; + if( ptr_a > ptr_c ) ptr_c = ptr_a; + mem = ptr_c - ptr; + + for( i=0; i next_dump ) { - int bucket, i; - /* For each bucket... */ - for( bucket=0; bucketsize; ++tor_offset ) { - /* Address torrents members */ - ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; -#ifdef WANT_SCROOOOOOOLL - ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; - char hash_out[41]; - to_hex(hash_out,*hash); - printf( "%s %08zd\n", hash_out, peer_list->peer_count ); -#endif - if(peer_list->peer_count<1024) peer_counts[peer_list->peer_count]++; else peer_counts[1023]++; - free_peerlist(peer_list); - } - free( torrents_list->data ); - memset( torrents_list, 0, sizeof(*torrents_list ) ); - } - for( i=1023; i>=0; --i ) - if( peer_counts[i] ) { - printf( "%d:%d ", i, peer_counts[i] ); - peer_counts[i] = 0; - } - printf( "\n" ); - next_dump = time(NULL) + 1; - } } + return 0; }