* gnu/packages/patches/java-jeromq-fix-tests.patch: New file. * gnu/local.mk (dist_patch_DATA): Add it. * gnu/packages/java.scm (java-jeromq)[source](patches): Add it. [arguments](#test-exclude): Disable more failing tests.
		
			
				
	
	
		
			253 lines
		
	
	
	
		
			9.1 KiB
		
	
	
	
		
			Diff
		
	
	
	
	
	
			
		
		
	
	
			253 lines
		
	
	
	
		
			9.1 KiB
		
	
	
	
		
			Diff
		
	
	
	
	
	
| From 5803aadd3f209eba1ffbb2cf7bf16778019dbee1 Mon Sep 17 00:00:00 2001
 | |
| From: fredoboulo <fredoboulo@users.noreply.github.com>
 | |
| Date: Fri, 23 Feb 2018 23:55:57 +0100
 | |
| Subject: [PATCH] Fix #524 : V1 and V2 protocol downgrades handle received data
 | |
|  in handshake buffer
 | |
| 
 | |
| This patch is upstream pull request, see:
 | |
| https://gihub.com/zeromq/jeromq/pull/527.
 | |
| 
 | |
| It is merged on commit c2afa9c, and we can drop it on the
 | |
| 0.4.4 release.
 | |
| 
 | |
| ---
 | |
|  src/main/java/zmq/io/StreamEngine.java            | 21 ++++++++++--
 | |
|  src/test/java/zmq/io/AbstractProtocolVersion.java | 41 +++++++++++++----------
 | |
|  src/test/java/zmq/io/V0ProtocolTest.java          | 12 +++++++
 | |
|  src/test/java/zmq/io/V1ProtocolTest.java          | 16 +++++++--
 | |
|  src/test/java/zmq/io/V2ProtocolTest.java          | 16 +++++++--
 | |
|  5 files changed, 81 insertions(+), 25 deletions(-)
 | |
| 
 | |
| diff --git a/src/main/java/zmq/io/StreamEngine.java b/src/main/java/zmq/io/StreamEngine.java
 | |
| index b8933c92..fe2f2d8d 100644
 | |
| --- a/src/main/java/zmq/io/StreamEngine.java
 | |
| +++ b/src/main/java/zmq/io/StreamEngine.java
 | |
| @@ -816,9 +816,7 @@ private boolean handshake()
 | |
|              assert (bufferSize == headerSize);
 | |
|  
 | |
|              //  Make sure the decoder sees the data we have already received.
 | |
| -            greetingRecv.flip();
 | |
| -            inpos = greetingRecv;
 | |
| -            insize = greetingRecv.limit();
 | |
| +            decodeDataAfterHandshake(0);
 | |
|  
 | |
|              //  To allow for interoperability with peers that do not forward
 | |
|              //  their subscriptions, we inject a phantom subscription message
 | |
| @@ -846,6 +844,8 @@ else if (greetingRecv.get(revisionPos) == Protocol.V1.revision) {
 | |
|              }
 | |
|              encoder = new V1Encoder(errno, Config.OUT_BATCH_SIZE.getValue());
 | |
|              decoder = new V1Decoder(errno, Config.IN_BATCH_SIZE.getValue(), options.maxMsgSize, options.allocator);
 | |
| +
 | |
| +            decodeDataAfterHandshake(V2_GREETING_SIZE);
 | |
|          }
 | |
|          else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) {
 | |
|              //  ZMTP/2.0 framing.
 | |
| @@ -859,6 +859,8 @@ else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) {
 | |
|              }
 | |
|              encoder = new V2Encoder(errno, Config.OUT_BATCH_SIZE.getValue());
 | |
|              decoder = new V2Decoder(errno, Config.IN_BATCH_SIZE.getValue(), options.maxMsgSize, options.allocator);
 | |
| +
 | |
| +            decodeDataAfterHandshake(V2_GREETING_SIZE);
 | |
|          }
 | |
|          else {
 | |
|              zmtpVersion = Protocol.V3;
 | |
| @@ -904,6 +906,19 @@ else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) {
 | |
|          return true;
 | |
|      }
 | |
|  
 | |
| +    private void decodeDataAfterHandshake(int greetingSize)
 | |
| +    {
 | |
| +        final int pos = greetingRecv.position();
 | |
| +        if (pos > greetingSize) {
 | |
| +            // data is present after handshake
 | |
| +            greetingRecv.position(greetingSize).limit(pos);
 | |
| +
 | |
| +            //  Make sure the decoder sees this extra data.
 | |
| +            inpos = greetingRecv;
 | |
| +            insize = greetingRecv.remaining();
 | |
| +        }
 | |
| +    }
 | |
| +
 | |
|      private Msg identityMsg()
 | |
|      {
 | |
|          Msg msg = new Msg(options.identitySize);
 | |
| diff --git a/src/test/java/zmq/io/AbstractProtocolVersion.java b/src/test/java/zmq/io/AbstractProtocolVersion.java
 | |
| index e60db403..aa06b4a7 100644
 | |
| --- a/src/test/java/zmq/io/AbstractProtocolVersion.java
 | |
| +++ b/src/test/java/zmq/io/AbstractProtocolVersion.java
 | |
| @@ -18,15 +18,18 @@
 | |
|  import zmq.SocketBase;
 | |
|  import zmq.ZError;
 | |
|  import zmq.ZMQ;
 | |
| +import zmq.ZMQ.Event;
 | |
|  import zmq.util.Utils;
 | |
|  
 | |
|  public abstract class AbstractProtocolVersion
 | |
|  {
 | |
| +    protected static final int REPETITIONS = 1000;
 | |
| +
 | |
|      static class SocketMonitor extends Thread
 | |
|      {
 | |
| -        private final Ctx             ctx;
 | |
| -        private final String          monitorAddr;
 | |
| -        private final List<ZMQ.Event> events = new ArrayList<>();
 | |
| +        private final Ctx         ctx;
 | |
| +        private final String      monitorAddr;
 | |
| +        private final ZMQ.Event[] events = new ZMQ.Event[1];
 | |
|  
 | |
|          public SocketMonitor(Ctx ctx, String monitorAddr)
 | |
|          {
 | |
| @@ -41,15 +44,15 @@ public void run()
 | |
|              boolean rc = s.connect(monitorAddr);
 | |
|              assertThat(rc, is(true));
 | |
|              // Only some of the exceptional events could fire
 | |
| -            while (true) {
 | |
| -                ZMQ.Event event = ZMQ.Event.read(s);
 | |
| -                if (event == null && s.errno() == ZError.ETERM) {
 | |
| -                    break;
 | |
| -                }
 | |
| -                assertThat(event, notNullValue());
 | |
| -
 | |
| -                events.add(event);
 | |
| +
 | |
| +            ZMQ.Event event = ZMQ.Event.read(s);
 | |
| +            if (event == null && s.errno() == ZError.ETERM) {
 | |
| +                s.close();
 | |
| +                return;
 | |
|              }
 | |
| +            assertThat(event, notNullValue());
 | |
| +
 | |
| +            events[0] = event;
 | |
|              s.close();
 | |
|          }
 | |
|      }
 | |
| @@ -69,11 +72,12 @@ public void run()
 | |
|          boolean rc = ZMQ.setSocketOption(receiver, ZMQ.ZMQ_LINGER, 0);
 | |
|          assertThat(rc, is(true));
 | |
|  
 | |
| -        SocketMonitor monitor = new SocketMonitor(ctx, "inproc://monitor");
 | |
| -        monitor.start();
 | |
|          rc = ZMQ.monitorSocket(receiver, "inproc://monitor", ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL);
 | |
|          assertThat(rc, is(true));
 | |
|  
 | |
| +        SocketMonitor monitor = new SocketMonitor(ctx, "inproc://monitor");
 | |
| +        monitor.start();
 | |
| +
 | |
|          rc = ZMQ.bind(receiver, host);
 | |
|          assertThat(rc, is(true));
 | |
|  
 | |
| @@ -81,17 +85,18 @@ public void run()
 | |
|          OutputStream out = sender.getOutputStream();
 | |
|          for (ByteBuffer raw : raws) {
 | |
|              out.write(raw.array());
 | |
| -            ZMQ.msleep(100);
 | |
|          }
 | |
|  
 | |
|          Msg msg = ZMQ.recv(receiver, 0);
 | |
|          assertThat(msg, notNullValue());
 | |
|          assertThat(new String(msg.data(), ZMQ.CHARSET), is(payload));
 | |
|  
 | |
| -        ZMQ.msleep(500);
 | |
| -        assertThat(monitor.events.size(), is(1));
 | |
| -        assertThat(monitor.events.get(0).event, is(ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL));
 | |
| -        assertThat((Integer) monitor.events.get(0).arg, is(version));
 | |
| +        monitor.join();
 | |
| +
 | |
| +        final Event event = monitor.events[0];
 | |
| +        assertThat(event, notNullValue());
 | |
| +        assertThat(event.event, is(ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL));
 | |
| +        assertThat((Integer) event.arg, is(version));
 | |
|  
 | |
|          InputStream in = sender.getInputStream();
 | |
|          byte[] data = new byte[255];
 | |
| diff --git a/src/test/java/zmq/io/V0ProtocolTest.java b/src/test/java/zmq/io/V0ProtocolTest.java
 | |
| index bd547d23..1a5b7aef 100644
 | |
| --- a/src/test/java/zmq/io/V0ProtocolTest.java
 | |
| +++ b/src/test/java/zmq/io/V0ProtocolTest.java
 | |
| @@ -10,6 +10,18 @@
 | |
|  
 | |
|  public class V0ProtocolTest extends AbstractProtocolVersion
 | |
|  {
 | |
| +    @Test
 | |
| +    public void testFixIssue524() throws IOException, InterruptedException
 | |
| +    {
 | |
| +        for (int idx = 0; idx < REPETITIONS; ++idx) {
 | |
| +            if (idx % 100 == 0) {
 | |
| +                System.out.print(idx + " ");
 | |
| +            }
 | |
| +            testProtocolVersion0short();
 | |
| +        }
 | |
| +        System.out.println();
 | |
| +    }
 | |
| +
 | |
|      @Test(timeout = 2000)
 | |
|      public void testProtocolVersion0short() throws IOException, InterruptedException
 | |
|      {
 | |
| diff --git a/src/test/java/zmq/io/V1ProtocolTest.java b/src/test/java/zmq/io/V1ProtocolTest.java
 | |
| index e1045f34..764159d0 100644
 | |
| --- a/src/test/java/zmq/io/V1ProtocolTest.java
 | |
| +++ b/src/test/java/zmq/io/V1ProtocolTest.java
 | |
| @@ -10,7 +10,19 @@
 | |
|  
 | |
|  public class V1ProtocolTest extends AbstractProtocolVersion
 | |
|  {
 | |
| -    @Test(timeout = 2000)
 | |
| +    @Test
 | |
| +    public void testFixIssue524() throws IOException, InterruptedException
 | |
| +    {
 | |
| +        for (int idx = 0; idx < REPETITIONS; ++idx) {
 | |
| +            if (idx % 100 == 0) {
 | |
| +                System.out.print(idx + " ");
 | |
| +            }
 | |
| +            testProtocolVersion1short();
 | |
| +        }
 | |
| +        System.out.println();
 | |
| +    }
 | |
| +
 | |
| +    @Test
 | |
|      public void testProtocolVersion1short() throws IOException, InterruptedException
 | |
|      {
 | |
|          List<ByteBuffer> raws = raws(0);
 | |
| @@ -25,7 +37,7 @@ public void testProtocolVersion1short() throws IOException, InterruptedException
 | |
|          assertProtocolVersion(1, raws, "abcdefg");
 | |
|      }
 | |
|  
 | |
| -    @Test(timeout = 2000)
 | |
| +    @Test
 | |
|      public void testProtocolVersion1long() throws IOException, InterruptedException
 | |
|      {
 | |
|          List<ByteBuffer> raws = raws(0);
 | |
| diff --git a/src/test/java/zmq/io/V2ProtocolTest.java b/src/test/java/zmq/io/V2ProtocolTest.java
 | |
| index d5e64bce..7fda31bc 100644
 | |
| --- a/src/test/java/zmq/io/V2ProtocolTest.java
 | |
| +++ b/src/test/java/zmq/io/V2ProtocolTest.java
 | |
| @@ -21,7 +21,19 @@ protected ByteBuffer identity()
 | |
|                  .put((byte) 0);
 | |
|      }
 | |
|  
 | |
| -    @Test(timeout = 2000)
 | |
| +    @Test
 | |
| +    public void testFixIssue524() throws IOException, InterruptedException
 | |
| +    {
 | |
| +        for (int idx = 0; idx < REPETITIONS; ++idx) {
 | |
| +            if (idx % 100 == 0) {
 | |
| +                System.out.print(idx + " ");
 | |
| +            }
 | |
| +            testProtocolVersion2short();
 | |
| +        }
 | |
| +        System.out.println();
 | |
| +    }
 | |
| +
 | |
| +    @Test
 | |
|      public void testProtocolVersion2short() throws IOException, InterruptedException
 | |
|      {
 | |
|          List<ByteBuffer> raws = raws(1);
 | |
| @@ -38,7 +50,7 @@ public void testProtocolVersion2short() throws IOException, InterruptedException
 | |
|          assertProtocolVersion(2, raws, "abcdefg");
 | |
|      }
 | |
|  
 | |
| -    @Test(timeout = 2000)
 | |
| +    @Test
 | |
|      public void testProtocolVersion2long() throws IOException, InterruptedException
 | |
|      {
 | |
|          List<ByteBuffer> raws = raws(1);
 |