[前][次][番号順一覧][スレッド一覧]

ruby-changes:24219

From: drbrain <ko1@a...>
Date: Tue, 3 Jul 2012 06:03:27 +0900 (JST)
Subject: [ruby-changes:24219] drbrain:r36270 (trunk): * ext/zlib/zlib.c (zstream_run): Process zlib streams without GVL.

drbrain	2012-07-03 06:03:15 +0900 (Tue, 03 Jul 2012)

  New Revision: 36270

  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=36270

  Log:
    * ext/zlib/zlib.c (zstream_run):  Process zlib streams without GVL.
      [Feature #6615]
    * NEWS:  ditto.

  Modified files:
    trunk/ChangeLog
    trunk/NEWS
    trunk/ext/zlib/zlib.c

Index: ChangeLog
===================================================================
--- ChangeLog	(revision 36269)
+++ ChangeLog	(revision 36270)
@@ -1,3 +1,9 @@
+Tue Jul  3 06:02:54 2012  Eric Hodel  <drbrain@s...>
+
+	* ext/zlib/zlib.c (zstream_run):  Process zlib streams without GVL.
+	  [Feature #6615]
+	* NEWS:  ditto.
+
 Mon Jul  2 22:13:04 2012  Tanaka Akira  <akr@f...>
 
 	* thread.c (rb_thread_aref): add explanation for why Thread#[] and
Index: ext/zlib/zlib.c
===================================================================
--- ext/zlib/zlib.c	(revision 36269)
+++ ext/zlib/zlib.c	(revision 36270)
@@ -72,6 +72,7 @@
 
 struct zstream;
 struct zstream_funcs;
+struct zstream_run_args;
 static void zstream_init(struct zstream*, const struct zstream_funcs*);
 static void zstream_expand_buffer(struct zstream*);
 static void zstream_expand_buffer_into(struct zstream*, unsigned long);
@@ -564,6 +565,11 @@
     inflateReset, inflateEnd, inflate,
 };
 
+struct zstream_run_args {
+    struct zstream * z;
+    int flush;
+    int interrupt;
+};
 
 static voidpf
 zlib_mem_alloc(voidpf opaque, uInt items, uInt size)
@@ -655,6 +661,42 @@
     }
 }
 
+static int
+zstream_expand_buffer_without_gvl(struct zstream *z)
+{
+    char * new_str;
+    long inc, len;
+
+    if (RSTRING_LEN(z->buf) - z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) {
+	z->stream.avail_out = ZSTREAM_AVAIL_OUT_STEP_MAX;
+    }
+    else {
+	inc = z->buf_filled / 2;
+	if (inc < ZSTREAM_AVAIL_OUT_STEP_MIN) {
+	    inc = ZSTREAM_AVAIL_OUT_STEP_MIN;
+	}
+
+	len = z->buf_filled + inc;
+
+	new_str = realloc(RSTRING(z->buf)->as.heap.ptr, len + 1);
+
+	if (!new_str)
+	    return 0;
+
+	/* from rb_str_resize */
+	RSTRING(z->buf)->as.heap.ptr = new_str;
+	RSTRING(z->buf)->as.heap.ptr[len] = '\0'; /* sentinel */
+	RSTRING(z->buf)->as.heap.len =
+	    RSTRING(z->buf)->as.heap.aux.capa = len;
+
+	z->stream.avail_out = (inc < ZSTREAM_AVAIL_OUT_STEP_MAX) ?
+	    (int)inc : ZSTREAM_AVAIL_OUT_STEP_MAX;
+    }
+    z->stream.next_out = (Bytef*)RSTRING_PTR(z->buf) + z->buf_filled;
+
+    return 1;
+}
+
 static void
 zstream_append_buffer(struct zstream *z, const Bytef *src, long len)
 {
@@ -871,13 +913,62 @@
     return Qnil;
 }
 
+static VALUE
+zstream_run_func(void *ptr) {
+    struct zstream_run_args *args = (struct zstream_run_args *)ptr;
+    int err, flush = args->flush;
+    struct zstream *z = args->z;
+    uInt n;
+
+    while (!args->interrupt) {
+	n = z->stream.avail_out;
+	err = z->func->run(&z->stream, flush);
+	z->buf_filled += n - z->stream.avail_out;
+
+	if (err == Z_STREAM_END) {
+	    z->flags &= ~ZSTREAM_FLAG_IN_STREAM;
+	    z->flags |= ZSTREAM_FLAG_FINISHED;
+	    break;
+	}
+
+	if (err != Z_OK)
+	    break;
+
+	if (z->stream.avail_out > 0) {
+	    z->flags |= ZSTREAM_FLAG_IN_STREAM;
+	    break;
+	}
+
+	if (!zstream_expand_buffer_without_gvl(z)) {
+	    err = Z_MEM_ERROR; /* realloc failed */
+	    break;
+	}
+    }
+
+    return (VALUE)err;
+}
+
+/*
+ * There is no safe way to interrupt z->run->func().
+ */
 static void
+zstream_unblock_func(void *ptr) {
+    struct zstream_run_args *args = (struct zstream_run_args *)ptr;
+
+    args->interrupt = 1;
+}
+
+static void
 zstream_run(struct zstream *z, Bytef *src, long len, int flush)
 {
-    uInt n;
+    struct zstream_run_args args;
     int err;
     volatile VALUE guard = Qnil;
 
+    args.z = z;
+    args.flush = flush;
+    args.interrupt = 0;
+
     if (NIL_P(z->input) && len == 0) {
 	z->stream.next_in = (Bytef*)"";
 	z->stream.avail_in = 0;
@@ -896,49 +987,34 @@
 	zstream_expand_buffer(z);
     }
 
-    for (;;) {
-	/* VC allocates err and guard to same address.  accessing err and guard
-	   in same scope prevents it. */
-	RB_GC_GUARD(guard);
-	n = z->stream.avail_out;
-	err = z->func->run(&z->stream, flush);
-	z->buf_filled += n - z->stream.avail_out;
-	rb_thread_schedule();
+loop:
+    err = (int)rb_thread_blocking_region(
+	    zstream_run_func, (void *)&args,
+	    zstream_unblock_func, (void *)&args);
 
-	if (err == Z_STREAM_END) {
-	    z->flags &= ~ZSTREAM_FLAG_IN_STREAM;
-	    z->flags |= ZSTREAM_FLAG_FINISHED;
-	    break;
+    if (flush != Z_FINISH && err == Z_BUF_ERROR
+	    && z->stream.avail_out > 0) {
+	z->flags |= ZSTREAM_FLAG_IN_STREAM;
+    }
+
+    zstream_reset_input(z);
+
+    if (err != Z_OK && err != Z_STREAM_END) {
+	if (z->stream.avail_in > 0) {
+	    zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
 	}
-	if (err != Z_OK) {
-	    if (flush != Z_FINISH && err == Z_BUF_ERROR
-		&& z->stream.avail_out > 0) {
-		z->flags |= ZSTREAM_FLAG_IN_STREAM;
-		break;
+	if (err == Z_NEED_DICT) {
+	    VALUE self = (VALUE)z->stream.opaque;
+	    VALUE dicts = rb_ivar_get(self, id_dictionaries);
+	    VALUE dict = rb_hash_aref(dicts, rb_uint2inum(z->stream.adler));
+	    if (!NIL_P(dict)) {
+		rb_inflate_set_dictionary(self, dict);
+		goto loop;
 	    }
-	    zstream_reset_input(z);
-	    if (z->stream.avail_in > 0) {
-		zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
-	    }
-	    if (err == Z_NEED_DICT) {
-		VALUE self = (VALUE)z->stream.opaque;
-		VALUE dicts = rb_ivar_get(self, id_dictionaries);
-		VALUE dict = rb_hash_aref(dicts, rb_uint2inum(z->stream.adler));
-		if (!NIL_P(dict)) {
-		    rb_inflate_set_dictionary(self, dict);
-		    continue;
-		}
-	    }
-	    raise_zlib_error(err, z->stream.msg);
 	}
-	if (z->stream.avail_out > 0) {
-	    z->flags |= ZSTREAM_FLAG_IN_STREAM;
-	    break;
-	}
-	zstream_expand_buffer(z);
+	raise_zlib_error(err, z->stream.msg);
     }
 
-    zstream_reset_input(z);
     if (z->stream.avail_in > 0) {
 	zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
         guard = Qnil; /* prevent tail call to make guard effective */
Index: NEWS
===================================================================
--- NEWS	(revision 36269)
+++ NEWS	(revision 36270)
@@ -117,6 +117,8 @@
 
 * zlib
   * Added support for the new deflate strategies Zlib::RLE and Zlib::FIXED.
+  * Zlib streams are now processed without the GVL.  This allows gzip, zlib and
+    deflate streams to be processed in parallel.
 
 * openssl
   * Consistently raise an error when trying to encode nil values. All instances

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]