[Carbon-commits] [Carbon] svn commit r115469 - in trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher: . src/main/java/org/wso2/carbon/bam/service/data/publisher/data src/main/java/org/wso2/carbon/bam/service/data/publisher/internal src/main/java/org/wso2/carbon/bam/service/data/publisher/modules src/main/java/org/wso2/carbon/bam/service/data/publisher/pool src/main/java/org/wso2/carbon/bam/service/data/publisher/process src/main/java/org/wso2/carbon/bam/service/data/publisher/publish src/main/java/org/wso2/carbon/bam/service/data/publisher/queue src/main/java/org/wso2/carbon/bam/service/data/publisher/util
kasunw at wso2.com
kasunw at wso2.com
Mon Nov 7 07:34:28 EST 2011
Author: kasunw
Date: Mon Nov 7 04:34:27 2011
New Revision: 115469
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=115469
Log:
implementing connection pool and extracting header information
Added:
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPool.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPoolFactory.java
Modified:
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java
trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/pom.xml Mon Nov 7 04:34:27 2011
@@ -64,6 +64,11 @@
<version>3.2.1</version>
</dependency>
<dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.bam.data.publisher.util</artifactId>
+ <version>${pom.version}</version>
+ </dependency>
+ <dependency>
<groupId>libthrift.wso2</groupId>
<artifactId>libthrift</artifactId>
<version>0.5.wso2v1</version>
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/ActivityData.java Mon Nov 7 04:34:27 2011
@@ -25,10 +25,65 @@
private String msgBody;
private Timestamp timestamp;
private String messageDirection;
- private String ipAddress;
private String serviceName;
private String operationName;
+ private String userAgent;
+ private String remoteAddress;
+ private String host;
+ private String contentType;
+ private String referer;
+ private String requestURL;
+
+
+ public String getUserAgent() {
+ return userAgent;
+ }
+
+ public void setUserAgent(String userAgent) {
+ this.userAgent = userAgent;
+ }
+
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public void setRemoteAddress(String remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getRequestURL() {
+ return requestURL;
+ }
+
+ public void setRequestURL(String requestURL) {
+ this.requestURL = requestURL;
+ }
+
+ public String getReferer() {
+ return referer;
+ }
+
+ public void setReferer(String referer) {
+ this.referer = referer;
+ }
+
public String getMessageId() {
return messageId;
}
@@ -69,14 +124,6 @@
this.messageDirection = messageDirection;
}
- public String getIpAddress() {
- return ipAddress;
- }
-
- public void setIpAddress(String ipAddress) {
- this.ipAddress = ipAddress;
- }
-
public String getOperationName() {
return operationName;
}
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/data/StatisticData.java Mon Nov 7 04:34:27 2011
@@ -31,6 +31,61 @@
private String operationName;
private Timestamp timestamp;
+ private String userAgent;
+ private String remoteAddress;
+ private String host;
+ private String contentType;
+ private String referer;
+ private String requestURL;
+
+ public String getRequestURL() {
+ return requestURL;
+ }
+
+ public void setRequestURL(String requestURL) {
+ this.requestURL = requestURL;
+ }
+
+ public String getReferer() {
+ return referer;
+ }
+
+ public void setReferer(String referer) {
+ this.referer = referer;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public void setRemoteAddress(String remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ }
+
+ public String getUserAgent() {
+ return userAgent;
+ }
+
+ public void setUserAgent(String userAgent) {
+ this.userAgent = userAgent;
+ }
+
public Timestamp getTimestamp() {
return timestamp;
}
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/internal/StatisticsServiceComponent.java Mon Nov 7 04:34:27 2011
@@ -21,6 +21,8 @@
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
+import org.wso2.carbon.bam.data.publisher.util.PublisherUtil;
import org.wso2.carbon.bam.service.data.publisher.conf.RegistryPersistenceManager;
import org.wso2.carbon.bam.service.data.publisher.publish.DataPublisher;
import org.wso2.carbon.bam.service.data.publisher.publish.DataPublisherUtil;
@@ -66,9 +68,12 @@
bundleContext.registerService(Axis2ConfigurationContextObserver.class.getName(),
new ServiceStatisticsAxis2ConfigurationContextObserver(), null);
+ PublisherConfiguration configuration = PublisherUtil.readConfigurationFromPublisherConfig();
+ DataPublisherUtil.setPublisherConfiguration(configuration);
+
//use event publisher as the serviceStatsProcessor
StatsProcessor statsProcessor = new DataPublisher();
- serviceStatisticsQueue = new ServiceStatisticsQueue(statsProcessor);
+ serviceStatisticsQueue = new ServiceStatisticsQueue(statsProcessor, configuration);
DataPublisherUtil.setServiceStatisticQueue(serviceStatisticsQueue);
activityQueue = new ActivityQueue(statsProcessor);
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityInHandler.java Mon Nov 7 04:34:27 2011
@@ -162,8 +162,6 @@
}
activityData.setMsgBody(msgBody);
activityData.setMessageDirection(ActivityPublisherConstants.ACTIVITY_DATA_MESSAGE_DIRECTION_IN);
- activityData.setIpAddress(
- messageContext.getProperty(CommonConstants.PROPERTY_REMOTE_ADDRESS).toString());
activityData.setServiceName(messageContext.getAxisService().getName());
activityData.setOperationName(messageContext.getAxisOperation().getName().getLocalPart());
activityData.setMessageId(messageContext.getMessageID());
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/ActivityOutHandler.java Mon Nov 7 04:34:27 2011
@@ -32,8 +32,10 @@
import org.apache.axis2.description.WSDL2Constants;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.BAMDataPublisherConstants;
import org.wso2.carbon.bam.service.data.publisher.conf.EventingConfigData;
import org.wso2.carbon.bam.service.data.publisher.data.ActivityData;
import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
@@ -44,6 +46,7 @@
import org.wso2.carbon.bam.service.data.publisher.util.TenantEventConfigData;
import org.wso2.carbon.core.multitenancy.SuperTenantCarbonContext;
+import javax.servlet.http.HttpServletRequest;
import javax.xml.namespace.QName;
import java.sql.Timestamp;
import java.util.Date;
@@ -89,8 +92,17 @@
activityID = element.getAttributeValue(new QName(ActivityPublisherConstants.ACTIVITY_ID));
}
+
ActivityData activityData = addDetailsOfTheMessage(timestamp, activityID, messageContext);
BAMServerInfo bamServerInfo = addBAMServerInfo(eventingConfigData);
+
+ MessageContext currentMessageContext = MessageContext.getCurrentMessageContext();
+ if (currentMessageContext != null) {
+ Object requestProperty = currentMessageContext.getProperty(
+ HTTPConstants.MC_HTTP_SERVLETREQUEST);
+ extractInfoFromHttpHeaders(activityData, requestProperty);
+ }
+
PublishData publishData = new PublishData();
publishData.setActivityData(activityData);
publishData.setBamServerInfo(bamServerInfo);
@@ -107,6 +119,23 @@
return InvocationResponse.CONTINUE;
}
+ private void extractInfoFromHttpHeaders(ActivityData activityData, Object requestProperty) {
+
+ if (requestProperty instanceof HttpServletRequest) {
+ HttpServletRequest httpServletRequest = (HttpServletRequest) requestProperty;
+ activityData.setRequestURL(httpServletRequest.getRequestURL().toString());
+ activityData.setRemoteAddress(httpServletRequest.getRemoteAddr());
+ activityData.setContentType(httpServletRequest.getContentType());
+ activityData.setUserAgent(httpServletRequest.getHeader(
+ BAMDataPublisherConstants.HTTP_HEADER_USER_AGENT));
+ activityData.setHost(httpServletRequest.getHeader(
+ BAMDataPublisherConstants.HTTP_HEADER_HOST));
+ activityData.setReferer(httpServletRequest.getHeader(
+ BAMDataPublisherConstants.HTTP_HEADER_REFERER));
+ }
+
+ }
+
private BAMServerInfo addBAMServerInfo(EventingConfigData eventingConfigData) {
BAMServerInfo bamServerInfo = new BAMServerInfo();
bamServerInfo.setBamServerURL(eventingConfigData.getUrl());
@@ -174,10 +203,6 @@
activityData.setActivityId(activityID);
activityData.setOperationName(messageContext.getAxisService().getName());
activityData.setServiceName(messageContext.getAxisOperation().getName().getLocalPart());
- if (messageContext.getProperty(CommonConstants.PROPERTY_REMOTE_ADDRESS) != null) {
- activityData.setIpAddress(messageContext.getProperty(
- CommonConstants.PROPERTY_REMOTE_ADDRESS).toString());
- }
activityData.setMessageId(messageContext.getMessageID());
activityData.setMessageDirection(ActivityPublisherConstants.ACTIVITY_DATA_MESSAGE_DIRECTION_OUT);
activityData.setMsgBody(messageContext.getEnvelope().getBody().toString());
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/modules/StatisticsHandler.java Mon Nov 7 04:34:27 2011
@@ -21,8 +21,10 @@
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.Handler;
import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.BAMDataPublisherConstants;
import org.wso2.carbon.bam.service.data.publisher.conf.EventingConfigData;
import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
@@ -36,6 +38,8 @@
import org.wso2.carbon.statistics.services.util.ServiceStatistics;
import org.wso2.carbon.statistics.services.util.SystemStatistics;
+import javax.mail.Message;
+import javax.servlet.http.HttpServletRequest;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Map;
@@ -44,7 +48,6 @@
private static Log log = LogFactory.getLog(StatisticsHandler.class);
-
@Override
public Handler.InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
@@ -80,6 +83,13 @@
return Handler.InvocationResponse.CONTINUE;
}
+ MessageContext currentMessageContext = MessageContext.getCurrentMessageContext();
+ if (currentMessageContext != null) {
+ Object requestProperty = currentMessageContext.getProperty(
+ HTTPConstants.MC_HTTP_SERVLETREQUEST);
+ extractInfoFromHttpHeaders(statisticData, requestProperty);
+ }
+
serviceStatistics = systemStatisticsUtil.getServiceStatistics(msgContext.getAxisService());
statisticData.setSystemStatistics(systemStatistics);
@@ -105,6 +115,23 @@
return Handler.InvocationResponse.CONTINUE;
}
+ private void extractInfoFromHttpHeaders(StatisticData statisticData, Object requestProperty) {
+
+ if (requestProperty instanceof HttpServletRequest) {
+ HttpServletRequest httpServletRequest = (HttpServletRequest) requestProperty;
+ statisticData.setRequestURL(httpServletRequest.getRequestURL().toString());
+ statisticData.setRemoteAddress(httpServletRequest.getRemoteAddr());
+ statisticData.setContentType(httpServletRequest.getContentType());
+ statisticData.setUserAgent(httpServletRequest.getHeader(
+ BAMDataPublisherConstants.HTTP_HEADER_USER_AGENT));
+ statisticData.setHost(httpServletRequest.getHeader(
+ BAMDataPublisherConstants.HTTP_HEADER_HOST));
+ statisticData.setReferer(httpServletRequest.getHeader(
+ BAMDataPublisherConstants.HTTP_HEADER_REFERER));
+ }
+
+ }
+
private BAMServerInfo addBAMServerInfo(EventingConfigData eventingConfigData) {
BAMServerInfo bamServerInfo = new BAMServerInfo();
bamServerInfo.setBamServerURL(eventingConfigData.getUrl());
Added: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPool.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPool.java?pathrev=115469
==============================================================================
--- (empty file)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPool.java Mon Nov 7 04:34:27 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.wso2.carbon.bam.service.data.publisher.pool;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+
+public class TFramedTransportPool {
+
+ private static volatile GenericKeyedObjectPool socketPool = null;
+
+ public static GenericKeyedObjectPool getClientPool(KeyedPoolableObjectFactory factory,
+ int maxActive,
+ int maxIdle,
+ boolean testOnBorrow,
+ long timeBetweenEvictionRunsMillis,
+ long minEvictableIdleTimeMillis) {
+ if (socketPool == null) {
+ synchronized (TFramedTransportPool.class) {
+ if (socketPool == null) {
+ socketPool = new GenericKeyedObjectPool();
+ socketPool.setFactory(factory);
+ socketPool.setMaxActive(maxActive);
+ socketPool.setTestOnBorrow(testOnBorrow);
+ socketPool.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
+ socketPool.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
+ socketPool.setMaxIdle(maxIdle);
+ socketPool.setWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);
+ }
+ }
+ }
+ return socketPool;
+ }
+
+}
Added: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPoolFactory.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPoolFactory.java?pathrev=115469
==============================================================================
--- (empty file)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/pool/TFramedTransportPoolFactory.java Mon Nov 7 04:34:27 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.wso2.carbon.bam.service.data.publisher.pool;
+
+import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+
+public class TFramedTransportPoolFactory extends BaseKeyedPoolableObjectFactory {
+
+ @Override
+ public TTransport makeObject(Object key) throws Exception {
+ TTransport receiverTransport = new TFramedTransport(new TSocket(key.toString(), 7611));
+ receiverTransport.open();
+ return receiverTransport;
+ }
+
+ @Override
+ public boolean validateObject(Object key, Object obj) {
+ TTransport receiverTransport= (TTransport)obj;
+ boolean isOpen = receiverTransport.isOpen();
+ return isOpen;
+ }
+
+
+}
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ActivityWorker.java Mon Nov 7 04:34:27 2011
@@ -17,13 +17,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.BAMDataPublisherConstants;
+import org.wso2.carbon.bam.service.Event;
import org.wso2.carbon.bam.service.data.publisher.data.ActivityData;
import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
import org.wso2.carbon.bam.service.data.publisher.publish.StatsProcessor;
-import org.wso2.carbon.bam.service.data.publisher.util.ActivityPublisherConstants;
-import org.wso2.carbon.bam.service.data.publisher.util.CommonConstants;
-import org.wso2.carbon.bam.service.Event;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -91,34 +90,60 @@
private void addCorrelationData(Map<String, ByteBuffer> correlationData,
ActivityData activityData) {
- correlationData.put(ActivityPublisherConstants.MSG_ACTIVITY_ID, ByteBuffer.wrap(
+ correlationData.put(BAMDataPublisherConstants.MSG_ACTIVITY_ID, ByteBuffer.wrap(
activityData.getActivityId().getBytes()));
- correlationData.put(ActivityPublisherConstants.MSG_ID, ByteBuffer.wrap(
+ correlationData.put(BAMDataPublisherConstants.MSG_ID, ByteBuffer.wrap(
activityData.getMessageId().getBytes()));
}
private void addMetaData(Map<String, ByteBuffer> metaData, ActivityData activityData) {
- String ipAddress = activityData.getIpAddress();
- if (ipAddress != null) {
- metaData.put(CommonConstants.IP_ADDRESS, ByteBuffer.wrap(ipAddress.getBytes()));
+
+ String remoteAddress = activityData.getRemoteAddress();
+ String host = activityData.getHost();
+ String contentType = activityData.getContentType();
+ String referer = activityData.getReferer();
+ String userAgent = activityData.getUserAgent();
+ String requestURL = activityData.getRequestURL();
+
+ if (remoteAddress != null) {
+ metaData.put(BAMDataPublisherConstants.REMOTE_ADDRESS, ByteBuffer.wrap(
+ remoteAddress.getBytes()));
+ }
+ if (host != null) {
+ metaData.put(BAMDataPublisherConstants.HOST, ByteBuffer.wrap(
+ host.getBytes()));
+ }
+ if (contentType != null) {
+ metaData.put(BAMDataPublisherConstants.CONTENT_TYPE, ByteBuffer.wrap(
+ contentType.getBytes()));
+ }
+ if (referer != null) {
+ metaData.put(BAMDataPublisherConstants.REFERER, ByteBuffer.wrap(
+ referer.getBytes()));
+ }
+ if (userAgent != null) {
+ metaData.put(BAMDataPublisherConstants.USER_AGENT, ByteBuffer.wrap(
+ userAgent.getBytes()));
+ }
+ if (requestURL != null) {
+ metaData.put(BAMDataPublisherConstants.REQUEST_URL, ByteBuffer.wrap(
+ requestURL.getBytes()));
}
}
private void addEventData(Map<String, ByteBuffer> eventData,
ActivityData activityData) {
- eventData.put(ActivityPublisherConstants.MSG_ACTIVITY_ID, ByteBuffer.wrap(
- activityData.getActivityId().getBytes()));
- eventData.put(CommonConstants.TIMESTAMP, ByteBuffer.wrap(
+ eventData.put(BAMDataPublisherConstants.TIMESTAMP, ByteBuffer.wrap(
activityData.getTimestamp().toString().getBytes()));
if (activityData.getMsgBody() != null) {
- eventData.put(ActivityPublisherConstants.MSG_BODY, ByteBuffer.wrap(
+ eventData.put(BAMDataPublisherConstants.MSG_BODY, ByteBuffer.wrap(
activityData.getMsgBody().getBytes()));
}
- eventData.put(ActivityPublisherConstants.MSG_DIRECTION, ByteBuffer.wrap(
+ eventData.put(BAMDataPublisherConstants.MSG_DIRECTION, ByteBuffer.wrap(
activityData.getMessageDirection().getBytes()));
- eventData.put(CommonConstants.SERVICE_NAME, ByteBuffer.wrap(
+ eventData.put(BAMDataPublisherConstants.SERVICE_NAME, ByteBuffer.wrap(
activityData.getServiceName().getBytes()));
- eventData.put(CommonConstants.OPERATION_NAME, ByteBuffer.wrap(
+ eventData.put(BAMDataPublisherConstants.OPERATION_NAME, ByteBuffer.wrap(
activityData.getOperationName().getBytes()));
}
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/process/ServiceStatsWorker.java Mon Nov 7 04:34:27 2011
@@ -17,11 +17,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.BAMDataPublisherConstants;
import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
import org.wso2.carbon.bam.service.data.publisher.data.StatisticData;
import org.wso2.carbon.bam.service.data.publisher.publish.StatsProcessor;
-import org.wso2.carbon.bam.service.data.publisher.util.CommonConstants;
import org.wso2.carbon.bam.service.data.publisher.util.ServiceStatisticsPublisherConstants;
import org.wso2.carbon.bam.service.Event;
import org.wso2.carbon.statistics.services.util.SystemStatistics;
@@ -74,31 +74,46 @@
StatisticData statisticData = publishData.getStatisticData();
- Map<String, ByteBuffer> correlation = new HashMap<String, ByteBuffer>();
- Map<String, ByteBuffer> meta = new HashMap<String, ByteBuffer>();
+ Map<String, ByteBuffer> correlationData = new HashMap<String, ByteBuffer>();
+ Map<String, ByteBuffer> metaData = new HashMap<String, ByteBuffer>();
Map<String, ByteBuffer> eventData = new HashMap<String, ByteBuffer>();
addDataIntoEventData(eventData, statisticData);
+ addDataIntoMetaData(metaData, statisticData);
Event event = new Event();
- event.setCorrelation(correlation);
- event.setMeta(meta);
+ event.setCorrelation(correlationData);
+ event.setMeta(metaData);
event.setEvent(eventData);
return event;
}
+ private void addDataIntoMetaData(Map<String, ByteBuffer> metaData,
+ StatisticData statisticData) {
+ metaData.put(BAMDataPublisherConstants.REMOTE_ADDRESS, ByteBuffer.wrap(
+ statisticData.getRemoteAddress().getBytes()));
+ metaData.put(BAMDataPublisherConstants.HOST, ByteBuffer.wrap(
+ statisticData.getHost().getBytes()));
+ metaData.put(BAMDataPublisherConstants.CONTENT_TYPE, ByteBuffer.wrap(
+ statisticData.getContentType().getBytes()));
+ metaData.put(BAMDataPublisherConstants.REFERER, ByteBuffer.wrap(
+ statisticData.getReferer().getBytes()));
+ metaData.put(BAMDataPublisherConstants.USER_AGENT, ByteBuffer.wrap(
+ statisticData.getUserAgent().getBytes()));
+ metaData.put(BAMDataPublisherConstants.REQUEST_URL, ByteBuffer.wrap(
+ statisticData.getRequestURL().getBytes()));
+ }
+
private void addDataIntoEventData(Map<String, ByteBuffer> eventData, StatisticData statistic) {
SystemStatistics systemStatistics = statistic.getSystemStatistics();
-/* ServiceStatistics serviceStatistics = statistic.getServiceStatistics();
- OperationStatistics operationStatistics = statistic.getOperationStatistics();*/
eventData.put(ServiceStatisticsPublisherConstants.SERVER_NAME, ByteBuffer.wrap(
systemStatistics.getServerName().getBytes()));
- eventData.put(CommonConstants.SERVICE_NAME, ByteBuffer.wrap(
+ eventData.put(BAMDataPublisherConstants.SERVICE_NAME, ByteBuffer.wrap(
statistic.getServiceName().getBytes()));
- eventData.put(CommonConstants.OPERATION_NAME, ByteBuffer.wrap(
+ eventData.put(BAMDataPublisherConstants.OPERATION_NAME, ByteBuffer.wrap(
statistic.getOperationName().getBytes()));
eventData.put(ServiceStatisticsPublisherConstants.TOTAL_SYSTEM_AVG_RESPONSE_TIME, ByteBuffer.wrap(
@@ -121,7 +136,7 @@
eventData.put(ServiceStatisticsPublisherConstants.FAULT_COUNT, ByteBuffer.wrap(
Integer.toString(systemStatistics.getCurrentInvocationFaultCount()).getBytes()));
- eventData.put(CommonConstants.TIMESTAMP, ByteBuffer.wrap(
+ eventData.put(BAMDataPublisherConstants.TIMESTAMP, ByteBuffer.wrap(
statistic.getTimestamp().toString().getBytes()));
}
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisher.java Mon Nov 7 04:34:27 2011
@@ -17,16 +17,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.THttpClient;
+import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
+import org.wso2.carbon.bam.data.publisher.util.stats.AtomicIntSingleton;
import org.wso2.carbon.bam.service.Event;
import org.wso2.carbon.bam.service.ReceiverService;
import org.wso2.carbon.bam.service.SessionTimeOutException;
-import org.wso2.carbon.bam.service.data.publisher.conf.EventingConfigData;
import org.wso2.carbon.bam.service.data.publisher.data.BAMServerInfo;
+import org.wso2.carbon.bam.service.data.publisher.pool.TFramedTransportPool;
+import org.wso2.carbon.bam.service.data.publisher.pool.TFramedTransportPoolFactory;
import java.util.ArrayList;
@@ -35,33 +40,138 @@
private static Log log = LogFactory.getLog(DataPublisher.class);
- public void process(ArrayList<Event> events,BAMServerInfo bamServerInfo) {
+ boolean isSocketTransportUsed = true;
+
+ public void process(ArrayList<Event> events, BAMServerInfo bamServerInfo) {
+ if (isSocketTransportUsed) {
+ publishUsingTSocketTransport(events, bamServerInfo);
+ } else {
+ publishUsingHttp(events, bamServerInfo);
+ }
+ }
+
+ private void publishUsingTSocketTransport(ArrayList<Event> events,
+ BAMServerInfo bamServerInfo) {
+ int i = 0;
+ TTransport transport = null;
+ String sessionId = ThriftUtil.getSessionId(bamServerInfo);
+ PublisherConfiguration configuration = DataPublisherUtil.getPublisherConfiguration();
+ GenericKeyedObjectPool transportPool = TFramedTransportPool.getClientPool(
+ new TFramedTransportPoolFactory(), configuration.getMaxPoolSize(),
+ configuration.getMaxIdleConnections(), true, configuration.getEvictionTimePeriod(),
+ configuration.getMinIdleTimeInPool());
+ try {
+ transport = (TTransport) transportPool.borrowObject("localhost");
+ TProtocol protocol = new TCompactProtocol(transport);
+
+ ReceiverService.Client receiverClient = new ReceiverService.Client(protocol);
+ for (Event event : events) {
+ receiverClient.publish(event, sessionId);
+ if (log.isDebugEnabled()) {
+ AtomicIntSingleton.getAtomicInteger().incrementAndGet();
+ }
+ i++;
+ }
+ } catch (TTransportException e) {
+ log.warn("TransportException, retrying to publish again..", e);
+ //Need to clear connection with correct key
+ transportPool.clear("localhost");
+ publishRetryUsingTSocket(events, i, bamServerInfo, transportPool);
+ } catch (TException e) {
+ log.error("Unable to publish event to BAM", e);
+ } catch (SessionTimeOutException e) {
+ publishRetryUsingTSocket(events, i, bamServerInfo, transportPool);
+ log.warn("Session Timeout, retrying .........");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ transportPool.returnObject("localhost", transport);
+ } catch (Exception e) {
+ log.warn("Error occurred while returning object to connection pool");
+ }
+ }
+ }
+
+ private void publishRetryUsingTSocket(ArrayList<Event> events, int i,
+ BAMServerInfo bamServerInfo,
+ GenericKeyedObjectPool transportPool) {
+ ArrayList<Event> newEventList = new ArrayList<Event>();
+ TTransport transport = null;
+ for (int j = i; j < events.size(); j++) {
+ newEventList.add(events.get(j));
+ }
+
+ ThriftUtil.setSessionId(null);
+
+ for (int k = 0; k < 30; k++) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Restoring the interrupted status after catching InterruptedException
+ // instead of Swallowing
+ Thread.currentThread().interrupt();
+ }
+ TProtocol protocol = null;
+ try {
+ String sessionId = ThriftUtil.getSessionId(bamServerInfo);
+ transport = (TTransport) transportPool.borrowObject("localhost");
+ protocol = new TCompactProtocol(transport);
+ ReceiverService.Client senderClient = new ReceiverService.Client(protocol);
+ for (Event event : events) {
+ senderClient.publish(event, sessionId);
+ if (log.isDebugEnabled()) {
+ AtomicIntSingleton.getAtomicInteger().incrementAndGet();
+ }
+ i++;
+ }
+ return;
+ } catch (TTransportException e) {
+ log.error("Unable to publish event to BAM", e);
+ } catch (TException e) {
+ log.error("Unable to publish event to BAM", e);
+ } catch (SessionTimeOutException e) {
+ log.warn("Session Timeout, retrying .........");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ transportPool.returnObject("localhost", transport);
+ } catch (Exception e) {
+ log.warn("Error occurred while returning object to connection pool");
+ }
+ }
+ }
+ }
+
+ private void publishUsingHttp(ArrayList<Event> events, BAMServerInfo bamServerInfo) {
THttpClient client = null;
TProtocol protocol = null;
- EventingConfigData eventingConfigData =null;
String sessionId = ThriftUtil.getSessionId(bamServerInfo);
int i = 0;
try {
client = new THttpClient(bamServerInfo.getBamServerURL() + "thriftReceiver");
- protocol = new TBinaryProtocol(client);
+ protocol = new TCompactProtocol(client);
} catch (TTransportException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
ReceiverService.Client receiverClient = new ReceiverService.Client(protocol);
try {
client.open();
for (Event event : events) {
- //System.out.println("Published");
receiverClient.publish(event, sessionId);
+ if (log.isDebugEnabled()) {
+ AtomicIntSingleton.getAtomicInteger().incrementAndGet();
+ }
i++;
}
} catch (TTransportException e) {
- e.printStackTrace();
+ log.error("Unable to publish event to BAM", e);
} catch (TException e) {
- e.printStackTrace();
+ log.error("Unable to publish event to BAM", e);
} catch (SessionTimeOutException e) {
- publishRetry(events,bamServerInfo, i);
+ publishRetryUsingHttp(events, bamServerInfo, i);
log.warn("Session Timeout, retrying .........");
} finally {
client.close();
@@ -69,7 +179,8 @@
}
- private void publishRetry(ArrayList<Event> events, BAMServerInfo bamServerInfo, int i) {
+ private void publishRetryUsingHttp(ArrayList<Event> events, BAMServerInfo bamServerInfo,
+ int i) {
ArrayList<Event> newEventList = new ArrayList<Event>();
for (int j = i; j < events.size(); j++) {
@@ -91,23 +202,21 @@
String sessionId = ThriftUtil.getSessionId(bamServerInfo);
try {
client = new THttpClient(bamServerInfo.getBamServerURL() + "thriftReceiver");
- protocol = new TBinaryProtocol(client);
- } catch (TTransportException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- ReceiverService.Client receiverClient = new ReceiverService.Client(protocol);
+ protocol = new TCompactProtocol(client);
+ ReceiverService.Client receiverClient = new ReceiverService.Client(protocol);
- try {
client.open();
for (Event event : events) {
- //System.out.println("Published");
receiverClient.publish(event, sessionId);
+ if (log.isDebugEnabled()) {
+ AtomicIntSingleton.getAtomicInteger().incrementAndGet();
+ }
}
return;
} catch (TTransportException e) {
- e.printStackTrace();
+ log.error("Unable to publish event to BAM", e);
} catch (TException e) {
- e.printStackTrace();
+ log.error("Unable to publish event to BAM", e);
} catch (SessionTimeOutException e) {
log.warn("Session Timeout, retrying .........");
} finally {
@@ -118,6 +227,6 @@
public void destroy() {
- //To change body of implemented methods use File | Settings | File Templates.
+
}
}
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/DataPublisherUtil.java Mon Nov 7 04:34:27 2011
@@ -16,6 +16,7 @@
package org.wso2.carbon.bam.service.data.publisher.publish;
+import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
import org.wso2.carbon.bam.service.data.publisher.queue.ActivityQueue;
import org.wso2.carbon.bam.service.data.publisher.queue.ServiceStatisticsQueue;
@@ -25,6 +26,8 @@
private static ServiceStatisticsQueue serviceStatsQueue;
private static ActivityQueue activityInStatsQueue;
+ private static PublisherConfiguration publisherConfiguration;
+
public static void publishServiceStats(PublishData publishData) {
serviceStatsQueue.enqueue(publishData);
}
@@ -40,4 +43,12 @@
public static void setActivityInQueue(ActivityQueue activityInQueue) {
activityInStatsQueue = activityInQueue;
}
+
+ public static void setPublisherConfiguration(PublisherConfiguration configuration) {
+ publisherConfiguration = configuration;
+ }
+
+ public static PublisherConfiguration getPublisherConfiguration() {
+ return publisherConfiguration;
+ }
}
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/publish/ThriftUtil.java Mon Nov 7 04:34:27 2011
@@ -16,7 +16,7 @@
package org.wso2.carbon.bam.service.data.publisher.publish;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TTransport;
@@ -35,7 +35,7 @@
public static String getSessionId(BAMServerInfo bamServerInfo) {
TTransport client = getClient(bamServerInfo);
- TProtocol protocol = new TBinaryProtocol(client);
+ TProtocol protocol = new TCompactProtocol(client);
try {
if (sessionId == null) {
synchronized (ThriftUtil.class) {
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/queue/ServiceStatisticsQueue.java Mon Nov 7 04:34:27 2011
@@ -18,6 +18,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
import org.wso2.carbon.bam.service.data.publisher.data.PublishData;
import org.wso2.carbon.bam.service.data.publisher.process.ServiceStatsWorker;
import org.wso2.carbon.bam.service.data.publisher.publish.StatsProcessor;
@@ -33,22 +34,24 @@
private static final Log log = LogFactory.getLog(ServiceStatisticsQueue.class);
- private BlockingQueue<Runnable> runnableQueue = new ArrayBlockingQueue<Runnable>(100);
- private Queue<PublishData> statisticsQueue = new ArrayBlockingQueue<PublishData>(6000);
+ private BlockingQueue<Runnable> runnableQueue;
+ private Queue<PublishData> statisticsQueue;
private ThreadPoolExecutor threadPool = null;
- int poolSize = 30;
- int maxPoolSize = 150;
- long keepAliveTime = 10;
+ long keepAliveTime = 20;
private StatsProcessor serviceStatsProcessor;
private boolean shutdown = false;
- public ServiceStatisticsQueue(StatsProcessor serviceStatisticProcessor) {
+
+ public ServiceStatisticsQueue(StatsProcessor serviceStatisticProcessor,
+ PublisherConfiguration configuration) {
+ runnableQueue = new ArrayBlockingQueue<Runnable>(configuration.getTaskQueueSize());
+ statisticsQueue = new ArrayBlockingQueue<PublishData>(configuration.getEventQueueSize());
this.serviceStatsProcessor = serviceStatisticProcessor;
- threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
+ threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(),
keepAliveTime, TimeUnit.SECONDS, runnableQueue);
- threadPool.allowCoreThreadTimeOut(true);
+ threadPool.allowCoreThreadTimeOut(false);
}
@@ -89,27 +92,4 @@
threadPool.shutdownNow();
serviceStatsProcessor.destroy();
}
-
-
-/* private class ServiceStatsWorker implements Runnable {
-
- public void run() {
- clearStatisticDataInQueue(statisticsQueue.size());
- }
-
- private void clearStatisticDataInQueue(int size) {
- if (log.isDebugEnabled()) {
- log.debug("Number of events in queue : " + size);
- }
- ArrayList<Event> eventList= new ArrayList<Event>();
- for (int i = 0; i < size; i++) {
- StatisticData statisticData = statisticsQueue.poll();
- //Sometimes other thread may get the last queue object
- if(statisticData!=null){
- ConstructEvent
- }
- }
- serviceStatsProcessor.process(sts);
- }
- }*/
}
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/ActivityPublisherConstants.java Mon Nov 7 04:34:27 2011
@@ -27,11 +27,6 @@
public static final String ACTIVITY_DATA_MESSAGE_DIRECTION_IN = "Request";
public static final String ACTIVITY_DATA_MESSAGE_DIRECTION_OUT = "Response";
- public static final String MSG_ACTIVITY_ID = "bam_activity_id";
- public static final String MSG_ID = "message_id";
- public static final String MSG_BODY = "message_body";
- public static final String MSG_DIRECTION = "message_direction";
-
public static final String ENABLE_ACTIVITY = "EnableActivity";
Modified: trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java
URL: http://wso2.org/svn/browse/wso2/trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java?rev=115469&r1=115468&r2=115469&view=diff
==============================================================================
--- trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java (original)
+++ trunk/carbon/components/bam-data-publishers/org.wso2.carbon.bam.service.data.publisher/src/main/java/org/wso2/carbon/bam/service/data/publisher/util/CommonConstants.java Mon Nov 7 04:34:27 2011
@@ -20,12 +20,7 @@
public class CommonConstants {
- public static final String TIMESTAMP = "timestamp";
public static final String IP_ADDRESS = "ip_address";
- public static final String SERVICE_NAME = "service_name";
- public static final String OPERATION_NAME = "operationName";
-
- public static final String PROPERTY_REMOTE_ADDRESS = "REMOTE_ADDR";
public static final String ADMIN_SERVICE_PARAMETER = "adminService";
public static final String HIDDEN_SERVICE_PARAMETER = "hiddenService";
More information about the Carbon-commits
mailing list