Skip to main content

[xadisk~svn:538] Changes for JIRA issue #76.

  • From: nitin_verma@...
  • To: commits@...
  • Subject: [xadisk~svn:538] Changes for JIRA issue #76.
  • Date: Sun, 18 Aug 2013 19:24:54 +0000

Project:    xadisk
Repository: svn
Revision:   538
Author:     nitin_verma
Date:       2013-08-18 19:24:53 UTC
Link:       

Log Message:
------------
Changes for JIRA issue #76.


Revisions:
----------
538


Modified Paths:
---------------
trunk/src/org/xadisk/bridge/proxies/interfaces/XAFileSystem.java
trunk/src/org/xadisk/filesystem/NativeXAFileSystem.java
trunk/src/org/xadisk/filesystem/TransactionInformation.java
trunk/src/org/xadisk/connector/outbound/XADiskUserLocalTransaction.java
trunk/src/org/xadisk/filesystem/NativeSession.java
trunk/src/org/xadisk/connector/outbound/XADiskLocalTransaction.java
trunk/src/org/xadisk/filesystem/TransactionLogEntry.java
trunk/src/org/xadisk/bridge/proxies/interfaces/Session.java
trunk/src/org/xadisk/filesystem/SessionCommonness.java
trunk/src/org/xadisk/connector/XAResourceImpl.java
trunk/src/org/xadisk/filesystem/utilities/FileIOUtility.java
trunk/src/org/xadisk/filesystem/workers/CrashRecoveryWorker.java
trunk/src/org/xadisk/bridge/proxies/impl/RemoteSession.java
trunk/src/org/xadisk/bridge/proxies/impl/RemoteXAFileSystem.java


Added Paths:
------------
trunk/src/org/xadisk/filesystem/exceptions/TransactionFailedException.java
trunk/src/org/xadisk/filesystem/exceptions/XASystemIOException.java


Diffs:
------
Index: trunk/src/org/xadisk/filesystem/NativeSession.java
===================================================================
--- trunk/src/org/xadisk/filesystem/NativeSession.java  (revision 537)
+++ trunk/src/org/xadisk/filesystem/NativeSession.java  (revision 538)
@@ -33,9 +33,11 @@
 import 
org.xadisk.filesystem.exceptions.InsufficientPermissionOnFileException;
 import org.xadisk.filesystem.exceptions.LockingFailedException;
 import org.xadisk.filesystem.exceptions.NoTransactionAssociatedException;
+import org.xadisk.filesystem.exceptions.TransactionFailedException;
 import org.xadisk.filesystem.exceptions.TransactionRolledbackException;
 import org.xadisk.filesystem.exceptions.TransactionTimeoutException;
 import org.xadisk.filesystem.exceptions.XASystemException;
+import org.xadisk.filesystem.exceptions.XASystemIOException;
 import org.xadisk.filesystem.exceptions.XASystemNoMoreAvailableException;
 import org.xadisk.filesystem.utilities.FileIOUtility;
 import org.xadisk.filesystem.utilities.MiscUtils;
@@ -126,6 +128,7 @@
             this.rollbackCause = rollbackCause;
         } catch (TransactionRolledbackException trbe) {
         } catch (NoTransactionAssociatedException note) {
+        } catch (TransactionFailedException tfe) {
         }
     }
 
@@ -607,7 +610,8 @@
         }
     }
 
-    public void commit(boolean onePhase) throws 
NoTransactionAssociatedException {
+    public void commit(boolean onePhase) throws 
NoTransactionAssociatedException,
+        TransactionFailedException {
         try {
             asynchronousRollbackLock.lock();
             checkIfCanContinue();
@@ -666,71 +670,80 @@
                     logReaderChannel.position(localPosition);
                     logEntry = 
TransactionLogEntry.getNextTransactionLogEntry(logReaderChannel, 
localPosition, false);
                 }
-                if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_APPEND) {
-                    File f = new File(logEntry.getFileName());
-                    if (filesDirectlyWrittenToDisk.contains(f)) {
-                        continue;
-                    }
-                    checkPointDuringModificationAgainstCopy(i - 2, f, 
srcFilesCopied, srcFilesMoved);
-                    commitFileAppend(logEntry, temp, logReaderChannel, 
logFileIndex, localPosition);
-                } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_DELETE) {
-                    String fileName = logEntry.getFileName();
-                    File f = new File(fileName);
-                    if (filesDirectlyWrittenToDisk.contains(f)) {
-                        continue;
-                    }
-                    checkPointDuringModificationAgainstCopy(i - 2, f, 
srcFilesCopied, srcFilesMoved);
-                    commitDeleteFile(fileName);
-                } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_CREATE) {
-                    String fileName = logEntry.getFileName();
-                    File f = new File(fileName);
-                    if (filesDirectlyWrittenToDisk.contains(f)) {
-                        continue;
-                    }
-                    checkPointDuringCreationAgainstMove(i - 2, f, 
srcFilesCopied, srcFilesMoved);
-                    commitCreateFile(fileName);
-                } else if (logEntry.getOperationType() == 
TransactionLogEntry.DIR_CREATE) {
-                    String dirName = logEntry.getFileName();
-                    checkPointDuringCreationAgainstMove(i - 2, new 
File(dirName), srcFilesCopied, srcFilesMoved);
-                    commitCreateDir(dirName);
-                } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_COPY) {
-                    File dest = new File(logEntry.getDestFileName());
-                    if (filesDirectlyWrittenToDisk.contains(dest)) {
-                        continue;
-                    }
-                    checkPointDuringCreationAgainstMove(i - 2, dest, 
srcFilesCopied, srcFilesMoved);
-                    commitFileCopy(logEntry, srcFilesCopied);
-                } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_MOVE) {
-                    File src = new File(logEntry.getFileName());
-                    File dest = new File(logEntry.getDestFileName());
-                    if (filesDirectlyWrittenToDisk.contains(dest)) {
-                        continue;
-                    }
-                    boolean isDirectoryMove = src.isDirectory();
-                    if(isDirectoryMove) {
-                        declareCheckPoint(i - 2, srcFilesCopied, 
srcFilesMoved);
-                        commitMove(logEntry);
-                        declareCheckPoint(i - 2, srcFilesCopied, 
srcFilesMoved);
-                    } else {
+                try {
+                    if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_APPEND) {
+                        File f = new File(logEntry.getFileName());
+                        if (filesDirectlyWrittenToDisk.contains(f)) {
+                            continue;
+                        }
+                        checkPointDuringModificationAgainstCopy(i - 2, f, 
srcFilesCopied, srcFilesMoved);
+                        commitFileAppend(logEntry, temp, logReaderChannel, 
logFileIndex, localPosition);
+                    } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_DELETE) {
+                        String fileName = logEntry.getFileName();
+                        File f = new File(fileName);
+                        if (filesDirectlyWrittenToDisk.contains(f)) {
+                            continue;
+                        }
+                        checkPointDuringModificationAgainstCopy(i - 2, f, 
srcFilesCopied, srcFilesMoved);
+                        commitDeleteFile(fileName);
+                    } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_CREATE) {
+                        String fileName = logEntry.getFileName();
+                        File f = new File(fileName);
+                        if (filesDirectlyWrittenToDisk.contains(f)) {
+                            continue;
+                        }
+                        checkPointDuringCreationAgainstMove(i - 2, f, 
srcFilesCopied, srcFilesMoved);
+                        commitCreateFile(fileName);
+                    } else if (logEntry.getOperationType() == 
TransactionLogEntry.DIR_CREATE) {
+                        String dirName = logEntry.getFileName();
+                        checkPointDuringCreationAgainstMove(i - 2, new 
File(dirName), srcFilesCopied, srcFilesMoved);
+                        commitCreateDir(dirName);
+                    } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_COPY) {
+                        File dest = new File(logEntry.getDestFileName());
+                        if (filesDirectlyWrittenToDisk.contains(dest)) {
+                            continue;
+                        }
+                        checkPointDuringCreationAgainstMove(i - 2, dest, 
srcFilesCopied, srcFilesMoved);
+                        commitFileCopy(logEntry, srcFilesCopied);
+                    } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_MOVE) {
+                        File src = new File(logEntry.getFileName());
+                        File dest = new File(logEntry.getDestFileName());
+                        if (filesDirectlyWrittenToDisk.contains(dest)) {
+                            continue;
+                        }
+                        boolean isDirectoryMove = src.isDirectory();
+                        if(isDirectoryMove) {
+                            declareCheckPoint(i - 2, srcFilesCopied, 
srcFilesMoved);
+                            commitMove(logEntry);
+                            declareCheckPoint(i - 2, srcFilesCopied, 
srcFilesMoved);
+                        } else {
+                            if(!checkPointDuringModificationAgainstCopy(i - 
2, src, srcFilesCopied, srcFilesMoved)) {
+                                checkPointDuringCreationAgainstMove(i - 2, 
dest, srcFilesCopied, srcFilesMoved);
+                            }
+                            commitFileMove(logEntry, srcFilesMoved);
+                        }
+                    } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_TRUNCATE) {
+                        File f = new File(logEntry.getFileName());
+                        if (filesDirectlyWrittenToDisk.contains(f)) {
+                            continue;
+                        }
+                        checkPointDuringModificationAgainstCopy(i - 2, f, 
srcFilesCopied, srcFilesMoved);
+                        commitFileTruncate(logEntry);
+                    } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_SPECIAL_MOVE) {
+                        File src = new File(logEntry.getFileName());
+                        File dest = new File(logEntry.getDestFileName());
                         if(!checkPointDuringModificationAgainstCopy(i - 2, 
src, srcFilesCopied, srcFilesMoved)) {
                             checkPointDuringCreationAgainstMove(i - 2, dest, 
srcFilesCopied, srcFilesMoved);
                         }
-                        commitFileMove(logEntry, srcFilesMoved);
+                        commitFileSpecialMove(logEntry, srcFilesMoved);
                     }
-                } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_TRUNCATE) {
-                    File f = new File(logEntry.getFileName());
-                    if (filesDirectlyWrittenToDisk.contains(f)) {
-                        continue;
-                    }
-                    checkPointDuringModificationAgainstCopy(i - 2, f, 
srcFilesCopied, srcFilesMoved);
-                    commitFileTruncate(logEntry);
-                } else if (logEntry.getOperationType() == 
TransactionLogEntry.FILE_SPECIAL_MOVE) {
-                    File src = new File(logEntry.getFileName());
-                    File dest = new File(logEntry.getDestFileName());
-                    if(!checkPointDuringModificationAgainstCopy(i - 2, src, 
srcFilesCopied, srcFilesMoved)) {
-                        checkPointDuringCreationAgainstMove(i - 2, dest, 
srcFilesCopied, srcFilesMoved);
-                    }
-                    commitFileSpecialMove(logEntry, srcFilesMoved);
+                } catch(XASystemIOException xasioe) {
+                    throw (IOException) xasioe.getCause();
+                } catch(IOException ioe) {
+                    //all these ioexceptions will be transaction specific 
(file_append just
+                    //reads from the txn-log) and so would not affect the 
system.
+                    xaFileSystem.notifyTransactionFailure(xid);
+                    throw new TransactionFailedException(ioe, xid);
                 }
             }
             diskSession.forceToDisk();
@@ -767,8 +780,12 @@
 
     private void declareCheckPoint(int currentLogPosition, HashSet<File> 
srcFilesCopied, HashSet<File> srcFilesMoved) throws IOException {
         diskSession.forceToDisk();
-        ByteBuffer logEntryBytes = 
ByteBuffer.wrap(TransactionLogEntry.getLogEntry(xid, currentLogPosition));
-        xaFileSystem.getTheGatheringDiskWriter().forceLog(logEntryBytes);
+        try {
+            ByteBuffer logEntryBytes = 
ByteBuffer.wrap(TransactionLogEntry.getLogEntry(xid, currentLogPosition));
+            xaFileSystem.getTheGatheringDiskWriter().forceLog(logEntryBytes);
+        } catch(IOException ioe) {
+            throw new XASystemIOException(ioe);
+        }
         srcFilesMoved.clear();
         srcFilesCopied.clear();
     }
@@ -790,6 +807,17 @@
         }
     }
 
+    void completeTheTransaction() {
+        try {
+            asynchronousRollbackLock.lock();
+            cleanup();
+        } catch(IOException ioe) {
+            xaFileSystem.notifySystemFailure(ioe);
+        } finally {
+            asynchronousRollbackLock.unlock();
+        }
+    }
+    
     private void commitFileAppend(TransactionLogEntry logEntry, ByteBuffer 
inMemoryLogEntry,
             FileChannel logReaderChannel, int logFileIndex, long 
localPosition)
             throws IOException {
@@ -907,7 +935,7 @@
         }
     }
 
-    public void rollback() throws NoTransactionAssociatedException {
+    public void rollback() throws NoTransactionAssociatedException, 
TransactionFailedException {
         try {
             asynchronousRollbackLock.lock();
             checkIfCanContinue();
@@ -959,39 +987,44 @@
                 }
 
                 FileOutputStream fos;
-                if (logEntry.getOperationType() == 
TransactionLogEntry.UNDOABLE_FILE_TRUNCATE) {
-                    String fileName = logEntry.getFileName();
-                    fos = new FileOutputStream(fileName, true);
-                    long contentLength = logEntry.getFileContentLength();
-                    FileChannel fc = fos.getChannel();
-                    if (logFileIndex == -1) {
-                    } else {
-                        logReaderChannel.position(localPosition + 
logEntry.getHeaderLength());
-                        long num = 0;
-                        if (logEntry.getFilePosition() <= fc.size()) {
-                            while (num < contentLength) {
-                                num += fc.transferFrom(logReaderChannel, num 
+ logEntry.getFilePosition(),
-                                        
NativeXAFileSystem.maxTransferToChannel(contentLength - num));
+                try {
+                    if (logEntry.getOperationType() == 
TransactionLogEntry.UNDOABLE_FILE_TRUNCATE) {
+                        String fileName = logEntry.getFileName();
+                        fos = new FileOutputStream(fileName, true);
+                        long contentLength = logEntry.getFileContentLength();
+                        FileChannel fc = fos.getChannel();
+                        if (logFileIndex == -1) {
+                        } else {
+                            logReaderChannel.position(localPosition + 
logEntry.getHeaderLength());
+                            long num = 0;
+                            if (logEntry.getFilePosition() <= fc.size()) {
+                                while (num < contentLength) {
+                                    num += fc.transferFrom(logReaderChannel, 
num + logEntry.getFilePosition(),
+                                            
NativeXAFileSystem.maxTransferToChannel(contentLength - num));
+                                }
                             }
                         }
+                        fc.force(false);//improve this. force for every 
piece of content? (same in commit method).
+                        fc.close();
+                    } else if (logEntry.getOperationType() == 
TransactionLogEntry.UNDOABLE_FILE_APPEND) {
+                        String fileName = logEntry.getFileName();
+                        fos = new FileOutputStream(fileName, true);
+                        FileChannel fc = fos.getChannel();
+                        fc.truncate(logEntry.getNewLength());
+                        fc.force(false);//the file length may be part of 
meta-data (not sure). Make "true"?
+                        fc.close();
                     }
-                    fc.force(false);//improve this. force for every piece of 
content? (same in commit method).
-                    fc.close();
-                } else if (logEntry.getOperationType() == 
TransactionLogEntry.UNDOABLE_FILE_APPEND) {
-                    String fileName = logEntry.getFileName();
-                    fos = new FileOutputStream(fileName, true);
-                    FileChannel fc = fos.getChannel();
-                    fc.truncate(logEntry.getNewLength());
-                    fc.force(false);//the file length may be part of 
meta-data (not sure). Make "true"?
-                    fc.close();
+                } catch(IOException ioe) {
+                    //all these ioexceptions will be transaction specific 
(file_append just
+                    //reads from the txn-log) and so would not affect the 
system.
+                    xaFileSystem.notifyTransactionFailure(xid);
+                    throw new TransactionFailedException(ioe, xid);
                 }
-
             }
             
xaFileSystem.getTheGatheringDiskWriter().transactionCompletes(xid, false);
             for (FileChannel logChannel : logReaderChannels.values()) {
                 logChannel.close();
             }
-
             cleanup();
         } catch (IOException ioe) {
             xaFileSystem.notifySystemFailure(ioe);
@@ -1261,7 +1294,7 @@
         }
     }
 
-    public void commit() throws NoTransactionAssociatedException {
+    public void commit() throws NoTransactionAssociatedException, 
TransactionFailedException {
         this.commit(true);
     }
 
Index: trunk/src/org/xadisk/filesystem/TransactionInformation.java
===================================================================
--- trunk/src/org/xadisk/filesystem/TransactionInformation.java (revision 537)
+++ trunk/src/org/xadisk/filesystem/TransactionInformation.java (revision 538)
@@ -116,6 +116,19 @@
         return new TransactionInformation(tidBuffer);
     }
 
+    public byte[] getBytes() {
+        ByteBuffer temp = ByteBuffer.allocate(1 + 1 + 4 + gid.length + 
bqual.length);
+        temp.put((byte) gid.length);
+        temp.put((byte) bqual.length);
+        temp.putInt(formatId);
+        temp.put(gid);
+        temp.put(bqual);
+        byte bytes[] = new byte[temp.capacity()];
+        temp.flip();
+        temp.get(bytes);
+        return bytes;
+    }
+
     public NativeSession getOwningSession() {
         return owningSession;
     }
Index: trunk/src/org/xadisk/filesystem/utilities/FileIOUtility.java
===================================================================
--- trunk/src/org/xadisk/filesystem/utilities/FileIOUtility.java        
(revision 537)
+++ trunk/src/org/xadisk/filesystem/utilities/FileIOUtility.java        
(revision 538)
@@ -24,8 +24,9 @@
     public static void renameTo(File src, File dest) throws IOException {
         if (!src.renameTo(dest)) {
             if (renamePossible(src, dest)) {
+                int retryCount = 1;
                 while (!src.renameTo(dest)) {
-                                       makeSpaceForGC();
+                                       doGCBeforeRetry(retryCount++, src);
                     if (src.renameTo(dest)) {
                         break;
                     }
@@ -56,8 +57,9 @@
         if (!f.exists()) {
             throw new IOException("File does not exist.");
         }
+        int retryCount = 1;
         while (!f.delete()) {
-                       makeSpaceForGC();
+                       doGCBeforeRetry(retryCount++, f);
         }
     }
 
@@ -90,8 +92,9 @@
         if (!f.getParentFile().canWrite()) {
             throw new IOException("Parent directory not writable.");
         }
+        int retryCount = 1;
         while (!f.createNewFile()) {
-                       makeSpaceForGC();
+                       doGCBeforeRetry(retryCount++, f);
         }
     }
 
@@ -105,8 +108,9 @@
         if (!dir.getParentFile().canWrite()) {
             throw new IOException("Parent directory not writable.");
         }
+        int retryCount = 1;
         while (!dir.mkdir()) {
-                       makeSpaceForGC();
+                       doGCBeforeRetry(retryCount++, dir);
         }
     }
 
@@ -126,20 +130,21 @@
             throw new IOException("The directory is not readable.");
         }
         String children[] = dir.list();
+        int retryCount = 1;
         while (children == null) {
-                       makeSpaceForGC();
+                       doGCBeforeRetry(retryCount++, dir);
             children = dir.list();
         }
         return children;
     }
 
-    private static void makeSpaceForGC() throws IOException {
-        /**
-         * I know that this mechanism of doing gc when file operations fail 
is weird. But I had no other option than to use
-         * this workaround which would get triggered very very rarely (when 
that jvm bug gets triggered). Things like
-         * not closing channel/stream are ususally the cause for file 
delete/rename failure, but a check over the
-         * complete xadisk code confirms that the cause here is something 
else (a jvm bug).
-        **/
+    private static void doGCBeforeRetry(int retryCount, File f) throws 
IOException {
+        if(retryCount == 5) {
+            throw new IOException("The i/o operation could not be completed 
for "
+                + "the file/directory with path [" + f.getAbsolutePath() + 
"] due "
+                + "to an unknown reason.");
+        }
+        
         System.gc();
         System.gc();
         System.gc();
Index: 
trunk/src/org/xadisk/filesystem/exceptions/TransactionFailedException.java
===================================================================
--- 
trunk/src/org/xadisk/filesystem/exceptions/TransactionFailedException.java  
(revision 0)
+++ 
trunk/src/org/xadisk/filesystem/exceptions/TransactionFailedException.java  
(revision 538)
@@ -0,0 +1,19 @@
+package org.xadisk.filesystem.exceptions;
+
+import org.xadisk.filesystem.TransactionInformation;
+
+public class TransactionFailedException extends XAApplicationException {
+
+    private static final long serialVersionUID = 1L;
+
+    private byte[] transactionIdentifier;
+
+    public TransactionFailedException(Throwable cause, 
TransactionInformation xid) {
+        super(cause);
+        this.transactionIdentifier = xid.getBytes();
+    }
+
+    public byte[] getTransactionIdentifier() {
+        return transactionIdentifier;
+    }
+}
Index: trunk/src/org/xadisk/filesystem/exceptions/XASystemIOException.java
===================================================================
--- trunk/src/org/xadisk/filesystem/exceptions/XASystemIOException.java 
(revision 0)
+++ trunk/src/org/xadisk/filesystem/exceptions/XASystemIOException.java 
(revision 538)
@@ -0,0 +1,10 @@
+package org.xadisk.filesystem.exceptions;
+
+public class XASystemIOException extends XASystemException {
+
+    private static final long serialVersionUID = 1L;
+    
+    public XASystemIOException(Throwable cause) {
+        super(cause);
+    }
+}
Index: trunk/src/org/xadisk/filesystem/TransactionLogEntry.java
===================================================================
--- trunk/src/org/xadisk/filesystem/TransactionLogEntry.java    (revision 537)
+++ trunk/src/org/xadisk/filesystem/TransactionLogEntry.java    (revision 538)
@@ -88,7 +88,7 @@
         buffer.putInt(0);
         buffer.putInt((int) fileContentLength);
         buffer.put(appendOrUndoTruncate);
-        buffer.put(serializeXid(xid));
+        buffer.put(xid.getBytes());
 
         buffer.putInt(filePathLength);
         buffer.put(filePathBytes);
@@ -110,7 +110,7 @@
         buffer.putInt(0);
         buffer.putInt(0);
         buffer.put(createFileOrDirOrDeleteOrUndoCreate);
-        buffer.put(serializeXid(xid));
+        buffer.put(xid.getBytes());
 
         buffer.putInt(filePathLength);
         buffer.put(filePathBytes);
@@ -131,7 +131,7 @@
         buffer.putInt(0);
         buffer.putInt(0);
         buffer.put(truncateOrUndoAppend);
-        buffer.put(serializeXid(xid));
+        buffer.put(xid.getBytes());
 
         buffer.putInt(filePathLength);
         buffer.put(filePathBytes);
@@ -156,7 +156,7 @@
         buffer.putInt(0);
         buffer.putInt(0);
         buffer.put(moveOrCopyOrUndoDelete);
-        buffer.put(serializeXid(xid));
+        buffer.put(xid.getBytes());
         buffer.putInt(srcFilePathLength);
         buffer.put(sourceFilePathBytes);
 
@@ -176,7 +176,7 @@
         buffer.putInt(0);
         buffer.putInt(0);
         buffer.put(commitStatus);
-        buffer.put(serializeXid(xid));
+        buffer.put(xid.getBytes());
 
         buffer.putInt(0, buffer.position());
 
@@ -191,7 +191,7 @@
         buffer.putInt(0);
         buffer.putInt(0);
         buffer.put(CHECKPOINT_AVOIDING_COPY_OR_MOVE_REDO);
-        buffer.put(serializeXid(xid));
+        buffer.put(xid.getBytes());
         buffer.putInt(checkPointPosition);
 
         buffer.putInt(0, buffer.position());
@@ -218,7 +218,7 @@
         buffer.putInt(0);
         buffer.putInt(0);
         buffer.put(FILES_ALREADY_ONDISK);
-        buffer.put(serializeXid(xid));
+        buffer.put(xid.getBytes());
         buffer.putInt(files.size());
         for (i = 0; i < filePathsBytes.length; i++) {
             buffer.putInt(filePathsBytes[i].length);
@@ -244,7 +244,7 @@
         buffer.putInt(0);
         buffer.putInt(0);
         buffer.put(enQ_deQ_prepareDequeue);
-        buffer.put(serializeXid(xid));
+        buffer.put(xid.getBytes());
         buffer.putInt(events.size());
         for (int i = 0; i < eventsBytes.length; i++) {
             buffer.put(eventsBytes[i]);
@@ -321,7 +321,7 @@
         temp.operationType = buffer.get();
         if (temp.operationType != REMOTE_ENDPOINT_ACTIVATES
                 && temp.operationType != REMOTE_ENDPOINT_DEACTIVATES) {
-            temp.xid = deSerializeXid(buffer);
+            temp.xid = new TransactionInformation(buffer);
         }
 
         if (temp.operationType == FILE_APPEND || temp.operationType == 
UNDOABLE_FILE_TRUNCATE) {
@@ -372,7 +372,7 @@
     private static byte[] getBytesFromEvent(FileSystemStateChangeEvent 
event) {
         byte fileNameBytes[] = 
getUTF8Bytes(event.getFile().getAbsolutePath());
         ByteBuffer buffer = ByteBuffer.allocate(fileNameBytes.length + 200);
-        buffer.put(serializeXid(event.getEnqueuingTransaction()));
+        buffer.put(event.getEnqueuingTransaction().getBytes());
         buffer.put(event.getEventType().getByteValue());
         buffer.putInt(fileNameBytes.length);
         buffer.put(fileNameBytes);
@@ -384,7 +384,7 @@
     }
 
     private static FileSystemStateChangeEvent readEvent(ByteBuffer buffer) {
-        TransactionInformation enqueuingTransaction = deSerializeXid(buffer);
+        TransactionInformation enqueuingTransaction = new 
TransactionInformation(buffer);
         byte eventType = buffer.get();
         int fileNameLength = buffer.getInt();
         byte fileNameBytes[] = new byte[fileNameLength];
@@ -463,25 +463,6 @@
         return checkPointPosition;
     }
 
-    static byte[] serializeXid(TransactionInformation xid) {
-        byte[] gid = xid.getGlobalTransactionId();
-        byte[] bqual = xid.getBranchQualifier();
-        ByteBuffer temp = ByteBuffer.allocate(1 + 1 + 4 + gid.length + 
bqual.length);
-        temp.put((byte) gid.length);
-        temp.put((byte) bqual.length);
-        temp.putInt(xid.getFormatId());
-        temp.put(gid);
-        temp.put(bqual);
-        byte bytes[] = new byte[temp.capacity()];
-        temp.flip();
-        temp.get(bytes);
-        return bytes;
-    }
-
-    static TransactionInformation deSerializeXid(ByteBuffer buffer) {
-        return new TransactionInformation(buffer);
-    }
-
     public static void updateContentLength(ByteBuffer buffer, int 
contentLength) {
         buffer.putInt(4, contentLength);
     }
@@ -506,7 +487,7 @@
             TransactionLogEntry logEntry = new TransactionLogEntry();
             logEntry.headerLength = header.getInt();
             logEntry.fileContentLength = header.getInt();
-            logEntry.xid = deSerializeXid(header);
+            logEntry.xid = new TransactionInformation(header);
             logEntry.operationType = header.get();
 
             logChannel.position(position + logEntry.headerLength + 
logEntry.fileContentLength);
Index: trunk/src/org/xadisk/filesystem/workers/CrashRecoveryWorker.java
===================================================================
--- trunk/src/org/xadisk/filesystem/workers/CrashRecoveryWorker.java    
(revision 537)
+++ trunk/src/org/xadisk/filesystem/workers/CrashRecoveryWorker.java    
(revision 538)
@@ -29,6 +29,7 @@
 import org.xadisk.filesystem.TransactionLogEntry;
 import org.xadisk.filesystem.TransactionInformation;
 import org.xadisk.filesystem.exceptions.NoTransactionAssociatedException;
+import org.xadisk.filesystem.exceptions.TransactionFailedException;
 import org.xadisk.filesystem.utilities.TransactionLogsUtility;
 
 public class CrashRecoveryWorker implements Work {
@@ -343,6 +344,7 @@
                     session.rollback();
                 }
             } catch (NoTransactionAssociatedException note) {
+            } catch (TransactionFailedException tfe) {
             }
         }
     }
Index: trunk/src/org/xadisk/filesystem/NativeXAFileSystem.java
===================================================================
--- trunk/src/org/xadisk/filesystem/NativeXAFileSystem.java     (revision 537)
+++ trunk/src/org/xadisk/filesystem/NativeXAFileSystem.java     (revision 538)
@@ -22,10 +22,12 @@
 import org.xadisk.filesystem.workers.TransactionTimeoutDetector;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.resource.spi.work.Work;
@@ -89,6 +91,8 @@
     private final ConcurrencyControl concurrencyControl;
     private final boolean handleGeneralRemoteInvocations;
     private final boolean handleClusterRemoteInvocations;
+    private final ConcurrentLinkedQueue<TransactionInformation> 
failedTransactions =
+            new ConcurrentLinkedQueue<TransactionInformation>();
     
     //fix for bug XADISK-85 and potentially similar ones.
     public static final int FILE_CHANNEL_MAX_TRANSFER = 1024 * 1024 * 8;
@@ -344,6 +348,38 @@
         return xids;
     }
 
+    public void notifyTransactionFailure(TransactionInformation xid) {
+        failedTransactions.add(xid);
+    }
+    
+    public byte[][] getIdentifiersForFailedTransactions() {
+        TransactionInformation identifiers[] = 
failedTransactions.toArray(new TransactionInformation[0]);
+        byte identifiersBytes[][] = new byte[identifiers.length][];
+        int i = 0;
+        for(TransactionInformation identifier: identifiers) {
+            identifiersBytes[i++] = identifier.getBytes();
+        }
+        return identifiersBytes;
+    }
+    
+    public void declareTransactionAsComplete(byte[] transactionIdentifier) {
+        try {
+            TransactionInformation xid = new 
TransactionInformation(ByteBuffer.wrap(transactionIdentifier));
+            gatheringDiskWriter.transactionCompletes(xid, true);
+            NativeSession session = transactionAndSession.get(xid);
+            if(session != null) {
+                //the xadisk has not gone down after failure.
+                session.completeTheTransaction();
+            } else {
+                //case of recovery going on.
+                recoveryWorker.cleanupTransactionInfo(xid);
+            }
+            failedTransactions.remove(xid);
+        } catch(IOException ioe) {
+            notifySystemFailure(ioe);
+        }
+    }
+    
     public BufferPool getBufferPool() {
         return bufferPool;
     }
Index: trunk/src/org/xadisk/filesystem/SessionCommonness.java
===================================================================
--- trunk/src/org/xadisk/filesystem/SessionCommonness.java      (revision 537)
+++ trunk/src/org/xadisk/filesystem/SessionCommonness.java      (revision 538)
@@ -10,10 +10,11 @@
 
 import org.xadisk.bridge.proxies.interfaces.Session;
 import org.xadisk.filesystem.exceptions.NoTransactionAssociatedException;
+import org.xadisk.filesystem.exceptions.TransactionFailedException;
 
 public interface SessionCommonness extends Session {
 
-    public void commit(boolean onePhase) throws 
NoTransactionAssociatedException;
+    public void commit(boolean onePhase) throws 
NoTransactionAssociatedException, TransactionFailedException;
     
     public void prepare() throws NoTransactionAssociatedException;
 
Index: trunk/src/org/xadisk/connector/outbound/XADiskUserLocalTransaction.java
===================================================================
--- trunk/src/org/xadisk/connector/outbound/XADiskUserLocalTransaction.java   
  (revision 537)
+++ trunk/src/org/xadisk/connector/outbound/XADiskUserLocalTransaction.java   
  (revision 538)
@@ -11,6 +11,7 @@
 import javax.resource.spi.ConnectionEvent;
 import org.xadisk.filesystem.exceptions.NoTransactionAssociatedException;
 import org.xadisk.filesystem.FileSystemConfiguration;
+import org.xadisk.filesystem.exceptions.TransactionFailedException;
 
 /**
  * This class is applicable only when invoking XADisk as a JCA Resource 
Adapter.
@@ -46,7 +47,8 @@
      * Commits the local transaction bound to this object.
      * @throws NoTransactionAssociatedException
      */
-    public void commitLocalTransaction() throws 
NoTransactionAssociatedException {
+    public void commitLocalTransaction() throws 
NoTransactionAssociatedException,
+        TransactionFailedException {
         localTxnImpl._commit();
         
mc.raiseUserLocalTransactionEvent(ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
     }
@@ -55,7 +57,8 @@
      * Rolls back the local transaction bound to this object.
      * @throws NoTransactionAssociatedException
      */
-    public void rollbackLocalTransaction() throws 
NoTransactionAssociatedException {
+    public void rollbackLocalTransaction() throws 
NoTransactionAssociatedException,
+        TransactionFailedException {
         localTxnImpl._rollback();
         
mc.raiseUserLocalTransactionEvent(ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
     }
Index: trunk/src/org/xadisk/connector/outbound/XADiskLocalTransaction.java
===================================================================
--- trunk/src/org/xadisk/connector/outbound/XADiskLocalTransaction.java 
(revision 537)
+++ trunk/src/org/xadisk/connector/outbound/XADiskLocalTransaction.java 
(revision 538)
@@ -12,6 +12,7 @@
 import javax.resource.spi.LocalTransaction;
 import org.xadisk.filesystem.XAFileSystemCommonness;
 import org.xadisk.filesystem.exceptions.NoTransactionAssociatedException;
+import org.xadisk.filesystem.exceptions.TransactionFailedException;
 
 public class XADiskLocalTransaction implements LocalTransaction {
 
@@ -33,7 +34,7 @@
         _begin();
     }
 
-    void _rollback() throws NoTransactionAssociatedException {
+    void _rollback() throws NoTransactionAssociatedException, 
TransactionFailedException {
         
mc.setTypeOfCurrentTransaction(XADiskManagedConnection.NO_TRANSACTION);
         mc.getSessionOfLocalTransaction().rollback();
     }
@@ -43,10 +44,12 @@
             _rollback();
         } catch (NoTransactionAssociatedException note) {
             throw new ResourceException(note);
+        } catch (TransactionFailedException tfe) {
+            throw new ResourceException(tfe);
         }
     }
 
-    void _commit() throws NoTransactionAssociatedException {
+    void _commit() throws NoTransactionAssociatedException, 
TransactionFailedException {
         
mc.setTypeOfCurrentTransaction(XADiskManagedConnection.NO_TRANSACTION);
         mc.getSessionOfLocalTransaction().commit();
     }
@@ -56,6 +59,8 @@
             _commit();
         } catch (NoTransactionAssociatedException note) {
             throw new ResourceException(note);
+        } catch (TransactionFailedException tfe) {
+            throw new ResourceException(tfe);
         }
     }
 
Index: trunk/src/org/xadisk/connector/XAResourceImpl.java
===================================================================
--- trunk/src/org/xadisk/connector/XAResourceImpl.java  (revision 537)
+++ trunk/src/org/xadisk/connector/XAResourceImpl.java  (revision 538)
@@ -18,6 +18,7 @@
 import org.xadisk.filesystem.XAFileSystemCommonness;
 import org.xadisk.filesystem.TransactionInformation;
 import org.xadisk.filesystem.exceptions.NoTransactionAssociatedException;
+import org.xadisk.filesystem.exceptions.TransactionFailedException;
 import org.xadisk.filesystem.exceptions.XASystemException;
 import org.xadisk.filesystem.utilities.MiscUtils;
 
@@ -96,7 +97,7 @@
             }
         } catch (NoTransactionAssociatedException note) {
             releaseFromInternalXidMap(xid);
-            throw 
MiscUtils.createXAExceptionWithCause(XAException.XA_RBROLLBACK, note);
+            throw 
MiscUtils.createXAExceptionWithCause(XAException.XAER_OUTSIDE, note);
         } catch (XASystemException xase) {
             releaseFromInternalXidMap(xid);
             throw 
MiscUtils.createXAExceptionWithCause(XAException.XAER_RMFAIL, xase);
@@ -111,8 +112,10 @@
         }
         try {
             sessionOfTransaction.rollback();
+        } catch(TransactionFailedException tfe) {
+            throw 
MiscUtils.createXAExceptionWithCause(XAException.XAER_RMERR, tfe);
         } catch (NoTransactionAssociatedException note) {
-            throw 
MiscUtils.createXAExceptionWithCause(XAException.XA_RBROLLBACK, note);
+            throw 
MiscUtils.createXAExceptionWithCause(XAException.XAER_OUTSIDE, note);
         } catch (XASystemException xase) {
             throw 
MiscUtils.createXAExceptionWithCause(XAException.XAER_RMFAIL, xase);
         } finally {
@@ -129,8 +132,10 @@
         }
         try {
             ((SessionCommonness)sessionOfTransaction).commit(onePhase);
+        } catch(TransactionFailedException tfe) {
+            throw 
MiscUtils.createXAExceptionWithCause(XAException.XAER_RMERR, tfe);
         } catch (NoTransactionAssociatedException note) {
-            throw 
MiscUtils.createXAExceptionWithCause(XAException.XA_RBROLLBACK, note);
+            throw 
MiscUtils.createXAExceptionWithCause(XAException.XAER_OUTSIDE, note);
         } catch (XASystemException xase) {
             throw 
MiscUtils.createXAExceptionWithCause(XAException.XAER_RMFAIL, xase);
         } finally {
Index: trunk/src/org/xadisk/bridge/proxies/impl/RemoteSession.java
===================================================================
--- trunk/src/org/xadisk/bridge/proxies/impl/RemoteSession.java (revision 537)
+++ trunk/src/org/xadisk/bridge/proxies/impl/RemoteSession.java (revision 538)
@@ -19,6 +19,7 @@
 import 
org.xadisk.filesystem.exceptions.InsufficientPermissionOnFileException;
 import org.xadisk.filesystem.exceptions.LockingFailedException;
 import org.xadisk.filesystem.exceptions.NoTransactionAssociatedException;
+import org.xadisk.filesystem.exceptions.TransactionFailedException;
 
 public class RemoteSession extends RemoteObjectProxy implements 
SessionCommonness {
 
@@ -356,25 +357,29 @@
         }
     }
 
-    public void commit() throws NoTransactionAssociatedException {
+    public void commit() throws NoTransactionAssociatedException, 
TransactionFailedException {
         this.commit(true);
     }
 
-    public void commit(boolean onePhase) throws 
NoTransactionAssociatedException {
+    public void commit(boolean onePhase) throws 
NoTransactionAssociatedException, TransactionFailedException {
         try {
             invokeRemoteMethod("commit", onePhase);
         } catch (NoTransactionAssociatedException note) {
             throw note;
+        } catch (TransactionFailedException tfe) {
+            throw tfe;
         } catch (Throwable t) {
             throw assertExceptionHandling(t);
         }
     }
 
-    public void rollback() throws NoTransactionAssociatedException {
+    public void rollback() throws NoTransactionAssociatedException, 
TransactionFailedException {
         try {
             invokeRemoteMethod("rollback");
         } catch (NoTransactionAssociatedException note) {
             throw note;
+        } catch (TransactionFailedException tfe) {
+            throw tfe;
         } catch (Throwable t) {
             throw assertExceptionHandling(t);
         }
Index: trunk/src/org/xadisk/bridge/proxies/impl/RemoteXAFileSystem.java
===================================================================
--- trunk/src/org/xadisk/bridge/proxies/impl/RemoteXAFileSystem.java    
(revision 537)
+++ trunk/src/org/xadisk/bridge/proxies/impl/RemoteXAFileSystem.java    
(revision 538)
@@ -115,6 +115,22 @@
         }
     }
 
+    public byte[][] getIdentifiersForFailedTransactions() {
+        try {
+            return (byte[][]) 
invokeRemoteMethod("getIdentifiersForFailedTransactions");
+        } catch (Throwable th) {
+            throw assertExceptionHandling(th);
+        }
+    }
+
+    public void declareTransactionAsComplete(byte[] transactionIdentifier) {
+        try {
+            invokeRemoteMethod("declareTransactionAsComplete", 
transactionIdentifier);
+        } catch (Throwable th) {
+            throw assertExceptionHandling(th);
+        }
+    }
+
     public void shutdown() {
         disconnect();
     }
Index: trunk/src/org/xadisk/bridge/proxies/interfaces/XAFileSystem.java
===================================================================
--- trunk/src/org/xadisk/bridge/proxies/interfaces/XAFileSystem.java    
(revision 537)
+++ trunk/src/org/xadisk/bridge/proxies/interfaces/XAFileSystem.java    
(revision 538)
@@ -83,4 +83,8 @@
      * @throws IOException
      */
     public void shutdown() throws IOException;
+
+    public byte[][] getIdentifiersForFailedTransactions();
+    
+    public void declareTransactionAsComplete(byte[] transactionIdentifier);
 }
Index: trunk/src/org/xadisk/bridge/proxies/interfaces/Session.java
===================================================================
--- trunk/src/org/xadisk/bridge/proxies/interfaces/Session.java (revision 537)
+++ trunk/src/org/xadisk/bridge/proxies/interfaces/Session.java (revision 538)
@@ -11,6 +11,7 @@
 import org.xadisk.connector.outbound.XADiskConnection;
 import org.xadisk.filesystem.FileSystemConfiguration;
 import org.xadisk.filesystem.exceptions.NoTransactionAssociatedException;
+import org.xadisk.filesystem.exceptions.TransactionFailedException;
 
 /**
  * This interface is used to invoke i/o operations on XADisk and to control 
the transaction 
@@ -50,11 +51,11 @@
      * Rolls back the transaction associated with this Session.
      * @throws NoTransactionAssociatedException
      */
-    public void rollback() throws NoTransactionAssociatedException;
+    public void rollback() throws NoTransactionAssociatedException, 
TransactionFailedException;
 
     /**
      * Commits the transaction associated with this Session.
      * @throws NoTransactionAssociatedException
      */
-    public void commit() throws NoTransactionAssociatedException;
+    public void commit() throws NoTransactionAssociatedException, 
TransactionFailedException;
 }





[xadisk~svn:538] Changes for JIRA issue #76.

nitin_verma 08/18/2013
 
 
Close
loading
Please Confirm
Close