본문 바로가기

프로젝트/XRP 환전 서비스

[일관성 보장] Outbox Pattern을 적용해보자

Outbox Pattern

1. Outbox Pattern을 사용한 이유?

아래는 trustline을 생성하는 비즈니스 로직으로

데이터베이스와 블록체인의 상태를 한번에 변경하고 있다.

@Transactional()
async createTrustLine(currency: string, userId: number) {
    const savedWallet = await this.prismaService.tx.wallet.findUnique({
      where: { userId },
      select: { id: true, seed: true },
    });
    if (!savedWallet) {
      throw new NotFoundException('존재하지 않는 지갑입니다.');
    }

    const adminWallet = this.walletService.createFromSeed(
      process.env.IOU_ADMIN_SEED!,
    );
    const userWallet = this.walletService.createFromSeed(savedWallet.seed);

    await this.prismaService.tx.trustLine.upsert({
      where: {
        address_currency_issuer: {
          address: userWallet.address,
          currency,
          issuer: adminWallet.address,
        },
      },
      update: {},
      create: {
        currency,
        issuer: adminWallet.address,
        address: userWallet.address,
        walletId: savedWallet.id,
        limit: TRUST_LIMIT,
      },
    });

    const result = await this.walletService.setTrust(
      adminWallet.address,
      userWallet,
      currency,
    );
    const meta = (result as any)?.result?.meta;
    if (meta?.TransactionResult !== 'tesSUCCESS') {
      throw new InternalServerErrorException('XRPL 트랜잭션 실패');
    }
}

 

데이터베이스의 상태 변경은 실패했지만 블록체인의 상태 변경은 성공한 경우 일관성 문제가 발생한다.

아래는 테스트 코드로 블록체인 트랜잭션은 성공하는 것으로 mocking해서 서비스 로직을 호출하고

데이터베이스에 commit 하기 전에 예외를 발생시키도록 했다.

이후 데이터베이스에 저장된 데이터가 없는지와 블록체인 트랜잭션은 호출되었는지를 검증했다.

rollback이 되어서 데이터베이스에 데이터는 저장이 안되었고 블록체인 트랜잭션은 성공적으로 호출되었다.

it('commit 실패시 데이터베이스와 XRPL의 일관성 검증', async () => {
    const email = 'test@naver.com';
    const password = 'test';
    const name = 'test';
    const address = 'test';
    const publicKey = 'test';
    const privateKey = 'test';
    const seed = 'test';
    const currency = 'test';
    
    const user = await prismaService.user.create({
      data: { email, password, name },
    });
    const wallet = await prismaService.wallet.create({
      data: {
        address,
        publicKey,
        privateKey,
        seed,
        userId: user.id,
      },
    });

    jest.spyOn(walletService, 'setTrust').mockResolvedValue({
      result: { meta: { TransactionResult: 'tesSUCCESS' } },
    });
    jest.spyOn(walletService, 'createFromSeed').mockReturnValue({
      address,
      seed,
      publicKey,
      privateKey,
    });

    await expect(
      prismaService.$transaction(async (tx) => {
        return asyncLocalStorage.run(tx, async () => {
          await userService.createTrustLineTest(currency, user.id);
          throw new InternalServerErrorException('서버 이상으로 인한 실패');
        });
      }),
    ).rejects.toThrow();

    const trustLines = await prismaService.trustLine.findMany({
      where: { walletId: wallet.id },
    });
    expect(trustLines.length).toBe(0);

    expect(walletService.setTrust).toHaveBeenCalledTimes(1);
});

 

이런 일관성이 깨지는 문제를 해결하기 위해 outbox pattern을 적용하기로 했다.

 

2. Outbox Pattern이란?

outbox는 전송되지 않았거나 전송에 실패한 메시지를 보관하고 있는 보관함이다.

데이터 변경 작업과 발행할 이벤트 내용을 outbox 테이블에 기록하는 작업을 동일한 트랜잭션으로 처리한다.

이후 별도의 프로세스가 주기적으로 outbox 테이블을 조회해서 이벤트를 발행한다.

 

Outbox Pattern 적용

1. Outbox 테이블

id 식별자
type XRPL 트랜잭션 종류
payload XRPL 트랜잭션 서명값
status outbox 상태로 PENDING, SUCCESS, FAILED가 있음
retryCount 재시도 횟수
createdAt 생성 시간
updatedAt 수정 시간

 

2. 서비스 로직

기존과 달리진 점은 XRPL 트랜잭션을 실행시키지 않고
서명으로 인해 생기는 txBlob을 outbox로 저장한다는 점이다.

@Transactional()
async createTrustLine(currency: string, userId: number) {
    const savedWallet = await this.prismaService.tx.wallet.findUnique({
      where: { userId: userId },
      select: {
        id: true,
        seed: true,
      },
    });
    if (!savedWallet) {
      throw new NotFoundException('존재하지 않는 지갑입니다.');
    }

    const adminWallet = this.walletService.createFromSeed(
      process.env.IOU_ADMIN_SEED!,
    );
    const userWallet = this.walletService.createFromSeed(savedWallet.seed);
    await this.prismaService.tx.trustLine.upsert({
      where: {
        address_currency_issuer: {
          address: userWallet.address,
          currency: currency,
          issuer: adminWallet.address,
        },
      },
      update: {},
      create: {
        currency: currency,
        issuer: adminWallet.address,
        address: userWallet.address,
        walletId: savedWallet.id,
        limit: TRUST_LIMIT,
      },
    });

    const txBlob = await this.walletService.setTrust(
      adminWallet.address,
      userWallet,
      currency,
    );
    await this.outboxSerice.create(
      'TRUST_SET',
      txBlob,
      userWallet.address,
      savedWallet.id,
    );
}

 

3. Worker 로직

scheduler를 통해 아래 로직을 일정 시간 주기로 호출한다.

PENDING 상태를 가진 outbox를 조회해서 실행시킨다.

async processPending() {
    const messages = await this.outboxService.findPending(5);
    for (const message of messages) {
      try {
        const result = await this.walletService.executeSignedTx(
          message.payload,
        );

        const meta = result?.result?.meta as any;
        if (meta?.TransactionResult === 'tesSUCCESS') {
          await this.outboxService.markSuccess(message.id);
        } else {
          await this.outboxService.markFailed(message.id);
        }
      } catch (error) {
        await this.outboxService.markFailed(message.id);
        this.logger.error(
          `Outbox ${message.id} 실패: ${message.type} - ${error}`,
        );
      }
    }
}

 

4. 테스트 

블록체인 트랜잭션은 성공하는 것으로 mocking해서 서비스 로직을 호출하고

데이터베이스에 commit 하기 전에 예외를 발생시키도록 했다.

이후 데이터베이스에 저장된 데이터가 없는지와 블록체인 트랜잭션은 호출되었는지를 검증했다.

it('commit 실패시 데이터베이스와 XRPL의 일관성 검증', async () => {
    const email = 'test@naver.com';
    const password = 'test';
    const name = 'test';
    const address = 'test';
    const publicKey = 'test';
    const privateKey = 'test';
    const seed = 'test';
    const currency = 'test';

    jest.spyOn(walletService, 'setTrust').mockResolvedValue('SIGNED_TX_BLOB');
    jest.spyOn(walletService, 'executeSignedTx').mockResolvedValue({
      result: { meta: { TransactionResult: 'tesSUCCESS' } },
    });
    jest.spyOn(walletService, 'createFromSeed').mockReturnValue({
      address,
      seed,
      publicKey,
      privateKey,
    });

    const user = await prismaService.user.create({
      data: { email, password, name },
    });
    const wallet = await prismaService.wallet.create({
      data: {
        address,
        publicKey,
        privateKey,
        seed,
        userId: user.id,
      },
    });

    await expect(
      prismaService.$transaction(async (tx) => {
        return asyncLocalStorage.run(tx, async () => {
          await userService.createTrustLine(currency, user.id);
          throw new InternalServerErrorException('서버 이상으로 인한 실패');
        });
      }),
    ).rejects.toThrow();

    const trustLines = await prismaService.trustLine.findMany({
      where: { walletId: wallet.id },
    });
    expect(trustLines.length).toBe(0);

    const outboxes = await prismaService.outbox.findMany({
      where: { walletId: wallet.id },
    });
    expect(outboxes.length).toBe(0);

    await outboxWorker.processPending();
    expect(walletService.executeSignedTx).toHaveBeenCalledTimes(0);
});