Skip to content

Commit

Permalink
Merge pull request wso2#2114 from malakaganga/add_separate_worker_poo…
Browse files Browse the repository at this point in the history
…l_183

Fix Passthrough Threads getting stuck due to request message discard
  • Loading branch information
malakaganga authored Jun 1, 2023
2 parents 54d212a + 994a198 commit cc0adfd
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,6 @@ public void run() {
getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis());
}
try {
// If an error has happened in the request processing, consumes the data in pipe completely and discard it
// If the consumeAndDiscard property is set to true
if (response.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscard()) {
RelayUtils.discardRequestMessage(requestMessageContext);
}
if (expectEntityBody) {
String cType = response.getHeader(HTTP.CONTENT_TYPE);
if(cType == null){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.synapse.transport.passthru;

import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import org.apache.synapse.transport.passthru.util.RelayUtils;

public class MessageDiscardWorker implements Runnable {

private Log log = LogFactory.getLog(MessageDiscardWorker.class);
private ClientWorker clientWorker = null;

TargetConfiguration targetConfiguration = null;

private TargetResponse response = null;

private MessageContext requestMessageContext;

NHttpClientConnection conn = null;

public MessageDiscardWorker(MessageContext requestMsgContext, TargetResponse response,
TargetConfiguration targetConfiguration, ClientWorker clientWorker, NHttpClientConnection conn) {
this.response = response;
this.requestMessageContext = requestMsgContext;
this.targetConfiguration = targetConfiguration;
this.clientWorker = clientWorker;
this.conn = conn;
}

public void run() {

// If an error has happened in the request processing, consumes the data in pipe completely and discard it
try {
RelayUtils.discardRequestMessage(requestMessageContext);
} catch (AxisFault af) {
log.error("Fault discarding request message", af);
}

targetConfiguration.getWorkerPool().execute(clientWorker);

targetConfiguration.getMetrics().incrementMessagesReceived();

NHttpServerConnection sourceConn = (NHttpServerConnection) requestMessageContext.getProperty(
PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION);
if (sourceConn != null) {
sourceConn.getContext().setAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME);

sourceConn.getContext().setAttribute(PassThroughConstants.REQ_DEPARTURE_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_DEPARTURE_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.REQ_DEPARTURE_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME)
);

conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME);

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,18 @@ public void responseReceived(NHttpClientConnection conn) {
if (statusCode == HttpStatus.SC_ACCEPTED && handle202(requestMsgContext)) {
return;
}
if (targetResponse.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscard()) {
ClientWorker clientWorker = new ClientWorker(targetConfiguration, requestMsgContext, targetResponse,
allowedResponseProperties);
targetConfiguration.getSecondaryWorkerPool().execute(new MessageDiscardWorker(requestMsgContext,
targetResponse, targetConfiguration, clientWorker, conn));
return;
}

WorkerPool workerPool = targetConfiguration.getWorkerPool();
workerPool.execute(
new ClientWorker(targetConfiguration, requestMsgContext, targetResponse));
if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) {
if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) {
conn.getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_SIDE_QUEUED_TIME,
System.currentTimeMillis());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public abstract class BaseConfiguration {
/** The thread pool for executing the messages passing through */
private WorkerPool workerPool = null;

/** The secondary thread pool for executing the messages passing through */
private WorkerPool secondaryWorkerPool = null;

/** The Axis2 ConfigurationContext */
protected ConfigurationContext configurationContext = null;

Expand All @@ -75,6 +78,10 @@ public abstract class BaseConfiguration {
private static final String PASSTHROUGH_THREAD_GROUP = "Pass-through Message Processing Thread Group";
private static final String PASSTHROUGH_THREAD_ID ="PassThroughMessageProcessor";

private static final String SECONDARY_PASSTHROUGH_THREAD_GROUP = "Secondary Pass-through Message Processing "
+ "Thread Group";
private static final String SECONDARY_PASSTHROUGH_THREAD_ID = "PassThroughMessageSecondaryProcessor";

private Integer socketTimeout = null;
private Integer connectionTimeout = null;

Expand All @@ -101,6 +108,16 @@ public void build() throws AxisFault {
PASSTHROUGH_THREAD_ID);
}

if (secondaryWorkerPool == null) {
secondaryWorkerPool = WorkerPoolFactory.getWorkerPool(
conf.getSecondaryWorkerPoolCoreSize(),
conf.getSecondaryWorkerPoolMaxSize(),
conf.getSecondaryWorkerThreadKeepaliveSec(),
conf.getSecondaryWorkerPoolQueueLen(),
SECONDARY_PASSTHROUGH_THREAD_GROUP,
SECONDARY_PASSTHROUGH_THREAD_ID);
}

httpParams = buildHttpParams();
ioReactorConfig = buildIOReactorConfig();
String sysCorrelationStatus = System.getProperty(PassThroughConstants.CORRELATION_LOGS_SYS_PROPERTY);
Expand Down Expand Up @@ -147,6 +164,9 @@ public int getIOBufferSize() {
public WorkerPool getWorkerPool() {
return workerPool;
}
public WorkerPool getSecondaryWorkerPool() {
return secondaryWorkerPool;
}

public ConfigurationContext getConfigurationContext() {
return configurationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ public interface PassThroughConfigPNames {
*/
public String IO_THREADS_PER_REACTOR = "io_threads_per_reactor";

/**
* Defines the core size (number of threads) of the secondary worker thread pool.
*/
public String SECONDARY_WORKER_POOL_SIZE_CORE = "secondary_worker_pool_size_core";

/**
* Defines the maximum size (number of threads) of the secondary worker thread pool.
*/
public String SECONDARY_WORKER_POOL_SIZE_MAX = "secondary_worker_pool_size_max";

/**
* Defines the keep-alive time for extra threads in the secondary worker pool.
*/
public String SECONDARY_WORKER_THREAD_KEEPALIVE_SEC = "secondary_worker_thread_keepalive_sec";

/**
* Defines the length of the queue that is used to hold Runnable tasks to be executed by the
* secondary worker pool.
*/
public String SECONDARY_WORKER_POOL_QUEUE_LENGTH = "secondary_worker_pool_queue_length";
/**
* Defines the IO buffer size
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,40 @@ public int getWorkerPoolCoreSize() {
return getIntProperty(PassThroughConfigPNames.WORKER_POOL_SIZE_CORE,
DEFAULT_WORKER_POOL_SIZE_CORE);
}
public int getSecondaryWorkerPoolCoreSize() {
return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.SECONDARY_WORKER_POOL_SIZE_CORE,
DEFAULT_WORKER_POOL_SIZE_CORE, props);
}

public int getWorkerPoolMaxSize() {
return getIntProperty(PassThroughConfigPNames.WORKER_POOL_SIZE_MAX,
DEFAULT_WORKER_POOL_SIZE_MAX);
}
public int getSecondaryWorkerPoolMaxSize() {
return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.SECONDARY_WORKER_POOL_SIZE_MAX,
DEFAULT_WORKER_POOL_SIZE_MAX, props);
}

public int getWorkerThreadKeepaliveSec() {
return getIntProperty(PassThroughConfigPNames.WORKER_THREAD_KEEP_ALIVE_SEC,
DEFAULT_WORKER_THREAD_KEEPALIVE_SEC);
}

public int getSecondaryWorkerThreadKeepaliveSec() {
return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.SECONDARY_WORKER_THREAD_KEEPALIVE_SEC,
DEFAULT_WORKER_THREAD_KEEPALIVE_SEC, props);
}

public int getWorkerPoolQueueLen() {
return getIntProperty(PassThroughConfigPNames.WORKER_POOL_QUEUE_LENGTH,
DEFAULT_WORKER_POOL_QUEUE_LENGTH);
}

public int getSecondaryWorkerPoolQueueLen() {
return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.SECONDARY_WORKER_POOL_QUEUE_LENGTH,
DEFAULT_WORKER_POOL_QUEUE_LENGTH, props);
}

public int getIOThreadsPerReactor() {
return getIntProperty(PassThroughConfigPNames.IO_THREADS_PER_REACTOR,
DEFAULT_IO_THREADS_PER_REACTOR);
Expand Down

0 comments on commit cc0adfd

Please sign in to comment.