Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/pg-bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# `pg-bench`

> TODO: description

## Usage

```
const pgBench = require('pg-bench');

// TODO: DEMONSTRATE API
```
36 changes: 36 additions & 0 deletions packages/pg-bench/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"name": "pg-bench",
"version": "1.0.0",
"type": "module",
"description": "> TODO: description",
"author": "Herman J. Radtke III <herman@hermanradtke.com>",
"homepage": "https://github.com/brianc/node-postgres#readme",
"license": "MIT",
"main": "lib/pg-bench.js",
"directories": {
"lib": "lib",
"test": "__tests__"
},
"files": [
"lib"
],
"publishConfig": {
"registry": "https://registry.yarnpkg.com"
},
"repository": {
"type": "git",
"url": "git+https://github.com/brianc/node-postgres.git"
},
"scripts": {
"test": "echo \"Error: run tests from root\" && exit 1"
},
"bugs": {
"url": "https://github.com/brianc/node-postgres/issues"
},
"dependencies": {
"mitata": "^1.0.34",
"pg": "^8.16.2",
"tsx": "^4.20.3",
"typescript": "^4.0.3"
}
}
149 changes: 149 additions & 0 deletions packages/pg-bench/src/memory-leak-demo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import { run, bench } from "mitata";
import { Client } from "pg";

const client = new Client({
host: 'localhost',
port: 5432,
database: 'postgres-bench',
user: 'clickup_admin',
password: process.env.MONGO_USER_PASSWORD_DEV,
});

await client.connect();

// Helper function to get memory usage in MB
function getMemoryUsageMB() {
const usage = process.memoryUsage();
return {
rss: Math.round(usage.rss / 1024 / 1024 * 100) / 100,
heapUsed: Math.round(usage.heapUsed / 1024 / 1024 * 100) / 100,
heapTotal: Math.round(usage.heapTotal / 1024 / 1024 * 100) / 100,
external: Math.round(usage.external / 1024 / 1024 * 100) / 100
};
}

// Simulate the memory leak scenario with COPY operations and large data chunks
bench("memory leak simulation - large COPY operations", async () => {
// Generate a large amount of data that would cause buffer growth
const largeDataSize = 500000; // 500k rows
const largeTextPayload = 'X'.repeat(500); // 500 char string per row

// Create a large dataset that will stress the parser buffer management
const result = await client.query({
text: `
WITH large_dataset AS (
SELECT
generate_series(1, $1) as id,
$2::text as large_text,
random()::text as random_data,
NOW()::text as timestamp_data,
(random() * 1000000)::bigint as large_number
)
SELECT * FROM large_dataset
`,
values: [largeDataSize, largeTextPayload]
});

return result.rows.length;
});

// Benchmark that simulates continuous processing with intermittent large payloads
// This would trigger the buffer growth and partial message scenarios
bench("memory leak simulation - mixed payload sizes", async () => {
const results = [];

// Simulate the problematic pattern: large payload followed by small ones
// This creates the scenario where large buffers remain allocated

// First: Large payload (grows buffer significantly)
const largeResult = await client.query({
text: `
SELECT
generate_series(1, 100000) as id,
'Large payload data that will grow the parser buffer significantly: ' || repeat('X', 200) as large_data,
md5(random()::text) as hash_data
`
});
results.push(largeResult.rows.length);

// Then: Several smaller payloads (would previously keep large buffer allocated)
for (let i = 0; i < 10; i++) {
const smallResult = await client.query({
text: `SELECT $1::int as small_id, 'small payload' as data`,
values: [i]
});
results.push(smallResult.rows.length);
}

return results.reduce((sum, count) => sum + count, 0);
});

// Monitor memory usage during benchmark execution
let memorySnapshots: Array<{time: number, memory: ReturnType<typeof getMemoryUsageMB>}> = [];
const startTime = Date.now();

// Take memory snapshot every 100ms during benchmarks
const memoryMonitor = setInterval(() => {
memorySnapshots.push({
time: Date.now() - startTime,
memory: getMemoryUsageMB()
});
}, 100);

console.log("🧠 Starting memory leak demonstration benchmark...");
console.log("πŸ“Š Initial memory usage:", getMemoryUsageMB());

await run({
format: 'mitata',
colors: true,
throw: true,
});

clearInterval(memoryMonitor);

console.log("\nπŸ“ˆ Memory Usage Analysis:");
console.log("========================");

if (memorySnapshots.length > 0) {
const initial = memorySnapshots[0].memory;
const final = memorySnapshots[memorySnapshots.length - 1].memory;
const peak = memorySnapshots.reduce((max, snapshot) =>
snapshot.memory.heapUsed > max.heapUsed ? snapshot.memory : max, initial);

console.log(`Initial Heap: ${initial.heapUsed} MB`);
console.log(`Peak Heap: ${peak.heapUsed} MB`);
console.log(`Final Heap: ${final.heapUsed} MB`);
console.log(`RSS Growth: ${final.rss - initial.rss} MB`);
console.log(`Heap Growth: ${final.heapUsed - initial.heapUsed} MB`);

// Calculate memory efficiency
const memoryRetained = ((final.heapUsed - initial.heapUsed) / (peak.heapUsed - initial.heapUsed)) * 100;
console.log(`Memory Retention: ${Math.round(memoryRetained)}% of peak usage`);

if (memoryRetained > 80) {
console.log("⚠️ HIGH MEMORY RETENTION - Potential memory leak detected!");
} else if (memoryRetained > 50) {
console.log("⚑ MODERATE MEMORY RETENTION - Some memory not released");
} else {
console.log("βœ… LOW MEMORY RETENTION - Good memory management");
}

// Show memory timeline for significant changes
console.log("\nπŸ“Š Memory Timeline (significant changes):");
let lastSignificantUsage = initial.heapUsed;
memorySnapshots.forEach(snapshot => {
const heapChange = Math.abs(snapshot.memory.heapUsed - lastSignificantUsage);
if (heapChange > 5) { // Show changes > 5MB
console.log(` ${snapshot.time}ms: ${snapshot.memory.heapUsed} MB heap (${heapChange > 0 ? '+' : ''}${Math.round(heapChange)} MB)`);
lastSignificantUsage = snapshot.memory.heapUsed;
}
});
}

console.log("\nπŸ”§ This benchmark demonstrates:");
console.log("β€’ Large buffer allocation during big queries");
console.log("β€’ Buffer management with mixed payload sizes");
console.log("β€’ Memory retention patterns that would indicate leaks");
console.log("β€’ The effectiveness of our gradual buffer shrinking fix");

await client.end();
25 changes: 25 additions & 0 deletions packages/pg-bench/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"compilerOptions": {
"module": "es2022",
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"strict": true,
"target": "es6",
"noImplicitAny": true,
"moduleResolution": "node16",
"sourceMap": true,
"outDir": "dist",
"incremental": true,
"baseUrl": ".",
"declaration": true,
"paths": {
"*": [
"node_modules/*",
"src/types/*"
]
}
},
"include": [
"src/**/*"
]
}
23 changes: 20 additions & 3 deletions packages/pg-protocol/src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,26 @@ export class Parser {
this.bufferLength = 0
this.bufferOffset = 0
} else {
// Adjust the cursors of remainingBuffer
this.bufferLength = bufferFullLength - offset
this.bufferOffset = offset
// A partial message remains.
// Use a gradual shrinking strategy: only shrink if buffer is at least half empty,
// and when shrinking, reduce to half size (not exact size) to provide wiggle room.
const remainingLength = bufferFullLength - offset
const bufferUtilization = remainingLength / this.buffer.byteLength

if (bufferUtilization < 0.5) {
// Buffer is more than half empty - shrink it to half its current size
const newBufferSize = Math.max(this.buffer.byteLength / 2, remainingLength * 2)
const newBuffer = Buffer.allocUnsafe(newBufferSize)
this.buffer.copy(newBuffer, 0, offset, offset + remainingLength)

this.buffer = newBuffer
this.bufferOffset = 0
this.bufferLength = remainingLength
} else {
// Buffer utilization is reasonable - use existing cursor strategy
this.bufferLength = remainingLength
this.bufferOffset = offset
}
}
}

Expand Down
Loading