diff --git a/ot_livesync.c b/ot_livesync.c index 87fe5cf..cded0f7 100644 --- a/ot_livesync.c +++ b/ot_livesync.c @@ -46,13 +46,14 @@ static int64 g_socket_in = -1; /* For incoming packets */ static int64 g_socket_out = -1; +static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; static size_t g_outbuf_data; static ot_time g_next_packet_time; static pthread_t thread_id; void livesync_init( ) { - + if( g_socket_in == -1 ) exerr( "No socket address for live sync specified." ); @@ -62,7 +63,7 @@ void livesync_init( ) { g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; - + pthread_create( &thread_id, NULL, livesync_worker, NULL ); } @@ -105,10 +106,20 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { socket_mcloop4(g_socket_out, 0); } +/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ static void livesync_issue_peersync( ) { - socket_send4(g_socket_out, g_outbuf, g_outbuf_data, groupip_1, LIVESYNC_PORT); + char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; + size_t data = g_outbuf_data; + + memcpy( mycopy, g_outbuf, data ); g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; + + /* From now this thread has a local copy of the buffer and + has modified the protected element */ + pthread_mutex_unlock(&g_outbuf_mutex); + + socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT); } static void livesync_handle_peersync( struct ot_workstruct *ws ) { @@ -140,13 +151,17 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) { enough */ void livesync_ticker( ) { /* livesync_issue_peersync sets g_next_packet_time */ + pthread_mutex_lock(&g_outbuf_mutex); if( g_now_seconds > g_next_packet_time && g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) livesync_issue_peersync(); + else + pthread_mutex_unlock(&g_outbuf_mutex); } /* Inform live sync about whats going on. */ void livesync_tell( struct ot_workstruct *ws ) { + pthread_mutex_lock(&g_outbuf_mutex); memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) ); @@ -155,6 +170,8 @@ void livesync_tell( struct ot_workstruct *ws ) { if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) livesync_issue_peersync(); + else + pthread_mutex_unlock(&g_outbuf_mutex); } static void * livesync_worker( void * args ) { @@ -162,11 +179,11 @@ static void * livesync_worker( void * args ) { ot_ip6 in_ip; uint16_t in_port; (void)args; - + /* Initialize our "thread local storage" */ ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); ws.outbuf = ws.reply = 0; - + memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); while( 1 ) {