[Carbon-commits] [Carbon] svn commit r115469 - in trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher: . src/main/java/org/wso2/carbon/bam/service/data/publisher/data src/main/java/org/wso2/carbon/bam/service/data/publisher/internal src/main/java/org/wso2/carbon/bam/service/data/publisher/modules src/main/java/org/wso2/carbon/bam/service/data/publisher/pool src/main/java/org/wso2/carbon/bam/service/data/publisher/process src/main/java/org/wso2/carbon/bam/service/data/publisher/publish src/main/java/org/wso2/carbon/bam/service/data/publisher/queue src/main/java/org/wso2/carbon/bam/service/data/publisher/util

kasunw at wso2.com kasunw at wso2.com
Mon Nov 7 07:34:28 EST 2011


Author: kasunw
Date: Mon Nov  7 04:34:27 2011
New Revision: 115469
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=115469

Log:
implementing connection pool and extracting header information 

Added:
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPool.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPoolFactory.java
Modified:
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java
   trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml	Mon Nov  7 04:34:27 2011
@@ -64,6 +64,11 @@
             <version>3.2.1</version>
         </dependency>
         <dependency>
+            <groupId>org.wso2.carbon</groupId>
+            <artifactId>org.wso2.carbon.bam.data.publisher.util</artifactId>
+            <version>${pom.version}</version>
+        </dependency>
+        <dependency>
             <groupId>libthrift.wso2</groupId>
             <artifactId>libthrift</artifactId>
             <version>0.5.wso2v1</version>

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java	Mon Nov  7 04:34:27 2011
@@ -25,10 +25,65 @@
     private String msgBody;
     private Timestamp timestamp;
     private String messageDirection;
-    private String ipAddress;
     private String serviceName;
     private String operationName;
 
+    private String userAgent;
+    private String remoteAddress;
+    private String host;
+    private String contentType;
+    private String referer;
+    private String requestURL;
+
+
+    public String getUserAgent() {
+        return userAgent;
+    }
+
+    public void setUserAgent(String userAgent) {
+        this.userAgent = userAgent;
+    }
+
+    public String getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    public void setRemoteAddress(String remoteAddress) {
+        this.remoteAddress = remoteAddress;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public String getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+
+    public String getRequestURL() {
+        return requestURL;
+    }
+
+    public void setRequestURL(String requestURL) {
+        this.requestURL = requestURL;
+    }
+
+    public String getReferer() {
+        return referer;
+    }
+
+    public void setReferer(String referer) {
+        this.referer = referer;
+    }
+
     public String getMessageId() {
         return messageId;
     }
@@ -69,14 +124,6 @@
         this.messageDirection = messageDirection;
     }
 
-    public String getIpAddress() {
-        return ipAddress;
-    }
-
-    public void setIpAddress(String ipAddress) {
-        this.ipAddress = ipAddress;
-    }
-
     public String getOperationName() {
         return operationName;
     }

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java	Mon Nov  7 04:34:27 2011
@@ -31,6 +31,61 @@
     private String operationName;
     private Timestamp timestamp;
 
+    private String userAgent;
+    private String remoteAddress;
+    private String host;
+    private String contentType;
+    private String referer;
+    private String requestURL;
+
+    public String getRequestURL() {
+        return requestURL;
+    }
+
+    public void setRequestURL(String requestURL) {
+        this.requestURL = requestURL;
+    }
+
+    public String getReferer() {
+        return referer;
+    }
+
+    public void setReferer(String referer) {
+        this.referer = referer;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public String getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+
+    public String getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    public void setRemoteAddress(String remoteAddress) {
+        this.remoteAddress = remoteAddress;
+    }
+
+    public String getUserAgent() {
+        return userAgent;
+    }
+
+    public void setUserAgent(String userAgent) {
+        this.userAgent = userAgent;
+    }
+
     public Timestamp getTimestamp() {
         return timestamp;
     }

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java	Mon Nov  7 04:34:27 2011
@@ -21,6 +21,8 @@
 import org.apache.commons.logging.LogFactory;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
+import org.wso2.carbon.bam.data.publisher.util.PublisherUtil;
 import org.wso2.carbon.bam.service.data.publisher.conf.RegistryPersistenceManager;
 import org.wso2.carbon.bam.service.data.publisher.publish.DataPublisher;
 import org.wso2.carbon.bam.service.data.publisher.publish.DataPublisherUtil;
@@ -66,9 +68,12 @@
             bundleContext.registerService(Axis2ConfigurationContextObserver.class.getName(),
                                           new ServiceStatisticsAxis2ConfigurationContextObserver(), null);
 
+            PublisherConfiguration configuration = PublisherUtil.readConfigurationFromPublisherConfig();
+            DataPublisherUtil.setPublisherConfiguration(configuration);
+
             //use event publisher as the serviceStatsProcessor
             StatsProcessor statsProcessor = new DataPublisher();
-            serviceStatisticsQueue = new ServiceStatisticsQueue(statsProcessor);
+            serviceStatisticsQueue = new ServiceStatisticsQueue(statsProcessor, configuration);
             DataPublisherUtil.setServiceStatisticQueue(serviceStatisticsQueue);
 
             activityQueue = new ActivityQueue(statsProcessor);

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java	Mon Nov  7 04:34:27 2011
@@ -162,8 +162,6 @@
         }
         activityData.setMsgBody(msgBody);
         activityData.setMessageDirection(ActivityPublisherConstants.ACTIVITY_DATA_MESSAGE_DIRECTION_IN);
-        activityData.setIpAddress(
-                messageContext.getProperty(CommonConstants.PROPERTY_REMOTE_ADDRESS).toString());
         activityData.setServiceName(messageContext.getAxisService().getName());
         activityData.setOperationName(messageContext.getAxisOperation().getName().getLocalPart());
         activityData.setMessageId(messageContext.getMessageID());

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java	Mon Nov  7 04:34:27 2011
@@ -32,8 +32,10 @@
 import org.apache.axis2.description.WSDL2Constants;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.http.HTTPConstants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.BAMDataPublisherConstants;
 import org.wso2.carbon.bam.service.data.publisher.conf.EventingConfigData;
 import org.wso2.carbon.bam.service.data.publisher.data.ActivityData;
 import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
@@ -44,6 +46,7 @@
 import org.wso2.carbon.bam.service.data.publisher.util.TenantEventConfigData;
 import org.wso2.carbon.core.multitenancy.SuperTenantCarbonContext;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.xml.namespace.QName;
 import java.sql.Timestamp;
 import java.util.Date;
@@ -89,8 +92,17 @@
                     activityID = element.getAttributeValue(new QName(ActivityPublisherConstants.ACTIVITY_ID));
                 }
 
+
                 ActivityData activityData = addDetailsOfTheMessage(timestamp, activityID, messageContext);
                 BAMServerInfo bamServerInfo = addBAMServerInfo(eventingConfigData);
+
+                MessageContext currentMessageContext = MessageContext.getCurrentMessageContext();
+                if (currentMessageContext != null) {
+                    Object requestProperty = currentMessageContext.getProperty(
+                            HTTPConstants.MC_HTTP_SERVLETREQUEST);
+                    extractInfoFromHttpHeaders(activityData, requestProperty);
+                }
+
                 PublishData publishData = new PublishData();
                 publishData.setActivityData(activityData);
                 publishData.setBamServerInfo(bamServerInfo);
@@ -107,6 +119,23 @@
         return InvocationResponse.CONTINUE;
     }
 
+    private void extractInfoFromHttpHeaders(ActivityData activityData, Object requestProperty) {
+
+        if (requestProperty instanceof HttpServletRequest) {
+            HttpServletRequest httpServletRequest = (HttpServletRequest) requestProperty;
+            activityData.setRequestURL(httpServletRequest.getRequestURL().toString());
+            activityData.setRemoteAddress(httpServletRequest.getRemoteAddr());
+            activityData.setContentType(httpServletRequest.getContentType());
+            activityData.setUserAgent(httpServletRequest.getHeader(
+                    BAMDataPublisherConstants.HTTP_HEADER_USER_AGENT));
+            activityData.setHost(httpServletRequest.getHeader(
+                    BAMDataPublisherConstants.HTTP_HEADER_HOST));
+            activityData.setReferer(httpServletRequest.getHeader(
+                    BAMDataPublisherConstants.HTTP_HEADER_REFERER));
+        }
+
+    }
+
     private BAMServerInfo addBAMServerInfo(EventingConfigData eventingConfigData) {
         BAMServerInfo bamServerInfo = new BAMServerInfo();
         bamServerInfo.setBamServerURL(eventingConfigData.getUrl());
@@ -174,10 +203,6 @@
         activityData.setActivityId(activityID);
         activityData.setOperationName(messageContext.getAxisService().getName());
         activityData.setServiceName(messageContext.getAxisOperation().getName().getLocalPart());
-        if (messageContext.getProperty(CommonConstants.PROPERTY_REMOTE_ADDRESS) != null) {
-            activityData.setIpAddress(messageContext.getProperty(
-                    CommonConstants.PROPERTY_REMOTE_ADDRESS).toString());
-        }
         activityData.setMessageId(messageContext.getMessageID());
         activityData.setMessageDirection(ActivityPublisherConstants.ACTIVITY_DATA_MESSAGE_DIRECTION_OUT);
         activityData.setMsgBody(messageContext.getEnvelope().getBody().toString());

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java	Mon Nov  7 04:34:27 2011
@@ -21,8 +21,10 @@
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.engine.Handler;
 import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.http.HTTPConstants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.BAMDataPublisherConstants;
 import org.wso2.carbon.bam.service.data.publisher.conf.EventingConfigData;
 import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
 import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
@@ -36,6 +38,8 @@
 import org.wso2.carbon.statistics.services.util.ServiceStatistics;
 import org.wso2.carbon.statistics.services.util.SystemStatistics;
 
+import javax.mail.Message;
+import javax.servlet.http.HttpServletRequest;
 import java.sql.Timestamp;
 import java.util.Date;
 import java.util.Map;
@@ -44,7 +48,6 @@
 
     private static Log log = LogFactory.getLog(StatisticsHandler.class);
 
-
     @Override
     public Handler.InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
 
@@ -80,6 +83,13 @@
                     return Handler.InvocationResponse.CONTINUE;
                 }
 
+                MessageContext currentMessageContext = MessageContext.getCurrentMessageContext();
+                if (currentMessageContext != null) {
+                    Object requestProperty = currentMessageContext.getProperty(
+                            HTTPConstants.MC_HTTP_SERVLETREQUEST);
+                    extractInfoFromHttpHeaders(statisticData, requestProperty);
+                }
+
                 serviceStatistics = systemStatisticsUtil.getServiceStatistics(msgContext.getAxisService());
 
                 statisticData.setSystemStatistics(systemStatistics);
@@ -105,6 +115,23 @@
         return Handler.InvocationResponse.CONTINUE;
     }
 
+    private void extractInfoFromHttpHeaders(StatisticData statisticData, Object requestProperty) {
+
+        if (requestProperty instanceof HttpServletRequest) {
+            HttpServletRequest httpServletRequest = (HttpServletRequest) requestProperty;
+            statisticData.setRequestURL(httpServletRequest.getRequestURL().toString());
+            statisticData.setRemoteAddress(httpServletRequest.getRemoteAddr());
+            statisticData.setContentType(httpServletRequest.getContentType());
+            statisticData.setUserAgent(httpServletRequest.getHeader(
+                    BAMDataPublisherConstants.HTTP_HEADER_USER_AGENT));
+            statisticData.setHost(httpServletRequest.getHeader(
+                    BAMDataPublisherConstants.HTTP_HEADER_HOST));
+            statisticData.setReferer(httpServletRequest.getHeader(
+                    BAMDataPublisherConstants.HTTP_HEADER_REFERER));
+        }
+
+    }
+
     private BAMServerInfo addBAMServerInfo(EventingConfigData eventingConfigData) {
         BAMServerInfo bamServerInfo = new BAMServerInfo();
         bamServerInfo.setBamServerURL(eventingConfigData.getUrl());

Added: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPool.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPool.java?pathrev=115469
==============================================================================
--- (empty file)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPool.java	Mon Nov  7 04:34:27 2011
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.wso2.carbon.bam.service.data.publisher.pool;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+
+public class TFramedTransportPool {
+
+    private static volatile GenericKeyedObjectPool socketPool = null;
+
+    public static GenericKeyedObjectPool getClientPool(KeyedPoolableObjectFactory factory,
+                                                       int maxActive,
+                                                       int maxIdle,
+                                                       boolean testOnBorrow,
+                                                       long timeBetweenEvictionRunsMillis,
+                                                       long minEvictableIdleTimeMillis) {
+        if (socketPool == null) {
+            synchronized (TFramedTransportPool.class) {
+                if (socketPool == null) {
+                    socketPool = new GenericKeyedObjectPool();
+                    socketPool.setFactory(factory);
+                    socketPool.setMaxActive(maxActive);
+                    socketPool.setTestOnBorrow(testOnBorrow);
+                    socketPool.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
+                    socketPool.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
+                    socketPool.setMaxIdle(maxIdle);
+                    socketPool.setWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);
+                }
+            }
+        }
+        return socketPool;
+    }
+
+}

Added: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPoolFactory.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPoolFactory.java?pathrev=115469
==============================================================================
--- (empty file)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPoolFactory.java	Mon Nov  7 04:34:27 2011
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.wso2.carbon.bam.service.data.publisher.pool;
+
+import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+
+public class TFramedTransportPoolFactory extends BaseKeyedPoolableObjectFactory {
+
+    @Override
+    public TTransport makeObject(Object key) throws Exception {
+        TTransport receiverTransport = new TFramedTransport(new TSocket(key.toString(), 7611));
+        receiverTransport.open();
+        return receiverTransport;
+    }
+
+    @Override
+    public boolean validateObject(Object key, Object obj) {
+        TTransport receiverTransport= (TTransport)obj;
+        boolean isOpen = receiverTransport.isOpen();
+        return isOpen;
+    }
+
+
+}

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java	Mon Nov  7 04:34:27 2011
@@ -17,13 +17,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.BAMDataPublisherConstants;
+import org.wso2.carbon.bam.service.Event;
 import org.wso2.carbon.bam.service.data.publisher.data.ActivityData;
 import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
 import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
 import org.wso2.carbon.bam.service.data.publisher.publish.StatsProcessor;
-import org.wso2.carbon.bam.service.data.publisher.util.ActivityPublisherConstants;
-import org.wso2.carbon.bam.service.data.publisher.util.CommonConstants;
-import org.wso2.carbon.bam.service.Event;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -91,34 +90,60 @@
 
     private void addCorrelationData(Map<String, ByteBuffer> correlationData,
                                     ActivityData activityData) {
-        correlationData.put(ActivityPublisherConstants.MSG_ACTIVITY_ID, ByteBuffer.wrap(
+        correlationData.put(BAMDataPublisherConstants.MSG_ACTIVITY_ID, ByteBuffer.wrap(
                 activityData.getActivityId().getBytes()));
-        correlationData.put(ActivityPublisherConstants.MSG_ID, ByteBuffer.wrap(
+        correlationData.put(BAMDataPublisherConstants.MSG_ID, ByteBuffer.wrap(
                 activityData.getMessageId().getBytes()));
     }
 
     private void addMetaData(Map<String, ByteBuffer> metaData, ActivityData activityData) {
-        String ipAddress = activityData.getIpAddress();
-        if (ipAddress != null) {
-            metaData.put(CommonConstants.IP_ADDRESS, ByteBuffer.wrap(ipAddress.getBytes()));
+
+        String remoteAddress = activityData.getRemoteAddress();
+        String host = activityData.getHost();
+        String contentType = activityData.getContentType();
+        String referer = activityData.getReferer();
+        String userAgent = activityData.getUserAgent();
+        String requestURL = activityData.getRequestURL();
+
+        if (remoteAddress != null) {
+            metaData.put(BAMDataPublisherConstants.REMOTE_ADDRESS, ByteBuffer.wrap(
+                    remoteAddress.getBytes()));
+        }
+        if (host != null) {
+            metaData.put(BAMDataPublisherConstants.HOST, ByteBuffer.wrap(
+                    host.getBytes()));
+        }
+        if (contentType != null) {
+            metaData.put(BAMDataPublisherConstants.CONTENT_TYPE, ByteBuffer.wrap(
+                    contentType.getBytes()));
+        }
+        if (referer != null) {
+            metaData.put(BAMDataPublisherConstants.REFERER, ByteBuffer.wrap(
+                    referer.getBytes()));
+        }
+        if (userAgent != null) {
+            metaData.put(BAMDataPublisherConstants.USER_AGENT, ByteBuffer.wrap(
+                    userAgent.getBytes()));
+        }
+        if (requestURL != null) {
+            metaData.put(BAMDataPublisherConstants.REQUEST_URL, ByteBuffer.wrap(
+                    requestURL.getBytes()));
         }
     }
 
     private void addEventData(Map<String, ByteBuffer> eventData,
                               ActivityData activityData) {
-        eventData.put(ActivityPublisherConstants.MSG_ACTIVITY_ID, ByteBuffer.wrap(
-                activityData.getActivityId().getBytes()));
-        eventData.put(CommonConstants.TIMESTAMP, ByteBuffer.wrap(
+        eventData.put(BAMDataPublisherConstants.TIMESTAMP, ByteBuffer.wrap(
                 activityData.getTimestamp().toString().getBytes()));
         if (activityData.getMsgBody() != null) {
-            eventData.put(ActivityPublisherConstants.MSG_BODY, ByteBuffer.wrap(
+            eventData.put(BAMDataPublisherConstants.MSG_BODY, ByteBuffer.wrap(
                     activityData.getMsgBody().getBytes()));
         }
-        eventData.put(ActivityPublisherConstants.MSG_DIRECTION, ByteBuffer.wrap(
+        eventData.put(BAMDataPublisherConstants.MSG_DIRECTION, ByteBuffer.wrap(
                 activityData.getMessageDirection().getBytes()));
-        eventData.put(CommonConstants.SERVICE_NAME, ByteBuffer.wrap(
+        eventData.put(BAMDataPublisherConstants.SERVICE_NAME, ByteBuffer.wrap(
                 activityData.getServiceName().getBytes()));
-        eventData.put(CommonConstants.OPERATION_NAME, ByteBuffer.wrap(
+        eventData.put(BAMDataPublisherConstants.OPERATION_NAME, ByteBuffer.wrap(
                 activityData.getOperationName().getBytes()));
 
     }

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java	Mon Nov  7 04:34:27 2011
@@ -17,11 +17,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.BAMDataPublisherConstants;
 import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
 import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
 import org.wso2.carbon.bam.service.data.publisher.data.StatisticData;
 import org.wso2.carbon.bam.service.data.publisher.publish.StatsProcessor;
-import org.wso2.carbon.bam.service.data.publisher.util.CommonConstants;
 import org.wso2.carbon.bam.service.data.publisher.util.ServiceStatisticsPublisherConstants;
 import org.wso2.carbon.bam.service.Event;
 import org.wso2.carbon.statistics.services.util.SystemStatistics;
@@ -74,31 +74,46 @@
 
         StatisticData statisticData = publishData.getStatisticData();
 
-        Map<String, ByteBuffer> correlation = new HashMap<String, ByteBuffer>();
-        Map<String, ByteBuffer> meta = new HashMap<String, ByteBuffer>();
+        Map<String, ByteBuffer> correlationData = new HashMap<String, ByteBuffer>();
+        Map<String, ByteBuffer> metaData = new HashMap<String, ByteBuffer>();
         Map<String, ByteBuffer> eventData = new HashMap<String, ByteBuffer>();
 
         addDataIntoEventData(eventData, statisticData);
+        addDataIntoMetaData(metaData, statisticData);
 
         Event event = new Event();
-        event.setCorrelation(correlation);
-        event.setMeta(meta);
+        event.setCorrelation(correlationData);
+        event.setMeta(metaData);
         event.setEvent(eventData);
 
         return event;
     }
 
+    private void addDataIntoMetaData(Map<String, ByteBuffer> metaData,
+                                     StatisticData statisticData) {
+        metaData.put(BAMDataPublisherConstants.REMOTE_ADDRESS, ByteBuffer.wrap(
+                statisticData.getRemoteAddress().getBytes()));
+        metaData.put(BAMDataPublisherConstants.HOST, ByteBuffer.wrap(
+                statisticData.getHost().getBytes()));
+        metaData.put(BAMDataPublisherConstants.CONTENT_TYPE, ByteBuffer.wrap(
+                statisticData.getContentType().getBytes()));
+        metaData.put(BAMDataPublisherConstants.REFERER, ByteBuffer.wrap(
+                statisticData.getReferer().getBytes()));
+        metaData.put(BAMDataPublisherConstants.USER_AGENT, ByteBuffer.wrap(
+                statisticData.getUserAgent().getBytes()));
+        metaData.put(BAMDataPublisherConstants.REQUEST_URL, ByteBuffer.wrap(
+                statisticData.getRequestURL().getBytes()));
+    }
+
     private void addDataIntoEventData(Map<String, ByteBuffer> eventData, StatisticData statistic) {
 
         SystemStatistics systemStatistics = statistic.getSystemStatistics();
-/*        ServiceStatistics serviceStatistics = statistic.getServiceStatistics();
-        OperationStatistics operationStatistics = statistic.getOperationStatistics();*/
 
         eventData.put(ServiceStatisticsPublisherConstants.SERVER_NAME, ByteBuffer.wrap(
                 systemStatistics.getServerName().getBytes()));
-        eventData.put(CommonConstants.SERVICE_NAME, ByteBuffer.wrap(
+        eventData.put(BAMDataPublisherConstants.SERVICE_NAME, ByteBuffer.wrap(
                 statistic.getServiceName().getBytes()));
-        eventData.put(CommonConstants.OPERATION_NAME, ByteBuffer.wrap(
+        eventData.put(BAMDataPublisherConstants.OPERATION_NAME, ByteBuffer.wrap(
                 statistic.getOperationName().getBytes()));
 
         eventData.put(ServiceStatisticsPublisherConstants.TOTAL_SYSTEM_AVG_RESPONSE_TIME, ByteBuffer.wrap(
@@ -121,7 +136,7 @@
         eventData.put(ServiceStatisticsPublisherConstants.FAULT_COUNT, ByteBuffer.wrap(
                 Integer.toString(systemStatistics.getCurrentInvocationFaultCount()).getBytes()));
 
-        eventData.put(CommonConstants.TIMESTAMP, ByteBuffer.wrap(
+        eventData.put(BAMDataPublisherConstants.TIMESTAMP, ByteBuffer.wrap(
                 statistic.getTimestamp().toString().getBytes()));
 
     }

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java	Mon Nov  7 04:34:27 2011
@@ -17,16 +17,21 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.THttpClient;
+import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
+import org.wso2.carbon.bam.data.publisher.util.stats.AtomicIntSingleton;
 import org.wso2.carbon.bam.service.Event;
 import org.wso2.carbon.bam.service.ReceiverService;
 import org.wso2.carbon.bam.service.SessionTimeOutException;
-import org.wso2.carbon.bam.service.data.publisher.conf.EventingConfigData;
 import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
+import org.wso2.carbon.bam.service.data.publisher.pool.TFramedTransportPool;
+import org.wso2.carbon.bam.service.data.publisher.pool.TFramedTransportPoolFactory;
 
 import java.util.ArrayList;
 
@@ -35,33 +40,138 @@
 
     private static Log log = LogFactory.getLog(DataPublisher.class);
 
-    public void process(ArrayList<Event> events,BAMServerInfo bamServerInfo) {
+    boolean isSocketTransportUsed = true;
+
+    public void process(ArrayList<Event> events, BAMServerInfo bamServerInfo) {
+        if (isSocketTransportUsed) {
+            publishUsingTSocketTransport(events, bamServerInfo);
+        } else {
+            publishUsingHttp(events, bamServerInfo);
+        }
+    }
+
+    private void publishUsingTSocketTransport(ArrayList<Event> events,
+                                              BAMServerInfo bamServerInfo) {
+        int i = 0;
+        TTransport transport = null;
+        String sessionId = ThriftUtil.getSessionId(bamServerInfo);
+        PublisherConfiguration configuration = DataPublisherUtil.getPublisherConfiguration();
+        GenericKeyedObjectPool transportPool = TFramedTransportPool.getClientPool(
+                new TFramedTransportPoolFactory(), configuration.getMaxPoolSize(),
+                configuration.getMaxIdleConnections(), true, configuration.getEvictionTimePeriod(),
+                configuration.getMinIdleTimeInPool());
+        try {
+            transport = (TTransport) transportPool.borrowObject("localhost");
+            TProtocol protocol = new TCompactProtocol(transport);
+
+            ReceiverService.Client receiverClient = new ReceiverService.Client(protocol);
+            for (Event event : events) {
+                receiverClient.publish(event, sessionId);
+                if (log.isDebugEnabled()) {
+                    AtomicIntSingleton.getAtomicInteger().incrementAndGet();
+                }
+                i++;
+            }
+        } catch (TTransportException e) {
+            log.warn("TransportException, retrying to publish again..", e);
+            //Need to clear connection with correct key
+            transportPool.clear("localhost");
+            publishRetryUsingTSocket(events, i, bamServerInfo, transportPool);
+        } catch (TException e) {
+            log.error("Unable to publish event to BAM", e);
+        } catch (SessionTimeOutException e) {
+            publishRetryUsingTSocket(events, i, bamServerInfo, transportPool);
+            log.warn("Session Timeout, retrying .........");
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                transportPool.returnObject("localhost", transport);
+            } catch (Exception e) {
+                log.warn("Error occurred while returning object to connection pool");
+            }
+        }
+    }
+
+    private void publishRetryUsingTSocket(ArrayList<Event> events, int i,
+                                          BAMServerInfo bamServerInfo,
+                                          GenericKeyedObjectPool transportPool) {
+        ArrayList<Event> newEventList = new ArrayList<Event>();
+        TTransport transport = null;
+        for (int j = i; j < events.size(); j++) {
+            newEventList.add(events.get(j));
+        }
+
+        ThriftUtil.setSessionId(null);
+
+        for (int k = 0; k < 30; k++) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                // Restoring the interrupted status after catching InterruptedException
+                // instead of Swallowing
+                Thread.currentThread().interrupt();
+            }
+            TProtocol protocol = null;
+            try {
+                String sessionId = ThriftUtil.getSessionId(bamServerInfo);
+                transport = (TTransport) transportPool.borrowObject("localhost");
+                protocol = new TCompactProtocol(transport);
+                ReceiverService.Client senderClient = new ReceiverService.Client(protocol);
+                for (Event event : events) {
+                    senderClient.publish(event, sessionId);
+                    if (log.isDebugEnabled()) {
+                        AtomicIntSingleton.getAtomicInteger().incrementAndGet();
+                    }
+                    i++;
+                }
+                return;
+            } catch (TTransportException e) {
+                log.error("Unable to publish event to BAM", e);
+            } catch (TException e) {
+                log.error("Unable to publish event to BAM", e);
+            } catch (SessionTimeOutException e) {
+                log.warn("Session Timeout, retrying .........");
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                try {
+                    transportPool.returnObject("localhost", transport);
+                } catch (Exception e) {
+                    log.warn("Error occurred while returning object to connection pool");
+                }
+            }
+        }
+    }
+
+    private void publishUsingHttp(ArrayList<Event> events, BAMServerInfo bamServerInfo) {
         THttpClient client = null;
         TProtocol protocol = null;
-        EventingConfigData eventingConfigData =null;
         String sessionId = ThriftUtil.getSessionId(bamServerInfo);
         int i = 0;
         try {
             client = new THttpClient(bamServerInfo.getBamServerURL() + "thriftReceiver");
-            protocol = new TBinaryProtocol(client);
+            protocol = new TCompactProtocol(client);
         } catch (TTransportException e) {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            e.printStackTrace();
         }
         ReceiverService.Client receiverClient = new ReceiverService.Client(protocol);
 
         try {
             client.open();
             for (Event event : events) {
-                //System.out.println("Published");
                 receiverClient.publish(event, sessionId);
+                if (log.isDebugEnabled()) {
+                    AtomicIntSingleton.getAtomicInteger().incrementAndGet();
+                }
                 i++;
             }
         } catch (TTransportException e) {
-            e.printStackTrace();
+            log.error("Unable to publish event to BAM", e);
         } catch (TException e) {
-            e.printStackTrace();
+            log.error("Unable to publish event to BAM", e);
         } catch (SessionTimeOutException e) {
-            publishRetry(events,bamServerInfo, i);
+            publishRetryUsingHttp(events, bamServerInfo, i);
             log.warn("Session Timeout, retrying .........");
         } finally {
             client.close();
@@ -69,7 +179,8 @@
     }
 
 
-    private void publishRetry(ArrayList<Event> events, BAMServerInfo bamServerInfo, int i) {
+    private void publishRetryUsingHttp(ArrayList<Event> events, BAMServerInfo bamServerInfo,
+                                       int i) {
 
         ArrayList<Event> newEventList = new ArrayList<Event>();
         for (int j = i; j < events.size(); j++) {
@@ -91,23 +202,21 @@
             String sessionId = ThriftUtil.getSessionId(bamServerInfo);
             try {
                 client = new THttpClient(bamServerInfo.getBamServerURL() + "thriftReceiver");
-                protocol = new TBinaryProtocol(client);
-            } catch (TTransportException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            }
-            ReceiverService.Client receiverClient = new ReceiverService.Client(protocol);
+                protocol = new TCompactProtocol(client);
+                ReceiverService.Client receiverClient = new ReceiverService.Client(protocol);
 
-            try {
                 client.open();
                 for (Event event : events) {
-                    //System.out.println("Published");
                     receiverClient.publish(event, sessionId);
+                    if (log.isDebugEnabled()) {
+                        AtomicIntSingleton.getAtomicInteger().incrementAndGet();
+                    }
                 }
                 return;
             } catch (TTransportException e) {
-                e.printStackTrace();
+                log.error("Unable to publish event to BAM", e);
             } catch (TException e) {
-                e.printStackTrace();
+                log.error("Unable to publish event to BAM", e);
             } catch (SessionTimeOutException e) {
                 log.warn("Session Timeout, retrying .........");
             } finally {
@@ -118,6 +227,6 @@
 
 
     public void destroy() {
-        //To change body of implemented methods use File | Settings | File Templates.
+
     }
 }

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java	Mon Nov  7 04:34:27 2011
@@ -16,6 +16,7 @@
 package org.wso2.carbon.bam.service.data.publisher.publish;
 
 
+import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
 import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
 import org.wso2.carbon.bam.service.data.publisher.queue.ActivityQueue;
 import org.wso2.carbon.bam.service.data.publisher.queue.ServiceStatisticsQueue;
@@ -25,6 +26,8 @@
     private static ServiceStatisticsQueue serviceStatsQueue;
     private static ActivityQueue activityInStatsQueue;
 
+    private static PublisherConfiguration publisherConfiguration;
+
     public static void publishServiceStats(PublishData publishData) {
         serviceStatsQueue.enqueue(publishData);
     }
@@ -40,4 +43,12 @@
     public static void setActivityInQueue(ActivityQueue activityInQueue) {
         activityInStatsQueue = activityInQueue;
     }
+
+    public static void setPublisherConfiguration(PublisherConfiguration configuration) {
+        publisherConfiguration = configuration;
+    }
+
+    public static PublisherConfiguration getPublisherConfiguration() {
+        return publisherConfiguration;
+    }
 }

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java	Mon Nov  7 04:34:27 2011
@@ -16,7 +16,7 @@
 package org.wso2.carbon.bam.service.data.publisher.publish;
 
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.THttpClient;
 import org.apache.thrift.transport.TTransport;
@@ -35,7 +35,7 @@
     public static String getSessionId(BAMServerInfo bamServerInfo) {
 
         TTransport client = getClient(bamServerInfo);
-        TProtocol protocol = new TBinaryProtocol(client);
+        TProtocol protocol = new TCompactProtocol(client);
         try {
             if (sessionId == null) {
                 synchronized (ThriftUtil.class) {

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java	Mon Nov  7 04:34:27 2011
@@ -18,6 +18,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
 import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
 import org.wso2.carbon.bam.service.data.publisher.process.ServiceStatsWorker;
 import org.wso2.carbon.bam.service.data.publisher.publish.StatsProcessor;
@@ -33,22 +34,24 @@
 
     private static final Log log = LogFactory.getLog(ServiceStatisticsQueue.class);
 
-    private BlockingQueue<Runnable> runnableQueue = new ArrayBlockingQueue<Runnable>(100);
-    private Queue<PublishData> statisticsQueue = new ArrayBlockingQueue<PublishData>(6000);
+    private BlockingQueue<Runnable> runnableQueue;
+    private Queue<PublishData> statisticsQueue;
     private ThreadPoolExecutor threadPool = null;
 
-    int poolSize = 30;
-    int maxPoolSize = 150;
-    long keepAliveTime = 10;
+    long keepAliveTime = 20;
 
     private StatsProcessor serviceStatsProcessor;
     private boolean shutdown = false;
 
-    public ServiceStatisticsQueue(StatsProcessor serviceStatisticProcessor) {
+
+    public ServiceStatisticsQueue(StatsProcessor serviceStatisticProcessor,
+                                  PublisherConfiguration configuration) {
+        runnableQueue = new ArrayBlockingQueue<Runnable>(configuration.getTaskQueueSize());
+        statisticsQueue = new ArrayBlockingQueue<PublishData>(configuration.getEventQueueSize());
         this.serviceStatsProcessor = serviceStatisticProcessor;
-        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
+        threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(),
                                             keepAliveTime, TimeUnit.SECONDS, runnableQueue);
-        threadPool.allowCoreThreadTimeOut(true);
+        threadPool.allowCoreThreadTimeOut(false);
     }
 
 
@@ -89,27 +92,4 @@
         threadPool.shutdownNow();
         serviceStatsProcessor.destroy();
     }
-
-
-/*    private class ServiceStatsWorker implements Runnable {
-
-        public void run() {
-            clearStatisticDataInQueue(statisticsQueue.size());
-        }
-
-        private void clearStatisticDataInQueue(int size) {
-            if (log.isDebugEnabled()) {
-                log.debug("Number of events in queue : " + size);
-            }
-            ArrayList<Event> eventList= new ArrayList<Event>();
-            for (int i = 0; i < size; i++) {
-                StatisticData statisticData = statisticsQueue.poll();
-                //Sometimes other thread may get the last queue object
-                if(statisticData!=null){
-                    ConstructEvent
-                }
-            }
-            serviceStatsProcessor.process(sts);
-        }
-    }*/
 }

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java	Mon Nov  7 04:34:27 2011
@@ -27,11 +27,6 @@
     public static final String ACTIVITY_DATA_MESSAGE_DIRECTION_IN = "Request";
     public static final String ACTIVITY_DATA_MESSAGE_DIRECTION_OUT = "Response";
 
-    public static final String MSG_ACTIVITY_ID = "bam_activity_id";
-    public static final String MSG_ID = "message_id";
-    public static final String MSG_BODY = "message_body";
-    public static final String MSG_DIRECTION = "message_direction";
-
     public static final String ENABLE_ACTIVITY = "EnableActivity";
 
 

Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java	(original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java	Mon Nov  7 04:34:27 2011
@@ -20,12 +20,7 @@
 
 public class CommonConstants {
 
-    public static final String TIMESTAMP = "timestamp";
     public static final String IP_ADDRESS = "ip_address";
-    public static final String SERVICE_NAME = "service_name";
-    public static final String OPERATION_NAME = "operationName";
-
-    public static final String PROPERTY_REMOTE_ADDRESS = "REMOTE_ADDR";
 
     public static final String ADMIN_SERVICE_PARAMETER = "adminService";
     public static final String HIDDEN_SERVICE_PARAMETER = "hiddenService";


More information about the Carbon-commits mailing list