Skip to content

Commit

Permalink
feat(grain storage): added compression modules (#20)
Browse files Browse the repository at this point in the history
Co-authored-by: Derek Grech <derek.grech@river.tech>
  • Loading branch information
djgrech and djgrech authored Feb 21, 2023
1 parent 7a9868c commit bb0037a
Show file tree
Hide file tree
Showing 18 changed files with 409 additions and 36 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
## [1.4.1](https://github.com/jonathansant/orleans.persistence.redis/compare/1.4.0...1.4.1) (2023-01-10)
## [1.5.0](https://github.com/jonathansant/orleans.persistence.redis/compare/1.4.1...1.5.0) (2023-02-21)

### Features

- Added a deflate unit tests (that verifies file integrity)
- Added BrotliCompression, DeflateCompression, GZipCompression and RawDeflateCompression modules that
can be used to compress human serialized data

## [1.4.1](https://github.com/jonathansant/orleans.persistence.redis/compare/1.4.0...1.4.1) (2023-01-10)

### Features

Expand Down
28 changes: 28 additions & 0 deletions DeflateCompressionTest/DeflateCompressionTest.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.1.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Orleans.Persistence.Redis\Orleans.Persistence.Redis.csproj" />
</ItemGroup>

</Project>
22 changes: 22 additions & 0 deletions DeflateCompressionTest/DeflateCompressionUnitTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Orleans.Persistence.Redis.Compression;
using System.Text;
using Xunit;

namespace DeflateCompressionTest
{
public class DeflateCompressionUnitTest
{
[Fact]
public void DeflateCompressionTest1()
{
const string test = "hello world";
var deflateCompression = new DeflateCompression();

var compressed = deflateCompression.Compress(Encoding.UTF8.GetBytes(test));
var decompressed = deflateCompression.Decompress(compressed);

Assert.NotNull(decompressed);
Assert.Equal(test, Encoding.UTF8.GetString(decompressed));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,32 @@ public async Task TestSmallDataSet()
}
}

public class CompressedTestHumanSerializable : RedisSegmentTests<SiloConfigurator.SiloBuilderConfiguratorHumanSerializerCompressed>
{
public CompressedTestHumanSerializable(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task Test()
{
await PerformTest("HumanSerializerCompressedTest");
}
}

public class CompressedTestHumanSerializableWithSegments : RedisSegmentTests<SiloConfigurator.SiloBuilderConfiguratorHumanSerializerCompressedWithSegments>
{
public CompressedTestHumanSerializableWithSegments(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task Test()
{
await PerformTest("HumanSerializerCompressedTestWithSegments");
}
}

public class RedisSegmentTests<T> : TestBase<T, SiloConfigurator.ClientBuilderConfigurator> where T : ISiloBuilderConfigurator, new()
{
private readonly ITestOutputHelper _output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,47 @@ public void Configure(ISiloHostBuilder hostBuilder)
;
}

public class SiloBuilderConfiguratorHumanSerializerCompressed : ISiloBuilderConfigurator
{
public void Configure(ISiloHostBuilder hostBuilder)
=> hostBuilder
.ConfigureApplicationParts(parts =>
parts.AddApplicationPart(typeof(ITestGrainSegments).Assembly).WithReferences())
.AddRedisGrainStorage("TestingProvider")
.AddCompression<Compression.RawDeflateCompression>()
//.AddCompression<Compression.BrotliCompression>()
//.AddCompression<Compression.GZipCompression>()

.Build(builder => builder.Configure(opts =>
{
opts.Servers = new List<string> { "localhost" };
opts.ClientName = "testing";
opts.ThrowExceptionOnInconsistentETag = false;
opts.HumanReadableSerialization = true;
}));
}

public class SiloBuilderConfiguratorHumanSerializerCompressedWithSegments : ISiloBuilderConfigurator
{
public void Configure(ISiloHostBuilder hostBuilder)
=> hostBuilder
.ConfigureApplicationParts(parts =>
parts.AddApplicationPart(typeof(ITestGrainSegments).Assembly).WithReferences())
.AddRedisGrainStorage("TestingProvider")
.AddCompression<Compression.RawDeflateCompression>()
//.AddCompression<Compression.BrotliCompression>()
//.AddCompression<Compression.GZipCompression>()

.Build(builder => builder.Configure(opts =>
{
opts.Servers = new List<string> { "localhost" };
opts.ClientName = "testing";
opts.ThrowExceptionOnInconsistentETag = false;
opts.HumanReadableSerialization = true;
opts.SegmentSize = (int)(100.Kilobytes().Bytes);
}));
}

public class ClientBuilderConfigurator : IClientBuilderConfigurator
{
public virtual void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
Expand Down
11 changes: 9 additions & 2 deletions Orleans.Persistence.Redis.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29418.71
# Visual Studio Version 17
VisualStudioVersion = 17.4.33205.214
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Persistence.Redis", "Orleans.Persistence.Redis\Orleans.Persistence.Redis.csproj", "{D2FCB20D-DE90-4DF1-B6F6-9E4A06196EF6}"
EndProject
Expand All @@ -28,6 +28,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestGrains", "Samples\TestG
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestClient", "Samples\TestClient\TestClient.csproj", "{7FD09B53-98ED-4F3F-B5C3-E811A04FFBF8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DeflateCompressionTest", "DeflateCompressionTest\DeflateCompressionTest.csproj", "{1042E086-A520-4D68-BF68-2B766BC74D52}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -58,6 +60,10 @@ Global
{7FD09B53-98ED-4F3F-B5C3-E811A04FFBF8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7FD09B53-98ED-4F3F-B5C3-E811A04FFBF8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7FD09B53-98ED-4F3F-B5C3-E811A04FFBF8}.Release|Any CPU.Build.0 = Release|Any CPU
{1042E086-A520-4D68-BF68-2B766BC74D52}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1042E086-A520-4D68-BF68-2B766BC74D52}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1042E086-A520-4D68-BF68-2B766BC74D52}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1042E086-A520-4D68-BF68-2B766BC74D52}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -68,6 +74,7 @@ Global
{785B4D38-5101-4DA3-AA13-0264F4DE8A8A} = {0C057C82-36AC-42A2-BBEC-5AF33395A04B}
{BE13263C-FCF6-4F0E-B4D7-61545C8C717C} = {0C057C82-36AC-42A2-BBEC-5AF33395A04B}
{7FD09B53-98ED-4F3F-B5C3-E811A04FFBF8} = {0C057C82-36AC-42A2-BBEC-5AF33395A04B}
{1042E086-A520-4D68-BF68-2B766BC74D52} = {0C057C82-36AC-42A2-BBEC-5AF33395A04B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {19F423B8-DD85-4046-BBA2-EBE4F74867AA}
Expand Down
31 changes: 31 additions & 0 deletions Orleans.Persistence.Redis/Compression/BrotliCompression.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System.IO;
using System.IO.Compression;

namespace Orleans.Persistence.Redis.Compression
{
public class BrotliCompression : ICompression
{
public byte[] Decompress(byte[] bytes)
{
using var memoryStream = new MemoryStream(bytes);
using var outputStream = new MemoryStream();
using (var decompressStream = new Brotli.BrotliStream(memoryStream, CompressionMode.Decompress))
{
decompressStream.CopyTo(outputStream);
}

return outputStream.ToArray();
}

public byte[] Compress(byte[] bytes)
{
using var memoryStream = new MemoryStream();
using (var brotliStream = new Brotli.BrotliStream(memoryStream, CompressionMode.Compress))
{
brotliStream.Write(bytes, 0, bytes.Length);
}

return memoryStream.ToArray();
}
}
}
75 changes: 75 additions & 0 deletions Orleans.Persistence.Redis/Compression/DeflateCompression.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.IO;
using System.IO.Compression;
using System.Linq;

namespace Orleans.Persistence.Redis.Compression
{
public class DeflateCompression : ICompression
{
public byte[] Decompress(byte[] buffer)
{
var tempBuffer = new byte[buffer.Length - 6]; // 2 header + 4 checksum
if (buffer[0] != 0x78 && buffer[0] != 0x9c)
return null;

Array.Copy(buffer, 2, tempBuffer, 0, tempBuffer.Length);

using var memoryStream = new MemoryStream(tempBuffer);
using var outputStream = new MemoryStream();
using (var decompressStream = new DeflateStream(memoryStream, CompressionMode.Decompress))
{
decompressStream.CopyTo(outputStream);
}

var decompressed = outputStream.ToArray();

var checksum = CalculateChecksum(decompressed);

return checksum.Where((t, i) => t != buffer[buffer.Length - 4 + i]).Any()
? null
: decompressed;
}

public byte[] Compress(byte[] buffer)
{
byte[] tempData;
using (var memoryStream = new MemoryStream())
{
// header
memoryStream.WriteByte(0x78);
memoryStream.WriteByte(0x9C);
using (var compressStream = new DeflateStream(memoryStream, CompressionMode.Compress))
compressStream.Write(buffer, 0, buffer.Length);

tempData = memoryStream.ToArray();
}

var n = tempData.Length;
Array.Resize(ref tempData, n + 4);
var checksum = CalculateChecksum(buffer);
Array.Copy(checksum, 0, tempData, n, checksum.Length);
return tempData;
}

private byte[] CalculateChecksum(byte[] buffer)
{
var a1 = 1;
var a2 = 0;
var checksum = new byte[4];

foreach (var b in buffer)
{
a1 = (a1 + b) % 65521;
a2 = (a2 + a1) % 65521;
}

checksum[0] = (byte)(a2 >> 8);
checksum[1] = (byte)a2;
checksum[2] = (byte)(a1 >> 8);
checksum[3] = (byte)a1;

return checksum;
}
}
}
31 changes: 31 additions & 0 deletions Orleans.Persistence.Redis/Compression/GZipCompression.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System.IO;
using System.IO.Compression;

namespace Orleans.Persistence.Redis.Compression
{
public class GZipCompression : ICompression
{
public byte[] Decompress(byte[] buffer)
{
using var memoryStream = new MemoryStream(buffer);
using var outputStream = new MemoryStream();
using (var decompressStream = new GZipStream(memoryStream, CompressionMode.Decompress))
{
decompressStream.CopyTo(outputStream);
}

return outputStream.ToArray();
}

public byte[] Compress(byte[] buffer)
{
using var memoryStream = new MemoryStream();
using (var compressStream = new GZipStream(memoryStream, CompressionMode.Compress))
{
compressStream.Write(buffer, 0, buffer.Length);
}

return memoryStream.ToArray();
}
}
}
8 changes: 8 additions & 0 deletions Orleans.Persistence.Redis/Compression/ICompression.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Orleans.Persistence.Redis.Compression
{
public interface ICompression
{
byte[] Decompress(byte[] buffer);
byte[] Compress(byte[] buffer);
}
}
31 changes: 31 additions & 0 deletions Orleans.Persistence.Redis/Compression/RawDeflateCompression.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System.IO;
using System.IO.Compression;

namespace Orleans.Persistence.Redis.Compression
{
public class RawDeflateCompression : ICompression
{
public byte[] Decompress(byte[] bytes)
{
using var memoryStream = new MemoryStream(bytes);
using var outputStream = new MemoryStream();
using (var decompressStream = new DeflateStream(memoryStream, CompressionMode.Decompress))
{
decompressStream.CopyTo(outputStream);
}

return outputStream.ToArray();
}

public byte[] Compress(byte[] bytes)
{
using var memoryStream = new MemoryStream();
using (var compressStream = new DeflateStream(memoryStream, CompressionMode.Compress))
{
compressStream.Write(bytes, 0, bytes.Length);
}

return memoryStream.ToArray();
}
}
}
Loading

0 comments on commit bb0037a

Please sign in to comment.