ADR-013: Concurrent Request Handling Implementation

Context

Gatekit's MCPProxy initially processed requests sequentially, using a simple send-then-wait pattern:

await self._upstream_transport.send_message(upstream_request)
response = await self._upstream_transport.receive_message()

This approach had significant limitations:

  1. receive_message() returned ANY available response, not the specific response for the request
  2. Multiple concurrent requests led to mismatched request/response pairs
  3. The proxy could not leverage upstream servers' concurrent processing capabilities
  4. Performance degraded significantly under concurrent load (1.0s for 10 requests vs 0.1s optimal)

Decision

We implemented a concurrent request handling system with proper request/response correlation:

Core Changes

  1. New Transport Interface Method:

    async def send_and_receive(self, request: MCPRequest) -> MCPResponse:
        """Send a request and wait for its specific response."""
    
  2. Request Correlation System:

    • Request ID-based Future mapping in _pending_requests
    • Thread-safe async locks for concurrent access
    • Automatic cleanup on completion, timeout, or failure
  3. Concurrent Request Limiting:

    • Configurable maximum concurrent requests (default: 100)
    • Graceful error handling when limits exceeded
    • Request tracking and metrics
  4. Resource Management:

    • Proper cleanup of completed requests
    • Exception handling and propagation
    • No memory leaks or hanging tasks

Implementation Architecture

# New concurrent-safe pattern
async def handle_request(self, request: MCPRequest) -> MCPResponse:
    self._concurrent_requests += 1
    try:
        # Process request with proper correlation
        response = await self._upstream_transport.send_and_receive(upstream_request)
        return response
    finally:
        self._concurrent_requests -= 1

Transport Layer Enhancement

async def send_and_receive(self, request: MCPRequest) -> MCPResponse:
    # Check concurrent limit
    if self._concurrent_request_count >= self._max_concurrent_requests:
        raise RuntimeError(f"Maximum concurrent requests exceeded")
    
    # Register request for correlation
    future = asyncio.Future()
    self._pending_requests[request.id] = future
    
    # Send request and wait for specific response
    await self.send_message(request)
    return await asyncio.wait_for(future, timeout=self.request_timeout)

Performance Impact

The implementation delivers significant performance improvements:

Alternatives Considered

Alternative 1: Message Queuing with Correlation IDs

# Maintain request queue and response matching
request_queue = asyncio.Queue()
response_handlers = {}

Rejected: Added complexity without significant benefits over Future-based approach.

Alternative 2: Per-Request Transport Instances

# Create new transport connection per request
transport = create_transport_for_request(request)

Rejected: Resource-intensive and doesn't leverage existing connection multiplexing.

Alternative 3: Request Batching

# Batch multiple requests into single upstream call
batch = await collect_requests(timeout=0.1)
responses = await send_batch(batch)

Rejected: Requires upstream server batch support and adds latency for single requests.

Migration Strategy

This is a non-breaking change:

Testing Strategy

Comprehensive test coverage with 8 test scenarios:

  1. Basic concurrent functionality (10 requests)
  2. Medium load testing (50 requests)
  3. High load stress testing (100 requests)
  4. Request/response correlation verification
  5. Plugin state isolation under concurrency
  6. Resource cleanup validation
  7. Error handling under concurrent load
  8. Concurrent request limit enforcement

Configuration

No configuration changes required - the feature works with existing configurations:

Future Enhancements

  1. Configurable Limits: YAML configuration for concurrent request limits
  2. Request Prioritization: Priority queues for different request types
  3. Advanced Batching: Optional request batching for supporting servers
  4. Monitoring Integration: Metrics and observability for concurrent operations
  5. Adaptive Limits: Dynamic adjustment based on server performance

Consequences

Positive

Negative

Neutral

Decision Rationale

The concurrent request handling implementation was chosen because:

  1. Significant Performance Gains: Up to 250x improvement for concurrent scenarios
  2. Maintains Compatibility: No breaking changes to existing APIs
  3. Production Ready: Comprehensive testing and error handling
  4. Leverages Existing Architecture: Builds on async-first design principles
  5. Future-Proof: Enables advanced features like batching and prioritization

This enhancement directly addresses the scalability requirements for production deployments while maintaining the reliability and simplicity that are core to Gatekit's design philosophy.