[Carbon-commits] [Carbon] svn commit r112175 - in trunk/carbon/components/bam: . org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence

tharindu at wso2.com tharindu at wso2.com
Wed Aug 31 13:25:09 EDT 2011


Author: tharindu
Date: Wed Aug 31 10:25:09 2011
New Revision: 112175
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=112175

Log:
refactoring and cleaning up code

Modified:
   trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java
   trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java
   trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java
   trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java
   trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java
   trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java
   trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java
   trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java
   trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java
   trunk/carbon/components/bam/pom.xml

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java	Wed Aug 31 10:25:09 2011
@@ -77,7 +77,7 @@
             indexRow = getColumnFamilyIndexRow();
         }
 
-        store.startTransaction();
+        store.startBatchCommit();
         if (rows != null) {
             // Persist rows to column family
             for (ResultRow row : rows) {
@@ -105,7 +105,7 @@
             }
         }
 
-        store.finishTransaction();
+        store.endBatchCommit();
     }
 
     public void setAnalyzerSeqeunceName(String analyzerSequence) {

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java	Wed Aug 31 10:25:09 2011
@@ -19,10 +19,10 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMFactory;
 import org.apache.axiom.om.OMText;
-import org.wso2.carbon.bam.utils.QueryIndex;
-import org.wso2.carbon.bam.utils.QueryUtils;
-import org.wso2.carbon.bam.utils.ResultColumn;
-import org.wso2.carbon.bam.utils.ResultRow;
+import org.wso2.carbon.bam.utils.persistence.QueryIndex;
+import org.wso2.carbon.bam.utils.persistence.QueryUtils;
+import org.wso2.carbon.bam.utils.persistence.ResultColumn;
+import org.wso2.carbon.bam.utils.persistence.ResultRow;
 import org.wso2.carbon.bam.utils.config.CFConfigBean;
 import org.wso2.carbon.bam.utils.config.KeyPart;
 

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java	Wed Aug 31 10:25:09 2011
@@ -18,15 +18,12 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPBody;
 import org.apache.commons.collections.map.UnmodifiableMap;
-import org.wso2.carbon.bam.receiver.ReceiverConstants;
-
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
 public class EventProcessor {
 
-    protected Map<String, String> data;
     private RawEvent rawEvent;
 
     public EventProcessor(RawEvent rawEvent) {
@@ -43,6 +40,7 @@
             Object object = eventChildren.next();
             if (object instanceof OMElement) {
                 OMElement elem = (OMElement) object;
+
                 // map event elements, to column keys and values
                 // <key1>value1</key1> ----> { AnyRowKey : { key1 : value1 }
                 processedMap.put(elem.getLocalName(), elem.getText());
@@ -58,9 +56,9 @@
         if (mandatoryDataMap == null) {
             return;
         }
-        for (String mandatoryKey : mandatoryDataMap.keySet()) {
-            if (!processedMap.containsKey(mandatoryKey)) {
-                processedMap.put(mandatoryKey, mandatoryDataMap.get(mandatoryKey));
+        for (Map.Entry<String, String> mandatoryEntry : mandatoryDataMap.entrySet()) {
+            if (!processedMap.containsKey(mandatoryEntry.getKey())) {
+                processedMap.put(mandatoryEntry.getKey(), mandatoryEntry.getValue());
             }
         }
     }

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java	Wed Aug 31 10:25:09 2011
@@ -92,11 +92,11 @@
 
     protected void setConfigurationContextService(ConfigurationContextService ccService) {
 
-        ConfigurationContext serverCtx = ccService.getServerConfigContext();
-        AxisConfiguration serverConfig = serverCtx.getAxisConfiguration();
-        LocalTransportReceiver.CONFIG_CONTEXT = new ConfigurationContext(serverConfig);
-        LocalTransportReceiver.CONFIG_CONTEXT.setServicePath("services");
-        LocalTransportReceiver.CONFIG_CONTEXT.setContextRoot("local:/");
+//        ConfigurationContext serverCtx = ccService.getServerConfigContext();
+//        AxisConfiguration serverConfig = serverCtx.getAxisConfiguration();
+//        LocalTransportReceiver.CONFIG_CONTEXT = new ConfigurationContext(serverConfig);
+//        LocalTransportReceiver.CONFIG_CONTEXT.setServicePath("services");
+//        LocalTransportReceiver.CONFIG_CONTEXT.setContextRoot("local:/");
 
         ReceiverUtils.setConfigurationContextService(ccService);
         if (log.isDebugEnabled()) {

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java	Wed Aug 31 10:25:09 2011
@@ -42,11 +42,11 @@
 
     private static NoSQLDataStore noSQLDataStore;
 
-    private static List<CFConfigBean> cfConfigs;
+    private static List<CFConfigBean> columnFamilyConfigs;
 
     private PersistenceManager() throws ConfigurationException {
         noSQLDataStore = NoSQLDataStore.getNoSQLDataStore();
-        cfConfigs = ConfigurationUtils.getCFConfigurations();
+        columnFamilyConfigs = ConfigurationUtils.getCFConfigurations();
     }
 
     public static PersistenceManager getManager() throws ConfigurationException {
@@ -56,14 +56,24 @@
         return persistenceManager;
     }
 
+    /**
+     * When given a map of column keys and column values this method will insert into necessary column families and to necessary rows
+     * @param cfData - Map where key is the column name and value is the column value
+     * @return success of operation
+     * @throws BAMReceiverException
+     */
     public boolean persistEvent(Map<String, String> cfData) throws BAMReceiverException {
         String defaultRowKey = null;
 
-        noSQLDataStore.startTransaction();
+        // Note: Performance gain by starting a pseudo batch commit for Cassandra,
+        // One event, will only have one commit, independent of the number of CFs and Rows being written to
+        noSQLDataStore.startBatchCommit();
 
-        for (CFConfigBean cfConfig : cfConfigs) {
+        // Iterate through all the CF configs and decide which one to insert event or index event
+        for (CFConfigBean cfConfig : columnFamilyConfigs) {
             String rowKey;
 
+            // All events will be at least committed to the default CF, which usually is the Event CF
             if (cfConfig.isDefaultCF()) {
                 defaultRowKey = createRowKey(cfConfig.getRowKeyParts(), cfConfig.getGranularity(),
                                              cfData, true);
@@ -75,18 +85,23 @@
                 rowKey = createRowKey(cfConfig.getRowKeyParts(), cfConfig.getGranularity(), cfData,
                                       false);
                 if (rowKey == null) {
-                    // this row key is does not apply to this event, skipt it
+                    // this row key is does not apply to this event, skip it
                     continue;
                 }
 
+                // We are creating a data map, with the default row key as the column name, i.e. default row key is stored
+                // as a pointer
                 Map<String, String> nonDefaultDataMap = createNonDefaultDataMap(defaultRowKey);
 
                 noSQLDataStore.persistData(cfConfig.getCfName(), rowKey, nonDefaultDataMap);
+                // persist any indexes if they are given
                 noSQLDataStore.persistIndexes(cfConfig.getCfName(), cfConfig.getRowKeyParts(),
                                               cfData);
 
                 if (cfConfig.getIndexRowKey() != null) {
-
+                    // Cassandra does not sort in rows, but Cassandra columns are sorted.
+                    // So we store a separate column, i.e. defaults to 'allKeys', that stores pointers to all the row keys
+                    // in the same CF
                     Map<String, String> indexRowKeyDataMap = createNonDefaultDataMap(rowKey);
                     noSQLDataStore.persistData(cfConfig.getCfName(), cfConfig.getIndexRowKey(),
                                                indexRowKeyDataMap);
@@ -95,57 +110,77 @@
             }
         }
 
-        noSQLDataStore.finishTransaction();
+        noSQLDataStore.endBatchCommit();
 
         return true;
     }
 
+    /**
+     * This will create a map that will be used as a pointer to the entry created in the default CF (i.e. usually the Event CF)
+     * @param rowKeyToIndex String that is the row key to index - i.e. usually, the row key in the event CF
+     * @return
+     */
     private Map<String, String> createNonDefaultDataMap(String rowKeyToIndex) {
 
-        Map<String, String> map = new HashMap<String, String>();
-        map.put(rowKeyToIndex, "");
-        return UnmodifiableMap.decorate(map);
+        Map<String, String> nonDefaultMap = new HashMap<String, String>();
+        nonDefaultMap.put(rowKeyToIndex, "");
+        return UnmodifiableMap.decorate(nonDefaultMap);
 
     }
 
+    /**
+     * Creates the row key which is probably the most important entry in an inserted row. This row key allows us to later find this
+     * row which will be an event or a pointer to the event
+     * @param rowKeyParts
+     * @param granularity
+     * @param cfData
+     * @param appendUUID
+     * @return
+     * @throws BAMReceiverException
+     */
     private String createRowKey(List<KeyPart> rowKeyParts, String granularity,
                                 Map<String, String> cfData, boolean appendUUID)
             throws BAMReceiverException {
 
         StringBuffer buffer = new StringBuffer();
-        int i = 1;
-
-        for (KeyPart rkPart : rowKeyParts) {
 
-            if (cfData.containsKey(rkPart.getName())) {
-                String rowKeyPartValue = cfData.get(rkPart.getName());
-                // handle timestamp case
-                if (rkPart.getName().equals(ReceiverConstants.TIMESTAMP_KEY_NAME)) {
+        for (int i = 0; i < rowKeyParts.size(); i++) {
+            KeyPart rowKeyPart = rowKeyParts.get(i);
+            String rowKeyPartName = rowKeyPart.getName();
+            if (cfData.containsKey(rowKeyPartName)) {
+                String rowKeyPartValue = cfData.get(rowKeyPartName);
 
+                // handle timestamp case according to granularity
+                if (rowKeyPartName.equals(ReceiverConstants.TIMESTAMP_KEY_NAME)) {
                     try {
+                        // we use the time stamp factory to generate the time stamp according to granularity
                         rowKeyPartValue = TimeStampFactory.getFactory().getTimeStamp(
-                                cfData.get(rkPart.getName()), granularity);
+                                cfData.get(rowKeyPartName), granularity);
                     } catch (ParseException e) {
                         throw new BAMReceiverException("Cannot parse time stamp : " +
-                                                       cfData.get(rkPart));
+                                                       cfData.get(rowKeyPartName));
                     }
 
                 }
                 buffer.append(rowKeyPartValue);
-                if (i != rowKeyParts.size()) {
+
+                // Skip appending row key delimiter for the last row key part
+                if ((i + 1) != rowKeyParts.size()) {
                     buffer.append("---");
                 }
 
-                i++;
+
             } else {
+                // if there is no column name that corresponds to the row key parts, that means this event should not be inserted,
+                // we return null, in that case
                 return null;
             }
         }
-        if (appendUUID) {
 
+        // Add an optional uuid
+        if (appendUUID) {
             buffer.append("---");
             buffer.append(UUID.randomUUID());
-
         }
 
         buffer.trimToSize();

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java	Wed Aug 31 10:25:09 2011
@@ -40,6 +40,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
+// This class is used as a workaround for not being able to use data access service for Cassandra
+// The code is directly copied from data access service classes
 public class CassandraUtils {
 
     private static final Log log = LogFactory.getLog(CassandraUtils.class);

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java	Wed Aug 31 10:25:09 2011
@@ -80,36 +80,35 @@
 
     private KeyspaceDefinition bamKeyspaceDefinition;
 
-    private Cluster initializeBAMNoSQLDataStore() throws ConfigurationException {
-
+    private synchronized Cluster initializeBAMNoSQLDataStore() throws ConfigurationException {
         if (cluster == null) {
-            synchronized (this) {
+//                Todo: Use the data access service, once the related bug is fixed in Cassandra component
 //            cluster = ReceiverUtils.getDataAccessService().getCluster(new ClusterInformation("admin", "admin"));
-                cluster = CassandraUtils.createCluster(new ClusterInformation("admin", "admin"));
+//                Todo : Get rid of hard coded username/ password when implementing MT
+            cluster = CassandraUtils.createCluster(new ClusterInformation("admin", "admin"));
+            bamKeyspaceDefinition = cluster.describeKeyspace(BAM_KEYSPACE);
+            if (bamKeyspaceDefinition == null) {
+                cluster.addKeyspace(HFactory.createKeyspaceDefinition(BAM_KEYSPACE));
                 bamKeyspaceDefinition = cluster.describeKeyspace(BAM_KEYSPACE);
-                if (bamKeyspaceDefinition == null) {
-                    cluster.addKeyspace(HFactory.createKeyspaceDefinition(BAM_KEYSPACE));
-                    bamKeyspaceDefinition = cluster.describeKeyspace(BAM_KEYSPACE);
-                }
-                setBamKeyspace(HFactory.createKeyspace(BAM_KEYSPACE, cluster));
-
-                List<CFConfigBean> cfConfigurations = ConfigurationUtils.getCFConfigurations();
-                for (CFConfigBean cfConfiguration : cfConfigurations) {
-                    createCF(cfConfiguration.getCfName());
-                }
+            }
+            setBamKeyspace(HFactory.createKeyspace(BAM_KEYSPACE, cluster));
 
-                // Create meta data column families
-                createCF(META_COLUMN_FAMILY_NAME);
-                createCF(INDEX_COLUMN_FAMILY_NAME);
-                createCF(CURSORS_COLUMN_FAMILY_NAME);
+            List<CFConfigBean> cfConfigurations = ConfigurationUtils.getCFConfigurations();
+            for (CFConfigBean cfConfiguration : cfConfigurations) {
+                createCF(cfConfiguration.getCfName());
+            }
 
-                persistColumnFamilyInformation(cfConfigurations);
+            // Create meta data column families
+            createCF(META_COLUMN_FAMILY_NAME);
+            createCF(INDEX_COLUMN_FAMILY_NAME);
+            createCF(CURSORS_COLUMN_FAMILY_NAME);
 
-            }
+            persistColumnFamilyInformation(cfConfigurations);
         }
         return cluster;
     }
 
+
     public void persistColumnFamilyConfiguration(CFConfigBean cfConfig) {
         List<CFConfigBean> config = new ArrayList<CFConfigBean>();
         config.add(cfConfig);
@@ -162,9 +161,8 @@
             Map<String, String> cfData = new HashMap<String, String>();
             cfData.put(INDEX_ROW_KEY, indexRowKey);
             cfData.put(GRANULARITY, granularity);
-            cfData.put(DEFAULT_COLUMN_FAMILY, new Boolean(
-                    cfConfig.isDefaultCF()).toString());
-            cfData.put(SECONDARY_COLUMN_FAMILY, new Boolean(cfConfig.isSecondaryCF()).toString());
+            cfData.put(DEFAULT_COLUMN_FAMILY, Boolean.valueOf(cfConfig.isDefaultCF()).toString());
+            cfData.put(SECONDARY_COLUMN_FAMILY, Boolean.valueOf(cfConfig.isSecondaryCF()).toString());
 
             int counter = 0;
             if (cfConfig.getRowKeyParts() != null) {
@@ -180,7 +178,7 @@
     }
 
     public void setLastCursorForColumnFamily(String cfName, String sequenceName,
-                                                    int analyzerIndex, String lastCursor) {
+                                             int analyzerIndex, String lastCursor) {
 
         String key = sequenceName + analyzerIndex;
         Map<String, String> cfData = new HashMap<String, String>();
@@ -190,39 +188,42 @@
 
     }
 
-    private ThreadLocal<Boolean> startTransaction = new ThreadLocal<Boolean>() {
+    private ThreadLocal<Boolean> startBatchCommit = new ThreadLocal<Boolean>() {
         @Override
         protected Boolean initialValue() {
             return false;
         }
     };
-    private ThreadLocal<Mutator> mutatorThreadLocal = new ThreadLocal<Mutator>();
 
-    public void startTransaction() {
-        startTransaction.set(true);
+    private ThreadLocal<Mutator<String>> mutatorThreadLocal = new ThreadLocal<Mutator<String>>();
+
+    public void startBatchCommit() {
+        startBatchCommit.set(true);
         mutatorThreadLocal.set(HFactory.createMutator(getBamKeyspace(), stringSerializer));
     }
 
-    public void finishTransaction() {
+    public void endBatchCommit() {
         mutatorThreadLocal.get().execute();
-        startTransaction.set(false);
+        startBatchCommit.set(false);
     }
 
     public boolean persistData(String CFName, String rowKey, Map<String, String> data) {
         if (!cfExists(CFName)) {
             createCF(CFName);
         }
+
         Mutator<String> mutator;
-        if (startTransaction.get()) {
+        if (startBatchCommit.get()) {
             mutator = mutatorThreadLocal.get();
         } else {
             mutator = HFactory.createMutator(getBamKeyspace(), stringSerializer);
         }
-        for (String columnKey : data.keySet()) {
-            mutator.addInsertion(rowKey, CFName, HFactory.createStringColumn(columnKey,
-                                                                             data.get(columnKey)));
+
+        for (Map.Entry<String, String> column : data.entrySet()) {
+            mutator.addInsertion(rowKey, CFName, HFactory.createStringColumn(column.getKey(), column.getValue()));
         }
-        if (!startTransaction.get()) {
+
+        if (!startBatchCommit.get()) {
             mutator.execute();
         }
         return true;
@@ -231,10 +232,7 @@
     private List<String> cfList = new ArrayList<String>();
 
     private boolean cfExists(String CFName) {
-        if (cfList.contains(CFName)) {
-            return true;
-        }
-        return false;
+        return cfList.contains(CFName);
     }
 
     private boolean createCF(String CFName) {
@@ -262,89 +260,6 @@
         return true;
     }
 
-//    public void persistEvent(List<EventData> eventDataList) {
-//        Cluster bamCluster = initializeBAMNoSQLDataStore();
-//
-//        Mutator<String> mutator = HFactory.createMutator(getBamKeyspace(), stringSerializer );
-//        Iterator<EventData> eventDataIterator = eventDataList.iterator();
-//
-//        boolean setKey = false;
-//        String superColumnUUID = null;
-//        while (eventDataIterator.hasNext()) {
-//            EventData eventData = eventDataIterator.next();
-//            if (!setKey) {
-//                superColumnUUID = getMessageKey(eventData);
-//                setKey = true;
-//            }
-//            mutator.addInsertion(superColumnUUID,
-//                                 ReceiverConstants.EVENT_COLUMNFAMILY_NAME,
-//                                 HFactory.createSuperColumn(eventData.getDataCategory().name(),
-//                                                            createColumnList(eventData), stringSerializer, stringSerializer, stringSerializer));
-//            if (!eventDataIterator.hasNext()) {
-////                dateSerializer = DateSerializer.get();
-////                mutator.addInsertion(superColumnUUID,
-////                    ReceiverConstants.EVENT_COLUMNFAMILY_NAME,
-////                        HFactory.createColumn(ReceiverConstants.MSG_RECEIVED_TIME_KEY, eventData.getTimeStamp(), stringSerializer, dateSerializer));
-//            }
-//        }
-//        mutator.execute();
-//
-////        RangeSuperSlicesQuery<String, String, String, String> rangeQuery = HFactory.createRangeSuperSlicesQuery(getBamKeyspace(),
-////                stringSerializer, stringSerializer, stringSerializer, stringSerializer);
-////        rangeQuery.setColumnFamily(ReceiverConstants.EVENT_COLUMNFAMILY_NAME);
-////        rangeQuery.setRange("", "", false, 4);
-////        rangeQuery.setKeys("", "");
-////        rangeQuery.setRowCount(1000 * 1000);
-////
-////
-////
-//////        SuperColumnQuery<String, String, String, String> query = HFactory.createSuperColumnQuery(getBamKeyspace(),
-//////                stringSerializer, stringSerializer, stringSerializer, stringSerializer);
-//////
-//////        query.setKey(null).setSuperName(DataType.CorrelationData.name()).setColumnFamily(ReceiverConstants.EVENT_COLUMNFAMILY_NAME);
-////        QueryResult<OrderedSuperRows<String, String, String, String>> result = rangeQuery.execute();
-//
-//
-////        for (SuperRow<String, String, String, String> superRow : result.get().getList())  {
-////            System.out.println("Super row : " + superRow + " \n");
-////        }
-//
-////        System.out.println("Number of entried in Cassandra : " + result.get().getCount() );
-////
-////        for (Iterator<SuperRow<String, String, String, String>> iterator = superRowList.iterator(); iterator.hasNext();) {
-////            SuperRow<String, String, String, String> superRow = iterator.next();
-////            for ()
-////            List<HSuperColumn<String, String, String>> superColumns = superRow.getSuperSlice().getSuperColumns();
-////            for (Iterator<HSuperColumn<String, String, String>> superColumnIterator = superColumns.iterator(); superColumnIterator.hasNext();) {
-////
-////            }
-////        }
-////
-////        for (int i = 0; i < superColumn.getSize(); i++) {
-////            HColumn<String, String> column = superColumn.get(i);
-////            System.out.println("Super Column name : " + superColumn.getName() + "Column name : " + column.getName() + "Column value : " + column.getValue() + "\n");
-////
-////        }
-//
-//
-//    }
-//
-//    private String getMessageKey(EventData data) {
-//        return data.getMessageKey() + "-" + data.getTimeStamp() + "-" + UUID.randomUUID();
-//    }
-//
-//    private List<HColumn<String, String>> createColumnList(EventData data) {
-//        Iterator<String> dataIterator = data.getData().keySet().iterator();
-//        List<HColumn<String, String>> columnList = new ArrayList<HColumn<String, String>>();
-//        while (dataIterator.hasNext()) {
-//            String elemLocalPart = dataIterator.next();
-//            columnList.add(HFactory.createColumn(elemLocalPart, data.getData().get(elemLocalPart), stringSerializer, stringSerializer));
-//
-//        }
-//        return columnList;
-//    }
-
-
 }
 
 

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java	Wed Aug 31 10:25:09 2011
@@ -39,10 +39,10 @@
     }
 
     public ResultColumn[] getColumns() {
-        return columns;
+        return columns.clone();
     }
 
     public void setColumns(ResultColumn[] columns) {
-        this.columns = columns;
+        this.columns = columns.clone();
     }
 }

Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java	(original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java	Wed Aug 31 10:25:09 2011
@@ -22,6 +22,9 @@
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Encapsulated time stamp creation according to granularity
+ */
 public class TimeStampFactory {
 
     private TimeStampFactory() {
@@ -39,26 +42,27 @@
 
     private Map<String, Integer[]> granularityMap = new HashMap<String, Integer[]>();
 
+
     private void populateGranularityMap() {
         granularityMap.put("year", new Integer[]{Calendar.MINUTE, Calendar.SECOND, Calendar.HOUR_OF_DAY, Calendar.DAY_OF_MONTH, Calendar.MONTH});
         granularityMap.put("month",new Integer[]{Calendar.MINUTE, Calendar.SECOND, Calendar.HOUR_OF_DAY, Calendar.DAY_OF_MONTH});
-        granularityMap.put("hour", new Integer[]{Calendar.MINUTE, Calendar.SECOND});
         granularityMap.put("day", new Integer[]{Calendar.MINUTE, Calendar.SECOND, Calendar.HOUR_OF_DAY});
+        granularityMap.put("hour", new Integer[]{Calendar.MINUTE, Calendar.SECOND});
         granularityMap.put("minute", new Integer[]{Calendar.SECOND});
         granularityMap.put("none", new Integer[]{0});
     }
 
     public String getTimeStamp(String eventTimeStamp, String granularity)
             throws IllegalArgumentException, ParseException {
-        //SimpleDateFormat format = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy");
+
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         if (!granularityMap.containsKey(granularity)) {
             throw new IllegalArgumentException("Invalid granularity value - " + granularity);
         }
-
         Date date = formatter.parse(eventTimeStamp);
         Calendar calendar = Calendar.getInstance();
         calendar.setTime(date);
+        // Reduce values according to granularity
         Integer[] deltaFactors = granularityMap.get(granularity);
         for (Integer deltaFactor : deltaFactors) {
             calendar.add(deltaFactor, calendar.get(deltaFactor) * -1);

Modified: trunk/carbon/components/bam/pom.xml
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/pom.xml?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/pom.xml	(original)
+++ trunk/carbon/components/bam/pom.xml	Wed Aug 31 10:25:09 2011
@@ -43,16 +43,16 @@
                 <activeByDefault>true</activeByDefault>
             </activation>
 			<modules>       
-				<module>org.wso2.carbon.bam.util</module>
-				<module>org.wso2.carbon.bam.core</module>
-				<module>org.wso2.carbon.bam.cep</module>
-				<module>org.wso2.carbon.bam.gauges.ui</module>
-				<module>org.wso2.carbon.bam.common</module>
-				<module>org.wso2.carbon.bam.ui</module>
-				<!--<module>org.wso2.carbon.bam.analyzer</module>-->
-				<!--<module>org.wso2.carbon.bam.receiver</module>-->
-                <!--<module>org.wso2.carbon.bam.utils</module>-->
-                <!--<module>org.wso2.carbon.bam.presentation</module>-->
+				<!--<module>org.wso2.carbon.bam.util</module>-->
+				<!--<module>org.wso2.carbon.bam.core</module>-->
+				<!--<module>org.wso2.carbon.bam.cep</module>-->
+				<!--<module>org.wso2.carbon.bam.gauges.ui</module>-->
+				<!--<module>org.wso2.carbon.bam.common</module>-->
+				<!--<module>org.wso2.carbon.bam.ui</module>-->
+				<module>org.wso2.carbon.bam.analyzer</module>
+				<module>org.wso2.carbon.bam.receiver</module>
+                <module>org.wso2.carbon.bam.utils</module>
+                <module>org.wso2.carbon.bam.presentation</module>
 			</modules>
 		</profile>
         <profile>


More information about the Carbon-commits mailing list