Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,26 @@ private SafeFileHandle CreateHandle(int segmentId)

fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING;
if (deleteOnClose)
{
fileFlags = fileFlags | Native32.FILE_FLAG_DELETE_ON_CLOSE;

// FILE_SHARE_DELETE allows multiple FASTER instances to share a single log directory and each can specify deleteOnClose.
// This will allow the files to persist until all handles across all instances have been closed.
fileShare = fileShare | Native32.FILE_SHARE_DELETE;
}

var logHandle = Native32.CreateFileW(
GetSegmentName(segmentId),
fileAccess, fileShare,
IntPtr.Zero, fileCreation,
fileFlags, IntPtr.Zero);

if (logHandle.IsInvalid)
{
var error = Marshal.GetLastWin32Error();
throw new IOException($"Error creating log file for {GetSegmentName(segmentId)}, error: {error}", Native32.MakeHRFromErrorCode(error));
}

if (preallocateFile)
SetFileSize(FileName, logHandle, segmentSize);

Expand Down
7 changes: 7 additions & 0 deletions cs/src/core/Utilities/Native32.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private struct MARK_HANDLE_INFO
internal const uint FILE_FLAG_DELETE_ON_CLOSE = 0x04000000;
internal const uint FILE_FLAG_NO_BUFFERING = 0x20000000;
internal const uint FILE_FLAG_OVERLAPPED = 0x40000000;

internal const uint FILE_SHARE_DELETE = 0x00000004;
#endregion

#region io functions
Expand Down Expand Up @@ -320,6 +322,11 @@ public static bool SetFileSize(SafeFileHandle file_handle, long file_size)

return true;
}

internal static int MakeHRFromErrorCode(int errorCode)
{
return unchecked(((int)0x80070000) | errorCode);
}
#endregion
}
}
232 changes: 232 additions & 0 deletions cs/test/SharedDirectoryTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using NUnit.Framework;
using NUnit.Framework.Internal;
using System;
using System.IO;
using System.Linq;

namespace FASTER.test.recovery.sumstore
{
[TestFixture]
internal class SharedDirectoryTests
{
const long numUniqueKeys = (1 << 14);
const long keySpace = (1L << 14);
const long numOps = (1L << 19);
const long refreshInterval = (1L << 8);
const long completePendingInterval = (1L << 10);
private string rootPath;
private string sharedLogDirectory;
FasterTestInstance original;
FasterTestInstance clone;

[SetUp]
public void Setup()
{
this.rootPath = $"{TestContext.CurrentContext.TestDirectory}\\{Path.GetRandomFileName()}";
Directory.CreateDirectory(this.rootPath);
this.sharedLogDirectory = $"{this.rootPath}\\SharedLogs";
Directory.CreateDirectory(this.sharedLogDirectory);

this.original = new FasterTestInstance();
this.clone = new FasterTestInstance();
}

[TearDown]
public void TearDown()
{
this.original.TearDown();
this.clone.TearDown();
try
{
Directory.Delete(this.rootPath, recursive: true);
}
catch
{
}
}

[Test]
public void SharedLogDirectory()
{
this.original.Initialize($"{this.rootPath}\\OriginalCheckpoint", this.sharedLogDirectory);
this.original.Faster.StartSession();
Assert.IsTrue(IsDirectoryEmpty(this.sharedLogDirectory)); // sanity check
Populate(this.original.Faster);

// Take checkpoint from original to start the clone from
Assert.IsTrue(this.original.Faster.TakeFullCheckpoint(out var checkpointGuid));
Assert.IsTrue(this.original.Faster.CompleteCheckpoint(wait: true));

// Sanity check against original
Assert.IsFalse(IsDirectoryEmpty(this.sharedLogDirectory));
Test(this.original, checkpointGuid);

// Copy checkpoint directory
var cloneCheckpointDirectory = $"{this.rootPath}\\CloneCheckpoint";
CopyDirectory(new DirectoryInfo(this.original.CheckpointDirectory), new DirectoryInfo(cloneCheckpointDirectory));

// Recover from original checkpoint
this.clone.Initialize(cloneCheckpointDirectory, this.sharedLogDirectory);
this.clone.Faster.Recover(checkpointGuid);
this.clone.Faster.StartSession();

// Both sessions should work concurrently
Test(this.original, checkpointGuid);
Test(this.clone, checkpointGuid);

// Dispose original, files should not be deleted
this.original.TearDown();

// Clone should still work
Assert.IsFalse(IsDirectoryEmpty(this.sharedLogDirectory));
Test(this.clone, checkpointGuid);

this.clone.TearDown();

// Files should be deleted after both instances are closed
Assert.IsTrue(IsDirectoryEmpty(this.sharedLogDirectory));
}

private struct FasterTestInstance
{
public string CheckpointDirectory { get; private set; }
public string LogDirectory { get; private set; }
public FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> Faster { get; private set; }
public IDevice LogDevice { get; private set; }

public void Initialize(string checkpointDirectory, string logDirectory)
{
this.CheckpointDirectory = checkpointDirectory;
this.LogDirectory = logDirectory;

this.LogDevice = Devices.CreateLogDevice($"{this.LogDirectory}\\log", deleteOnClose: true);
this.Faster = new FasterKV<AdId, NumClicks, Input, Output, Empty, Functions>(
keySpace,
new Functions(),
new LogSettings { LogDevice = this.LogDevice },
new CheckpointSettings { CheckpointDir = this.CheckpointDirectory, CheckPointType = CheckpointType.FoldOver });
}

public void TearDown()
{
this.Faster?.StopSession();
this.Faster?.Dispose();
this.Faster = null;
this.LogDevice?.Close();
this.LogDevice = null;
}
}

private void Populate(FasterKV<AdId, NumClicks, Input, Output, Empty, Functions> fasterInstance)
{
// Prepare the dataset
var inputArray = new Input[numOps];
for (int i = 0; i < numOps; i++)
{
inputArray[i].adId.adId = i % numUniqueKeys;
inputArray[i].numClicks.numClicks = 1;
}

// Process the batch of input data
for (int i = 0; i < numOps; i++)
{
fasterInstance.RMW(ref inputArray[i].adId, ref inputArray[i], Empty.Default, i);

if (i % completePendingInterval == 0)
{
fasterInstance.CompletePending(false);
}
else if (i % refreshInterval == 0)
{
fasterInstance.Refresh();
}
}

// Make sure operations are completed
fasterInstance.CompletePending(true);
}

private void Test(FasterTestInstance fasterInstance, Guid checkpointToken)
{
var checkpointInfo = default(HybridLogRecoveryInfo);
Assert.IsTrue(checkpointInfo.Recover(checkpointToken, new DirectoryConfiguration(fasterInstance.CheckpointDirectory)));

// Create array for reading
var inputArray = new Input[numUniqueKeys];
for (int i = 0; i < numUniqueKeys; i++)
{
inputArray[i].adId.adId = i;
inputArray[i].numClicks.numClicks = 0;
}

var input = default(Input);
var output = default(Output);

// Issue read requests
for (var i = 0; i < numUniqueKeys; i++)
{
var status = fasterInstance.Faster.Read(ref inputArray[i].adId, ref input, ref output, Empty.Default, i);
Assert.IsTrue(status == Status.OK);
inputArray[i].numClicks = output.value;
}

// Complete all pending requests
fasterInstance.Faster.CompletePending(true);


// Compute expected array
long[] expected = new long[numUniqueKeys];
foreach (var guid in checkpointInfo.continueTokens.Keys)
{
var sno = checkpointInfo.continueTokens[guid];
for (long i = 0; i <= sno; i++)
{
var id = i % numUniqueKeys;
expected[id]++;
}
}

int threadCount = 1; // single threaded test
int numCompleted = threadCount - checkpointInfo.continueTokens.Count;
for (int t = 0; t < numCompleted; t++)
{
var sno = numOps;
for (long i = 0; i < sno; i++)
{
var id = i % numUniqueKeys;
expected[id]++;
}
}

// Assert that expected is same as found
for (long i = 0; i < numUniqueKeys; i++)
{
Assert.IsTrue(
expected[i] == inputArray[i].numClicks.numClicks,
"Debug error for AdId {0}: Expected ({1}), Found({2})", inputArray[i].adId.adId, expected[i], inputArray[i].numClicks.numClicks);
}
}

private bool IsDirectoryEmpty(string path) => !Directory.Exists(path) || !Directory.EnumerateFileSystemEntries(path).Any();

private static void CopyDirectory(DirectoryInfo source, DirectoryInfo target)
{
// Copy each file
foreach (var file in source.GetFiles())
{
file.CopyTo(Path.Combine(target.FullName, file.Name), true);
}

// Copy each subdirectory
foreach (var sourceSubDirectory in source.GetDirectories())
{
var targetSubDirectory = target.CreateSubdirectory(sourceSubDirectory.Name);
CopyDirectory(sourceSubDirectory, targetSubDirectory);
}
}
}
}