[Carbon-commits] [Carbon] svn commit r112175 - in trunk/carbon/components/bam: . org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence
tharindu at wso2.com
tharindu at wso2.com
Wed Aug 31 13:25:09 EDT 2011
Author: tharindu
Date: Wed Aug 31 10:25:09 2011
New Revision: 112175
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=112175
Log:
refactoring and cleaning up code
Modified:
trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java
trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java
trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java
trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java
trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java
trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java
trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java
trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java
trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java
trunk/carbon/components/bam/pom.xml
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.analyzer/src/main/java/org/wso2/carbon/bam/analyzer/analyzers/PutAnalyzer.java Wed Aug 31 10:25:09 2011
@@ -77,7 +77,7 @@
indexRow = getColumnFamilyIndexRow();
}
- store.startTransaction();
+ store.startBatchCommit();
if (rows != null) {
// Persist rows to column family
for (ResultRow row : rows) {
@@ -105,7 +105,7 @@
}
}
- store.finishTransaction();
+ store.endBatchCommit();
}
public void setAnalyzerSeqeunceName(String analyzerSequence) {
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.presentation/src/main/java/org/wso2/carbon/bam/presentation/QueryService.java Wed Aug 31 10:25:09 2011
@@ -19,10 +19,10 @@
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.axiom.om.OMText;
-import org.wso2.carbon.bam.utils.QueryIndex;
-import org.wso2.carbon.bam.utils.QueryUtils;
-import org.wso2.carbon.bam.utils.ResultColumn;
-import org.wso2.carbon.bam.utils.ResultRow;
+import org.wso2.carbon.bam.utils.persistence.QueryIndex;
+import org.wso2.carbon.bam.utils.persistence.QueryUtils;
+import org.wso2.carbon.bam.utils.persistence.ResultColumn;
+import org.wso2.carbon.bam.utils.persistence.ResultRow;
import org.wso2.carbon.bam.utils.config.CFConfigBean;
import org.wso2.carbon.bam.utils.config.KeyPart;
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/event/EventProcessor.java Wed Aug 31 10:25:09 2011
@@ -18,15 +18,12 @@
import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAPBody;
import org.apache.commons.collections.map.UnmodifiableMap;
-import org.wso2.carbon.bam.receiver.ReceiverConstants;
-
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class EventProcessor {
- protected Map<String, String> data;
private RawEvent rawEvent;
public EventProcessor(RawEvent rawEvent) {
@@ -43,6 +40,7 @@
Object object = eventChildren.next();
if (object instanceof OMElement) {
OMElement elem = (OMElement) object;
+
// map event elements, to column keys and values
// <key1>value1</key1> ----> { AnyRowKey : { key1 : value1 }
processedMap.put(elem.getLocalName(), elem.getText());
@@ -58,9 +56,9 @@
if (mandatoryDataMap == null) {
return;
}
- for (String mandatoryKey : mandatoryDataMap.keySet()) {
- if (!processedMap.containsKey(mandatoryKey)) {
- processedMap.put(mandatoryKey, mandatoryDataMap.get(mandatoryKey));
+ for (Map.Entry<String, String> mandatoryEntry : mandatoryDataMap.entrySet()) {
+ if (!processedMap.containsKey(mandatoryEntry.getKey())) {
+ processedMap.put(mandatoryEntry.getKey(), mandatoryEntry.getValue());
}
}
}
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/internal/BAMReceiverServiceComponent.java Wed Aug 31 10:25:09 2011
@@ -92,11 +92,11 @@
protected void setConfigurationContextService(ConfigurationContextService ccService) {
- ConfigurationContext serverCtx = ccService.getServerConfigContext();
- AxisConfiguration serverConfig = serverCtx.getAxisConfiguration();
- LocalTransportReceiver.CONFIG_CONTEXT = new ConfigurationContext(serverConfig);
- LocalTransportReceiver.CONFIG_CONTEXT.setServicePath("services");
- LocalTransportReceiver.CONFIG_CONTEXT.setContextRoot("local:/");
+// ConfigurationContext serverCtx = ccService.getServerConfigContext();
+// AxisConfiguration serverConfig = serverCtx.getAxisConfiguration();
+// LocalTransportReceiver.CONFIG_CONTEXT = new ConfigurationContext(serverConfig);
+// LocalTransportReceiver.CONFIG_CONTEXT.setServicePath("services");
+// LocalTransportReceiver.CONFIG_CONTEXT.setContextRoot("local:/");
ReceiverUtils.setConfigurationContextService(ccService);
if (log.isDebugEnabled()) {
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.receiver/src/main/java/org/wso2/carbon/bam/receiver/persistence/PersistenceManager.java Wed Aug 31 10:25:09 2011
@@ -42,11 +42,11 @@
private static NoSQLDataStore noSQLDataStore;
- private static List<CFConfigBean> cfConfigs;
+ private static List<CFConfigBean> columnFamilyConfigs;
private PersistenceManager() throws ConfigurationException {
noSQLDataStore = NoSQLDataStore.getNoSQLDataStore();
- cfConfigs = ConfigurationUtils.getCFConfigurations();
+ columnFamilyConfigs = ConfigurationUtils.getCFConfigurations();
}
public static PersistenceManager getManager() throws ConfigurationException {
@@ -56,14 +56,24 @@
return persistenceManager;
}
+ /**
+ * When given a map of column keys and column values this method will insert into necessary column families and to necessary rows
+ * @param cfData - Map where key is the column name and value is the column value
+ * @return success of operation
+ * @throws BAMReceiverException
+ */
public boolean persistEvent(Map<String, String> cfData) throws BAMReceiverException {
String defaultRowKey = null;
- noSQLDataStore.startTransaction();
+ // Note: Performance gain by starting a pseudo batch commit for Cassandra,
+ // One event, will only have one commit, independent of the number of CFs and Rows being written to
+ noSQLDataStore.startBatchCommit();
- for (CFConfigBean cfConfig : cfConfigs) {
+ // Iterate through all the CF configs and decide which one to insert event or index event
+ for (CFConfigBean cfConfig : columnFamilyConfigs) {
String rowKey;
+ // All events will be at least committed to the default CF, which usually is the Event CF
if (cfConfig.isDefaultCF()) {
defaultRowKey = createRowKey(cfConfig.getRowKeyParts(), cfConfig.getGranularity(),
cfData, true);
@@ -75,18 +85,23 @@
rowKey = createRowKey(cfConfig.getRowKeyParts(), cfConfig.getGranularity(), cfData,
false);
if (rowKey == null) {
- // this row key is does not apply to this event, skipt it
+ // this row key is does not apply to this event, skip it
continue;
}
+ // We are creating a data map, with the default row key as the column name, i.e. default row key is stored
+ // as a pointer
Map<String, String> nonDefaultDataMap = createNonDefaultDataMap(defaultRowKey);
noSQLDataStore.persistData(cfConfig.getCfName(), rowKey, nonDefaultDataMap);
+ // persist any indexes if they are given
noSQLDataStore.persistIndexes(cfConfig.getCfName(), cfConfig.getRowKeyParts(),
cfData);
if (cfConfig.getIndexRowKey() != null) {
-
+ // Cassandra does not sort in rows, but Cassandra columns are sorted.
+ // So we store a separate column, i.e. defaults to 'allKeys', that stores pointers to all the row keys
+ // in the same CF
Map<String, String> indexRowKeyDataMap = createNonDefaultDataMap(rowKey);
noSQLDataStore.persistData(cfConfig.getCfName(), cfConfig.getIndexRowKey(),
indexRowKeyDataMap);
@@ -95,57 +110,77 @@
}
}
- noSQLDataStore.finishTransaction();
+ noSQLDataStore.endBatchCommit();
return true;
}
+ /**
+ * This will create a map that will be used as a pointer to the entry created in the default CF (i.e. usually the Event CF)
+ * @param rowKeyToIndex String that is the row key to index - i.e. usually, the row key in the event CF
+ * @return
+ */
private Map<String, String> createNonDefaultDataMap(String rowKeyToIndex) {
- Map<String, String> map = new HashMap<String, String>();
- map.put(rowKeyToIndex, "");
- return UnmodifiableMap.decorate(map);
+ Map<String, String> nonDefaultMap = new HashMap<String, String>();
+ nonDefaultMap.put(rowKeyToIndex, "");
+ return UnmodifiableMap.decorate(nonDefaultMap);
}
+ /**
+ * Creates the row key which is probably the most important entry in an inserted row. This row key allows us to later find this
+ * row which will be an event or a pointer to the event
+ * @param rowKeyParts
+ * @param granularity
+ * @param cfData
+ * @param appendUUID
+ * @return
+ * @throws BAMReceiverException
+ */
private String createRowKey(List<KeyPart> rowKeyParts, String granularity,
Map<String, String> cfData, boolean appendUUID)
throws BAMReceiverException {
StringBuffer buffer = new StringBuffer();
- int i = 1;
-
- for (KeyPart rkPart : rowKeyParts) {
- if (cfData.containsKey(rkPart.getName())) {
- String rowKeyPartValue = cfData.get(rkPart.getName());
- // handle timestamp case
- if (rkPart.getName().equals(ReceiverConstants.TIMESTAMP_KEY_NAME)) {
+ for (int i = 0; i < rowKeyParts.size(); i++) {
+ KeyPart rowKeyPart = rowKeyParts.get(i);
+ String rowKeyPartName = rowKeyPart.getName();
+ if (cfData.containsKey(rowKeyPartName)) {
+ String rowKeyPartValue = cfData.get(rowKeyPartName);
+ // handle timestamp case according to granularity
+ if (rowKeyPartName.equals(ReceiverConstants.TIMESTAMP_KEY_NAME)) {
try {
+ // we use the time stamp factory to generate the time stamp according to granularity
rowKeyPartValue = TimeStampFactory.getFactory().getTimeStamp(
- cfData.get(rkPart.getName()), granularity);
+ cfData.get(rowKeyPartName), granularity);
} catch (ParseException e) {
throw new BAMReceiverException("Cannot parse time stamp : " +
- cfData.get(rkPart));
+ cfData.get(rowKeyPartName));
}
}
buffer.append(rowKeyPartValue);
- if (i != rowKeyParts.size()) {
+
+ // Skip appending row key delimiter for the last row key part
+ if ((i + 1) != rowKeyParts.size()) {
buffer.append("---");
}
- i++;
+
} else {
+ // if there is no column name that corresponds to the row key parts, that means this event should not be inserted,
+ // we return null, in that case
return null;
}
}
- if (appendUUID) {
+ // Add an optional uuid
+ if (appendUUID) {
buffer.append("---");
buffer.append(UUID.randomUUID());
-
}
buffer.trimToSize();
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/CassandraUtils.java Wed Aug 31 10:25:09 2011
@@ -40,6 +40,8 @@
import java.util.HashMap;
import java.util.Map;
+// This class is used as a workaround for not being able to use data access service for Cassandra
+// The code is directly copied from data access service classes
public class CassandraUtils {
private static final Log log = LogFactory.getLog(CassandraUtils.class);
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/NoSQLDataStore.java Wed Aug 31 10:25:09 2011
@@ -80,36 +80,35 @@
private KeyspaceDefinition bamKeyspaceDefinition;
- private Cluster initializeBAMNoSQLDataStore() throws ConfigurationException {
-
+ private synchronized Cluster initializeBAMNoSQLDataStore() throws ConfigurationException {
if (cluster == null) {
- synchronized (this) {
+// Todo: Use the data access service, once the related bug is fixed in Cassandra component
// cluster = ReceiverUtils.getDataAccessService().getCluster(new ClusterInformation("admin", "admin"));
- cluster = CassandraUtils.createCluster(new ClusterInformation("admin", "admin"));
+// Todo : Get rid of hard coded username/ password when implementing MT
+ cluster = CassandraUtils.createCluster(new ClusterInformation("admin", "admin"));
+ bamKeyspaceDefinition = cluster.describeKeyspace(BAM_KEYSPACE);
+ if (bamKeyspaceDefinition == null) {
+ cluster.addKeyspace(HFactory.createKeyspaceDefinition(BAM_KEYSPACE));
bamKeyspaceDefinition = cluster.describeKeyspace(BAM_KEYSPACE);
- if (bamKeyspaceDefinition == null) {
- cluster.addKeyspace(HFactory.createKeyspaceDefinition(BAM_KEYSPACE));
- bamKeyspaceDefinition = cluster.describeKeyspace(BAM_KEYSPACE);
- }
- setBamKeyspace(HFactory.createKeyspace(BAM_KEYSPACE, cluster));
-
- List<CFConfigBean> cfConfigurations = ConfigurationUtils.getCFConfigurations();
- for (CFConfigBean cfConfiguration : cfConfigurations) {
- createCF(cfConfiguration.getCfName());
- }
+ }
+ setBamKeyspace(HFactory.createKeyspace(BAM_KEYSPACE, cluster));
- // Create meta data column families
- createCF(META_COLUMN_FAMILY_NAME);
- createCF(INDEX_COLUMN_FAMILY_NAME);
- createCF(CURSORS_COLUMN_FAMILY_NAME);
+ List<CFConfigBean> cfConfigurations = ConfigurationUtils.getCFConfigurations();
+ for (CFConfigBean cfConfiguration : cfConfigurations) {
+ createCF(cfConfiguration.getCfName());
+ }
- persistColumnFamilyInformation(cfConfigurations);
+ // Create meta data column families
+ createCF(META_COLUMN_FAMILY_NAME);
+ createCF(INDEX_COLUMN_FAMILY_NAME);
+ createCF(CURSORS_COLUMN_FAMILY_NAME);
- }
+ persistColumnFamilyInformation(cfConfigurations);
}
return cluster;
}
+
public void persistColumnFamilyConfiguration(CFConfigBean cfConfig) {
List<CFConfigBean> config = new ArrayList<CFConfigBean>();
config.add(cfConfig);
@@ -162,9 +161,8 @@
Map<String, String> cfData = new HashMap<String, String>();
cfData.put(INDEX_ROW_KEY, indexRowKey);
cfData.put(GRANULARITY, granularity);
- cfData.put(DEFAULT_COLUMN_FAMILY, new Boolean(
- cfConfig.isDefaultCF()).toString());
- cfData.put(SECONDARY_COLUMN_FAMILY, new Boolean(cfConfig.isSecondaryCF()).toString());
+ cfData.put(DEFAULT_COLUMN_FAMILY, Boolean.valueOf(cfConfig.isDefaultCF()).toString());
+ cfData.put(SECONDARY_COLUMN_FAMILY, Boolean.valueOf(cfConfig.isSecondaryCF()).toString());
int counter = 0;
if (cfConfig.getRowKeyParts() != null) {
@@ -180,7 +178,7 @@
}
public void setLastCursorForColumnFamily(String cfName, String sequenceName,
- int analyzerIndex, String lastCursor) {
+ int analyzerIndex, String lastCursor) {
String key = sequenceName + analyzerIndex;
Map<String, String> cfData = new HashMap<String, String>();
@@ -190,39 +188,42 @@
}
- private ThreadLocal<Boolean> startTransaction = new ThreadLocal<Boolean>() {
+ private ThreadLocal<Boolean> startBatchCommit = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
- private ThreadLocal<Mutator> mutatorThreadLocal = new ThreadLocal<Mutator>();
- public void startTransaction() {
- startTransaction.set(true);
+ private ThreadLocal<Mutator<String>> mutatorThreadLocal = new ThreadLocal<Mutator<String>>();
+
+ public void startBatchCommit() {
+ startBatchCommit.set(true);
mutatorThreadLocal.set(HFactory.createMutator(getBamKeyspace(), stringSerializer));
}
- public void finishTransaction() {
+ public void endBatchCommit() {
mutatorThreadLocal.get().execute();
- startTransaction.set(false);
+ startBatchCommit.set(false);
}
public boolean persistData(String CFName, String rowKey, Map<String, String> data) {
if (!cfExists(CFName)) {
createCF(CFName);
}
+
Mutator<String> mutator;
- if (startTransaction.get()) {
+ if (startBatchCommit.get()) {
mutator = mutatorThreadLocal.get();
} else {
mutator = HFactory.createMutator(getBamKeyspace(), stringSerializer);
}
- for (String columnKey : data.keySet()) {
- mutator.addInsertion(rowKey, CFName, HFactory.createStringColumn(columnKey,
- data.get(columnKey)));
+
+ for (Map.Entry<String, String> column : data.entrySet()) {
+ mutator.addInsertion(rowKey, CFName, HFactory.createStringColumn(column.getKey(), column.getValue()));
}
- if (!startTransaction.get()) {
+
+ if (!startBatchCommit.get()) {
mutator.execute();
}
return true;
@@ -231,10 +232,7 @@
private List<String> cfList = new ArrayList<String>();
private boolean cfExists(String CFName) {
- if (cfList.contains(CFName)) {
- return true;
- }
- return false;
+ return cfList.contains(CFName);
}
private boolean createCF(String CFName) {
@@ -262,89 +260,6 @@
return true;
}
-// public void persistEvent(List<EventData> eventDataList) {
-// Cluster bamCluster = initializeBAMNoSQLDataStore();
-//
-// Mutator<String> mutator = HFactory.createMutator(getBamKeyspace(), stringSerializer );
-// Iterator<EventData> eventDataIterator = eventDataList.iterator();
-//
-// boolean setKey = false;
-// String superColumnUUID = null;
-// while (eventDataIterator.hasNext()) {
-// EventData eventData = eventDataIterator.next();
-// if (!setKey) {
-// superColumnUUID = getMessageKey(eventData);
-// setKey = true;
-// }
-// mutator.addInsertion(superColumnUUID,
-// ReceiverConstants.EVENT_COLUMNFAMILY_NAME,
-// HFactory.createSuperColumn(eventData.getDataCategory().name(),
-// createColumnList(eventData), stringSerializer, stringSerializer, stringSerializer));
-// if (!eventDataIterator.hasNext()) {
-//// dateSerializer = DateSerializer.get();
-//// mutator.addInsertion(superColumnUUID,
-//// ReceiverConstants.EVENT_COLUMNFAMILY_NAME,
-//// HFactory.createColumn(ReceiverConstants.MSG_RECEIVED_TIME_KEY, eventData.getTimeStamp(), stringSerializer, dateSerializer));
-// }
-// }
-// mutator.execute();
-//
-//// RangeSuperSlicesQuery<String, String, String, String> rangeQuery = HFactory.createRangeSuperSlicesQuery(getBamKeyspace(),
-//// stringSerializer, stringSerializer, stringSerializer, stringSerializer);
-//// rangeQuery.setColumnFamily(ReceiverConstants.EVENT_COLUMNFAMILY_NAME);
-//// rangeQuery.setRange("", "", false, 4);
-//// rangeQuery.setKeys("", "");
-//// rangeQuery.setRowCount(1000 * 1000);
-////
-////
-////
-////// SuperColumnQuery<String, String, String, String> query = HFactory.createSuperColumnQuery(getBamKeyspace(),
-////// stringSerializer, stringSerializer, stringSerializer, stringSerializer);
-//////
-////// query.setKey(null).setSuperName(DataType.CorrelationData.name()).setColumnFamily(ReceiverConstants.EVENT_COLUMNFAMILY_NAME);
-//// QueryResult<OrderedSuperRows<String, String, String, String>> result = rangeQuery.execute();
-//
-//
-//// for (SuperRow<String, String, String, String> superRow : result.get().getList()) {
-//// System.out.println("Super row : " + superRow + " \n");
-//// }
-//
-//// System.out.println("Number of entried in Cassandra : " + result.get().getCount() );
-////
-//// for (Iterator<SuperRow<String, String, String, String>> iterator = superRowList.iterator(); iterator.hasNext();) {
-//// SuperRow<String, String, String, String> superRow = iterator.next();
-//// for ()
-//// List<HSuperColumn<String, String, String>> superColumns = superRow.getSuperSlice().getSuperColumns();
-//// for (Iterator<HSuperColumn<String, String, String>> superColumnIterator = superColumns.iterator(); superColumnIterator.hasNext();) {
-////
-//// }
-//// }
-////
-//// for (int i = 0; i < superColumn.getSize(); i++) {
-//// HColumn<String, String> column = superColumn.get(i);
-//// System.out.println("Super Column name : " + superColumn.getName() + "Column name : " + column.getName() + "Column value : " + column.getValue() + "\n");
-////
-//// }
-//
-//
-// }
-//
-// private String getMessageKey(EventData data) {
-// return data.getMessageKey() + "-" + data.getTimeStamp() + "-" + UUID.randomUUID();
-// }
-//
-// private List<HColumn<String, String>> createColumnList(EventData data) {
-// Iterator<String> dataIterator = data.getData().keySet().iterator();
-// List<HColumn<String, String>> columnList = new ArrayList<HColumn<String, String>>();
-// while (dataIterator.hasNext()) {
-// String elemLocalPart = dataIterator.next();
-// columnList.add(HFactory.createColumn(elemLocalPart, data.getData().get(elemLocalPart), stringSerializer, stringSerializer));
-//
-// }
-// return columnList;
-// }
-
-
}
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/ResultRow.java Wed Aug 31 10:25:09 2011
@@ -39,10 +39,10 @@
}
public ResultColumn[] getColumns() {
- return columns;
+ return columns.clone();
}
public void setColumns(ResultColumn[] columns) {
- this.columns = columns;
+ this.columns = columns.clone();
}
}
Modified: trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java (original)
+++ trunk/carbon/components/bam/org.wso2.carbon.bam.utils/src/main/java/org/wso2/carbon/bam/utils/persistence/TimeStampFactory.java Wed Aug 31 10:25:09 2011
@@ -22,6 +22,9 @@
import java.util.HashMap;
import java.util.Map;
+/**
+ * Encapsulated time stamp creation according to granularity
+ */
public class TimeStampFactory {
private TimeStampFactory() {
@@ -39,26 +42,27 @@
private Map<String, Integer[]> granularityMap = new HashMap<String, Integer[]>();
+
private void populateGranularityMap() {
granularityMap.put("year", new Integer[]{Calendar.MINUTE, Calendar.SECOND, Calendar.HOUR_OF_DAY, Calendar.DAY_OF_MONTH, Calendar.MONTH});
granularityMap.put("month",new Integer[]{Calendar.MINUTE, Calendar.SECOND, Calendar.HOUR_OF_DAY, Calendar.DAY_OF_MONTH});
- granularityMap.put("hour", new Integer[]{Calendar.MINUTE, Calendar.SECOND});
granularityMap.put("day", new Integer[]{Calendar.MINUTE, Calendar.SECOND, Calendar.HOUR_OF_DAY});
+ granularityMap.put("hour", new Integer[]{Calendar.MINUTE, Calendar.SECOND});
granularityMap.put("minute", new Integer[]{Calendar.SECOND});
granularityMap.put("none", new Integer[]{0});
}
public String getTimeStamp(String eventTimeStamp, String granularity)
throws IllegalArgumentException, ParseException {
- //SimpleDateFormat format = new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy");
+
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (!granularityMap.containsKey(granularity)) {
throw new IllegalArgumentException("Invalid granularity value - " + granularity);
}
-
Date date = formatter.parse(eventTimeStamp);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
+ // Reduce values according to granularity
Integer[] deltaFactors = granularityMap.get(granularity);
for (Integer deltaFactor : deltaFactors) {
calendar.add(deltaFactor, calendar.get(deltaFactor) * -1);
Modified: trunk/carbon/components/bam/pom.xml
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam/pom.xml?rev=112175&r1=112174&r2=112175&view=diff
==============================================================================
--- trunk/carbon/components/bam/pom.xml (original)
+++ trunk/carbon/components/bam/pom.xml Wed Aug 31 10:25:09 2011
@@ -43,16 +43,16 @@
<activeByDefault>true</activeByDefault>
</activation>
<modules>
- <module>org.wso2.carbon.bam.util</module>
- <module>org.wso2.carbon.bam.core</module>
- <module>org.wso2.carbon.bam.cep</module>
- <module>org.wso2.carbon.bam.gauges.ui</module>
- <module>org.wso2.carbon.bam.common</module>
- <module>org.wso2.carbon.bam.ui</module>
- <!--<module>org.wso2.carbon.bam.analyzer</module>-->
- <!--<module>org.wso2.carbon.bam.receiver</module>-->
- <!--<module>org.wso2.carbon.bam.utils</module>-->
- <!--<module>org.wso2.carbon.bam.presentation</module>-->
+ <!--<module>org.wso2.carbon.bam.util</module>-->
+ <!--<module>org.wso2.carbon.bam.core</module>-->
+ <!--<module>org.wso2.carbon.bam.cep</module>-->
+ <!--<module>org.wso2.carbon.bam.gauges.ui</module>-->
+ <!--<module>org.wso2.carbon.bam.common</module>-->
+ <!--<module>org.wso2.carbon.bam.ui</module>-->
+ <module>org.wso2.carbon.bam.analyzer</module>
+ <module>org.wso2.carbon.bam.receiver</module>
+ <module>org.wso2.carbon.bam.utils</module>
+ <module>org.wso2.carbon.bam.presentation</module>
</modules>
</profile>
<profile>
More information about the Carbon-commits
mailing list