From 6a73fcf4e31732c78ce3e17660dd7ffe39e8eb22 Mon Sep 17 00:00:00 2001 From: Robin Watts Date: Tue, 5 Apr 2016 16:49:04 +0100 Subject: Add threaded operation to mudraw. --- source/tools/mudraw.c | 323 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 295 insertions(+), 28 deletions(-) (limited to 'source/tools') diff --git a/source/tools/mudraw.c b/source/tools/mudraw.c index 95b6765c..1ab7b8a8 100644 --- a/source/tools/mudraw.c +++ b/source/tools/mudraw.c @@ -7,9 +7,20 @@ #ifdef _MSC_VER #include +#include +#define MUDRAW_THREADS 1 #else #include +#ifdef HAVE_PTHREADS +#define MUDRAW_THREADS 2 +#include +#include #endif +#endif + +/* Enable for helpful threading debug */ +/* #define DEBUG_THREADS(A) do { printf A; fflush(stdout); } while (0) */ +#define DEBUG_THREADS(A) do { } while (0) enum { OUT_NONE, @@ -108,6 +119,123 @@ static const format_cs_table_t format_cs_table[] = { OUT_STEXT, CS_RGB, { CS_RGB } }, }; +/* + In the presence of pthreads or Windows threads, we can offer + a multi-threaded option. In the absence, of such, we degrade + nicely. +*/ +#ifdef MUDRAW_THREADS +#if MUDRAW_THREADS == 1 + +/* Windows threads */ +#define SEMAPHORE HANDLE +#define SEMAPHORE_INIT(A) do { A = CreateSemaphore(NULL, 0, 1, NULL); } while (0) +#define SEMAPHORE_FIN(A) do { CloseHandle(A); } while (0) +#define SEMAPHORE_TRIGGER(A) do { (void)ReleaseSemaphore(A, 1, NULL); } while (0) +#define SEMAPHORE_WAIT(A) do { (void)WaitForSingleObject(A, INFINITE); } while (0) +#define THREAD HANDLE +#define THREAD_INIT(A,B,C) do { A = CreateThread(NULL, 0, B, C, 0, NULL); } while (0) +#define THREAD_FIN(A) do { CloseHandle(A); } while (0) +#define THREAD_RETURN_TYPE DWORD WINAPI +#define THREAD_RETURN() return 0 +#define MUTEX CRITICAL_SECTION +#define MUTEX_INIT(A) do { InitializeCriticalSection(&A); } while (0) +#define MUTEX_FIN(A) do { DeleteCriticalSection(&A); } while (0) +#define MUTEX_LOCK(A) do { EnterCriticalSection(&A); } while (0) +#define MUTEX_UNLOCK(A) do { LeaveCriticalSection(&A); } while (0) + +#elif MUDRAW_THREADS == 2 + +/* PThreads */ +#define SEMAPHORE sem_t +#define SEMAPHORE_INIT(A) do { (void)sem_init(&A, 0, 0); } while (0) +#define SEMAPHORE_FIN(A) do { (void)sem_destroy(&A); } while (0) +#define SEMAPHORE_TRIGGER(A) do { (void)sem_post(&A); } while (0) +#define SEMAPHORE_WAIT(A) do { (void)sem_wait(&A); } while (0) +#define THREAD pthread_t +#define THREAD_INIT(A,B,C) do { (void)pthread_create(&A, NULL, B, C); } while (0) +#define THREAD_FIN(A) do { void *res; (void)pthread_join(A, &res); } while (0) +#define THREAD_RETURN_TYPE void * +#define THREAD_RETURN() return NULL +#define MUTEX pthread_mutex_t +#define MUTEX_INIT(A) do { (void)pthread_mutex_init(&A, NULL); } while (0) +#define MUTEX_FIN(A) do { (void)pthread_mutex_destroy(&A); } while (0) +#define MUTEX_LOCK(A) do { (void)pthread_mutex_lock(&A); } while (0) +#define MUTEX_UNLOCK(A) do { (void)pthread_mutex_unlock(&A); } while (0) + +#else +#error Unknown MUDRAW_THREADS setting +#endif + +#define LOCKS_INIT() init_mudraw_locks() +#define LOCKS_FIN() fin_mudraw_locks() + +static MUTEX mutexes[FZ_LOCK_MAX]; + +static void mudraw_lock(void *user, int lock) +{ + MUTEX_LOCK(mutexes[lock]); +} + +static void mudraw_unlock(void *user, int lock) +{ + MUTEX_UNLOCK(mutexes[lock]); +} + +static fz_locks_context mudraw_locks = +{ + NULL, mudraw_lock, mudraw_unlock +}; + +static fz_locks_context *init_mudraw_locks(void) +{ + int i; + + for (i = 0; i < FZ_LOCK_MAX; i++) + MUTEX_INIT(mutexes[i]); + + return &mudraw_locks; +} + +static void fin_mudraw_locks(void) +{ + int i; + + for (i = 0; i < FZ_LOCK_MAX; i++) + MUTEX_FIN(mutexes[i]); +} + +#else + +/* Null Threads implementation */ +#define SEMAPHORE int +#define THREAD int +#define SEMAPHORE_INIT(A) do { A = 0; } while (0) +#define SEMAPHORE_FIN(A) do { A = 0; } while (0) +#define SEMAPHORE_TRIGGER(A) do { A = 0; } while (0) +#define SEMAPHORE_WAIT(A) do { A = 0; } while (0) +#define THREAD_INIT(A,B,C) do { A = 0; (void)C; } while (0) +#define THREAD_FIN(A) do { A = 0; } while (0) +#define LOCKS_INIT() NULL +#define LOCKS_FIN() do { } while (0) + +#endif + +typedef struct worker_t { + fz_context *ctx; + int num; + int band; /* -1 to shutdown, or band to render */ + int savealpha; + fz_display_list *list; + fz_matrix ctm; + fz_rect tbounds; + fz_pixmap *pix; + fz_cookie cookie; + SEMAPHORE start; + SEMAPHORE stop; + THREAD thread; +} worker_t; + static char *output = NULL; fz_output *out = NULL; static int output_pagenum = 0; @@ -153,6 +281,8 @@ static fz_stext_sheet *sheet = NULL; static fz_colorspace *colorspace; static char *filename; static int files = 0; +static int num_workers = 0; +static worker_t *workers; static struct { int count, total; @@ -187,6 +317,9 @@ static void usage(void) "\t-h -\theight (in pixels) (maximum height if -r is specified)\n" "\t-f -\tfit width and/or height exactly; ignore original aspect ratio\n" "\t-B -\tmaximum bandheight (pgm, ppm, pam, png output only)\n" +#ifdef MUDRAW_THREADS + "\t-T -\tnumber of threads to use for rendering (banded mode only)\n" +#endif "\n" "\t-W -\tpage width for EPUB layout\n" "\t-H -\tpage height for EPUB layout\n" @@ -298,6 +431,41 @@ file_level_trailers(fz_context *ctx) fz_drop_stext_sheet(ctx, sheet); } +static void drawband(fz_context *ctx, int savealpha, fz_page *page, fz_display_list *list, const fz_matrix *ctm, const fz_rect *tbounds, fz_cookie *cookie, int band, fz_pixmap *pix) +{ + fz_device *dev = NULL; + + fz_try(ctx) + { + if (savealpha) + fz_clear_pixmap(ctx, pix); + else + fz_clear_pixmap_with_value(ctx, pix, 255); + + dev = fz_new_draw_device(ctx, pix); + if (alphabits == 0) + fz_enable_device_hints(ctx, dev, FZ_DONT_INTERPOLATE_IMAGES); + if (list) + fz_run_display_list(ctx, list, dev, ctm, tbounds, cookie); + else + fz_run_page(ctx, page, dev, ctm, cookie); + fz_drop_device(ctx, dev); + dev = NULL; + + if (invert) + fz_invert_pixmap(ctx, pix); + if (gamma_value != 1) + fz_gamma_pixmap(ctx, pix, gamma_value); + + if (savealpha) + fz_unmultiply_pixmap(ctx, pix); + } + fz_catch(ctx) + { + fz_drop_device(ctx, dev); + fz_rethrow(ctx); + } +} static void drawpage(fz_context *ctx, fz_document *doc, int pagenum) { @@ -614,7 +782,6 @@ static void drawpage(fz_context *ctx, fz_document *doc, int pagenum) fz_round_rect(&ibounds, &tbounds); fz_rect_from_irect(&tbounds, &ibounds); - /* TODO: banded rendering and multi-page ppm */ fz_try(ctx) { int savealpha = (out_cs == CS_GRAY_ALPHA || out_cs == CS_RGB_ALPHA || out_cs == CS_CMYK_ALPHA); @@ -632,10 +799,32 @@ static void drawpage(fz_context *ctx, fz_document *doc, int pagenum) band_ibounds.y1 = band_ibounds.y0 + bandheight; bands = (totalheight + bandheight-1)/bandheight; tbounds.y1 = tbounds.y0 + bandheight + 2; + DEBUG_THREADS(("Using %d Bands\n", bands)); } - pix = fz_new_pixmap_with_bbox(ctx, colorspace, &band_ibounds); - fz_pixmap_set_resolution(pix, resolution); + if (num_workers > 0) + { + for (band = 0; band < fz_mini(num_workers, bands); band++) + { + workers[band].band = band; + workers[band].savealpha = savealpha; /* Constant on a page */ + workers[band].ctm = ctm; + workers[band].tbounds = tbounds; + memset(&workers[band].cookie, 0, sizeof(fz_cookie)); + workers[band].list = list; + workers[band].pix = fz_new_pixmap_with_bbox(ctx, colorspace, &band_ibounds); + fz_pixmap_set_resolution(workers[band].pix, resolution); + DEBUG_THREADS(("Worker %d, Pre-triggering band %d\n", band, band)); + SEMAPHORE_TRIGGER(workers[band].start); + ctm.f -= drawheight; + } + pix = workers[0].pix; + } + else + { + pix = fz_new_pixmap_with_bbox(ctx, colorspace, &band_ibounds); + fz_pixmap_set_resolution(pix, resolution); + } /* Output any page level headers (for banded formats) */ if (output) @@ -663,28 +852,16 @@ static void drawpage(fz_context *ctx, fz_document *doc, int pagenum) for (band = 0; band < bands; band++) { - if (savealpha) - fz_clear_pixmap(ctx, pix); - else - fz_clear_pixmap_with_value(ctx, pix, 255); - - dev = fz_new_draw_device(ctx, pix); - if (alphabits == 0) - fz_enable_device_hints(ctx, dev, FZ_DONT_INTERPOLATE_IMAGES); - if (list) - fz_run_display_list(ctx, list, dev, &ctm, &tbounds, &cookie); + if (num_workers > 0) + { + worker_t *w = &workers[band % num_workers]; + DEBUG_THREADS(("Waiting for worker %d to complete band %d\n", w->num, band)); + SEMAPHORE_WAIT(w->stop); + pix = w->pix; + cookie.errors += w->cookie.errors; + } else - fz_run_page(ctx, page, dev, &ctm, &cookie); - fz_drop_device(ctx, dev); - dev = NULL; - - if (invert) - fz_invert_pixmap(ctx, pix); - if (gamma_value != 1) - fz_gamma_pixmap(ctx, pix, gamma_value); - - if (savealpha) - fz_unmultiply_pixmap(ctx, pix); + drawband(ctx, savealpha, page, list, &ctm, &tbounds, &cookie, band, pix); if (output) { @@ -724,9 +901,21 @@ static void drawpage(fz_context *ctx, fz_document *doc, int pagenum) fz_write_pixmap_as_tga(ctx, out, pix, savealpha); } } + + if (num_workers > 0 && band + num_workers < bands) + { + worker_t *w = &workers[band % num_workers]; + w->band = band + num_workers; + w->ctm = ctm; + w->tbounds = tbounds; + memset(&w->cookie, 0, sizeof(fz_cookie)); + DEBUG_THREADS(("Triggering worker %d for band %d\n", w->num, w->band)); + SEMAPHORE_TRIGGER(w->start); + } ctm.f -= drawheight; } + /* FIXME */ if (showmd5) { unsigned char digest[16]; @@ -758,7 +947,14 @@ static void drawpage(fz_context *ctx, fz_document *doc, int pagenum) { fz_drop_device(ctx, dev); dev = NULL; - fz_drop_pixmap(ctx, pix); + if (num_workers > 0) + { + int band; + for (band = 0; band < num_workers; band++) + fz_drop_pixmap(ctx, workers[band].pix); + } + else + fz_drop_pixmap(ctx, pix); } fz_catch(ctx) { @@ -927,6 +1123,26 @@ trace_realloc(void *arg, void *p_, unsigned int size) return &p[1]; } +#ifdef MUDRAW_THREADS +static THREAD_RETURN_TYPE worker_thread(void *arg) +{ + worker_t *me = (worker_t *)arg; + + do + { + DEBUG_THREADS(("Worker %d waiting\n", me->num)); + SEMAPHORE_WAIT(me->start); + DEBUG_THREADS(("Worker %d woken for band %d\n", me->num, me->band)); + if (me->band >= 0) + drawband(me->ctx, me->savealpha, NULL, me->list, &me->ctm, &me->tbounds, &me->cookie, me->band, me->pix); + DEBUG_THREADS(("Worker %d completed band %d\n", me->num, me->band)); + SEMAPHORE_TRIGGER(me->stop); + } + while (me->band >= 0); + THREAD_RETURN(); +} +#endif + #ifdef MUDRAW_STANDALONE int main(int argc, char **argv) #else @@ -935,13 +1151,13 @@ int mudraw_main(int argc, char **argv) { char *password = ""; fz_document *doc = NULL; - int c; + int c, i; fz_context *ctx; fz_alloc_context alloc_ctx = { NULL, trace_malloc, trace_realloc, trace_free }; fz_var(doc); - while ((c = fz_getopt(argc, argv, "p:o:F:R:r:w:h:fB:c:G:I:s:A:DiW:H:S:U:v")) != -1) + while ((c = fz_getopt(argc, argv, "p:o:F:R:r:w:h:fB:c:G:I:s:A:DiW:H:S:T:U:v")) != -1) { switch (c) { @@ -979,6 +1195,14 @@ int mudraw_main(int argc, char **argv) case 'D': uselist = 0; break; case 'i': ignore_errors = 1; break; + case 'T': +#ifdef MUDRAW_THREADS + num_workers = atoi(fz_optarg); break; +#else + fprintf(stderr, "Threads not enabled in this build\n"); + break; +#endif + case 'v': fprintf(stderr, "mudraw version %s\n", FZ_VERSION); return 1; } } @@ -986,13 +1210,40 @@ int mudraw_main(int argc, char **argv) if (fz_optind == argc) usage(); - ctx = fz_new_context((showmemory == 0 ? NULL : &alloc_ctx), NULL, FZ_STORE_DEFAULT); + if (num_workers > 0) + { + if (uselist == 0) + { + fprintf(stderr, "cannot use multiple threads without using display list\n"); + exit(1); + } + + if (bandheight == 0) + { + fprintf(stderr, "Using multiple threads without banding is pointless\n"); + } + } + + ctx = fz_new_context((showmemory == 0 ? NULL : &alloc_ctx), LOCKS_INIT(), FZ_STORE_DEFAULT); if (!ctx) { fprintf(stderr, "cannot initialise context\n"); exit(1); } + if (num_workers > 0) + { + workers = fz_calloc(ctx, num_workers, sizeof(*workers)); + for (i = 0; i < num_workers; i++) + { + workers[i].ctx = fz_clone_context(ctx); + workers[i].num = i; + SEMAPHORE_INIT(workers[i].start); + SEMAPHORE_INIT(workers[i].stop); + THREAD_INIT(workers[i].thread, worker_thread, &workers[i]); + } + } + fz_set_aa_level(ctx, alphabits); if (layout_css) @@ -1231,7 +1482,23 @@ int mudraw_main(int argc, char **argv) } } + if (num_workers > 0) + { + for (i = 0; i < num_workers; i++) + { + workers[i].band = -1; + SEMAPHORE_TRIGGER(workers[i].start); + SEMAPHORE_WAIT(workers[i].stop); + SEMAPHORE_FIN(workers[i].start); + SEMAPHORE_FIN(workers[i].stop); + fz_drop_context(workers[i].ctx); + THREAD_FIN(workers[i].thread); + } + fz_free(ctx, workers); + } + fz_drop_context(ctx); + LOCKS_FIN(); if (showmemory) { -- cgit v1.2.3