[grizzly~git:c7ff6fcd] [master] + implement task # #1512

  • From: oleksiys@...
  • To: commits@...
  • Subject: [grizzly~git:c7ff6fcd] [master] + implement task # #1512
  • Date: Tue, 21 May 2013 23:13:22 +0000

Project:    grizzly
Repository: git
Revision:   c7ff6fcd92bd62a2eeb677ca567fcdbb9b4c11a7
Author:     oleksiys
Date:       2013-05-21 22:50:08 UTC
Link:       

Log Message:
------------
[master] + implement task # #1512
https://java.net/jira/browse/GRIZZLY-1512

"SPDY server push is limited by Integer.MAX_VALUE"



Revisions:
----------
c7ff6fcd92bd62a2eeb677ca567fcdbb9b4c11a7


Modified Paths:
---------------
modules/spdy/src/main/java/org/glassfish/grizzly/spdy/Source.java
modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyOutputSink.java


Diffs:
------
--- a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/Source.java
+++ b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/Source.java
@@ -58,7 +58,7 @@ public abstract class Source {
     /**
      * Returns the number of bytes remaining to be written.
      */
-    public abstract int remaining();
+    public abstract long remaining();
     
     /**
      * Returns the number of bytes to be written.
@@ -208,18 +208,14 @@ public abstract class Source {
                 final SpdyStream spdyStream)
                 throws FileNotFoundException {
             fileLengthRemaining = file.length();
-            if (fileLengthRemaining > Integer.MAX_VALUE) {
-                throw new IllegalStateException("Files larger than " + 
Integer.MAX_VALUE + " are not supported");
-            }
-            
             this.fis = new FileInputStream(file);
             this.fileChannel = fis.getChannel();
             this.spdyStream = spdyStream;
         }
 
         @Override
-        public int remaining() {
-            return (int) fileLengthRemaining;
+        public long remaining() {
+            return fileLengthRemaining;
         }
 
         @Override
@@ -291,7 +287,7 @@ public abstract class Source {
         }
         
         @Override
-        public int remaining() {
+        public long remaining() {
             return buffer.remaining();
         }
 
@@ -352,7 +348,7 @@ public abstract class Source {
         }
         
         @Override
-        public int remaining() {
+        public long remaining() {
             return remaining;
         }
 --- 
a/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyOutputSink.java
+++ 
b/modules/spdy/src/main/java/org/glassfish/grizzly/spdy/SpdyOutputSink.java
@@ -75,10 +75,10 @@ import static org.glassfish.grizzly.spdy.Constants.*;
  * @author Alexey Stashok
  */
 final class SpdyOutputSink {
-    private static final int EMPTY_QUEUE_RECORD_SIZE = 1;
+    private static final int ATOMIC_QUEUE_RECORD_SIZE = 1;
     
     private static final OutputQueueRecord TERMINATING_QUEUE_RECORD =
-            new OutputQueueRecord(null, null, null, true);
+            new OutputQueueRecord(null, null, null, true, true);
     
     // async output queue
     final TaskQueue<OutputQueueRecord> outputQueue =
@@ -130,7 +130,8 @@ final class SpdyOutputSink {
             
             // if it's terminating record - processFin
             if (outputQueueRecord == TERMINATING_QUEUE_RECORD) {
-                outputQueue.releaseSpace(EMPTY_QUEUE_RECORD_SIZE);
+                // if it's TERMINATING_QUEUE_RECORD - don't forget to 
release ATOMIC_QUEUE_RECORD_SIZE
+                releaseWriteQueueSpace(0, true, true);
                 writeEmptyFin();
                 return;
             }
@@ -140,6 +141,7 @@ final class SpdyOutputSink {
             AggregatingLifeCycleHandler lifeCycleHandler =
                     outputQueueRecord.lifeCycleHandler;
             boolean isLast = outputQueueRecord.isLast;
+            final boolean isAtomic = outputQueueRecord.isAtomic;
             final Source resource = outputQueueRecord.resource;
             
             // check if output record's buffer is fitting into window size
@@ -159,7 +161,7 @@ final class SpdyOutputSink {
                 // reset isLast for the current chunk
                 isLast = false;
             } else {
-                outputQueueRecord.complete();
+                outputQueueRecord.release();
                 outputQueueRecord = null;
             }
 
@@ -168,8 +170,9 @@ final class SpdyOutputSink {
                     (dataChunkToSend.hasRemaining() || isLast)) {
                 final int dataChunkToSendSize = dataChunkToSend.remaining();
                 
-                DataFrame dataFrame = 
DataFrame.builder().data(dataChunkToSend).
-                        
last(isLast).streamId(spdyStream.getStreamId()).build();
+                final DataFrame dataFrame = DataFrame.builder()
+                        .data(dataChunkToSend).last(isLast)
+                        .streamId(spdyStream.getStreamId()).build();
 
                 // send a spdydata frame
                 writeDownStream(dataFrame, completionHandler,
@@ -178,11 +181,14 @@ final class SpdyOutputSink {
                 
                 // update unconfirmed bytes counter
                 unconfirmedBytes.addAndGet(dataChunkToSendSize);
-                outputQueue.releaseSpace(dataChunkToSendSize);
+                releaseWriteQueueSpace(dataChunkToSendSize,
+                        isAtomic, outputQueueRecord == null);
                 
-                // pass peer-window-size as max, even though these values 
are independent.
-                // later we may want to decouple outputQueue's max-size and 
peer-window-size
                 outputQueue.onSizeDecreased(windowSizeLimit);
+            } else if (isAtomic && outputQueueRecord == null) {
+                // if it's atomic and no remainder left - don't forget to 
release ATOMIC_QUEUE_RECORD_SIZE
+                releaseWriteQueueSpace(0, true, true);
+                outputQueue.onSizeDecreased(ATOMIC_QUEUE_RECORD_SIZE);
             }
             
             if (outputQueueRecord != null) {
@@ -323,8 +329,11 @@ final class SpdyOutputSink {
 
             boolean isDataCloned = false;
 
+            final boolean isAtomic = (dataSize == 0);
+            final int spaceToReserve = isAtomic ? ATOMIC_QUEUE_RECORD_SIZE : 
dataSize;
+
             // Check if output queue is not empty - add new element
-            if (outputQueue.reserveSpace(dataSize) > dataSize) {
+            if (reserveWriteQueueSpace(spaceToReserve) > spaceToReserve) {
                 // if the queue is not empty - the headers should have been 
sent
                 assert headerFrame == null;
             
@@ -340,7 +349,8 @@ final class SpdyOutputSink {
                 outputQueueRecord = new OutputQueueRecord(
                         Source.factory(spdyStream)
                             .createBufferSource(data),
-                        aggrCompletionHandler, aggrLifeCycleHandler, isLast);
+                        aggrCompletionHandler, aggrLifeCycleHandler,
+                        isLast, isAtomic);
                 // Should be called before flushing headerFrame, so
                 // AggregatingCompletionHanlder will not pass completed 
event to the parent
                 outputQueueRecord.incCompletionCounter();
@@ -348,7 +358,7 @@ final class SpdyOutputSink {
                 outputQueue.offer(outputQueueRecord);
 
                 // check if our element wasn't forgotten (async)
-                if (outputQueue.size() != dataSize ||
+                if (outputQueue.size() != spaceToReserve ||
                         !outputQueue.remove(outputQueueRecord)) {
                     // if not - return
                     return;
@@ -384,7 +394,8 @@ final class SpdyOutputSink {
                 outputQueueRecord = new OutputQueueRecord(
                         Source.factory(spdyStream)
                             .createBufferSource(dataChunkToStore),
-                        aggrCompletionHandler, aggrLifeCycleHandler, isLast);
+                        aggrCompletionHandler, aggrLifeCycleHandler,
+                        isLast, isAtomic);
                 outputQueueRecord.incCompletionCounter();
                 // reset completion handler and isLast for the current chunk
                 isLast = false;
@@ -399,7 +410,8 @@ final class SpdyOutputSink {
 
                 // update unconfirmed bytes counter
                 unconfirmedBytes.addAndGet(dataChunkToSendSize);
-                outputQueue.releaseSpace(dataChunkToSendSize);
+                releaseWriteQueueSpace(dataChunkToSendSize,
+                        isAtomic, outputQueueRecord == null);
 
                 // encode spdydata frame
                 dataFrame = DataFrame.builder()
@@ -443,6 +455,7 @@ final class SpdyOutputSink {
         if (outputQueueRecord == null) {
             return;
         }
+        
         addOutputQueueRecord(outputQueueRecord);
     }
     
@@ -523,7 +536,7 @@ final class SpdyOutputSink {
                 return;
             }
 
-            final int dataSize = source.remaining();
+            final long dataSize = source.remaining();
 
             if (dataSize == 0) {
                 close();
@@ -531,7 +544,7 @@ final class SpdyOutputSink {
             }
 
             // our element is first in the output queue
-            outputQueue.reserveSpace(dataSize);
+            reserveWriteQueueSpace(ATOMIC_QUEUE_RECORD_SIZE);
 
             boolean isLast = true;
 
@@ -543,7 +556,7 @@ final class SpdyOutputSink {
             if (fitWindowLen < dataSize) {
                 // Create output record for the chunk to be stored
                 outputQueueRecord = new OutputQueueRecord(source,
-                        null, null, true);
+                        null, null, true, true);
                 isLast = false;
             }
 
@@ -555,7 +568,8 @@ final class SpdyOutputSink {
 
             // update unconfirmed bytes counter
             unconfirmedBytes.addAndGet(dataChunkToSendSize);
-            outputQueue.releaseSpace(dataChunkToSendSize);
+            releaseWriteQueueSpace(dataChunkToSendSize, true,
+                    outputQueueRecord == null);
 
             // encode spdydata frame
             final SpdyFrame dataFrame = DataFrame.builder()
@@ -585,7 +599,7 @@ final class SpdyOutputSink {
      * 
      * @param data the output queue data.
      */
-    private int checkOutputWindow(final int size) {
+    private int checkOutputWindow(final long size) {
         // take a snapshot of the current output window state
         final int unconfirmedBytesNow = unconfirmedBytes.get();
         final int windowSizeLimit = spdyStream.getPeerWindowSize();
@@ -598,7 +612,7 @@ final class SpdyOutputSink {
             return dataSizeAllowedToSend;
         }
 
-        return size;
+        return (int) size;
     }
     
     private Buffer splitOutputBufferIfNeeded(final Buffer buffer,
@@ -683,10 +697,10 @@ final class SpdyOutputSink {
                 return;
             }
             
-            outputQueue.reserveSpace(EMPTY_QUEUE_RECORD_SIZE);
+            outputQueue.reserveSpace(ATOMIC_QUEUE_RECORD_SIZE);
             outputQueue.offer(TERMINATING_QUEUE_RECORD);
             
-            if (outputQueue.size() == EMPTY_QUEUE_RECORD_SIZE &&
+            if (outputQueue.size() == ATOMIC_QUEUE_RECORD_SIZE &&
                     outputQueue.remove(TERMINATING_QUEUE_RECORD)) {
                 writeEmptyFin();
             }
@@ -758,6 +772,7 @@ final class SpdyOutputSink {
                         outputQueueRecord.lifeCycleHandler;
 
                 boolean isLast = outputQueueRecord.isLast;
+                final boolean isAtomic = outputQueueRecord.isAtomic;
                 
                 final Source currentResource = outputQueueRecord.resource;
                 
@@ -776,7 +791,7 @@ final class SpdyOutputSink {
                     // reset isLast for the current chunk
                     isLast = false;
                 } else {
-                    outputQueueRecord.complete();
+                    outputQueueRecord.release();
                     outputQueueRecord = null;
                 }
 
@@ -784,23 +799,41 @@ final class SpdyOutputSink {
                 if (dataChunkToSend != null &&
                         (dataChunkToSend.hasRemaining() || isLast)) {
                     final int dataChunkToSendSize = 
dataChunkToSend.remaining();
-                    
-                    // update unconfirmed bytes counter
-                    unconfirmedBytes.addAndGet(dataChunkToSendSize);
-                    outputQueue.releaseSpace(dataChunkToSendSize);
 
                     // encode spdydata frame
-                    DataFrame frame =
-                            
DataFrame.builder().streamId(spdyStream.getStreamId()).
+                    final DataFrame frame = DataFrame.builder()
+                            .streamId(spdyStream.getStreamId()).
                             data(dataChunkToSend).last(isLast).build();
                     writeDownStream(frame, aggrCompletionHandler,
                             aggrLifeCycleHandler, isLast);
+                    
+                    // update unconfirmed bytes counter
+                    unconfirmedBytes.addAndGet(dataChunkToSendSize);
+                    releaseWriteQueueSpace(dataChunkToSendSize, isAtomic,
+                            outputQueueRecord == null);
+                    
+                } else if (isAtomic && outputQueueRecord == null) {
+                    // if it's atomic and no remainder left - don't forget 
to release ATOMIC_QUEUE_RECORD_SIZE
+                    releaseWriteQueueSpace(0, true, true);
                 }
             } else {
                 break; // will be (or already) written asynchronously
             }
         } while (outputQueueRecord != null);
     }
+    
+    private int reserveWriteQueueSpace(final int spaceToReserve) {
+        return outputQueue.reserveSpace(spaceToReserve);
+    }
+
+    private void releaseWriteQueueSpace(final int justSentBytes, final 
boolean isAtomic,
+            final boolean isEndOfChunk) {
+        if (isEndOfChunk) {
+            outputQueue.releaseSpace(isAtomic ? ATOMIC_QUEUE_RECORD_SIZE : 
justSentBytes);
+        } else if (!isAtomic) {
+            outputQueue.releaseSpace(justSentBytes);
+        }
+    }
 
     private static class OutputQueueRecord extends 
AsyncQueueRecord<WriteResult>{
         private Source resource;
@@ -809,16 +842,19 @@ final class SpdyOutputSink {
         
         private boolean isLast;
         
+        private final boolean isAtomic;
+        
         public OutputQueueRecord(final Source resource,
                 final AggregatingCompletionHandler completionHandler,
                 final AggregatingLifeCycleHandler lifeCycleHandler,
-                final boolean isLast) {
+                final boolean isLast, final boolean isAtomic) {
             super(null, null, null, null);
             
             this.resource = resource;
             this.aggrCompletionHandler = completionHandler;
             this.lifeCycleHandler = lifeCycleHandler;
             this.isLast = isLast;
+            this.isAtomic = isAtomic;
         }
         
         private void incCompletionCounter() {
@@ -843,7 +879,7 @@ final class SpdyOutputSink {
             this.isLast = last;
         }
         
-        public void complete() {
+        public void release() {
             if (resource != null) {
                 resource.release();
                 resource = null;
@@ -859,7 +895,7 @@ final class SpdyOutputSink {
                     chLocal.failed(e);
                 }
             } finally {
-                complete();
+                release();
             }
         }
         





[grizzly~git:c7ff6fcd] [master] + implement task # #1512

oleksiys 05/21/2013
Terms of Use; Privacy Policy; Copyright ©2013-2015 (revision 20150626.29986a4)
 
 
Close
loading
Please Confirm
Close