mirror of
git://erdgeist.org/opentracker
synced 2025-02-17 06:31:30 +08:00
Network connection code seems to be working now
This commit is contained in:
parent
cde8cf0559
commit
ed1673eb10
92
proxy.c
92
proxy.c
@ -211,7 +211,7 @@ static void livesync_handle_peersync( ssize_t datalen ) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int usage( char *self ) {
|
int usage( char *self ) {
|
||||||
fprintf( stderr, "Usage: %s -i ip -p port\n", self );
|
fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self );
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,6 +274,7 @@ static void handle_reconnects( void ) {
|
|||||||
for( i=0; i<g_connection_count; ++i )
|
for( i=0; i<g_connection_count; ++i )
|
||||||
if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
|
if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
|
||||||
int64 newfd = socket_tcp6( );
|
int64 newfd = socket_tcp6( );
|
||||||
|
fprintf( stderr, "(Re)connecting to peer..." );
|
||||||
if( newfd < 0 ) continue; /* No socket for you */
|
if( newfd < 0 ) continue; /* No socket for you */
|
||||||
io_fd(newfd);
|
io_fd(newfd);
|
||||||
if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
|
if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
|
||||||
@ -389,6 +390,7 @@ close_socket:
|
|||||||
indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
|
indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
|
||||||
datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
|
datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
|
||||||
if( !datalen || datalen < -1 ) {
|
if( !datalen || datalen < -1 ) {
|
||||||
|
fprintf( stderr, "Connection closed by remote peer.\n" );
|
||||||
io_close( peersocket );
|
io_close( peersocket );
|
||||||
reset_info_block( peer );
|
reset_info_block( peer );
|
||||||
} else if( datalen > 0 ) {
|
} else if( datalen > 0 ) {
|
||||||
@ -402,6 +404,7 @@ close_socket:
|
|||||||
/* Can write new sync data to the stream */
|
/* Can write new sync data to the stream */
|
||||||
static void handle_write( int64 peersocket ) {
|
static void handle_write( int64 peersocket ) {
|
||||||
proxy_peer *peer = io_getcookie( peersocket );
|
proxy_peer *peer = io_getcookie( peersocket );
|
||||||
|
|
||||||
if( !peer ) {
|
if( !peer ) {
|
||||||
/* Can't happen ;) */
|
/* Can't happen ;) */
|
||||||
io_close( peersocket );
|
io_close( peersocket );
|
||||||
@ -416,25 +419,32 @@ static void handle_write( int64 peersocket ) {
|
|||||||
case FLAG_CONNECTING:
|
case FLAG_CONNECTING:
|
||||||
/* Ensure that the connection is established and handle connection error */
|
/* Ensure that the connection is established and handle connection error */
|
||||||
if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
|
if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
|
||||||
|
fprintf( stderr, "failed\n" );
|
||||||
|
reset_info_block( peer );
|
||||||
io_close( peersocket );
|
io_close( peersocket );
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
|
io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
|
||||||
PROXYPEER_SETWAITTRACKERID( peer->state );
|
PROXYPEER_SETWAITTRACKERID( peer->state );
|
||||||
|
fprintf( stderr, " succeeded.\n" );
|
||||||
|
|
||||||
io_dontwantwrite( peersocket );
|
io_dontwantwrite( peersocket );
|
||||||
io_wantread( peersocket );
|
io_wantread( peersocket );
|
||||||
break;
|
break;
|
||||||
case FLAG_CONNECTED:
|
case FLAG_CONNECTED:
|
||||||
switch( iob_send( peersocket, &peer->outdata ) ) {
|
switch( iob_send( peersocket, &peer->outdata ) ) {
|
||||||
case 0: /* all data sent */
|
case 0: /* all data sent */
|
||||||
|
fprintf( stderr, "EMPTY\n" );
|
||||||
io_dontwantwrite( peersocket );
|
io_dontwantwrite( peersocket );
|
||||||
break;
|
break;
|
||||||
case -3: /* an error occured */
|
case -3: /* an error occured */
|
||||||
|
fprintf( stderr, "ERROR\n" );
|
||||||
io_close( peersocket );
|
io_close( peersocket );
|
||||||
reset_info_block( peer );
|
reset_info_block( peer );
|
||||||
break;
|
break;
|
||||||
default: /* Normal operation or eagain */
|
default: /* Normal operation or eagain */
|
||||||
|
fprintf( stderr, "EGAIN\n" );
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -445,7 +455,6 @@ static void handle_write( int64 peersocket ) {
|
|||||||
|
|
||||||
static void server_mainloop() {
|
static void server_mainloop() {
|
||||||
int64 sock;
|
int64 sock;
|
||||||
tai6464 now;
|
|
||||||
|
|
||||||
/* inlined livesync_init() */
|
/* inlined livesync_init() */
|
||||||
memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
|
memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
|
||||||
@ -461,9 +470,7 @@ static void server_mainloop() {
|
|||||||
handle_reconnects( );
|
handle_reconnects( );
|
||||||
|
|
||||||
/* Wait for io events until next approx reconn check time */
|
/* Wait for io events until next approx reconn check time */
|
||||||
taia_now( &now );
|
io_waituntil2( 30*1000 );
|
||||||
taia_addsec( &now, &now, 30 );
|
|
||||||
io_waituntil( now );
|
|
||||||
|
|
||||||
/* Loop over readable sockets */
|
/* Loop over readable sockets */
|
||||||
while( ( sock = io_canread( ) ) != -1 ) {
|
while( ( sock = io_canread( ) ) != -1 ) {
|
||||||
@ -482,31 +489,90 @@ static void server_mainloop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void panic( const char *routine ) {
|
||||||
|
fprintf( stderr, "%s: %s\n", routine, strerror(errno) );
|
||||||
|
exit( 111 );
|
||||||
|
}
|
||||||
|
|
||||||
|
static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) {
|
||||||
|
int64 sock = socket_tcp6( );
|
||||||
|
|
||||||
|
if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 )
|
||||||
|
panic( "socket_bind6_reuse" );
|
||||||
|
|
||||||
|
if( socket_listen( sock, SOMAXCONN) == -1 )
|
||||||
|
panic( "socket_listen" );
|
||||||
|
|
||||||
|
if( !io_fd( sock ) )
|
||||||
|
panic( "io_fd" );
|
||||||
|
|
||||||
|
io_setcookie( sock, (void*)FLAG_SERVERSOCKET );
|
||||||
|
io_wantread( sock );
|
||||||
|
return sock;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
|
||||||
|
const char *s = src;
|
||||||
|
int off, bracket = 0;
|
||||||
|
while( isspace(*s) ) ++s;
|
||||||
|
if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */
|
||||||
|
if( !(off = scan_ip6( s, ip ) ) )
|
||||||
|
return 0;
|
||||||
|
s += off;
|
||||||
|
if( *s == 0 || isspace(*s)) return s-src;
|
||||||
|
if( *s == ']' && bracket ) ++s;
|
||||||
|
if( !ip6_isv4mapped(ip)){
|
||||||
|
if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0;
|
||||||
|
s++;
|
||||||
|
} else {
|
||||||
|
if( *(s++) != ':' ) return 0;
|
||||||
|
}
|
||||||
|
if( !(off = scan_ushort (s, port ) ) )
|
||||||
|
return 0;
|
||||||
|
return off+s-src;
|
||||||
|
}
|
||||||
|
|
||||||
int main( int argc, char **argv ) {
|
int main( int argc, char **argv ) {
|
||||||
static pthread_t sync_in_thread_id;
|
static pthread_t sync_in_thread_id;
|
||||||
static pthread_t sync_out_thread_id;
|
static pthread_t sync_out_thread_id;
|
||||||
ot_ip6 serverip;
|
ot_ip6 serverip;
|
||||||
uint16_t tmpport;
|
uint16_t tmpport;
|
||||||
int scanon = 1, bound = 0;
|
int scanon = 1, lbound = 0, sbound = 0;
|
||||||
|
|
||||||
srandom( time(NULL) );
|
srandom( time(NULL) );
|
||||||
g_tracker_id = random();
|
g_tracker_id = random();
|
||||||
|
noipv6=1;
|
||||||
|
|
||||||
while( scanon ) {
|
while( scanon ) {
|
||||||
switch( getopt( argc, argv, ":i:p:vh" ) ) {
|
switch( getopt( argc, argv, ":l:c:L:h" ) ) {
|
||||||
case -1: scanon = 0; break;
|
case -1: scanon = 0; break;
|
||||||
case 'S':
|
case 'l':
|
||||||
if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); }
|
tmpport = 0;
|
||||||
|
if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
|
||||||
|
ot_try_bind( serverip, tmpport );
|
||||||
|
++sbound;
|
||||||
break;
|
break;
|
||||||
case 'p':
|
case 'c':
|
||||||
if( !scan_ushort( optarg, &tmpport)) { usage( argv[0] ); exit( 1 ); }
|
if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" );
|
||||||
livesync_bind_mcast( serverip, tmpport); bound++; break;
|
tmpport = 0;
|
||||||
|
if( !scan_ip6_port( optarg,
|
||||||
|
g_connections[g_connection_count].ip,
|
||||||
|
&g_connections[g_connection_count].port ) ||
|
||||||
|
!g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); }
|
||||||
|
g_connections[g_connection_count++].state = FLAG_OUTGOING;
|
||||||
|
break;
|
||||||
|
case 'L':
|
||||||
|
tmpport = 9696;
|
||||||
|
if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
|
||||||
|
livesync_bind_mcast( serverip, tmpport); ++lbound; break;
|
||||||
default:
|
default:
|
||||||
case '?': usage( argv[0] ); exit( 1 );
|
case '?': usage( argv[0] ); exit( 1 );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if( !bound ) exerr( "No port bound." );
|
if( !lbound ) exerr( "No livesync port bound." );
|
||||||
|
if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." );
|
||||||
pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
|
pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
|
||||||
pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
|
pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user