[shoal~svn:1782] Merge Mahesh's commit of svn revision 1764 into glassfish3.1.2patch1 bran

  • From: jfialli@...
  • To: commits@...
  • Subject: [shoal~svn:1782] Merge Mahesh's commit of svn revision 1764 into glassfish3.1.2patch1 bran
  • Date: Fri, 18 May 2012 19:42:14 +0000

Project:    shoal
Repository: svn
Revision:   1782
Author:     jfialli
Date:       2012-05-18 19:42:11 UTC
Link:       

Log Message:
------------
Merge Mahesh's commit of svn revision 1764 into glassfish3.1.2patch1 branch 
to the trunk.

Commit comment was "Flush pending batched messages".
This addressed oracle bugdb issue 13459758 addressed in glassfish 3.1.2 patch 
1.




Revisions:
----------
1782


Modified Paths:
---------------
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterManager.java
trunk/cache/src/main/java/org/shoal/ha/cache/impl/command/CommandManager.java
trunk/cache/src/main/java/org/shoal/ha/cache/api/AbstractCommandInterceptor.java
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithMap.java
trunk/cache/src/main/java/org/shoal/ha/cache/api/DataStore.java
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithList.java
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/CommandCollector.java
trunk/cache/src/main/java/org/shoal/ha/cache/impl/store/ReplicatedDataStore.java
trunk/cache/src/main/java/org/shoal/ha/cache/api/DataStoreContext.java


Diffs:
------
Index: 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/store/ReplicatedDataStore.java
===================================================================
--- 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/store/ReplicatedDataStore.java
    (revision 1781)
+++ 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/store/ReplicatedDataStore.java
    (revision 1782)
@@ -62,6 +62,7 @@
 import java.lang.management.ManagementFactory;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -101,6 +102,8 @@
 
     private MBeanServer mbs;
     private ObjectName mbeanObjectName;
+    
+    private AtomicBoolean closed = new AtomicBoolean(false);
 
     public ReplicatedDataStore(DataStoreContext<K, V> conf, GroupService gs) 
{
         this.dsc = conf;
@@ -266,237 +269,276 @@
         }
     }
 
-
     @Override
     public String put(K k, V v)
-            throws DataStoreException {
+        throws DataStoreException {
         String result = "";
-        DataStoreEntry<K, V> entry = replicaStore.getOrCreateEntry(k);
-        synchronized (entry) {
-            if (!entry.isRemoved()) {
-                if (dsc.isCacheLocally()) {
-                    entry.setV(v);
-                }
-                KeyMapper keyMapper = dsc.getKeyMapper();
 
-                // fix for GLASSFISH-18085
-                String[] members = keyMapper.getCurrentMembers();
-                if (members.length == 0) {
-                    _saveLogger.log(Level.FINE, "Skipped replication of " + 
k + " since there is only one instance running in the cluster.");
-                    return result;
-                }
+        try {
+            dsc.acquireReadLock();
+            if (closed.get()) {
+                throw new DataStoreAlreadyClosedException("put() failed. 
Store " + dsc.getStoreName() + " already closed");
+            }
 
-                SaveCommand<K, V> cmd = 
dsc.getDataStoreEntryUpdater().createSaveCommand(entry, k, v);
-                cm.execute(cmd);
-                dscMBean.incrementSaveCount();
+            DataStoreEntry<K, V> entry = replicaStore.getOrCreateEntry(k);
+            synchronized (entry) {
+                if (!entry.isRemoved()) {
+                    if (dsc.isCacheLocally()) {
+                        entry.setV(v);
+                    }
+                    KeyMapper keyMapper = dsc.getKeyMapper();
 
-                String staleLocation = 
entry.setReplicaInstanceName(cmd.getTargetName());
+                    // fix for GLASSFISH-18085
+                    String[] members = keyMapper.getCurrentMembers();
+                    if (members.length == 0) {
+                        _saveLogger.log(Level.FINE, "Skipped replication of 
" + k + " since there is only one instance running in the cluster.");
+                        return result;
+                    }
 
-                result = cmd.getKeyMappingInfo();
+                    SaveCommand<K, V> cmd = 
dsc.getDataStoreEntryUpdater().createSaveCommand(entry, k, v);
+                    cm.execute(cmd);
+                    dscMBean.incrementSaveCount();
 
-                if ((staleLocation != null) && (! 
staleLocation.equals(cmd.getTargetName()))) {
-                    StaleCopyRemoveCommand<K, V> staleCmd = new 
StaleCopyRemoveCommand<K, V>(k);
-                    staleCmd.setStaleTargetName(staleLocation);
-                    cm.execute(staleCmd);
+                    String staleLocation = 
entry.setReplicaInstanceName(cmd.getTargetName());
+
+                    result = cmd.getKeyMappingInfo();
+
+                    if ((staleLocation != null) && (! 
staleLocation.equals(cmd.getTargetName()))) {
+                        StaleCopyRemoveCommand<K, V> staleCmd = new 
StaleCopyRemoveCommand<K, V>(k);
+                        staleCmd.setStaleTargetName(staleLocation);
+                        cm.execute(staleCmd);
+                    }
+                } else {
+                    _logger.log(Level.WARNING, "ReplicatedDataStore.put(" + 
k + ") AFTER remove?");
+                    return result;
                 }
-            } else {
-                _logger.log(Level.WARNING, "ReplicatedDataStore.put(" + k + 
") AFTER remove?");
-                return result;
             }
-        }
 
-        if (_saveLogger.isLoggable(Level.FINE)) {
-            _saveLogger.log(Level.FINE, debugName + " done save(" + k + ") 
to " + result);
+            if (_saveLogger.isLoggable(Level.FINE)) {
+                _saveLogger.log(Level.FINE, debugName + " done save(" + k + 
") to " + result);
+            }
+        } finally {
+            dsc.releaseReadLock();
         }
         return result;
     }
 
     @Override
     public V get(K key)
-            throws DataStoreException {
+        throws DataStoreException {
         dscMBean.incrementLoadCount();
         V v = null;
-        boolean foundLocally = false;
-        DataStoreEntry<K, V> entry = replicaStore.getEntry(key);
-        if (entry != null) {
-            if (!entry.isRemoved()) {
-                v = dsc.getDataStoreEntryUpdater().getV(entry);
-                if (v != null) {
-                    foundLocally = true;
-                    dscMBean.incrementLocalLoadSuccessCount();
-                    if (_loadLogger.isLoggable(Level.FINE)) {
-                        _loadLogger.log(Level.FINE, debugName + "load(" + key
+
+        try {
+            dsc.acquireReadLock();
+            if (closed.get()) {
+                throw new DataStoreAlreadyClosedException("get() failed. 
Store " + dsc.getStoreName() + " already closed");
+            }
+
+            dscMBean.incrementLoadCount();
+            boolean foundLocally = false;
+            DataStoreEntry<K, V> entry = replicaStore.getEntry(key);
+            if (entry != null) {
+                if (!entry.isRemoved()) {
+                    v = dsc.getDataStoreEntryUpdater().getV(entry);
+                    if (v != null) {
+                        foundLocally = true;
+                        dscMBean.incrementLocalLoadSuccessCount();
+                        if (_loadLogger.isLoggable(Level.FINE)) {
+                            _loadLogger.log(Level.FINE, debugName + "load(" 
+ key
                                 + "); FOUND IN LOCAL CACHE!!");
+                        }
                     }
+                } else {
+                    return null; //Because it is already removed
                 }
-            } else {
-                return null; //Because it is already removed
             }
-        }
 
-        if (v == null) {
-            KeyMapper keyMapper = dsc.getKeyMapper();
-            String replicachoices = 
keyMapper.getReplicaChoices(dsc.getGroupName(), key);
-            String[] replicaHint = replicachoices.split(":");
-            if (_loadLogger.isLoggable(Level.FINE)) {
-                _loadLogger.log(Level.FINE, debugName + "load(" + key
-                        + "); ReplicaChoices: " + replicachoices);
-            }
-
-            // fix for GLASSFISH-18085
-            String[] members = keyMapper.getCurrentMembers();
-            if (members.length == 0) {
-                _loadLogger.log(Level.FINE, "Skipped replication of " + key 
+ " since there is only one instance running in the cluster.");
-                return null;
-            }
-            String respondingInstance = null;
-            for (int replicaIndex = 0; (replicaIndex < replicaHint.length) 
&& (replicaIndex < MAX_REPLICA_TRIES); replicaIndex++) {
-                String target = replicaHint[replicaIndex];
-                if (target == null || target.trim().length() == 0 || 
target.equals(dsc.getInstanceName())) {
-                    continue;
-                }
-                LoadRequestCommand<K, V> command= new LoadRequestCommand<K, 
V>(key,
-                        entry == null ? DataStoreEntry.MIN_VERSION : 
entry.getVersion(), target);
+            if (v == null) {
+                KeyMapper keyMapper = dsc.getKeyMapper();
+                String replicachoices = 
keyMapper.getReplicaChoices(dsc.getGroupName(), key);
+                String[] replicaHint = replicachoices.split(":");
                 if (_loadLogger.isLoggable(Level.FINE)) {
                     _loadLogger.log(Level.FINE, debugName + "load(" + key
-                        + ") Trying to load from Replica[" + replicaIndex + 
"]: " + replicaHint[replicaIndex]);
+                        + "); ReplicaChoices: " + replicachoices);
                 }
 
-                cm.execute(command);
-                v = command.getResult(3, TimeUnit.SECONDS);
-                if (v != null) {
-                    respondingInstance = command.getRespondingInstanceName();
-                    dscMBean.incrementSimpleLoadSuccessCount();
-                    break;
+                // fix for GLASSFISH-18085
+                String[] members = keyMapper.getCurrentMembers();
+                if (members.length == 0) {
+                _loadLogger.log(Level.FINE, "Skipped replication of " + key 
+ " since there is only one instance running in the cluster.");
+                    return null;
                 }
-            }
-
-            if (v == null) {
-                if (_loadLogger.isLoggable(Level.FINE)) {
-                    _loadLogger.log(Level.FINE, debugName + "*load(" + key
-                        + ") Performing broadcast load");
-                }
-                String[] targetInstances = 
dsc.getKeyMapper().getCurrentMembers();
-                for (String targetInstance : targetInstances) {
-                    if (targetInstance.equals(dsc.getInstanceName())) {
+                String respondingInstance = null;
+                for (int replicaIndex = 0; (replicaIndex < 
replicaHint.length) && (replicaIndex < MAX_REPLICA_TRIES); replicaIndex++) {
+                    String target = replicaHint[replicaIndex];
+                    if (target == null || target.trim().length() == 0 || 
target.equals(dsc.getInstanceName())) {
                         continue;
                     }
-                    LoadRequestCommand<K, V> lrCmd = new 
LoadRequestCommand<K, V>(key,
-                            entry == null ? DataStoreEntry.MIN_VERSION : 
entry.getVersion(), targetInstance);
+                    LoadRequestCommand<K, V> command= new 
LoadRequestCommand<K, V>(key,
+                        entry == null ? DataStoreEntry.MIN_VERSION : 
entry.getVersion(), target);
                     if (_loadLogger.isLoggable(Level.FINE)) {
-                        _loadLogger.log(Level.FINE, debugName + "*load(" + 
key
-                            + ") Trying to load from " + targetInstance);
+                        _loadLogger.log(Level.FINE, debugName + "load(" + key
+                            + ") Trying to load from Replica[" + 
replicaIndex + "]: " + replicaHint[replicaIndex]);
                     }
 
-                    cm.execute(lrCmd);
-                    v = lrCmd.getResult(3, TimeUnit.SECONDS);
+                    cm.execute(command);
+                    v = command.getResult(3, TimeUnit.SECONDS);
                     if (v != null) {
-                        respondingInstance = targetInstance;
-                        dscMBean.incrementBroadcastLoadSuccessCount();
+                        respondingInstance = 
command.getRespondingInstanceName();
+                        dscMBean.incrementSimpleLoadSuccessCount();
                         break;
                     }
                 }
-            }
 
-            if (v != null) {
-                entry = replicaStore.getEntry(key);
-                if (entry != null) {
-                    synchronized (entry) {
-                        if (!entry.isRemoved()) {
-                            if (dsc.isCacheLocally()) {
-                                entry.setV(v);
-                            }
+                if (v == null) {
+                    if (_loadLogger.isLoggable(Level.FINE)) {
+                        _loadLogger.log(Level.FINE, debugName + "*load(" + 
key
+                            + ") Performing broadcast load");
+                    }
+                    String[] targetInstances = 
dsc.getKeyMapper().getCurrentMembers();
+                    for (String targetInstance : targetInstances) {
+                        if (targetInstance.equals(dsc.getInstanceName())) {
+                            continue;
+                        }
+                        LoadRequestCommand<K, V> lrCmd = new 
LoadRequestCommand<K, V>(key,
+                            entry == null ? DataStoreEntry.MIN_VERSION : 
entry.getVersion(), targetInstance);
+                        if (_loadLogger.isLoggable(Level.FINE)) {
+                            _loadLogger.log(Level.FINE, debugName + "*load(" 
+ key
+                                + ") Trying to load from " + targetInstance);
+                        }
 
-                            
entry.setLastAccessedAt(System.currentTimeMillis());
-                            entry.setReplicaInstanceName(respondingInstance);
-                            //Note: Do not remove the stale replica now. We 
will
-                            //  do that in save
-                            if (_loadLogger.isLoggable(Level.FINE)) {
-                                _loadLogger.log(Level.FINE, debugName + 
"load(" + key
+                        cm.execute(lrCmd);
+                        v = lrCmd.getResult(3, TimeUnit.SECONDS);
+                        if (v != null) {
+                            respondingInstance = targetInstance;
+                            dscMBean.incrementBroadcastLoadSuccessCount();
+                            break;
+                        }
+                    }
+                }
+
+                if (v != null) {
+                    entry = replicaStore.getEntry(key);
+                    if (entry != null) {
+                        synchronized (entry) {
+                            if (!entry.isRemoved()) {
+                                if (dsc.isCacheLocally()) {
+                                    entry.setV(v);
+                                }
+
+                                
entry.setLastAccessedAt(System.currentTimeMillis());
+                                
entry.setReplicaInstanceName(respondingInstance);
+                                //Note: Do not remove the stale replica now. 
We will
+                                //  do that in save
+                                if (_loadLogger.isLoggable(Level.FINE)) {
+                                    _loadLogger.log(Level.FINE, debugName + 
"load(" + key
                                         + "; Successfully loaded data from " 
+ respondingInstance);
-                            }
+                                }
 
-                            dscMBean.incrementLoadSuccessCount();
-                        } else {
-                            if (_loadLogger.isLoggable(Level.FINE)) {
-                                _loadLogger.log(Level.FINE, debugName + 
"load(" + key
+                                dscMBean.incrementLoadSuccessCount();
+                            } else {
+                                if (_loadLogger.isLoggable(Level.FINE)) {
+                                    _loadLogger.log(Level.FINE, debugName + 
"load(" + key
                                         + "; Got data from " + 
respondingInstance + ", but another concurrent thread removed the entry");
+                                }
+                                dscMBean.incrementLoadFailureCount();
                             }
-                            dscMBean.incrementLoadFailureCount();
                         }
                     }
+                } else {
+                    dscMBean.incrementLoadFailureCount();
                 }
-            } else {
-                dscMBean.incrementLoadFailureCount();
             }
-        }
 
-        if (_loadLogger.isLoggable(Level.FINE)) {
-            _loadLogger.log(Level.FINE, debugName + "load(" + key + ") Final 
result: " + v);
-        }
+            if (_loadLogger.isLoggable(Level.FINE)) {
+                _loadLogger.log(Level.FINE, debugName + "load(" + key + ") 
Final result: " + v);
+            }
 
-        if ((v != null) && foundLocally) {
-            //Because we did a successful load, to ensure that the data 
lives in another instance
-            //  lets do a save
+            if ((v != null) && foundLocally) {
+                //Because we did a successful load, to ensure that the data 
lives in another instance
+                //  lets do a save
 
-            try {
-                String secondaryReplica = put(key, v);
-                if (_logger.isLoggable(Level.FINE)) {
-                    _saveLogger.log(Level.FINE, "(SaveOnLoad) Saved the data 
to replica: "  + secondaryReplica);
+                try {
+                    String secondaryReplica = put(key, v);
+                    if (_logger.isLoggable(Level.FINE)) {
+                        _saveLogger.log(Level.FINE, "(SaveOnLoad) Saved the 
data to replica: "  + secondaryReplica);
+                    }
+                } catch (DataStoreException dsEx) {
+                    _saveLogger.log(Level.WARNING, "(SaveOnLoad) Failed to 
save data after a load", dsEx);
                 }
-            } catch (DataStoreException dsEx) {
-                _saveLogger.log(Level.WARNING, "(SaveOnLoad) Failed to save 
data after a load", dsEx);
+
             }
-
+            return v;
+        } finally {
+            dsc.releaseReadLock();
         }
-        return v;
     }
 
     @Override
     public void remove(K k)
-            throws DataStoreException {
+        throws DataStoreException {
 
-        if (_logger.isLoggable(Level.FINE)) {
-               _logger.log(Level.FINE, "DataStore.remove(" + k + ") CALLED 
****");
-       }
+        try {
+            dsc.acquireReadLock();
+            if (closed.get()) {
+                throw new DataStoreAlreadyClosedException("remove() failed. 
Store " + dsc.getStoreName() + " already closed");
+            }
 
-        replicaStore.remove(k);
-        dscMBean.incrementRemoveCount();
-        
-        String[] targets = dsc.getKeyMapper().getCurrentMembers();
+            if (_logger.isLoggable(Level.FINE)) {
+                _logger.log(Level.FINE, "DataStore.remove(" + k + ") CALLED 
****");
+            }
 
-        if (targets != null) {
-            for (String target : targets) {
-                RemoveCommand<K, V> cmd = new RemoveCommand<K, V>(k);
-                cmd.setTarget(target);
-                cm.execute(cmd);
+            replicaStore.remove(k);
+            dscMBean.incrementRemoveCount();
+
+            String[] targets = dsc.getKeyMapper().getCurrentMembers();
+
+            if (targets != null) {
+                for (String target : targets) {
+                    RemoveCommand<K, V> cmd = new RemoveCommand<K, V>(k);
+                    cmd.setTarget(target);
+                    cm.execute(cmd);
+                }
             }
+        } finally {
+            dsc.releaseReadLock();
         }
-
     }
 
     @Override
     public String touch(K k, long version, long ts, long ttl)
-            throws DataStoreException {
+        throws DataStoreException {
         String location = "";
-        DataStoreEntry<K, V> entry = replicaStore.getEntry(k);
-        if (entry != null) {
-            synchronized (entry) {
-                long now = System.currentTimeMillis();
-                entry.setLastAccessedAt(now);
-                String target = entry.getReplicaInstanceName();
-                TouchCommand<K, V> cmd = new TouchCommand<K, V>(k, version, 
now, defaultIdleTimeoutInMillis);
-                cm.execute(cmd);
+        try {
+            dsc.acquireReadLock();
+            if (closed.get()) {
+                throw new DataStoreAlreadyClosedException("touch() failed. 
Store " + dsc.getStoreName() + " already closed");
+            }
 
-                location = cmd.getKeyMappingInfo();
+            DataStoreEntry<K, V> entry = replicaStore.getEntry(k);
+            if (entry != null) {
+                synchronized (entry) {
+                    long now = System.currentTimeMillis();
+                    entry.setLastAccessedAt(now);
+                    String target = entry.getReplicaInstanceName();
+                    TouchCommand<K, V> cmd = new TouchCommand<K, V>(k, 
version, now, defaultIdleTimeoutInMillis);
+                    cm.execute(cmd);
+
+                    location = cmd.getKeyMappingInfo();
+                }
             }
+        } finally {
+            dsc.releaseReadLock();
         }
         return location;
     }
 
+
     @Override
     public int removeIdleEntries(long idleFor) {
 
+        int finalResult = 0;
         String[] targets = dsc.getKeyMapper().getCurrentMembers();
 
         ResponseMediator respMed = dsc.getResponseMediator();
@@ -505,8 +547,11 @@
         Future<Integer> future = resp.getFuture();
         resp.setTransientResult(new Integer(0));
 
-        int finalResult = 0;
         try {
+            dsc.acquireReadLock();
+            if (closed.get()) {
+                throw new 
DataStoreAlreadyClosedException("removeIdleEntries() failed. Store " + 
dsc.getStoreName() + " already closed");
+            }
             if (targets != null && dsc.isBroadcastRemovedExpired()) {
                 resp.setExpectedUpdateCount(targets.length);
                 for (String target : targets) {
@@ -534,6 +579,7 @@
             //TODO
         } finally {
             respMed.removeCommandResponse(tokenId);
+            dsc.releaseReadLock();
         }
 
         return finalResult;
@@ -544,6 +590,9 @@
     @Override
     public void close() {
         try {
+            dsc.acquireWriteLock();
+            closed.set(true);
+            dsc.getCommandManager().close();
             if (mbs != null && mbeanObjectName != null) {
                 mbs.unregisterMBean(mbeanObjectName);
             }
@@ -551,30 +600,48 @@
             //TODO
         } catch (MBeanRegistrationException mbRegEx) {
             //TODO
+        } finally {
+            dsc.releaseWriteLock();
         }
     }
 
+    @Override
+    public void destroy() {
+        close();
+    }
+
     public int size() {
+
         int result = 0;
-        KeyMapper km = dsc.getKeyMapper();
-        String[] targets = km.getCurrentMembers();
+        try {
+            dsc.acquireReadLock();
+            if (closed.get()) {
+                //Since we cannot throw an Exception and since the store is 
already closed
+                // we return 0
+                return 0;
+            }
 
-        int targetCount = targets.length;
-        SizeRequestCommand[] commands = new SizeRequestCommand[targetCount];
-
-        for (int i = 0; i < targetCount; i++) {
-            commands[i] = new SizeRequestCommand(targets[i]);
-            try {
-                dsc.getCommandManager().execute(commands[i]);
-            } catch (DataStoreException dse) {
-                //TODO:
+            KeyMapper km = dsc.getKeyMapper();
+            String[] targets = km.getCurrentMembers();
+    
+            int targetCount = targets.length;
+            SizeRequestCommand[] commands = new 
SizeRequestCommand[targetCount];
+    
+            for (int i = 0; i < targetCount; i++) {
+                commands[i] = new SizeRequestCommand(targets[i]);
+                try {
+                    dsc.getCommandManager().execute(commands[i]);
+                } catch (DataStoreException dse) {
+                    //TODO:
+                }
             }
+    
+            for (int i = 0; i < targetCount; i++) {
+                result += commands[i].getResult();
+            }
+        } finally {
+            dsc.releaseReadLock();
         }
-
-        for (int i = 0; i < targetCount; i++) {
-            result += commands[i].getResult();
-        }
-
         return result;
     }
 }
Index: 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/command/CommandManager.java
===================================================================
--- 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/command/CommandManager.java 
      (revision 1781)
+++ 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/command/CommandManager.java 
      (revision 1782)
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright (c) 1997-2010 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997-2012 Oracle and/or its affiliates. All rights reserved.
  *
  * The contents of this file are subject to the terms of either the GNU
  * General Public License Version 2 only ("GPL") or the Common Development
@@ -162,5 +162,11 @@
            try {ois.close();} catch (Exception ex) 
{_logger.log(Level.FINEST, "Ignorable error while closing 
ObjectInputStream");}
         }
     }
+
+    public void close() {
+        for (AbstractCommandInterceptor<K, V> h = head; h != null; h = 
h.getNext()) {
+            h.close();
+        }
+    }
     
 }
Index: 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithMap.java
===================================================================
--- 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithMap.java
     (revision 1781)
+++ 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithMap.java
     (revision 1782)
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright (c) 1997-2010 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997-2012 Oracle and/or its affiliates. All rights reserved.
  *
  * The contents of this file are subject to the terms of either the GNU
  * General Public License Version 2 only ("GPL") or the Common Development
@@ -42,15 +42,15 @@
 
 import org.shoal.adapter.store.commands.NoOpCommand;
 import org.shoal.adapter.store.commands.SaveCommand;
+import org.shoal.ha.cache.api.DataStoreAlreadyClosedException;
 import org.shoal.ha.cache.api.DataStoreContext;
+import org.shoal.ha.cache.api.DataStoreException;
 import org.shoal.ha.cache.api.ShoalCacheLoggerConstants;
 import org.shoal.ha.cache.impl.command.Command;
 import org.shoal.ha.cache.impl.command.ReplicationCommandOpcode;
 import org.shoal.ha.cache.impl.util.ASyncReplicationManager;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -92,7 +92,13 @@
     private long timeStamp = System.currentTimeMillis();
 
     ThreadPoolExecutor executor;
+    
+    private AtomicBoolean openStatus = new AtomicBoolean(true);
 
+    private AtomicInteger activeBatchCount = new AtomicInteger(1);
+
+    private CountDownLatch latch = new CountDownLatch(1);
+
     static {
         try {
             TRANSMITTER_FREQUECNCY_IN_MILLIS =
@@ -122,56 +128,102 @@
         this.targetName = targetName;
         this.dsc = rsInfo;
 
-
-
-        BatchedCommandMapDataFrame batch = new BatchedCommandMapDataFrame();
+        BatchedCommandMapDataFrame batch = new 
BatchedCommandMapDataFrame(openStatus.get());
         mapRef = new AtomicReference<BatchedCommandMapDataFrame>(batch);
 
-
         future = 
asyncReplicationManager.getScheduledThreadPoolExecutor().scheduleAtFixedRate(this,
 TRANSMITTER_FREQUECNCY_IN_MILLIS,
                 TRANSMITTER_FREQUECNCY_IN_MILLIS, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void close() {
+        //We have a write lock here.
+        // So no other request threads OR background thread are active
         try {
-            future.cancel(false);
+
+            //Mark this as closed to prevent new valid batches
+            if (openStatus.compareAndSet(true, false)) {
+
+                //First cancel the background task
+                future.cancel(false);
+
+                //Now flush all pending batched data
+                if (_logger.isLoggable(Level.FINE)) {
+                    _logger.log(Level.FINE, 
"(ReplicationCommandTransmitterWithMap) BEGIN Flushing all batched data upon 
shutdown..."
+                        + activeBatchCount.get() + " to be flushed...");
+                }
+
+                BatchedCommandMapDataFrame closedBatch
+                        = new BatchedCommandMapDataFrame(false);
+                BatchedCommandMapDataFrame batch = 
mapRef.getAndSet(closedBatch);
+                //Note that the above batch is a valid batch
+                asyncReplicationManager.getExecutorService().submit(batch);
+                dsc.getDataStoreMBean().incrementBatchSentCount();
+
+                for (int loopCount = 0; loopCount < 5; loopCount++) {
+                    if (activeBatchCount.get() > 0) {
+                        try {
+                            latch.await(5, TimeUnit.SECONDS);
+                        } catch (InterruptedException inEx) {
+                            //Ignore...
+                        }
+                    }
+                }
+
+                if (_logger.isLoggable(Level.FINE)) {
+                    _logger.log(Level.FINE, 
"(ReplicationCommandTransmitterWithMap) DONE Flushing all batched data upon 
shutdown...");
+                }
+            }
         } catch (Exception ex) {
             //Ignore
         }
     }
 
     @Override
-    public void addCommand(Command<K, V> cmd) {
-
-        for (boolean done = false; !done;) {
-            BatchedCommandMapDataFrame batch = mapRef.get();
-            done = batch.doAddOrRemove(cmd, true);
-            if (!done) {
-                BatchedCommandMapDataFrame frame = new 
BatchedCommandMapDataFrame();
-                frame.doAddOrRemove(cmd, true);
-                done = mapRef.compareAndSet(batch, frame);
-            }
-        }
+    public void addCommand(Command<K, V> cmd)
+        throws DataStoreException {
+        addCommandToBatch(cmd, true);
     }
 
     @Override
-    public void removeCommand(Command<K, V> cmd) {
+    public void removeCommand(Command<K, V> cmd)
+        throws DataStoreException {
+        addCommandToBatch(cmd, false);
+    }
 
+    private void addCommandToBatch(Command<K, V> cmd, boolean isAdd)
+        throws DataStoreException {
         for (boolean done = false; !done;) {
             BatchedCommandMapDataFrame batch = mapRef.get();
-            done = batch.doAddOrRemove(cmd, false);
+            done = batch.doAddOrRemove(cmd, isAdd);
             if (!done) {
-                BatchedCommandMapDataFrame frame = new 
BatchedCommandMapDataFrame();
-                frame.doAddOrRemove(cmd, false);
+                BatchedCommandMapDataFrame frame = new 
BatchedCommandMapDataFrame(openStatus.get());
+                frame.doAddOrRemove(cmd, isAdd);
                 done = mapRef.compareAndSet(batch, frame);
+                if (done && frame.isValid()) {
+                    activeBatchCount.incrementAndGet();
+                }
             }
         }
     }
 
     public void run() {
-        BatchedCommandMapDataFrame batch = mapRef.get();
-        batch.flushAndTransmit();
+        try {
+            dsc.acquireReadLock();
+            if (openStatus.get()) {
+                BatchedCommandMapDataFrame batch = mapRef.get();
+
+                //Since this called by a async thread
+                //   OR upon close, it is OK to not rethrow the exceptions
+                batch.flushAndTransmit();
+            }
+        } catch (DataStoreAlreadyClosedException dsEx) {
+            //Ignore....
+        } catch (DataStoreException dsEx) {
+            _logger.log(Level.WARNING, "Error during flush...");
+        } finally {
+            dsc.releaseReadLock();
+        }
     }
 
 
@@ -180,7 +232,7 @@
 
     private class BatchedCommandMapDataFrame
             implements Runnable {
-
+        
         private int myBatchNumber;
 
         private AtomicInteger inFlightCount = new AtomicInteger(0);
@@ -198,12 +250,23 @@
 
         private volatile long lastTS = System.currentTimeMillis();
 
-        BatchedCommandMapDataFrame() {
+        private boolean validBatch;
+        
+        BatchedCommandMapDataFrame(boolean validBatch) {
+            this.validBatch = validBatch;
             myBatchNumber = _sendBatchCount.incrementAndGet();
         }
 
-        public boolean doAddOrRemove(Command cmd, boolean isAdd) {
+        private boolean isValid() {
+            return validBatch;
+        }
 
+        private boolean doAddOrRemove(Command cmd, boolean isAdd)
+            throws DataStoreException {
+
+            if (! validBatch) {
+                throw new DataStoreAlreadyClosedException("Cannot add a 
command to a Batch after the DataStore has been closed");
+            }
             boolean result = false;
             if (! batchThresholdReached.get()) {
                 
@@ -258,7 +321,8 @@
         }
 
         //Called by periodic task
-        void flushAndTransmit() {
+        void flushAndTransmit()
+            throws DataStoreException {
             dsc.getDataStoreMBean().incrementFlushThreadWakeupCount();
             if ((!alreadySent.get()) && ((map.size() > 0) || 
(removedKeysSize.get() > 0))) {
                 if (lastTS == timeStamp) {
@@ -283,14 +347,18 @@
                     }
                     timeStamp = lastTS;
                 }
+            } else {
+//                if (_statsLogger.isLoggable(Level.FINEST)) {
+//                    _statsLogger.log(Level.FINEST, "flushAndTransmit 
visited a new Batch");
+//                }
             }
         }
 
         public void run() {
-
-            ReplicationFramePayloadCommand rfCmd = new 
ReplicationFramePayloadCommand();
-            rfCmd.setTargetInstance(targetName);
             try {
+                ReplicationFramePayloadCommand rfCmd = new 
ReplicationFramePayloadCommand();
+                rfCmd.setTargetInstance(targetName);
+                try {
                 for (ConcurrentLinkedQueue<Command> cmdList : map.values()) {
                     SaveCommand saveCmd = null;
                     for (Command cmd : cmdList) {
@@ -306,17 +374,32 @@
                             rfCmd.addComamnd(cmd);
                         }
                     }
-
+    
                     if (saveCmd != null) {
                         rfCmd.addComamnd(saveCmd);
                     }
                 }
 
-                rfCmd.setRemovedKeys(removedKeys);
-                dsc.getCommandManager().execute(rfCmd);
+                    rfCmd.setRemovedKeys(removedKeys);
+                    dsc.getCommandManager().execute(rfCmd);
+    
+                } catch (IOException ioEx) {
+                    _logger.log(Level.WARNING, "Batch operation 
(ASyncCommandList failed...", ioEx);
+                }
+            } finally {
+                //We want to decrement only if we transmitted a valid batch
+                //   Otherwise we should not decrement the activeBatchCount.
+                //  Also, we decrement even if there was an IOException
+                if (validBatch && activeBatchCount.decrementAndGet() <= 0) {
+                    if (! openStatus.get()) {
+                        latch.countDown();
+                    }
+                }
 
-            } catch (IOException ioEx) {
-                _logger.log(Level.WARNING, "Batch operation 
(ASyncCommandList failed...", ioEx);
+                if (_logger.isLoggable(Level.FINE)) {
+                    _logger.log(Level.FINE, 
"(ReplicationCommandTransmitterWithMap) Completed one batch. Still "
+                            + activeBatchCount.get() + " to be flushed...");
+                }
             }
         }
 
Index: 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterManager.java
===================================================================
--- 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterManager.java
     (revision 1781)
+++ 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterManager.java
     (revision 1782)
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright (c) 1997-2010 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997-2012 Oracle and/or its affiliates. All rights reserved.
  *
  * The contents of this file are subject to the terms of either the GNU
  * General Public License Version 2 only ("GPL") or the Common Development
@@ -40,6 +40,7 @@
 
 package org.shoal.ha.cache.impl.interceptor;
 
+import org.shoal.adapter.store.commands.NoOpCommand;
 import org.shoal.ha.cache.api.DataStoreContext;
 import org.shoal.ha.cache.api.DataStoreException;
 import org.shoal.ha.cache.api.ShoalCacheLoggerConstants;
@@ -112,4 +113,13 @@
         }
     }
 
+    public void close() {
+        for (CommandCollector<K, V> cc : transmitters.values()) {
+            cc.close();
+        }
+
+       try { broadcastTransmitter.addCommand(new NoOpCommand()); } catch 
(DataStoreException dsEx) {}
+       broadcastTransmitter.close();
+    }
+
 }
Index: 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/CommandCollector.java
===================================================================
--- 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/CommandCollector.java
 (revision 1781)
+++ 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/CommandCollector.java
 (revision 1782)
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright (c) 1997-2010 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997-2012 Oracle and/or its affiliates. All rights reserved.
  *
  * The contents of this file are subject to the terms of either the GNU
  * General Public License Version 2 only ("GPL") or the Common Development
@@ -41,6 +41,7 @@
 package org.shoal.ha.cache.impl.interceptor;
 
 import org.shoal.ha.cache.api.DataStoreContext;
+import org.shoal.ha.cache.api.DataStoreException;
 import org.shoal.ha.cache.impl.command.Command;
 
 /**
@@ -52,7 +53,9 @@
 
     void close();
 
-    void addCommand(Command<K, V> cmd);
+    void addCommand(Command<K, V> cmd)
+            throws DataStoreException;
 
-    void removeCommand(Command<K, V> cmd);
+    void removeCommand(Command<K, V> cmd)
+            throws DataStoreException;
 }
Index: 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithList.java
===================================================================
--- 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithList.java
    (revision 1781)
+++ 
trunk/cache/src/main/java/org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithList.java
    (revision 1782)
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright (c) 1997-2010 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997-2012 Oracle and/or its affiliates. All rights reserved.
  *
  * The contents of this file are subject to the terms of either the GNU
  * General Public License Version 2 only ("GPL") or the Common Development
@@ -41,18 +41,17 @@
 package org.shoal.ha.cache.impl.interceptor;
 
 import org.shoal.adapter.store.commands.NoOpCommand;
+import org.shoal.ha.cache.api.DataStoreAlreadyClosedException;
 import org.shoal.ha.cache.api.DataStoreContext;
+import org.shoal.ha.cache.api.DataStoreException;
 import org.shoal.ha.cache.api.ShoalCacheLoggerConstants;
 import org.shoal.ha.cache.impl.command.Command;
 import org.shoal.ha.cache.impl.command.ReplicationCommandOpcode;
 import org.shoal.ha.cache.impl.util.ASyncReplicationManager;
-import org.shoal.ha.group.GroupService;
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
@@ -90,6 +89,13 @@
 
     ThreadPoolExecutor executor;
 
+    private AtomicBoolean openStatus = new AtomicBoolean(true);
+
+    private AtomicInteger activeBatchCount = new AtomicInteger(1);
+
+    private CountDownLatch latch = new CountDownLatch(1);
+
+
     public void initialize(String targetName, DataStoreContext<K, V> rsInfo) 
{
 
         this.executor = 
ASyncReplicationManager._getInstance().getExecutorService();
@@ -112,7 +118,7 @@
             //Ignore
         }
 
-        BatchedCommandListDataFrame batch = new 
BatchedCommandListDataFrame();
+        BatchedCommandListDataFrame batch = new 
BatchedCommandListDataFrame(openStatus.get());
         mapRef = new AtomicReference<BatchedCommandListDataFrame>(batch);
 
 
@@ -120,51 +126,93 @@
                 TRANSMITTER_FREQUECNCY_IN_MILLIS, TimeUnit.MILLISECONDS);
     }
 
+    @Override
     public void close() {
+        //We have a write lock here.
+        // So no other request threads OR background thread are active
         try {
-            future.cancel(false);
+
+            //Mark this as closed to prevent new valid batches
+            if (openStatus.compareAndSet(true, false)) {
+
+                //First cancel the background task
+                future.cancel(false);
+
+                //Now flush all pending batched data
+                if (_logger.isLoggable(Level.FINE)) {
+                    _logger.log(Level.FINE, 
"(ReplicationCommandTransmitterWithList) BEGIN Flushing all batched data upon 
shutdown..."
+                        + activeBatchCount.get() + " to be flushed...");
+                }
+
+                BatchedCommandListDataFrame closedBatch
+                        = new BatchedCommandListDataFrame(false);
+                BatchedCommandListDataFrame batch = 
mapRef.getAndSet(closedBatch);
+                //Note that the above batch is a valid batch
+                asyncReplicationManager.getExecutorService().submit(batch);
+                dsc.getDataStoreMBean().incrementBatchSentCount();
+
+                for (int loopCount = 0; loopCount < 5; loopCount++) {
+                    if (activeBatchCount.get() > 0) {
+                        try {
+                            latch.await(5, TimeUnit.SECONDS);
+                        } catch (InterruptedException inEx) {
+                            //Ignore...
+                        }
+                    }
+                }
+
+                if (_logger.isLoggable(Level.FINE)) {
+                    _logger.log(Level.FINE, 
"(ReplicationCommandTransmitterWithList) DONE Flushing all batched data upon 
shutdown...");
+                }
+            }
         } catch (Exception ex) {
             //Ignore
         }
     }
 
-    public void addCommand(Command<K, V> cmd) {
+    public void addCommand(Command<K, V> cmd)
+        throws DataStoreException {
 
         for (boolean done = false; !done;) {
             BatchedCommandListDataFrame batch = mapRef.get();
             done = batch.addCommand(cmd);
             if (!done) {
-                BatchedCommandListDataFrame frame = new 
BatchedCommandListDataFrame();
+                BatchedCommandListDataFrame frame = new 
BatchedCommandListDataFrame(openStatus.get());
                 frame.addCommand(cmd);
                 done = mapRef.compareAndSet(batch, frame);
+                if (done && frame.isValidBatch()) {
+                    activeBatchCount.incrementAndGet();
+                }
             }
         }
     }
 
     @Override
-    public void removeCommand(Command<K, V> cmd) {
+    public void removeCommand(Command<K, V> cmd)
+        throws DataStoreException {
         addCommand(cmd);
     }
 
-    private void sendMessage(byte[] data) {
-        GroupService gs = dsc.getGroupService();
-        gs.sendMessage(targetName, dsc.getServiceName(), data);
-        if (_logger.isLoggable(Level.FINE)) {
-            _logger.log(Level.FINE, dsc.getServiceName() + ": 
ReplicationCommandTransmitterWithList.onTransmit() Sent "
-                    + (targetName == null ? " ALL MEMBERS " : targetName)
-                    + "; size: " + data.length);
-        }
-    }
-
     public void run() {
-        BatchedCommandListDataFrame batch = mapRef.get();
-        if (batch.isTimeToFlush(timeStamp)) {
-            NoOpCommand noop = new NoOpCommand();
-            while (batch.addCommand(noop)) {
-                ;
+        try {
+            dsc.acquireReadLock();
+            BatchedCommandListDataFrame batch = mapRef.get();
+            //Since this called by a async thread
+            //   OR upon close, it is OK to not rethrow the exceptions
+            if (batch.isTimeToFlush(timeStamp) || (! openStatus.get())) {
+                NoOpCommand noop = new NoOpCommand();
+                while (batch.addCommand(noop)) {
+                    ;
+                }
             }
+            timeStamp = batch.getBatchCreationTime();
+        } catch (DataStoreAlreadyClosedException dsEx) {
+            //Ignore....
+        } catch (DataStoreException dsEx) {
+            _logger.log(Level.WARNING, "Error during flush...");
+        } finally {
+            dsc.releaseReadLock();
         }
-        timeStamp = batch.getBatchCreationTime();
     }
 
     private class BatchedCommandListDataFrame
@@ -176,7 +224,22 @@
 
         private long batchCreationTime = System.currentTimeMillis();
 
-        public boolean addCommand(Command cmd) {
+        private boolean validBatch;
+
+        BatchedCommandListDataFrame(boolean valid) {
+            this.validBatch = valid;
+        }
+
+        private boolean isValidBatch() {
+            return validBatch;
+        }
+
+        public boolean addCommand(Command cmd)
+            throws DataStoreException {
+            if (! validBatch) {
+                throw new DataStoreAlreadyClosedException("Cannot add a 
command to a Batch after the DataStore has been closed");
+            }
+
             int value = current.incrementAndGet();
             if (value < MAX_BATCH_SIZE) {
                 list.add(cmd);
@@ -213,6 +276,20 @@
                 dsc.getCommandManager().execute(rfCmd);
             } catch (IOException ioEx) {
                 _logger.log(Level.WARNING, "Batch operation 
(ASyncCommandList failed...", ioEx);
+            } finally {
+                //We want to decrement only if we transmitted a valid batch
+                //   Otherwise we should not decrement the activeBatchCount.
+                //  Also, we decrement even if there was an IOException
+                if (validBatch && activeBatchCount.decrementAndGet() <= 0) {
+                    if (! openStatus.get()) {
+                        latch.countDown();
+                    }
+                }
+
+                if (_logger.isLoggable(Level.FINE)) {
+                    _logger.log(Level.FINE, 
"(ReplicationCommandTransmitterWithList) Completed one batch. Still "
+                        + activeBatchCount.get() + " to be flushed...");
+                }
             }
         }
     }
Index: 
trunk/cache/src/main/java/org/shoal/ha/cache/api/AbstractCommandInterceptor.java
===================================================================
--- 
trunk/cache/src/main/java/org/shoal/ha/cache/api/AbstractCommandInterceptor.java
    (revision 1781)
+++ 
trunk/cache/src/main/java/org/shoal/ha/cache/api/AbstractCommandInterceptor.java
    (revision 1782)
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright (c) 1997-2010 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997-2012 Oracle and/or its affiliates. All rights reserved.
  *
  * The contents of this file are subject to the terms of either the GNU
  * General Public License Version 2 only ("GPL") or the Common Development
@@ -107,4 +107,6 @@
         }
     }
 
+    public void close() {}
+
 }
Index: trunk/cache/src/main/java/org/shoal/ha/cache/api/DataStore.java
===================================================================
--- trunk/cache/src/main/java/org/shoal/ha/cache/api/DataStore.java     
(revision 1781)
+++ trunk/cache/src/main/java/org/shoal/ha/cache/api/DataStore.java     
(revision 1782)
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright (c) 1997-2010 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997-2012 Oracle and/or its affiliates. All rights reserved.
  *
  * The contents of this file are subject to the terms of either the GNU
  * General Public License Version 2 only ("GPL") or the Common Development
@@ -95,6 +95,8 @@
      */
     public void close();
 
+    public void destroy();
+
     public int size();
     
 }
Index: trunk/cache/src/main/java/org/shoal/ha/cache/api/DataStoreContext.java
===================================================================
--- trunk/cache/src/main/java/org/shoal/ha/cache/api/DataStoreContext.java    
  (revision 1781)
+++ trunk/cache/src/main/java/org/shoal/ha/cache/api/DataStoreContext.java    
  (revision 1782)
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright (c) 1997-2010 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997-2012 Oracle and/or its affiliates. All rights reserved.
  *
  * The contents of this file are subject to the terms of either the GNU
  * General Public License Version 2 only ("GPL") or the Common Development
@@ -51,6 +51,7 @@
 import org.shoal.ha.mapper.KeyMapper;
 
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -72,6 +73,8 @@
 
     private ReplicatedDataStoreStatsHolder dscMBean;
 
+    private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
     public DataStoreContext(String serviceName, GroupService gs, ClassLoader 
loader) {
         super.setStoreName(serviceName);
         super.setInstanceName(gs.getMemberName());
@@ -83,6 +86,22 @@
         super();
     }
 
+    public void acquireReadLock() {
+        rwLock.readLock().lock();
+    }
+
+    public void releaseReadLock() {
+        rwLock.readLock().unlock();
+    }
+
+    public void acquireWriteLock() {
+        rwLock.writeLock().lock();
+    }
+
+    public void releaseWriteLock() {
+        rwLock.writeLock().unlock();
+    }
+
     public DataStoreContext(BackingStoreConfiguration conf) {
         setInstanceName(conf.getInstanceName())
                 .setGroupName(conf.getClusterName())





[shoal~svn:1782] Merge Mahesh's commit of svn revision 1764 into glassfish3.1.2patch1 bran

jfialli 05/18/2012
Terms of Use; Privacy Policy; Copyright ©2013-2017 (revision 20160708.bf2ac18)
 
 
Close
loading
Please Confirm
Close