Transport-level routing for MCP/ACP protocols
We appreciate your patience. The service is temporarily operating in an external runtime environment.
Transport-level routing for MCP/ACP protocols
We appreciate your patience. The service is temporarily operating in an external runtime environment.
This guide describes how to embed stdio Bus in an IDE, platform, or infrastructure system. It covers operating mode selection, configuration, worker implementation, error handling, and graceful shutdown.
stdio Bus can be embedded in your platform in several ways, depending on your architecture and requirements.
Subprocess
Spawn cos as a child process. Ideal for IDE extensions and desktop applications where stdio Bus runs alongside your main process.
Sidecar
Run cos alongside your service in containerized deployments. Connect via Unix socket or TCP for multi-process architectures.
Embedded (TBD)
Link stdio Bus as a library for deep integration. This mode is planned for future releases and not yet supported.
The most common approach is spawning stdio Bus as a subprocess. This provides process isolation while maintaining simple communication via stdin/stdout.
subprocess-embedding.js
1const { spawn } = require('child_process');2const readline = require('readline');34// Spawn stdio Bus as a subprocess5const kernel = spawn('./build/kernel', [6'--config', './config.json',7'--stdio'8], {9stdio: ['pipe', 'pipe', 'inherit'] // stdin, stdout, stderr10});1112// Send messages to stdio Bus via stdin13cos.stdin.write(JSON.stringify({14jsonrpc: '2.0',15id: '1',16method: 'initialize',17params: {},18sessionId: 'session-001'19}) + '\n');2021// Read responses from stdio Bus via stdout22const rl = readline.createInterface({ input: cos.stdout });23rl.on('line', (line) => {24const response = JSON.parse(line);25console.log('Response:', response);26});
Key Points
When using subprocess embedding, stdio Bus reads from its stdin and writes to its stdout. Your platform sends NDJSON messages to stdio Bus's stdin and reads responses from its stdout. Diagnostic output goes to stderr.
For multi-client scenarios, use Unix socket or TCP mode. This allows multiple processes to connect to a single stdio Bus instance.
unix-socket-client.js
1const net = require('net');23// Connect to stdio Bus via Unix socket4const client = net.createConnection('/tmp/stdio_bus.sock', () => {5console.log('Connected to stdio Bus');67// Send a request8client.write(JSON.stringify({9jsonrpc: '2.0',10id: '1',11method: 'test',12params: {},13sessionId: 'session-001'14}) + '\n');15});1617// Handle responses with NDJSON parsing18let buffer = '';19client.on('data', (data) => {20buffer += data.toString();21let newlineIndex;22while ((newlineIndex = buffer.indexOf('\n')) !== -1) {23const line = buffer.slice(0, newlineIndex);24buffer = buffer.slice(newlineIndex + 1);25const response = JSON.parse(line);26console.log('Response:', response);27}28});
tcp-client.js
const net = require('net');// Connect to stdio Bus via TCPconst client = net.createConnection({ host: '127.0.0.1', port: 9000 }, () => {console.log('Connected to stdio Bus via TCP');client.write(JSON.stringify({jsonrpc: '2.0',id: '1',method: 'process',params: { data: 'test' },sessionId: 'session-001'}) + '\n');});client.on('data', (data) => {// Handle NDJSON responsesconst lines = data.toString().split('\n').filter(Boolean);lines.forEach(line => {const response = JSON.parse(line);console.log('Response:', response);});});
stdio Bus supports three operating modes, each suited to different integration scenarios. Choose based on your deployment requirements.
| Mode | Flag | Use Case |
|---|---|---|
| stdio | --stdio | Subprocess embedding, single client, IDE extensions |
| Unix socket | --unix <path> | Local IPC, multiple clients, security-sensitive deployments |
| TCP | --tcp <host:port> | Network access, remote clients, distributed systems |
| Requirement | Recommended Mode |
|---|---|
| Single client, subprocess embedding | stdio |
| Multiple local clients, security-sensitive | Unix socket |
| Multiple clients, network access needed | TCP |
| IDE extension | stdio |
| Containerized microservice | Unix socket or TCP |
| Development/testing | Any (TCP is easiest to debug) |
stdio Bus reads configuration from a JSON file specified via --config <path>. Configure worker pools and operational limits based on your requirements.
| Field | Required | Description |
|---|---|---|
pools[].id | Yes | Unique identifier for the pool (used in logs) |
pools[].command | Yes | Path to the worker executable |
pools[].args | No | Array of command-line arguments |
pools[].instances | Yes | Number of worker processes to spawn (≥1) |
| Field | Default | Description |
|---|---|---|
max_input_buffer | 1 MB | Maximum bytes buffered per input connection |
max_output_queue | 4 MB | Maximum bytes queued per output connection |
max_restarts | 5 | Maximum worker restarts within time window |
restart_window_sec | 60 | Time window for counting restarts |
drain_timeout_sec | 30 | Graceful shutdown timeout before SIGKILL |
backpressure_timeout_sec | 60 | Backpressure timeout before closing connection |
Minimal Configuration — Single worker, default limits:
config-minimal.json
{"pools": [{"id": "worker","command": "/usr/bin/node","args": ["./worker.js"],"instances": 1}]}
High-Throughput Configuration — Multiple workers, increased buffers:
config-high-throughput.json
{"pools": [{"id": "worker","command": "/usr/bin/node","args": ["./worker.js"],"instances": 8}],"limits": {"max_input_buffer": 4194304,"max_output_queue": 16777216,"backpressure_timeout_sec": 120}}
Fault-Tolerant Configuration — Aggressive restart policy:
config-fault-tolerant.json
{"pools": [{"id": "worker","command": "/usr/bin/node","args": ["./worker.js"],"instances": 4}],"limits": {"max_restarts": 10,"restart_window_sec": 300,"drain_timeout_sec": 60}}
Workers are child processes that communicate with stdio Bus via stdin/stdout pipes using NDJSON format. A compliant worker must:
id and sessionId in responsesworker.js
1#!/usr/bin/env node2const readline = require('readline');34const rl = readline.createInterface({5input: process.stdin,6output: process.stdout,7terminal: false8});910rl.on('line', (line) => {11try {12const msg = JSON.parse(line);1314// Only respond to requests (messages with id and method)15if (msg.id !== undefined && msg.method !== undefined) {16const response = {17jsonrpc: '2.0',18id: msg.id,19result: handleMethod(msg.method, msg.params)20};2122// Preserve sessionId if present23if (msg.sessionId) {24response.sessionId = msg.sessionId;25}2627console.log(JSON.stringify(response));28}29} catch (err) {30console.error(`Parse error: ${err.message}`);31}32});3334function handleMethod(method, params) {35switch (method) {36case 'initialize':37return { capabilities: {} };38case 'echo':39return { echo: params };40default:41return { error: `Unknown method: ${method}` };42}43}4445// Graceful shutdown on SIGTERM46process.on('SIGTERM', () => {47console.error('Received SIGTERM, shutting down');48process.exit(0);49});
worker.py
1#!/usr/bin/env python32import sys3import json4import signal56def handle_method(method, params):7"""Implement your method handlers here."""8if method == 'initialize':9return {'capabilities': {}}10elif method == 'echo':11return {'echo': params}12else:13return {'error': f'Unknown method: {method}'}1415def process_message(line):16"""Process a single NDJSON message."""17try:18msg = json.loads(line)1920# Only respond to requests (messages with id and method)21if 'id' in msg and 'method' in msg:22response = {23'jsonrpc': '2.0',24'id': msg['id'],25'result': handle_method(msg['method'], msg.get('params', {}))26}2728# Preserve sessionId if present29if 'sessionId' in msg:30response['sessionId'] = msg['sessionId']3132print(json.dumps(response), flush=True)33except json.JSONDecodeError as e:34print(f'Parse error: {e}', file=sys.stderr)3536def main():37# Graceful shutdown on SIGTERM38signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit(0))3940for line in sys.stdin:41process_message(line.strip())4243if __name__ == '__main__':44main()
Platform integrators should handle connection errors, message errors, and worker failures gracefully.
| Error | Cause | Platform Action |
|---|---|---|
| Connection refused | stdio Bus not running or wrong address | Retry with backoff, check stdio Bus status |
| Connection reset | stdio Bus closed connection | Check for malformed messages, reconnect |
| Timeout | stdio Bus or worker unresponsive | Implement client-side timeouts |
| Exit Code | Meaning | Platform Action |
|---|---|---|
| 0 | Graceful shutdown | Normal termination |
| 1 | Configuration or startup error | Check config file and logs |
| Non-zero | Unexpected error | Check stderr, restart |
error-handling.js
1const pendingRequests = new Map();23function sendRequest(client, request, timeoutMs = 30000) {4return new Promise((resolve, reject) => {5const timer = setTimeout(() => {6pendingRequests.delete(request.id);7reject(new Error('Request timeout'));8}, timeoutMs);910// Store pending request11pendingRequests.set(request.id, { resolve, reject, timer });1213// Send request14client.write(JSON.stringify(request) + '\n');15});16}1718function handleResponse(response) {19const pending = pendingRequests.get(response.id);20if (pending) {21clearTimeout(pending.timer);22pendingRequests.delete(response.id);23if (response.error) {24pending.reject(new Error(response.error.message));25} else {26pending.resolve(response.result);27}28}29}
Proper shutdown handling ensures no data loss and clean resource cleanup.
When stdio Bus receives SIGTERM or SIGINT:
drain_timeout_sec for workers to exitgraceful-shutdown.js
1const { spawn } = require('child_process');23const kernel = spawn('./build/kernel', ['--config', 'config.json', '--stdio']);45// Graceful shutdown function6function shutdown() {7return new Promise((resolve) => {8cos.on('exit', (code) => {9console.log(`stdio Bus exited with code ${code}`);10resolve(code);11});1213// Send SIGTERM for graceful shutdown14cos.kill('SIGTERM');1516// Force kill after timeout (drain_timeout_sec + buffer)17setTimeout(() => {18if (!cos.killed) {19console.warn('stdio Bus did not exit, sending SIGKILL');20cos.kill('SIGKILL');21}22}, 35000);23});24}2526// Handle process signals27process.on('SIGTERM', async () => {28await shutdown();29process.exit(0);30});
⏱️ Timeout Recommendation
Set your platform timeout slightly higher than drain_timeout_sec (default: 30s) to allow stdio Bus to complete its shutdown sequence. A platform timeout of 35 seconds is recommended.
The following code snippets demonstrate common patterns for integrating with stdio Bus in production environments.
Sessions provide worker affinity for related messages. All messages with the same sessionId route to the same worker.
session-management.js
// Generate unique session IDsfunction generateSessionId() {return `session-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;}// Track sessions per user/contextconst userSessions = new Map();function getOrCreateSession(userId) {if (!userSessions.has(userId)) {userSessions.set(userId, generateSessionId());}return userSessions.get(userId);}// Send request with session affinityfunction sendRequest(client, userId, method, params) {const sessionId = getOrCreateSession(userId);const request = {jsonrpc: '2.0',id: generateRequestId(),method,params,sessionId};client.write(JSON.stringify(request) + '\n');}
Track pending requests for response matching using the request id field.
request-correlation.js
const pendingRequests = new Map();let requestCounter = 0;function generateRequestId() {return `req-${++requestCounter}`;}function sendRequest(client, method, params, sessionId) {return new Promise((resolve, reject) => {const id = generateRequestId();const request = { jsonrpc: '2.0', id, method, params };if (sessionId) request.sessionId = sessionId;pendingRequests.set(id, { resolve, reject });client.write(JSON.stringify(request) + '\n');});}function handleResponse(response) {const pending = pendingRequests.get(response.id);if (pending) {pendingRequests.delete(response.id);if (response.error) {pending.reject(new Error(response.error.message));} else {pending.resolve(response.result);}}}
Manage multiple connections for high throughput in TCP or Unix socket mode.
connection-pool.js
1const net = require('net');23class ConnectionPool {4constructor(socketPath, poolSize = 4) {5this.socketPath = socketPath;6this.connections = [];7this.nextIndex = 0;89for (let i = 0; i < poolSize; i++) {10this.connections.push(this.createConnection());11}12}1314createConnection() {15const conn = net.createConnection(this.socketPath);16conn.buffer = '';17conn.on('data', (data) => {18conn.buffer += data.toString();19this.processBuffer(conn);20});21return conn;22}2324processBuffer(conn) {25let newlineIndex;26while ((newlineIndex = conn.buffer.indexOf('\n')) !== -1) {27const line = conn.buffer.slice(0, newlineIndex);28conn.buffer = conn.buffer.slice(newlineIndex + 1);29this.handleResponse(JSON.parse(line));30}31}3233getConnection() {34const conn = this.connections[this.nextIndex];35this.nextIndex = (this.nextIndex + 1) % this.connections.length;36return conn;37}3839send(message) {40const conn = this.getConnection();41conn.write(JSON.stringify(message) + '\n');42}4344close() {45this.connections.forEach(conn => conn.end());46}47}
Monitor stdio Bus process health and implement automatic restart on failure.
health-monitoring.js
1const { spawn } = require('child_process');23class COSManager {4constructor(configPath, mode = 'stdio') {5this.configPath = configPath;6this.mode = mode;7this.process = null;8this.restartCount = 0;9this.maxRestarts = 5;10}1112start() {13const args = ['--config', this.configPath, `--${this.mode}`];14this.process = spawn('./build/kernel', args, {15stdio: ['pipe', 'pipe', 'inherit']16});1718this.process.on('exit', (code, signal) => {19console.error(`stdio Bus exited: code=${code}, signal=${signal}`);20this.handleExit(code, signal);21});2223this.process.on('error', (err) => {24console.error(`stdio Bus error: ${err.message}`);25});2627return this.process;28}2930handleExit(code, signal) {31if (signal === 'SIGTERM' || signal === 'SIGINT') {32return; // Intentional shutdown33}3435// Unexpected exit - attempt restart36if (this.restartCount < this.maxRestarts) {37this.restartCount++;38console.log(`Restarting stdio Bus (attempt ${this.restartCount})`);39setTimeout(() => this.start(), 1000 * this.restartCount);40} else {41console.error('Max restarts exceeded, giving up');42}43}4445stop() {46if (this.process) {47this.process.kill('SIGTERM');48}49}50}
Capture stdio Bus stderr output and forward to your logging system.
logging-integration.js
const { spawn } = require('child_process');const readline = require('readline');const kernel = spawn('./build/kernel', ['--config', 'config.json', '--stdio'], {stdio: ['pipe', 'pipe', 'pipe'] // Capture stderr});// Forward stdio Bus logs to your logging systemconst stderrReader = readline.createInterface({ input: cos.stderr });stderrReader.on('line', (line) => {// Parse stdio Bus log format: [TIMESTAMP] [LEVEL] messageconst match = line.match(/^\[([^\]]+)\] \[([^\]]+)\] (.*)$/);if (match) {const [, timestamp, level, message] = match;yourLogger.log(level.toLowerCase(), message, {timestamp,source: 'stdio Bus'});} else {yourLogger.info(line, { source: 'stdio Bus' });}});