From 465cc2ecdf14909144debe70a4833642e11697da Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Mon, 6 Oct 2008 02:05:53 +0000 Subject: [PATCH] Live sync is now handled in its own thread. Therefore it now creates and handles its own sockets. --- ot_livesync.c | 134 ++++++++++++++++++++++++++++++++------------------ ot_livesync.h | 18 ++----- 2 files changed, 90 insertions(+), 62 deletions(-) diff --git a/ot_livesync.c b/ot_livesync.c index 577bb5f..92c947c 100644 --- a/ot_livesync.c +++ b/ot_livesync.c @@ -7,9 +7,11 @@ #include #include #include +#include /* Libowfat */ #include "socket.h" +#include "ndelay.h" /* Opentracker */ #include "trackerlogic.h" @@ -17,10 +19,23 @@ #include "ot_accesslist.h" #ifdef WANT_SYNC_LIVE -char groupip_1[4] = { LIVESYNC_MCASTDOMAIN_1 }; + +char groupip_1[4] = { 224,0,23,42 }; + +#define LIVESYNC_BUFFINSIZE (256*256) +#define LIVESYNC_BUFFSIZE 1504 +#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash)) + +#define LIVESYNC_MAXDELAY 15 + +/* Forward declaration */ +static void * livesync_worker( void * args ); /* For outgoing packets */ -int64 g_livesync_socket = -1; +static int64 g_livesync_socket_in = -1; + +/* For incoming packets */ +static int64 g_livesync_socket_out = -1; static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE]; static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ]; @@ -28,34 +43,49 @@ static uint8_t *livesync_outbuffer_pos; static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER; static ot_time livesync_lastpacket_time; +static pthread_t thread_id; void livesync_init( ) { - if( g_livesync_socket == -1 ) + if( g_livesync_socket_in == -1 ) exerr( "No socket address for live sync specified." ); livesync_outbuffer_pos = livesync_outbuffer_start; memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); livesync_outbuffer_pos += sizeof( g_tracker_id ); livesync_lastpacket_time = g_now; + + pthread_create( &thread_id, NULL, livesync_worker, NULL ); } void livesync_deinit() { - + pthread_cancel( thread_id ); } void livesync_bind_mcast( char *ip, uint16_t port) { char tmpip[4] = {0,0,0,0}; - if( g_livesync_socket != -1 ) - exerr("Livesync listen ip specified twice."); - if( socket_mcjoin4( ot_try_bind(tmpip, port, FLAG_MCA ), groupip_1, ip ) ) - exerr("Cant join mcast group."); - g_livesync_socket = ot_try_bind( ip, port, FLAG_UDP ); - io_dontwantread(g_livesync_socket); - - socket_mcttl4(g_livesync_socket, 1); - socket_mcloop4(g_livesync_socket, 0); + + if( g_livesync_socket_in != -1 ) + exerr("Error: Livesync listen ip specified twice."); + + if( ( g_livesync_socket_in = socket_udp4( )) < 0) + exerr("Error: Cant create live sync incoming socket." ); + ndelay_off(g_livesync_socket_in); + + if( socket_bind4_reuse( g_livesync_socket_in, tmpip, port ) == -1 ) + exerr("Error: Cant bind live sync incoming socket." ); + + if( socket_mcjoin4( g_livesync_socket_in, groupip_1, ip ) ) + exerr("Error: Cant make live sync incoming socket join mcast group."); + + if( ( g_livesync_socket_out = socket_udp4()) < 0) + exerr("Error: Cant create live sync outgoing socket." ); + if( socket_bind4_reuse( g_livesync_socket_out, ip, port ) == -1 ) + exerr("Error: Cant bind live sync outgoing socket." ); + + socket_mcttl4(g_livesync_socket_out, 1); + socket_mcloop4(g_livesync_socket_out, 0); } static void livesync_issuepacket( ) { - socket_send4(g_livesync_socket, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start, + socket_send4(g_livesync_socket_out, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start, groupip_1, LIVESYNC_PORT); livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id ); livesync_lastpacket_time = g_now; @@ -81,41 +111,51 @@ void livesync_ticker( ) { livesync_issuepacket(); } -/* Handle an incoming live sync packet */ -void handle_livesync( int64 serversocket ) { +static void * livesync_worker( void * args ) { uint8_t in_ip[4]; uint16_t in_port; - ssize_t datalen = socket_recv4(serversocket, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port); - int off = 4; - - if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) { - // TODO: log invalid sync packet - return; - } - - if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) { - // TODO: log invalid sync packet - return; - } - - if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { - // TODO: log packet coming from ourselves - return; - } - - // Now basic sanity checks have been done on the live sync packet - // We might add more testing and logging. - while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { - ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash)); - ot_hash *hash = (ot_hash*)(livesync_inbuffer + off); - - if( OT_FLAG(peer) & PEER_FLAG_STOPPED ) - remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA); - else - add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1)); - - off += sizeof( ot_hash ) + sizeof( ot_peer ); - } + ssize_t datalen; + int off; + args = args; + + while( 1 ) { + datalen = socket_recv4(g_livesync_socket_in, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port); + off = 4; + + if( datalen <= 0 ) + continue; + + if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) { + // TODO: log invalid sync packet + continue; + } + + if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) { + // TODO: log invalid sync packet + continue; + } + + if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { + // TODO: log packet coming from ourselves + continue; + } + + // Now basic sanity checks have been done on the live sync packet + // We might add more testing and logging. + while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { + ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash)); + ot_hash *hash = (ot_hash*)(livesync_inbuffer + off); + + if( OT_FLAG(peer) & PEER_FLAG_STOPPED ) + remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA); + else + add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1)); + + off += sizeof( ot_hash ) + sizeof( ot_peer ); + } + } + /* Never returns. */ + return NULL; } #endif diff --git a/ot_livesync.h b/ot_livesync.h index 4dc6b60..27070d6 100644 --- a/ot_livesync.h +++ b/ot_livesync.h @@ -10,14 +10,14 @@ #include "trackerlogic.h" /* - Syncing is done as udp packets in the multicast domain 224.23.42.N port 9696 + Syncing is done as udp packets in the multicast domain 224.0.42.N port 9696 Each tracker should join the multicast group and send its live sync packets to that group, using a ttl of 1 Format of a live sync packet is straight forward and depends on N: - For N == 1: (simple tracker2tracker sync) + For N == 23: (simple tracker2tracker sync) 0x0000 0x04 id of tracker instance [ 0x0004 0x14 info_hash 0x0018 0x04 peer's ipv4 address @@ -25,7 +25,7 @@ 0x0020 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) ]* - For N == 2: (aggregator syncs) + For N == 24: (aggregator syncs) 0x0000 0x04 id of tracker instance [ 0x0004 0x14 info_hash 0x0018 0x01 number of peers @@ -41,18 +41,6 @@ #ifdef WANT_SYNC_LIVE #define LIVESYNC_PORT 9696 -#define LIVESYNC_MCASTDOMAIN_1 224,23,42,1 -#define LIVESYNC_MCASTDOMAIN_2 224,23,42,2 -extern char groupip_1[4]; -extern char groupip_2[4]; - -extern int64 g_livesync_socket; - -#define LIVESYNC_BUFFINSIZE (256*256) -#define LIVESYNC_BUFFSIZE 1504 -#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash)) - -#define LIVESYNC_MAXDELAY 15 void livesync_init(); void livesync_deinit();