[grizzly~git:ef70908e] [2.3.x] + fix issue #1469

  • From: oleksiys@...
  • To: commits@...
  • Subject: [grizzly~git:ef70908e] [2.3.x] + fix issue #1469
  • Date: Wed, 17 Apr 2013 01:30:07 +0000

Project:    grizzly
Repository: git
Revision:   ef70908ea0696250c71acc76aa2472b8813415b5
Author:     oleksiys
Date:       2013-04-17 01:28:14 UTC
Link:       

Log Message:
------------
[2.3.x] + fix issue #1469
http://java.net/jira/browse/GRIZZLY-1469
"Properly process the situation when DataFrame comes before SynReply"


Revisions:
----------
ef70908ea0696250c71acc76aa2472b8813415b5


Modified Paths:
---------------
modules/spdy/src/main/java/org/glassfish/grizzly/spdy/Constants.java
modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyHandlerFilter.java
modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdySession.java
modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStream.java
modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStreamException.java
modules/spdy/src/test/java/org/glassfish/grizzly/spdy/SpdySemanticsTest.java


Diffs:
------
diff --git 
a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/Constants.java 
b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/Constants.java
index 6e74966..3ca3dbb 100644
--- a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/Constants.java
+++ b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/Constants.java
@@ -87,6 +87,9 @@ public class Constants {
     static final Termination RESET_TERMINATION =
             new Termination(TerminationType.RST, "Reset by peer");
 
+    static final Termination UNEXPECTED_FRAME_TERMINATION =
+            new Termination(TerminationType.LOCAL_CLOSE, "Unexpected SPDY 
frame");
+
     static final Termination FRAME_TOO_LARGE_TERMINATION =
             new Termination(TerminationType.LOCAL_CLOSE, "SpdyFrame sent by 
peer is too big");
     
diff --git 
a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyHandlerFilter.java
 
b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyHandlerFilter.java
index 368bfd6..a8d1a93 100644
--- 
a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyHandlerFilter.java
+++ 
b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyHandlerFilter.java
@@ -261,7 +261,7 @@ public class SpdyHandlerFilter extends HttpBaseFilter {
         
         return ctx.getStopAction();
     }
-
+    
     private void processInFrame(final SpdySession spdySession,
                                 final FilterChainContext context,
                                 final SpdyFrame frame)
@@ -461,11 +461,13 @@ public class SpdyHandlerFilter extends HttpBaseFilter {
             spdyRequest.setExpectContent(false);
         }
         
+        spdyStream.onSynFrameCome();
+        
         final boolean isExpectContent = spdyRequest.isExpectContent();
         if (!isExpectContent) {
             spdyStream.inputBuffer.close(IN_FIN_TERMINATION);
         }
-        
+
         sendUpstream(spdySession, spdyStream, spdyRequest, isExpectContent);
     }
 
@@ -710,7 +712,6 @@ public class SpdyHandlerFilter extends HttpBaseFilter {
         final HttpResponsePacket response = spdyRequest.getResponse();
         if (response == null) {
             spdyResponse = SpdyResponse.create();
-            spdyResponse.setRequest(spdyRequest);
         } else if (response instanceof SpdyResponse) {
             spdyResponse = (SpdyResponse) response;
         } else {
@@ -731,8 +732,9 @@ public class SpdyHandlerFilter extends HttpBaseFilter {
             frame.recycle();
         }
 
-        bind(spdyRequest, spdyResponse);
+        spdyStream.onSynFrameCome();
         
+        bind(spdyRequest, spdyResponse);
         sendUpstream(spdySession, spdyStream, spdyResponse, !isFin);
     }
 
@@ -745,53 +747,62 @@ public class SpdyHandlerFilter extends HttpBaseFilter {
         
         final SpdySession spdySession = checkSpdySession(ctx, false);
 
-        if (spdySession != null &&
-                HttpPacket.isHttp(message)) {
+        if (spdySession != null) {
+            if (HttpPacket.isHttp(message)) {
             
-            // Get HttpPacket
-            final HttpPacket httpPacket = ctx.getMessage();
-            final HttpHeader httpHeader = httpPacket.getHttpHeader();
+                // Get HttpPacket
+                final HttpPacket httpPacket = ctx.getMessage();
+                final HttpHeader httpHeader = httpPacket.getHttpHeader();
 
-            if (!httpHeader.isCommitted()) {
-                if (httpHeader.isRequest()) {
-                    prepareOutgoingRequest((HttpRequestPacket) httpHeader);
-                } else {
-                    prepareOutgoingResponse((HttpResponsePacket) httpHeader);
+                if (!httpHeader.isCommitted()) {
+                    if (httpHeader.isRequest()) {
+                        prepareOutgoingRequest((HttpRequestPacket) 
httpHeader);
+                    } else {
+                        prepareOutgoingResponse((HttpResponsePacket) 
httpHeader);
+                    }
                 }
-            }
-            
-            final boolean isNewStream = httpHeader.isRequest() &&
-                    !httpHeader.isCommitted();
-            
-            final Lock newStreamLock = spdySession.getNewClientStreamLock();
-            
-            if (isNewStream) {
-                newStreamLock.lock();
-            }
-            
-            try {
-                SpdyStream spdyStream = SpdyStream.getSpdyStream(httpHeader);
-                
-                if (isNewStream && spdyStream == null) {
-                    spdyStream = spdySession.openStream(
-                            (HttpRequestPacket) httpHeader,
-                            spdySession.getNextLocalStreamId(),
-                            0, 0, 0, !httpHeader.isExpectContent());
+
+                final boolean isNewStream = httpHeader.isRequest() &&
+                        !httpHeader.isCommitted();
+
+                final Lock newStreamLock = 
spdySession.getNewClientStreamLock();
+
+                if (isNewStream) {
+                    newStreamLock.lock();
                 }
 
-                assert spdyStream != null;
-                
-                final TransportContext transportContext = 
ctx.getTransportContext();
+                try {
+                    SpdyStream spdyStream = 
SpdyStream.getSpdyStream(httpHeader);
+
+                    if (isNewStream && spdyStream == null) {
+                        spdyStream = spdySession.openStream(
+                                (HttpRequestPacket) httpHeader,
+                                spdySession.getNextLocalStreamId(),
+                                0, 0, 0, !httpHeader.isExpectContent());
+                    }
+
+                    assert spdyStream != null;
+
+                    final TransportContext transportContext = 
ctx.getTransportContext();
 
-                spdyStream.writeDownStream(httpPacket,
+                    spdyStream.writeDownStream(httpPacket,
+                            transportContext.getCompletionHandler(),
+                            transportContext.getMessageCloner());
+
+                    return ctx.getStopAction();
+                } finally {
+                    if (isNewStream) {
+                        newStreamLock.unlock();
+                    }
+                }
+            } else {
+                final TransportContext transportContext = 
ctx.getTransportContext();
+                spdySession.writeDownStream(message,
                         transportContext.getCompletionHandler(),
                         transportContext.getMessageCloner());
+                        
                 
                 return ctx.getStopAction();
-            } finally {
-                if (isNewStream) {
-                    newStreamLock.unlock();
-                }
             }
         }
 
@@ -1362,7 +1373,7 @@ public class SpdyHandlerFilter extends HttpBaseFilter {
     }
 
     private static void processDataFrame(final SpdyStream spdyStream,
-                                         final SpdyFrame frame) {
+            final SpdyFrame frame) throws SpdyStreamException {
 
         DataFrame dataFrame = (DataFrame) frame;
         spdyStream.offerInputData(dataFrame.getData(), 
dataFrame.isFlagSet(DataFrame.FLAG_FIN));
diff --git 
a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdySession.java 
b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdySession.java
index b809500..6e9f42e 100644
--- a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdySession.java
+++ b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdySession.java
@@ -351,6 +351,14 @@ final class SpdySession {
                 null, frame, completionHandler, (MessageCloner) null);       
 
     }
 
+    <K> void writeDownStream(final K anyMessage,
+            final CompletionHandler<WriteResult> completionHandler,
+            final MessageCloner<K> messageCloner) {
+        
+        downstreamChain.write(connection,
+                null, anyMessage, completionHandler, messageCloner);        
+    }
+
     boolean initCommunication(final FilterChainContext context,
             final boolean isUpStream) {
         
diff --git 
a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStream.java 
b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStream.java
index 033a970..b5da8f3 100644
--- a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStream.java
+++ b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStream.java
@@ -69,6 +69,7 @@ import org.glassfish.grizzly.impl.FutureImpl;
 import org.glassfish.grizzly.memory.Buffers;
 import org.glassfish.grizzly.memory.CompositeBuffer;
 import org.glassfish.grizzly.memory.CompositeBuffer.DisposeOrder;
+import org.glassfish.grizzly.spdy.frames.RstStreamFrame;
 import org.glassfish.grizzly.utils.Futures;
 
 import static org.glassfish.grizzly.spdy.Constants.*;
@@ -78,7 +79,7 @@ import static org.glassfish.grizzly.spdy.Constants.*;
  */
 public class SpdyStream implements AttributeStorage, OutputSink, Closeable {
     public static final String SPDY_STREAM_ATTRIBUTE = 
SpdyStream.class.getName();
-    
+
     private enum CompletionUnit {
         Input, Output, Complete
     }
@@ -111,7 +112,10 @@ public class SpdyStream implements AttributeStorage, 
OutputSink, Closeable {
     
     // flag, which is indicating if SpdyStream processing has been marked as 
complete by external code
     volatile boolean isProcessingComplete;
-            
+    
+    // flag, which is indicating if SynStream or SynReply frames have 
already come for this SpdyStream
+    private boolean isSynFrameCame;
+    
     public static SpdyStream getSpdyStream(final HttpHeader httpHeader) {
         final HttpRequestPacket request = httpHeader.isRequest() ?
                 (HttpRequestPacket) httpHeader :
@@ -253,6 +257,7 @@ public class SpdyStream implements AttributeStorage, 
OutputSink, Closeable {
         outputSink.writeDownStream(httpPacket, completionHandler);
     }
     
+    @SuppressWarnings("unchecked")
     void writeDownStream(final HttpPacket httpPacket,
             final CompletionHandler<WriteResult> completionHandler,
             final MessageCloner messageCloner)
@@ -347,6 +352,7 @@ public class SpdyStream implements AttributeStorage, 
OutputSink, Closeable {
     }
     
     @Override
+    @SuppressWarnings("unchecked")
     public void addCloseListener(CloseListener closeListener) {
         CloseType closeType = closeTypeFlag.get();
         
@@ -407,11 +413,29 @@ public class SpdyStream implements AttributeStorage, 
OutputSink, Closeable {
             localWindowSize = spdySession.getLocalInitialWindowSize();
         }
     }
+
+    void onSynFrameCome() throws SpdyStreamException {
+        if (!isSynFrameCame) {
+            isSynFrameCame = true;
+        } else {
+            inputBuffer.close(UNEXPECTED_FRAME_TERMINATION);
+            throw new SpdyStreamException(getStreamId(),
+                    RstStreamFrame.PROTOCOL_ERROR, "Only one syn frame is 
allowed");
+        }
+    }
     
     private Buffer cachedInputBuffer;
     private boolean cachedIsLast;
     
-    void offerInputData(final Buffer data, final boolean isLast) {
+    void offerInputData(final Buffer data, final boolean isLast)
+            throws SpdyStreamException {
+        if (!isSynFrameCame) {
+            close(null, true);
+            
+            throw new SpdyStreamException(getStreamId(),
+                    RstStreamFrame.PROTOCOL_ERROR, "DataFrame came before 
SynReply");
+        }
+        
         onDataFrameReceive();
         
         final boolean isFirstBufferCached = (cachedInputBuffer == null);
@@ -465,6 +489,7 @@ public class SpdyStream implements AttributeStorage, 
OutputSink, Closeable {
     /**
      * Notify all close listeners
      */
+    @SuppressWarnings("unchecked")
     private void notifyCloseListeners() {
         final CloseType closeType = closeTypeFlag.get();
         
diff --git 
a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStreamException.java
 
b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStreamException.java
index 726334f..16c2c97 100644
--- 
a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStreamException.java
+++ 
b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyStreamException.java
@@ -51,6 +51,13 @@ public final class SpdyStreamException extends IOException 
{
     private final int rstReason;
 
     public SpdyStreamException(final int streamId, final int rstReason) {
+        this(streamId, rstReason, null);
+    }
+
+    public SpdyStreamException(final int streamId, final int rstReason,
+            final String description) {
+        super(description);
+        
         this.streamId = streamId;
         this.rstReason = rstReason;
     }
diff --git 
a/modules/spdy/src/test/java/org/glassfish/grizzly/spdy/SpdySemanticsTest.java
 
b/modules/spdy/src/test/java/org/glassfish/grizzly/spdy/SpdySemanticsTest.java
index 9f13dcc..d229e34 100644
--- 
a/modules/spdy/src/test/java/org/glassfish/grizzly/spdy/SpdySemanticsTest.java
+++ 
b/modules/spdy/src/test/java/org/glassfish/grizzly/spdy/SpdySemanticsTest.java
@@ -48,10 +48,13 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 import java.util.zip.Deflater;
 import org.glassfish.grizzly.Buffer;
+import org.glassfish.grizzly.CloseListener;
+import org.glassfish.grizzly.CloseType;
 import org.glassfish.grizzly.Connection;
 import org.glassfish.grizzly.Grizzly;
 import org.glassfish.grizzly.WriteResult;
@@ -60,6 +63,7 @@ import org.glassfish.grizzly.filterchain.FilterChain;
 import org.glassfish.grizzly.filterchain.FilterChainBuilder;
 import org.glassfish.grizzly.filterchain.FilterChainContext;
 import org.glassfish.grizzly.filterchain.NextAction;
+import org.glassfish.grizzly.http.HttpPacket;
 import org.glassfish.grizzly.http.HttpRequestPacket;
 import org.glassfish.grizzly.http.Method;
 import org.glassfish.grizzly.http.Protocol;
@@ -72,6 +76,7 @@ import org.glassfish.grizzly.http.server.Response;
 import org.glassfish.grizzly.http.server.StaticHttpHandler;
 import org.glassfish.grizzly.http.util.Header;
 import org.glassfish.grizzly.impl.FutureImpl;
+import org.glassfish.grizzly.memory.Buffers;
 import org.glassfish.grizzly.memory.CompositeBuffer;
 import org.glassfish.grizzly.memory.MemoryManager;
 import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
@@ -697,7 +702,7 @@ public class SpdySemanticsTest extends AbstractSpdyTest {
                             clientInQueue.poll(10, TimeUnit.SECONDS);
                     
                     assertNotNull("We expect more frames. Expected=" + 
normalRequestsCounter + " got=" + repliesGot, frame);
-                    assertTrue("Connection  was unexpectedly closed", frame 
!= CLOSE_FRAME);
+                    assertTrue("Connection was unexpectedly closed", frame 
!= CLOSE_FRAME);
                     
                     if (!frame.getHeader().isControl()) {
                         // skip DataFrame
@@ -729,6 +734,94 @@ public class SpdySemanticsTest extends AbstractSpdyTest {
         }
     }
     
+    /**
+     * Testing client side to properly process incorrect server side 
response,
+     * where DataFrame comes before SynReply
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testRstOnMissedSynReply() throws Exception {
+        final TCPNIOTransport clientTransport =
+                TCPNIOTransportBuilder.newInstance().build();
+        final HttpServer server = createServer(
+                HttpHandlerRegistration.of(new HttpHandler() {
+            @Override
+            public void service(Request request, Response response) throws 
Exception {
+                final SpdyStream spdyStream =
+                        (SpdyStream) 
request.getAttribute(SpdyStream.SPDY_STREAM_ATTRIBUTE);
+                
+                final FilterChainContext ctx = request.getContext();
+                
+                final DataFrame dataFrame = DataFrame.builder()
+                        .streamId(spdyStream.getStreamId())
+                        .data(Buffers.wrap(ctx.getMemoryManager(), 
"Unexpected data frame"))
+                        .last(false)
+                        .build();
+                
+                ctx.write(dataFrame);
+            }
+        }, "/test"));
+        
+        try {
+            final BlockingQueue<HttpPacket> clientQueue =
+                    new LinkedTransferQueue<HttpPacket>();
+            final FutureImpl<CloseType> closeFuture =
+                    Futures.<CloseType>createSafeFuture();
+            
+            server.start();
+            final FilterChainBuilder clientFilterChainBuilder =
+                    createClientFilterChainAsBuilder(SpdyMode.PLAIN, false);
+            
+            clientFilterChainBuilder.add(new BaseFilter() {
+                @Override
+                public NextAction handleRead(final FilterChainContext ctx)
+                        throws IOException {
+                    final HttpPacket packet = ctx.getMessage();
+                    clientQueue.add(packet);
+                    return ctx.getInvokeAction();
+                }
+            });
+            
+            clientTransport.setProcessor(clientFilterChainBuilder.build());
+
+            clientTransport.start();
+
+            Future<Connection> connectFuture = 
clientTransport.connect("localhost", PORT);
+            Connection connection = null;
+            try {
+                connection = connectFuture.get(10, TimeUnit.SECONDS);
+                final SpdySession spdySession = SpdySession.get(connection);
+                
+                final SpdyStream spdyStream = spdySession.getStreamBuilder()
+                        .method(Method.GET)
+                        .uri("/test")
+                        .protocol(Protocol.HTTP_1_1)
+                        .header(Header.Host, "localhost:" + PORT)
+                        .fin(true)
+                        .open();
+                
+                spdyStream.addCloseListener(new CloseListener<SpdyStream, 
CloseType>() {
+                    @Override
+                    public void onClosed(SpdyStream closeable, CloseType 
type)
+                            throws IOException {
+                        closeFuture.result(type);
+                    }
+                });
+
+                assertEquals(CloseType.LOCALLY, closeFuture.get(10, 
TimeUnit.SECONDS));
+                assertTrue(clientQueue.isEmpty());
+            } finally {
+                // Close the client connection
+                if (connection != null) {
+                    connection.closeSilently();
+                }
+            }
+        } finally {
+            clientTransport.stop();
+            server.stop();
+        }
+    }
+    
     private HttpServer createServer(final HttpHandlerRegistration... 
registrations) {
         return createServer(".", PORT, SpdyMode.PLAIN, false, registrations);
     }




[grizzly~git:ef70908e] [2.3.x] + fix issue #1469

oleksiys 04/17/2013
Terms of Use; Privacy Policy; Copyright ©2013-2015 (revision 20150226.965aeb8)
 
 
Close
loading
Please Confirm
Close