[前][次][番号順一覧][スレッド一覧]

ruby-changes:4367

From: ko1@a...
Date: Sun, 30 Mar 2008 15:38:22 +0900 (JST)
Subject: [ruby-changes:4367] akr - Ruby:r15858 (trunk): * io.c: IO.copy_stream implemented.

akr	2008-03-30 15:38:05 +0900 (Sun, 30 Mar 2008)

  New Revision: 15858

  Modified files:
    trunk/ChangeLog
    trunk/configure.in
    trunk/io.c
    trunk/test/ruby/test_io.rb
    trunk/thread.c

  Log:
    * io.c: IO.copy_stream implemented.  [ruby-dev:33843]
    
    * thread.c (rb_fd_select): new function.
    
    * configure.in (sys/sendfile.h): check the header file.
      (sendfile): check the function.
      (pread): check the function.


  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/trunk/test/ruby/test_io.rb?r1=15858&r2=15857&diff_format=u
  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/trunk/ChangeLog?r1=15858&r2=15857&diff_format=u
  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/trunk/thread.c?r1=15858&r2=15857&diff_format=u
  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/trunk/io.c?r1=15858&r2=15857&diff_format=u
  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/trunk/configure.in?r1=15858&r2=15857&diff_format=u

Index: configure.in
===================================================================
--- configure.in	(revision 15857)
+++ configure.in	(revision 15858)
@@ -592,7 +592,7 @@
 		 fcntl.h sys/fcntl.h sys/select.h sys/time.h sys/times.h sys/param.h\
 		 syscall.h pwd.h grp.h a.out.h utime.h memory.h direct.h sys/resource.h \
 		 sys/mkdev.h sys/utime.h xti.h netinet/in_systm.h float.h ieeefp.h pthread.h \
-		 ucontext.h intrinsics.h langinfo.h locale.h)
+		 ucontext.h intrinsics.h langinfo.h locale.h sys/sendfile.h)
 
 dnl Check additional types.
 AC_CHECK_SIZEOF(rlim_t, 0, [
@@ -705,7 +705,8 @@
 	      dlopen sigprocmask sigaction sigsetjmp _setjmp vsnprintf snprintf\
 	      setsid telldir seekdir fchmod cosh sinh tanh log2 round signbit\
 	      setuid setgid daemon select_large_fdset setenv unsetenv\
-	      mktime timegm clock_gettime gettimeofday)
+              mktime timegm clock_gettime gettimeofday\
+              pread sendfile)
 AC_ARG_ENABLE(setreuid,
        [  --enable-setreuid       use setreuid()/setregid() according to need even if obsolete.],
        [use_setreuid=$enableval])
Index: ChangeLog
===================================================================
--- ChangeLog	(revision 15857)
+++ ChangeLog	(revision 15858)
@@ -1,3 +1,13 @@
+Sun Mar 30 15:33:29 2008  Tanaka Akira  <akr@f...>
+
+	* io.c: IO.copy_stream implemented.  [ruby-dev:33843]
+
+	* thread.c (rb_fd_select): new function.
+
+	* configure.in (sys/sendfile.h): check the header file.
+	  (sendfile): check the function.
+	  (pread): check the function.
+
 Sat Mar 29 14:18:41 2008  Hidetoshi NAGAI  <nagai@a...>
 
 	* ext/tk/*: full update Ruby/Tk to support Ruby(1.9|1.8) and Tc/Tk8.5.
Index: io.c
===================================================================
--- io.c	(revision 15857)
+++ io.c	(revision 15858)
@@ -14,6 +14,7 @@
 #include "ruby/ruby.h"
 #include "ruby/io.h"
 #include "ruby/signal.h"
+#include "vm_core.h"
 #include <ctype.h>
 #include <errno.h>
 
@@ -6219,9 +6220,479 @@
     return rb_ensure(io_s_read, (VALUE)&arg, rb_io_close, arg.io);
 }
 
+struct copy_stream_struct {
+    VALUE src;
+    VALUE dst;
+    int src_fd;
+    int dst_fd;
+    off_t copy_length;
+    off_t src_offset;
+    int close_src;
+    int close_dst;
+    off_t total;
+    char *syserr;
+    int error_no;
+    char *notimp;
+    rb_fdset_t fds;
+    rb_thread_t *th;
+};
 
+static void
+copy_stream_rbuf_to_dst(struct copy_stream_struct *stp,
+        rb_io_t *src_fptr, rb_io_t *dst_fptr, const char *dst_path)
+{
+    ssize_t r;
+    int len;
+retry:
+    len = src_fptr->rbuf_len;
+    if (stp->copy_length != (off_t)-1 && stp->copy_length < len) {
+        len = stp->copy_length;
+    }
+    if (len == 0)
+        return;
+    r = rb_write_internal(dst_fptr->fd, src_fptr->rbuf + src_fptr->rbuf_off, len);
+    if (len == r) {
+        src_fptr->rbuf_len -= len;
+        if (src_fptr->rbuf_len < 0) src_fptr->rbuf_len = 0;
+        if (stp->copy_length != (off_t)-1) stp->copy_length -= len;
+        stp->total += len;
+        return;
+    }
+    else if (0 <= r) {
+        src_fptr->rbuf_off += r;
+        src_fptr->rbuf_len -= r;
+        if (stp->copy_length != (off_t)-1) stp->copy_length -= r;
+        stp->total += r;
+        errno = EAGAIN;
+    }
+    if (rb_io_wait_writable(dst_fptr->fd)) {
+        rb_io_check_closed(dst_fptr);
+        if (src_fptr->rbuf_len)
+            goto retry;
+    }
+    rb_sys_fail(dst_path);
+}
+
+static int
+copy_stream_wait_read(struct copy_stream_struct *stp)
+{
+    int ret;
+    rb_fd_zero(&stp->fds);
+    rb_fd_set(stp->src_fd, &stp->fds);
+    ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+    if (ret == -1) {
+        stp->syserr = "select";
+        stp->error_no = errno;
+        return -1;
+    }
+    return 0;
+}
+
+static int
+copy_stream_wait_write(struct copy_stream_struct *stp)
+{
+    int ret;
+    rb_fd_zero(&stp->fds);
+    rb_fd_set(stp->dst_fd, &stp->fds);
+    ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
+    if (ret == -1) {
+        stp->syserr = "select";
+        stp->error_no = errno;
+        return -1;
+    }
+    return 0;
+}
+
+#ifdef HAVE_SENDFILE
+
+#ifdef __linux__
+#define USE_SENDFILE
+
+#ifdef HAVE_SYS_SENDFILE_H
+#include <sys/sendfile.h>
+#endif
+
+static ssize_t
+simple_sendfile(int out_fd, int in_fd, off_t *offset, size_t count)
+{
+    return sendfile(out_fd, in_fd, offset, count);
+}
+
+#endif
+
+#endif
+
+#ifdef USE_SENDFILE
+static int
+copy_stream_sendfile(struct copy_stream_struct *stp)
+{
+    struct stat src_stat, dst_stat;
+    ssize_t ss;
+    int ret;
+
+    off_t copy_length;
+    off_t src_offset;
+    int use_pread;
+
+    ret = fstat(stp->src_fd, &src_stat);
+    if (ret == -1) {
+        stp->syserr = "fstat";
+        stp->error_no = errno;
+        return -1;
+    }
+    if (!S_ISREG(src_stat.st_mode))
+        return 0;
+
+    ret = fstat(stp->dst_fd, &dst_stat);
+    if (ret == -1) {
+        stp->syserr = "fstat";
+        stp->error_no = errno;
+        return -1;
+    }
+    if ((dst_stat.st_mode & S_IFMT) != S_IFSOCK)
+        return 0;
+
+    src_offset = stp->src_offset;
+    use_pread = src_offset != (off_t)-1;
+
+    copy_length = stp->copy_length;
+    if (copy_length == (off_t)-1) {
+        if (use_pread)
+            copy_length = src_stat.st_size - src_offset;
+        else {
+            off_t cur = lseek(stp->src_fd, 0, SEEK_CUR);
+            if (cur == (off_t)-1) {
+                stp->syserr = "lseek";
+                stp->error_no = errno;
+                return -1;
+            }
+            copy_length = src_stat.st_size - cur;
+        }
+    }
+
+retry_sendfile:
+    if (use_pread) {
+        ss = simple_sendfile(stp->dst_fd, stp->src_fd, &src_offset, copy_length);
+    }
+    else {
+        ss = simple_sendfile(stp->dst_fd, stp->src_fd, NULL, copy_length);
+    }
+    if (0 < ss) {
+        stp->total += ss;
+        copy_length -= ss;
+        if (0 < copy_length) {
+            ss = -1;
+            errno = EAGAIN;
+        }
+    }
+    if (ss == -1) {
+        if (errno == EINVAL || errno == ENOSYS)
+            return 0;
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+            if (copy_stream_wait_write(stp) == -1)
+                return -1;
+            if (RUBY_VM_INTERRUPTED(stp->th))
+                return;
+            goto retry_sendfile;
+        }
+        stp->syserr = "sendfile";
+        stp->error_no = errno;
+        return -1;
+    }
+    return 1;
+}
+#endif
+
+static ssize_t
+copy_stream_read(struct copy_stream_struct *stp, char *buf, int len, off_t offset)
+{
+    ssize_t ss;
+retry_read:
+    if (offset == (off_t)-1)
+        ss = read(stp->src_fd, buf, len);
+    else {
+#ifdef HAVE_PREAD
+        ss = pread(stp->src_fd, buf, len, offset);
+#else
+        stp->notimp = "pread";
+        return -1;
+#endif
+    }
+    if (ss == 0) {
+        return 0;
+    }
+    if (ss == -1) {
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+            if (copy_stream_wait_read(stp) == -1)
+                return -1;
+            goto retry_read;
+        }
+        if (errno == ENOSYS) {
+            stp->notimp = "pread";
+            return -1;
+        }
+        stp->syserr = offset == (off_t)-1 ?  "read" : "pread";
+        stp->error_no = errno;
+        return -1;
+    }
+    return ss;
+}
+
+static int
+copy_stream_write(struct copy_stream_struct *stp, char *buf, int len)
+{
+    ssize_t ss;
+    int off = 0;
+    while (len) {
+        ss = write(stp->dst_fd, buf+off, len);
+        if (ss == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                if (copy_stream_wait_write(stp) == -1)
+                    return -1;
+                continue;
+            }
+            stp->syserr = "write";
+            stp->error_no = errno;
+            return -1;
+        }
+        off += ss;
+        len -= ss;
+        stp->total += ss;
+    }
+    return 0;
+}
+
+static void
+copy_stream_read_write(struct copy_stream_struct *stp)
+{
+    char buf[1024*16];
+    int len;
+    ssize_t ss;
+    int ret;
+    off_t copy_length;
+    int use_eof;
+    off_t src_offset;
+    int use_pread;
+
+    copy_length = stp->copy_length;
+    use_eof = copy_length == (off_t)-1;
+    src_offset = stp->src_offset;
+    use_pread = src_offset != (off_t)-1;
+
+    if (use_pread && stp->close_src) {
+        off_t r;
+        r = lseek(stp->src_fd, src_offset, SEEK_SET);
+        if (r == (off_t)-1) {
+            stp->syserr = "lseek";
+            stp->error_no = errno;
+            return;
+        }
+        src_offset = (off_t)-1;
+        use_pread = 0;
+    }
+
+    while (use_eof || 0 < copy_length) {
+        if (!use_eof && copy_length < sizeof(buf)) {
+            len = copy_length;
+        }
+        else {
+            len = sizeof(buf);
+        }
+        if (use_pread) {
+            ss = copy_stream_read(stp, buf, len, src_offset);
+            if (0 < ss)
+                src_offset += ss;
+        }
+        else {
+            ss = copy_stream_read(stp, buf, len, (off_t)-1);
+        }
+        if (ss <= 0) /* EOF or error */
+            return;
+
+        ret = copy_stream_write(stp, buf, ss);
+        if (ret < 0)
+            return;
+
+        if (!use_eof)
+            copy_length -= ss;
+
+        if (RUBY_VM_INTERRUPTED(stp->th))
+            return;
+    }
+}
+
+static VALUE
+copy_stream_func(void *arg)
+{
+    struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+    int ret;
+
+#ifdef USE_SENDFILE
+    ret = copy_stream_sendfile(stp);
+    if (ret != 0)
+        goto finish; /* error or success */
+#endif
+
+    copy_stream_read_write(stp);
+
+finish:
+    return Qnil;
+}
+
+static VALUE
+copy_stream_body(VALUE arg)
+{
+    struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+    VALUE src_io, dst_io;
+    rb_io_t *src_fptr, *dst_fptr;
+    int src_fd, dst_fd;
+    char *src_path = 0, *dst_path = 0;
+
+    stp->th = GET_THREAD();
+
+    src_io = rb_check_convert_type(stp->src, T_FILE, "IO", "to_io");
+    if (!NIL_P(src_io)) {
+        GetOpenFile(src_io, src_fptr);
+        src_fd = src_fptr->fd;
+    }
+    else {
+        src_fptr = 0;
+        FilePathValue(stp->src);
+        src_path = StringValueCStr(stp->src);
+        src_fd = rb_sysopen_internal(src_path, O_RDONLY|O_NOCTTY, 0);
+        if (src_fd == -1) { rb_sys_fail(src_path); }
+        stp->close_src = 1;
+    }
+    stp->src_fd = src_fd;
+
+    dst_io = rb_check_convert_type(stp->dst, T_FILE, "IO", "to_io");
+    if (!NIL_P(dst_io)) {
+        dst_io = GetWriteIO(dst_io);
+        GetOpenFile(dst_io, dst_fptr);
+        dst_fd = dst_fptr->fd;
+    }
+    else {
+        dst_fptr = 0;
+        FilePathValue(stp->dst);
+        dst_path = StringValueCStr(stp->dst);
+        dst_fd = rb_sysopen_internal(dst_path, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0600);
+        if (dst_fd == -1) { rb_sys_fail(dst_path); }
+        stp->close_dst = 1;
+    }
+    stp->dst_fd = dst_fd;
+
+    stp->total = 0;
+
+    if (src_fptr && dst_fptr && src_fptr->rbuf_len && dst_fptr->wbuf_len) {
+        long len = src_fptr->rbuf_len;
+        VALUE str;
+        if (stp->copy_length != (off_t)-1 && stp->copy_length < len) {
+            len = stp->copy_length;
+        }
+        str = rb_str_buf_new(len);
+        rb_str_resize(str,len);
+        read_buffered_data(RSTRING_PTR(str), len, src_fptr);
+        io_fwrite(str, dst_fptr);
+        stp->total += len;
+        if (stp->copy_length != (off_t)-1)
+            stp->copy_length -= len;
+    }
+
+    if (dst_fptr && io_fflush(dst_fptr) < 0) {
+	rb_raise(rb_eIOError, "flush failed");
+    }
+
+    if (src_fptr) {
+        copy_stream_rbuf_to_dst(stp, src_fptr, dst_fptr, dst_path);
+    }
+
+    if (stp->copy_length == 0)
+        return Qnil;
+
+    rb_fd_init(&stp->fds);
+    rb_fd_set(src_fd, &stp->fds);
+    rb_fd_set(dst_fd, &stp->fds);
+
+    return rb_thread_blocking_region(copy_stream_func, (void*)stp, RB_UBF_DFL, 0);
+}
+
+static VALUE
+copy_stream_finalize(VALUE arg)
+{
+    struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+    if (stp->close_src)
+        close(stp->src_fd);
+    if (stp->close_dst)
+        close(stp->dst_fd);
+    rb_fd_term(&stp->fds);
+    if (stp->syserr) {
+        errno = stp->error_no;
+        rb_sys_fail(stp->syserr);
+    }
+    if (stp->notimp) {
+	rb_raise(rb_eNotImpError, "%s() not implemented", stp->notimp);
+    }
+    return Qnil;
+}
+
 /*
  *  call-seq:
+ *     IO.copy_stream(src, dst)
+ *     IO.copy_stream(src, dst, copy_length)
+ *     IO.copy_stream(src, dst, copy_length, src_offset)
+ *
+ *  IO.copy_stream copies <i>src</i> to <i>dst</i>.
+ *  <i>src</i> and <i>dst</i> is either a filename or an IO.
+ *
+ *  This method returns the number of bytes copied.
+ *
+ *  If optional arguments are not given,
+ *  the start position of the copy is
+ *  the beginning of the filename or
+ *  the current file offset of the IO.
+ *  The end position of the copy is the end of file.
+ *
+ *  If <i>copy_length</i> is given,
+ *  No more than <i>copy_length</i> bytes are copied.
+ *
+ *  If <i>src_offset</i> is given,
+ *  it specifies the start position of the copy.
+ *
+ *  When <i>src_offset</i> is specified and
+ *  <i>src</i> is an IO,
+ *  IO.copy_stream doesn't move the current file offset.
+ *
+ */
+static VALUE
+rb_io_s_copy_stream(int argc, VALUE *argv, VALUE io)
+{
+    VALUE src, dst, length, src_offset;
+    struct copy_stream_struct st;
+
+    MEMZERO(&st, struct copy_stream_struct, 1);
+
+    rb_scan_args(argc, argv, "22", &src, &dst, &length, &src_offset);
+
+    if (NIL_P(length))
+        st.copy_length = (off_t)-1;
+    else
+        st.copy_length = NUM2OFFT(length);
+
+    if (NIL_P(src_offset))
+        st.src_offset = (off_t)-1;
+    else
+        st.src_offset = NUM2OFFT(src_offset);
+
+    st.src = src;
+    st.dst = dst;
+
+    rb_ensure(copy_stream_body, (VALUE)&st, copy_stream_finalize, (VALUE)&st);
+
+    return OFFT2NUM(st.total);
+}
+
+/*
+ *  call-seq:
  *     io.external_encoding   => encoding
  *
  *  Returns the Encoding object that represents the encoding of the file.
@@ -6874,6 +7345,7 @@
     rb_define_singleton_method(rb_cIO, "select", rb_f_select, -1);
     rb_define_singleton_method(rb_cIO, "pipe", rb_io_s_pipe, -1);
     rb_define_singleton_method(rb_cIO, "try_convert", rb_io_s_try_convert, 1);
+    rb_define_singleton_method(rb_cIO, "copy_stream", rb_io_s_copy_stream, -1);
 
     rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1);
 
Index: thread.c
===================================================================
--- thread.c	(revision 15857)
+++ thread.c	(revision 15858)
@@ -1702,6 +1702,25 @@
     memcpy(dst->fdset, src, size);
 }
 
+int
+rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
+{
+    fd_set *r = NULL, *w = NULL, *e = NULL;
+    if (readfds) {
+        rb_fd_resize(n - 1, readfds);
+        r = rb_fd_ptr(readfds);
+    }
+    if (writefds) {
+        rb_fd_resize(n - 1, writefds);
+        w = rb_fd_ptr(writefds);
+    }
+    if (exceptfds) {
+        rb_fd_resize(n - 1, exceptfds);
+        e = rb_fd_ptr(exceptfds);
+    }
+    return select(n, r, w, e, timeout);
+}
+
 #undef FD_ZERO
 #undef FD_SET
 #undef FD_CLR
Index: test/ruby/test_io.rb
===================================================================
--- test/ruby/test_io.rb	(revision 15857)
+++ test/ruby/test_io.rb	(revision 15858)
@@ -1,4 +1,7 @@
 require 'test/unit'
+require 'tmpdir'
+require 'io/nonblock'
+require 'socket'
 
 class TestIO < Test::Unit::TestCase
   def test_gets_rs
@@ -61,4 +64,337 @@
       File.read("empty", nil, nil, {})
     end
   end
+
+  def with_pipe
+    r, w = IO.pipe
+    begin
+      yield r, w
+    ensure
+      r.close unless r.closed?
+      w.close unless w.closed?
+    end
+  end
+
+  def with_read_pipe(content)
+    r, w = IO.pipe
+    w << content
+    w.close
+    begin
+      yield r
+    ensure
+      r.close
+    end
+  end
+
+  def mkcdtmpdir
+    Dir.mktmpdir {|d|
+      Dir.chdir(d) {
+        yield
+      }
+    }
+  end
+
+  def test_copy_stream
+    mkcdtmpdir {|d|
+
+      content = "foobar"
+      File.open("src", "w") {|f| f << content }
+      ret = IO.copy_stream("src", "dst")
+      assert_equal(content.bytesize, ret)
+      assert_equal(content, File.read("dst"))
+
+      # overwrite by smaller file.
+      content = "baz"
+      File.open("src", "w") {|f| f << content }
+      ret = IO.copy_stream("src", "dst")
+      assert_equal(content.bytesize, ret)
+      assert_equal(content, File.read("dst"))
+
+      ret = IO.copy_stream("src", "dst", 2)
+      assert_equal(2, ret)
+      assert_equal(content[0,2], File.read("dst"))
+
+      ret = IO.copy_stream("src", "dst", 0)
+      assert_equal(0, ret)
+      assert_equal("", File.read("dst"))
+
+      ret = IO.copy_stream("src", "dst", nil, 1)
+      assert_equal(content.bytesize-1, ret)
+      assert_equal(content[1..-1], File.read("dst"))
+
+      assert_raise(Errno::ENOENT) {
+        IO.copy_stream("nodir/foo", "dst")
+      }
+
+      assert_raise(Errno::ENOENT) {
+        IO.copy_stream("src", "nodir/bar")
+      }
+
+      with_pipe {|r, w|
+        ret = IO.copy_stream("src", w)
+        assert_equal(content.bytesize, ret)
+        w.close
+        assert_equal(content, r.read)
+      }
+
+      with_pipe {|r, w|
+        w.close
+        assert_raise(IOError) { IO.copy_stream("src", w) }
+      }
+
+      pipe_content = "abc"
+      with_read_pipe(pipe_content) {|r|
+        ret = IO.copy_stream(r, "dst")
+        assert_equal(pipe_content.bytesize, ret)
+        assert_equal(pipe_content, File.read("dst"))
+      }
+
+      with_read_pipe("abc") {|r1|
+        assert_equal("a", r1.getc)
+        with_pipe {|r2, w2|
+          w2.sync = false
+          w2 << "def"
+          ret = IO.copy_stream(r1, w2)
+          assert_equal(2, ret)
+          w2.close
+          assert_equal("defbc", r2.read)
+        }
+      }
+
+      with_read_pipe("abc") {|r1|
+        assert_equal("a", r1.getc)
+        with_pipe {|r2, w2|
+          w2.sync = false
+          w2 << "def"
+          ret = IO.copy_stream(r1, w2, 1)
+          assert_equal(1, ret)
+          w2.close
+          assert_equal("defb", r2.read)
+        }
+      }
+
+      with_read_pipe("abc") {|r1|
+        assert_equal("a", r1.getc)
+        with_pipe {|r2, w2|
+          ret = IO.copy_stream(r1, w2)
+          assert_equal(2, ret)
+          w2.close
+          assert_equal("bc", r2.read)
+        }
+      }
+
+      with_read_pipe("abc") {|r1|
+        assert_equal("a", r1.getc)
+        with_pipe {|r2, w2|
+          ret = IO.copy_stream(r1, w2, 1)
+          assert_equal(1, ret)
+          w2.close
+          assert_equal("b", r2.read)
+        }
+      }
+
+      with_read_pipe("abc") {|r1|
+        assert_equal("a", r1.getc)
+        with_pipe {|r2, w2|
+          ret = IO.copy_stream(r1, w2, 0)
+          assert_equal(0, ret)
+          w2.close
+          assert_equal("", r2.read)
+        }
+      }
+
+      with_pipe {|r1, w1|
+        w1 << "abc"
+        assert_equal("a", r1.getc)
+        with_pipe {|r2, w2|
+          w1 << "def"
+          w1.close
+          ret = IO.copy_stream(r1, w2)
+          assert_equal(5, ret)
+          w2.close
+          assert_equal("bcdef", r2.read)
+        }
+      }
+
+      with_pipe {|r, w|
+        ret = IO.copy_stream("src", w, 1, 1)
+        assert_equal(1, ret)
+        w.close
+        assert_equal(content[1,1], r.read)
+      }
+
+      with_read_pipe("abc") {|r1|
+        assert_equal("a", r1.getc)
+        with_pipe {|r2, w2|
+          w2.nonblock = true
+          s = w2.syswrite("a" * 100000)
+          t = Thread.new { sleep 0.1; r2.read }
+          ret = IO.copy_stream(r1, w2)
+          w2.close
+          assert_equal(2, ret)
+          assert_equal("a" * s + "bc", t.value)
+        }
+      }
+
+
+      bigcontent = "abc" * 123456
+      File.open("bigsrc", "w") {|f| f << bigcontent }
+      ret = IO.copy_stream("bigsrc", "bigdst")
+      assert_equal(bigcontent.bytesize, ret)
+      assert_equal(bigcontent, File.read("bigdst"))
+
+      File.unlink("bigdst")
+      ret = IO.copy_stream("bigsrc", "bigdst", nil, 100)
+      assert_equal(bigcontent.bytesize-100, ret)
+      assert_equal(bigcontent[100..-1], File.read("bigdst"))
+
+      File.unlink("bigdst")
+      ret = IO.copy_stream("bigsrc", "bigdst", 30000, 100)
+      assert_equal(30000, ret)
+      assert_equal(bigcontent[100, 30000], File.read("bigdst"))
+
+      File.open("bigsrc") {|f|
+        assert_equal(0, f.pos)
+        ret = IO.copy_stream(f, "bigdst", nil, 10)
+        assert_equal(bigcontent.bytesize-10, ret)
+        assert_equal(bigcontent[10..-1], File.read("bigdst"))
+        assert_equal(0, f.pos)
+        ret = IO.copy_stream(f, "bigdst", 40, 30)
+        assert_equal(40, ret)
+        assert_equal(bigcontent[30, 40], File.read("bigdst"))
+        assert_equal(0, f.pos)
+      }
+
+      with_pipe {|r, w|
+        w.close
+        assert_raise(IOError) { IO.copy_stream("src", w) }
+      }
+
+      megacontent = "abc" * 1234567
+      File.open("megasrc", "w") {|f| f << megacontent }
+
+      with_pipe {|r1, w1|
+        with_pipe {|r2, w2|
+          t1 = Thread.new { w1 << megacontent; w1.close }
+          t2 = Thread.new { r2.read }
+          r1.nonblock = true
+          w2.nonblock = true
+          ret = IO.copy_stream(r1, w2)
+          assert_equal(megacontent.bytesize, ret)
+          w2.close
+          t1.join
+          assert_equal(megacontent, t2.value)
+        }
+      }
+
+      with_pipe {|r1, w1|
+        with_pipe {|r2, w2|
+          t1 = Thread.new { w1 << megacontent; w1.close }
+          t2 = Thread.new { r2.read }
+          ret = IO.copy_stream(r1, w2)
+          assert_equal(megacontent.bytesize, ret)
+          w2.close
+          t1.join
+          assert_equal(megacontent, t2.value)
+        }
+      }
+
+      with_pipe {|r, w|
+        t = Thread.new { r.read }
+        ret = IO.copy_stream("megasrc", w)
+        assert_equal(megacontent.bytesize, ret)
+        w.close
+        assert_equal(megacontent, t.value)
+      }
+    }
+  end
+
+  def with_socketpair
+    s1, s2 = UNIXSocket.pair
+    begin
+      yield s1, s2
+    ensure
+      s1.close unless s1.closed?
+      s2.close unless s2.closed?
+    end
+  end
+
+  def test_copy_stream_socket
+    return unless defined? UNIXSocket
+    mkcdtmpdir {|d|
+
+      content = "foobar"
+      File.open("src", "w") {|f| f << content }
+
+      with_socketpair {|s1, s2|
+        ret = IO.copy_stream("src", s1)
+        assert_equal(content.bytesize, ret)
+        s1.close
+        assert_equal(content, s2.read)
+      }
+
+      bigcontent = "abc" * 123456
+      File.open("bigsrc", "w") {|f| f << bigcontent }
+
+      with_socketpair {|s1, s2|
+        t = Thread.new { s2.read }
+        ret = IO.copy_stream("bigsrc", s1)
+        assert_equal(bigcontent.bytesize, ret)
+        s1.close
+        result = t.value
+        assert_equal(bigcontent, result)
+      }
+
+      with_socketpair {|s1, s2|
+        t = Thread.new { s2.read }
+        ret = IO.copy_stream("bigsrc", s1, 10000)
+        assert_equal(10000, ret)
+        s1.close
+        result = t.value
+        assert_equal(bigcontent[0,10000], result)
+      }
+
+      File.open("bigsrc") {|f|
+        assert_equal(0, f.pos)
+        with_socketpair {|s1, s2|
+          t = Thread.new { s2.read }
+          ret = IO.copy_stream(f, s1, nil, 100)
+          assert_equal(bigcontent.bytesize-100, ret)
+          assert_equal(0, f.pos)
+          s1.close
+          result = t.value
+          assert_equal(bigcontent[100..-1], result)
+        }
+      }
+
+      File.open("bigsrc") {|f|
+        assert_equal(bigcontent[0,100], f.read(100))
+        assert_equal(100, f.pos)
+        with_socketpair {|s1, s2|
+          t = Thread.new { s2.read }
+          ret = IO.copy_stream(f, s1)
+          assert_equal(bigcontent.bytesize-100, ret)
+          assert_equal(bigcontent.length, f.pos)
+          s1.close
+          result = t.value
+          assert_equal(bigcontent[100..-1], result)
+        }
+      }
+
+      megacontent = "abc" * 1234567
+      File.open("megasrc", "w") {|f| f << megacontent }
+
+      with_socketpair {|s1, s2|
+        t = Thread.new { s2.read }
+        s1.nonblock = true
+        ret = IO.copy_stream("megasrc", s1)
+        assert_equal(megacontent.bytesize, ret)
+        s1.close
+        result = t.value
+        assert_equal(megacontent, result)
+      }
+
+
+    }
+  end
 end

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]