introducing multithreaded full scrape creation.

dynamic-accesslists
erdgeist 17 years ago
parent d3963803ca
commit 6458a37d82

@ -193,15 +193,29 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec
size_t header_size, size = iovec_length( &iovec_entries, &iovector ); size_t header_size, size = iovec_length( &iovec_entries, &iovector );
tai6464 t; tai6464 t;
/* No cookie? Bad socket. Leave. */
if( !h ) { if( !h ) {
iovec_free( &iovec_entries, &iovector ); iovec_free( &iovec_entries, &iovector );
return; HTTPERROR_500;
} }
/* If this socket collected request in a buffer,
free it now */
if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) { if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) {
h->flag &= ~STRUCT_HTTP_FLAG_ARRAY_USED; h->flag &= ~STRUCT_HTTP_FLAG_ARRAY_USED;
array_reset( &h->request ); array_reset( &h->request );
} }
/* If we came here, wait for the answer is over */
h->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK;
/* Our answers never are 0 bytes. Return an error. */
if( !iovec_entries || !iovector[0].iov_len ) {
iovec_free( &iovec_entries, &iovector );
HTTPERROR_500;
}
/* Prepare space for http header */
header = malloc( SUCCESS_HTTP_HEADER_LENGTH ); header = malloc( SUCCESS_HTTP_HEADER_LENGTH );
if( !header ) { if( !header ) {
iovec_free( &iovec_entries, &iovector ); iovec_free( &iovec_entries, &iovector );
@ -390,19 +404,15 @@ LOG_TO_STDERR( "sync: %d.%d.%d.%d\n", h->ip[0], h->ip[1], h->ip[2], h->ip[3] );
/* Full scrape... you might want to limit that */ /* Full scrape... you might want to limit that */
if( !byte_diff( data, 12, "scrape HTTP/" ) ) { if( !byte_diff( data, 12, "scrape HTTP/" ) ) {
int iovec_entries = 0;
struct iovec * iovector = NULL;
reply_size = return_fullscrape_for_tracker( &iovec_entries, &iovector );
LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now - ot_start_time), h->ip[0], h->ip[1], h->ip[2], h->ip[3] ); LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now - ot_start_time), h->ip[0], h->ip[1], h->ip[2], h->ip[3] );
#ifdef _DEBUG_HTTPERROR #ifdef _DEBUG_HTTPERROR
write( 2, debug_request, l ); write( 2, debug_request, l );
#endif #endif
if( !reply_size ) HTTPERROR_500; /* Pass this task to the worker thread */
h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK;
/* Stat keeping */ fullscrape_deliver( s );
stats_issue_event( EVENT_FULLSCRAPE, 1, reply_size); io_dontwantread( s );
return sendiovecdata( s, iovec_entries, iovector ); return;
} }
SCRAPE_WORKAROUND: SCRAPE_WORKAROUND:
@ -714,9 +724,8 @@ static void handle_timeouted( void ) {
static void server_mainloop( ) { static void server_mainloop( ) {
time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL; time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
/* Later we will poll for finished tasks
struct iovec *iovector; struct iovec *iovector;
int iovec_entries;*/ int iovec_entries;
for( ; ; ) { for( ; ; ) {
int64 i; int64 i;
@ -733,9 +742,8 @@ static void server_mainloop( ) {
handle_read( i ); handle_read( i );
} }
/* Later we will poll for finished tasks
while( ( i = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 ) while( ( i = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 )
sendiovecdata( i, iovec_entries, iovector ); */ sendiovecdata( i, iovec_entries, iovector );
while( ( i = io_canwrite( ) ) != -1 ) while( ( i = io_canwrite( ) ) != -1 )
handle_write( i ); handle_write( i );
@ -835,6 +843,8 @@ int main( int argc, char **argv ) {
if( trackerlogic_init( serverdir ) == -1 ) if( trackerlogic_init( serverdir ) == -1 )
panic( "Logic not started" ); panic( "Logic not started" );
fullscrape_init( );
g_now = ot_start_time = time( NULL ); g_now = ot_start_time = time( NULL );
alarm(5); alarm(5);

@ -5,6 +5,7 @@
#include <sys/uio.h> #include <sys/uio.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <pthread.h>
/* Libowfat */ /* Libowfat */
@ -23,14 +24,45 @@
/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
#define OT_FULLSCRAPE_MAXENTRYLEN 100 #define OT_FULLSCRAPE_MAXENTRYLEN 100
size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovector ) { /* Forward declaration */
static void fullscrape_make( int *iovec_entries, struct iovec **iovector );
/* This is the entry point into this worker thread
It grabs tasks from mutex_tasklist and delivers results back
*/
static void * fullscrape_worker( void * args) {
int iovec_entries;
struct iovec *iovector;
args = args;
while( 1 ) {
ot_taskid taskid = mutex_workqueue_poptask( OT_TASKTYPE_FULLSCRAPE );
fullscrape_make( &iovec_entries, &iovector );
if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
iovec_free( &iovec_entries, &iovector );
}
return NULL;
}
void fullscrape_init( ) {
pthread_t thread_id;
pthread_create( &thread_id, NULL, fullscrape_worker, NULL );
}
void fullscrape_deliver( int64 socket ) {
mutex_workqueue_pushtask( socket, OT_TASKTYPE_FULLSCRAPE );
}
static void fullscrape_make( int *iovec_entries, struct iovec **iovector ) {
int bucket; int bucket;
char *r, *re; char *r, *re;
/* Setup return vector... */ /* Setup return vector... */
*iovec_entries = 0; *iovec_entries = 0;
*iovector = NULL;
if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
return 0; return;
/* ... and pointer to end of current output buffer. /* ... and pointer to end of current output buffer.
This works as a low watermark */ This works as a low watermark */
@ -76,7 +108,7 @@ size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovecto
/* Release lock on current bucket and return */ /* Release lock on current bucket and return */
mutex_bucket_unlock( bucket ); mutex_bucket_unlock( bucket );
return 0; return;
} }
/* Adjust new end of output buffer */ /* Adjust new end of output buffer */
@ -93,7 +125,4 @@ size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovecto
/* Release unused memory in current output buffer */ /* Release unused memory in current output buffer */
iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) ); iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
/* Return answer size */
return iovec_length( iovec_entries, iovector );
} }

@ -4,8 +4,9 @@
#ifndef __OT_FULLSCRAPE_H__ #ifndef __OT_FULLSCRAPE_H__
#define __OT_FULLSCRAPE_H__ #define __OT_FULLSCRAPE_H__
#include <sys/uio.h> #include <io.h>
size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovector ); void fullscrape_init( );
void fullscrape_deliver( int64 socket );
#endif #endif

Loading…
Cancel
Save