Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai
return resolve();
}
const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress);
const populateReceipt = new PopulateReceiptTransform();
const populateEffects = new PopulateEffectsTransform();
const populateReceipt = new PopulateReceiptTransform(this);
const populateEffects = new PopulateEffectsTransform(this);

const streamParams: BuildWalletTxsStreamParams = {
transactionStream,
Expand Down Expand Up @@ -582,13 +582,33 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai
let { transactionStream } = streamParams;
const { populateEffects } = streamParams;

transactionStream = EVMTransactionStorage.collection
// Store cursor reference for cleanup
const cursor = EVMTransactionStorage.collection
.find(query)
.sort({ blockTimeNormalized: 1 })
.addCursorFlag('noCursorTimeout', true)
.pipe(new TransformWithEventPipe({ objectMode: true, passThrough: true }));
.addCursorFlag('noCursorTimeout', true);

// Add cleanup handlers when client disconnects
let cursorClosed = false;
const cleanupCursor = () => {
if (!cursorClosed) {
cursorClosed = true;
try {
cursor.close();
} catch {
// Cursor might already be closed, ignore
}
}
};

const { req, res } = params;
req.on('close', cleanupCursor);
res.on('close', cleanupCursor);

// Pipe cursor to transform stream
transactionStream = cursor.pipe(new TransformWithEventPipe({ objectMode: true, passThrough: true }));

transactionStream = transactionStream.eventPipe(populateEffects); // For old db entires
transactionStream = transactionStream.eventPipe(populateEffects); // For old db entries

if (params.args.tokenAddress) {
const erc20Transform = new Erc20RelatedFilterTransform(params.args.tokenAddress);
Expand Down
27 changes: 23 additions & 4 deletions packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,34 @@ export class GnosisApi {

let transactionStream = new Readable({ objectMode: true });
const ethTransactionTransform = new EVMListTransactionsStream([multisigContractAddress, args.tokenAddress]);
const populateReceipt = new PopulateReceiptTransform();
const populateEffects = new PopulateEffectsTransform();
const EVM = getCSP(chain, network);
const populateReceipt = new PopulateReceiptTransform(EVM);
const populateEffects = new PopulateEffectsTransform(EVM);

transactionStream = EVMTransactionStorage.collection
// Store cursor reference for cleanup
const cursor = EVMTransactionStorage.collection
.find(query)
.sort({ blockTimeNormalized: 1 })
.addCursorFlag('noCursorTimeout', true);

transactionStream = transactionStream.pipe(populateEffects); // For old db entires
// Add cleanup handlers when client disconnects
let cursorClosed = false;
const cleanupCursor = () => {
if (!cursorClosed) {
cursorClosed = true;
try {
cursor.close();
} catch {
// Cursor might already be closed, ignore
}
}
};

const { req } = params;
req.on('close', cleanupCursor);
res.on('close', cleanupCursor);

transactionStream = cursor.pipe(populateEffects); // For old db entries

if (multisigContractAddress) {
const multisigTransform = new MultisigRelatedFilterTransform(multisigContractAddress, args.tokenAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@ export class InternalTxRelatedFilterTransform extends TransformWithEventPipe {

async getWalletAddresses(tx) {
if (!this.walletAddresses.length) {
this.walletAddresses = await WalletAddressStorage.collection
.find({ chain: tx.chain, network: tx.network, wallet: this.walletId })
.toArray();
const cursor = WalletAddressStorage.collection
.find({ chain: tx.chain, network: tx.network, wallet: this.walletId });
try {
this.walletAddresses = await cursor.toArray();
} finally {
await cursor.close();
}
}
return this.walletAddresses.map(walletAddress => this.web3.utils.toChecksumAddress(walletAddress.address));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import { IEVMTransaction } from '../types';
import { BaseEVMStateProvider } from './csp';

export class PopulateEffectsTransform extends TransformWithEventPipe {
constructor() {
constructor(private evm: BaseEVMStateProvider) {
super({ objectMode: true });
}

async _transform(tx: MongoBound<IEVMTransaction>, _, done) {
// Add effects to old db entries
const EVM = new BaseEVMStateProvider(tx.chain);
tx = EVM.populateEffects(tx);
tx = this.evm.populateEffects(tx);
this.push(tx);
return done();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import { IEVMTransaction } from '../types';
import { BaseEVMStateProvider } from './csp';

export class PopulateReceiptTransform extends TransformWithEventPipe {
constructor() {
constructor(private evm: BaseEVMStateProvider) {
super({ objectMode: true });
}

async _transform(tx: MongoBound<IEVMTransaction>, _, done) {
try {
const EVM = new BaseEVMStateProvider(tx.chain);
tx = await EVM.populateReceipt(tx);
tx = await this.evm.populateReceipt(tx);
} catch {/* ignore error */}
this.push(tx);
return done();
Expand Down
248 changes: 248 additions & 0 deletions packages/bitcore-node/test/integration/ethereum/memory-leaks.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
import { ObjectId } from 'bson';
import { expect } from 'chai';
import { Request, Response } from 'express-serve-static-core';
import * as sinon from 'sinon';
import { PassThrough } from 'stream';
import { MongoBound } from '../../../src/models/base';
import { IWallet, WalletStorage } from '../../../src/models/wallet';
import { WalletAddressStorage } from '../../../src/models/walletAddress';
import { EVMTransactionStorage } from '../../../src/providers/chain-state/evm/models/transaction';
import { intAfterHelper, intBeforeHelper } from '../../helpers/integration';

const chain = 'ETH';
const network = 'regtest';

describe('EVM Memory Leak Prevention', function() {
const suite = this;
this.timeout(30000);
let globalSandbox: sinon.SinonSandbox;
let ETH: any;

before(intBeforeHelper);
after(async () => intAfterHelper(suite));

beforeEach(async () => {
globalSandbox = sinon.createSandbox();
// Use the ETH module instance
const { ETH: ETHModule } = await import('../../../src/modules/ethereum/api/csp');
ETH = ETHModule;
await EVMTransactionStorage.collection.deleteMany({});
await WalletStorage.collection.deleteMany({});
await WalletAddressStorage.collection.deleteMany({});
});

afterEach(() => {
globalSandbox.restore();
});

function createMockReqRes() {
const reqStream = new PassThrough();
const req = reqStream as unknown as Request;

const resStream = new PassThrough();
const res = resStream as unknown as Response;

(res as any).write = resStream.write.bind(resStream);
(res as any).end = resStream.end.bind(resStream);

res.type = () => res;
res.status = () => res;
res.send = () => res;

// Consume data to keep stream flowing
resStream.on('data', () => {});

return { req, res, reqEmitter: reqStream, resEmitter: resStream };
}

describe('Cursor Cleanup on Client Disconnect', () => {
it('should not crash when client disconnects during streamWalletTransactions', async () => {
const address = '0x7F17aF79AABC4A297A58D389ab5905fEd4Ec9502';
const objectId = ObjectId.createFromHexString('60f9abed0e32086bf9903bb5');
const wallet = {
_id: objectId,
chain,
network,
name: 'test-wallet',
pubKey: '0x029ec2ebdebe6966259cf3c6f35c4f126b82fe072bf9d0e81dad375f1d6d2d9054',
path: 'm/44\'/60\'/0\'/0/0',
singleAddress: true
} as MongoBound<IWallet>;

await WalletStorage.collection.insertOne(wallet as any);
await WalletAddressStorage.collection.insertOne({
chain,
network,
wallet: objectId,
address,
processed: true
});

const txCount = 5;
const txs = new Array(txCount).fill({}).map((_, i) => ({
chain,
network,
blockHeight: 1,
gasPrice: 10 * 1e9,
data: Buffer.from(''),
from: address,
to: '0xRecipient123',
txid: '0x' + (i + 1).toString(16).padStart(64, '0'),
wallets: [objectId]
} as any));

await EVMTransactionStorage.collection.insertMany(txs);

const { req, res, reqEmitter } = createMockReqRes();

const streamPromise = ETH.streamWalletTransactions({
chain,
network,
wallet,
req,
res,
args: {}
});

// Wait for stream to start
await new Promise(resolve => setTimeout(resolve, 100));

// Simulate client disconnect
reqEmitter.emit('close');

// Wait for cleanup
await new Promise(resolve => setTimeout(resolve, 100));

// Should not throw - the implementation should handle cleanup gracefully
await streamPromise.catch(() => {});

// If we get here without crashing, the test passes
expect(true).to.be.true;
});

it('should handle multiple interrupted requests without accumulating resources', async () => {
const address = '0x7F17aF79AABC4A297A58D389ab5905fEd4Ec9502';
const objectId = ObjectId.createFromHexString('60f9abed0e32086bf9903bb5');
const wallet = {
_id: objectId,
chain,
network,
name: 'test-wallet-multi',
pubKey: '0x029ec2ebdebe6966259cf3c6f35c4f126b82fe072bf9d0e81dad375f1d6d2d9054',
path: 'm/44\'/60\'/0\'/0/0',
singleAddress: true
} as MongoBound<IWallet>;

await WalletStorage.collection.insertOne(wallet as any);
await WalletAddressStorage.collection.insertOne({
chain,
network,
wallet: objectId,
address,
processed: true
});

const txCount = 5;
const txs = new Array(txCount).fill({}).map((_, i) => ({
chain,
network,
blockHeight: 1,
gasPrice: 10 * 1e9,
data: Buffer.from(''),
from: address,
to: '0xRecipient456',
txid: '0x' + (i + 100).toString(16).padStart(64, '0'),
wallets: [objectId]
} as any));

await EVMTransactionStorage.collection.insertMany(txs);

// Make 3 requests that all get interrupted
const numRequests = 3;
for (let i = 0; i < numRequests; i++) {
const { req, res, reqEmitter } = createMockReqRes();

const streamPromise = ETH.streamWalletTransactions({
chain,
network,
wallet,
req,
res,
args: {}
});

await new Promise(resolve => setTimeout(resolve, 50));
reqEmitter.emit('close');
await new Promise(resolve => setTimeout(resolve, 50));
await streamPromise.catch(() => {});
}

// If we completed all requests without issues, test passes
expect(true).to.be.true;
});
});

describe('Provider Instance Reuse', () => {
it('should complete streaming without errors', async () => {
const address = '0x7F17aF79AABC4A297A58D389ab5905fEd4Ec9502';
const objectId = ObjectId.createFromHexString('60f9abed0e32086bf9903bb5');
const wallet = {
_id: objectId,
chain,
network,
name: 'test-wallet-provider',
pubKey: '0x029ec2ebdebe6966259cf3c6f35c4f126b82fe072bf9d0e81dad375f1d6d2d9054',
path: 'm/44\'/60\'/0\'/0/0',
singleAddress: true
} as MongoBound<IWallet>;

await WalletStorage.collection.insertOne(wallet as any);
await WalletAddressStorage.collection.insertOne({
chain,
network,
wallet: objectId,
address,
processed: true
});

const txCount = 5;
const txs = new Array(txCount).fill({}).map((_, i) => ({
chain,
network,
blockHeight: 1,
gasPrice: 10 * 1e9,
data: Buffer.from(''),
from: address,
to: '0xRecipient789',
txid: '0x' + (i + 200).toString(16).padStart(64, '0'),
wallets: [objectId]
} as any));

await EVMTransactionStorage.collection.insertMany(txs);

const { req, res, resEmitter } = createMockReqRes();

let receivedTxCount = 0;
resEmitter.on('data', () => {
receivedTxCount++;
});

await new Promise((resolve, reject) => {
resEmitter.on('finish', resolve);
resEmitter.on('error', reject);

ETH.streamWalletTransactions({
chain,
network,
wallet,
req,
res,
args: {}
}).catch(reject);
});

// Verify that we received some transactions (stream worked)
expect(receivedTxCount).to.be.greaterThan(0);
});
});
});