Completely rewritten fullscrape code. All assumptions on how deflate() works were a little naive. Needs more error checking and testing.

dynamic-accesslists
erdgeist 17 years ago
parent 54560fdcd3
commit 8f7ef4b2ea

@ -25,10 +25,18 @@
Full scrapes usually are huge and one does not want to
allocate more memory. So lets get them in 512k units
*/
#define OT_SCRAPE_CHUNK_SIZE (512*1024)
#define OT_SCRAPE_CHUNK_SIZE (1024)
/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
#define OT_FULLSCRAPE_MAXENTRYLEN 256
#define OT_SCRAPE_MAXENTRYLEN 256
#ifdef WANT_COMPRESSION_GZIP
#define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK
#define WANT_COMPRESSION_GZIP_PARAM( param1, param2 ) , param1, param2
#else
#define IF_COMPRESSION( TASK )
#define WANT_COMPRESSION_GZIP_PARAM( param1, param2 )
#endif
/* Forward declaration */
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
@ -69,11 +77,38 @@ void fullscrape_deliver( int64 socket, ot_tasktype tasktype ) {
mutex_workqueue_pushtask( socket, tasktype );
}
static int fullscrape_increase( int *iovec_entries, struct iovec **iovector,
char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode ) ) {
/* Allocate a fresh output buffer at the end of our buffers list */
if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) {
/* Deallocate gzip buffers */
IF_COMPRESSION( deflateEnd(strm); )
/* Release lock on current bucket and return */
return -1;
}
/* Adjust new end of output buffer */
*re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
/* When compressing, we have all the bytes in output buffer */
IF_COMPRESSION( { \
*re -= OT_SCRAPE_MAXENTRYLEN; \
strm->next_out = (ot_byte*)*r; \
strm->avail_out = OT_SCRAPE_CHUNK_SIZE; \
deflate( strm, Z_NO_FLUSH ); \
*r = (char*)strm->next_out; \
} )
return 0;
}
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
int bucket;
char *r, *re;
int bucket;
char *r, *re;
#ifdef WANT_COMPRESSION_GZIP
char compress_buffer[OT_FULLSCRAPE_MAXENTRYLEN];
char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
z_stream strm;
#endif
@ -83,28 +118,24 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
return;
/* ... and pointer to end of current output buffer.
This works as a low watermark */
re = r + OT_SCRAPE_CHUNK_SIZE;
/* re points to low watermark */
re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
#ifdef WANT_COMPRESSION_GZIP
if( mode & TASK_FLAG_GZIP ) {
re += OT_SCRAPE_MAXENTRYLEN;
byte_zero( &strm, sizeof(strm) );
strm.next_in = (ot_byte*)r;
strm.next_in = (ot_byte*)compress_buffer;
strm.next_out = (ot_byte*)r;
strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
if( deflateInit2(&strm,9,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
fprintf( stderr, "not ok.\n" );
strm.next_out = (unsigned char*)r;
strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
r = compress_buffer;
}
#endif
/* Reply dictionary only needed for bencoded fullscrape */
if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
memmove( r, "d5:filesd", 9 );
r += 9;
}
if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
r += sprintf( r, "d5:filesd" );
/* For each bucket... */
for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
@ -121,13 +152,15 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
switch( mode & TASK_TASK_MASK ) {
case TASK_FULLSCRAPE:
default:
/* push hash as bencoded string */
*r++='2'; *r++='0'; *r++=':';
memmove( r, hash, 20 ); r+=20;
/* push rest of the scrape string */
r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count );
break;
break;
case TASK_FULLSCRAPE_TPB_ASCII:
to_hex( r, *hash ); r+=40;
r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
@ -144,73 +177,41 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
}
#ifdef WANT_COMPRESSION_GZIP
if( mode & TASK_FLAG_GZIP ) {
if( mode & TASK_FLAG_GZIP ) {
strm.next_in = (ot_byte*)compress_buffer;
strm.avail_in = r - compress_buffer;
if( deflate( &strm, Z_NO_FLUSH ) != Z_OK )
fprintf( stderr, "Not ok.\n" );
deflate( &strm, Z_NO_FLUSH );
r = (char*)strm.next_out;
}
#endif
/* If we reached our low watermark in buffer... */
if( re - r <= OT_FULLSCRAPE_MAXENTRYLEN ) {
/* crop current output buffer to the amount really used */
iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
/* And allocate a fresh output buffer at the end of our buffers list */
if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) {
/* If this fails: free buffers */
iovec_free( iovec_entries, iovector );
#ifdef WANT_COMPRESSION_GZIP
deflateEnd(&strm);
#endif
/* Release lock on current bucket and return */
mutex_bucket_unlock( bucket );
return;
}
/* Adjust new end of output buffer */
re = r + OT_SCRAPE_CHUNK_SIZE;
/* Check if there still is enough buffer left */
while( ( r > re ) && fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode ) ) )
return mutex_bucket_unlock( bucket );
#ifdef WANT_COMPRESSION_GZIP
if( mode & TASK_FLAG_GZIP ) {
strm.next_out = (ot_byte*)r;
strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
}
#endif
}
#ifdef WANT_COMPRESSION_GZIP
if( mode & TASK_FLAG_GZIP ) {
r = compress_buffer;
}
#endif
IF_COMPRESSION( r = compress_buffer; )
}
/* All torrents done: release lock on currenct bucket */
mutex_bucket_unlock( bucket );
}
/* Close bencoded scrape dictionary if necessary */
if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
*r++='e'; *r++='e';
}
if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
r += sprintf( r, "ee" );
#ifdef WANT_COMPRESSION_GZIP
if( mode & TASK_FLAG_GZIP ) {
strm.next_in = (ot_byte*) compress_buffer;
strm.next_in = (ot_byte*)compress_buffer;
strm.avail_in = r - compress_buffer;
if( deflate( &strm, Z_FINISH ) != Z_STREAM_END )
fprintf( stderr, "Not ok.\n" );
deflate( &strm, Z_FINISH );
r = (char*)strm.next_out;
deflateEnd(&strm);
while( ( r > re ) && fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode ) ) )
return mutex_bucket_unlock( bucket );
deflateEnd(&strm);
}
#endif
/* Release unused memory in current output buffer */
iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
iovec_fixlast( iovec_entries, iovector, r );
}

Loading…
Cancel
Save