Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor producer #817

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector.Testing 14.0.0
nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.Testing.Xunit 14.0.0
nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication 14.0.0

nuget Be.Vlaanderen.Basisregisters.Projector 15.1.0
nuget Be.Vlaanderen.Basisregisters.Projector 15.2.0

nuget Be.Vlaanderen.Basisregisters.Crab 4.0.0

Expand Down
2 changes: 1 addition & 1 deletion paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ NUGET
Microsoft.EntityFrameworkCore (>= 8.0.2)
Microsoft.Extensions.Logging (>= 8.0)
xunit (>= 2.7)
Be.Vlaanderen.Basisregisters.Projector (15.1)
Be.Vlaanderen.Basisregisters.Projector (15.2)
Autofac (>= 8.0)
Autofac.Extensions.DependencyInjection (>= 9.0)
Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector (>= 14.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@ namespace ParcelRegistry.Producer.Snapshot.Oslo.Infrastructure.Modules
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.SqlServer.MigrationExtensions;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Autofac;
using Be.Vlaanderen.Basisregisters.Projector;
using Be.Vlaanderen.Basisregisters.Projector.ConnectedProjections;
using Be.Vlaanderen.Basisregisters.Projector.Modules;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NodaTime;
using ParcelRegistry.Infrastructure;

public class ApiModule : Module
public class ProducerModule : Module
{
private readonly IConfiguration _configuration;
private readonly IServiceCollection _services;
private readonly ILoggerFactory _loggerFactory;

public ApiModule(
public ProducerModule(
IConfiguration configuration,
IServiceCollection services,
ILoggerFactory loggerFactory)
Expand Down Expand Up @@ -69,12 +71,26 @@ private void RegisterProjectionSetup(ContainerBuilder builder)

private void RegisterProjections(ContainerBuilder builder)
{
builder
.RegisterModule(
new ProducerModule(
_configuration,
_services,
_loggerFactory));
var logger = _loggerFactory.CreateLogger<ProducerModule>();
var connectionString = _configuration.GetConnectionString("ProducerSnapshotProjections");

var hasConnectionString = !string.IsNullOrWhiteSpace(connectionString);
if (hasConnectionString)
{
RunOnSqlServer(_services, _loggerFactory, connectionString);
}
else
{
RunInMemoryDb(_services, _loggerFactory, logger);
}

logger.LogInformation(
"Added {Context} to services:" +
Environment.NewLine +
"\tSchema: {Schema}" +
Environment.NewLine +
"\tTableName: {TableName}",
nameof(ProducerContext), Schema.ProducerSnapshotOslo, MigrationTables.ProducerSnapshotOslo);

var connectedProjectionSettings = ConnectedProjectionSettings.Configure(x =>
{
Expand Down Expand Up @@ -149,5 +165,34 @@ private ProducerOptions CreateProducerOptions()

return producerOptions;
}

private static void RunOnSqlServer(
IServiceCollection services,
ILoggerFactory loggerFactory,
string producerConnectionString)
{
services
.AddDbContext<ProducerContext>((_, options) => options
.UseLoggerFactory(loggerFactory)
.UseSqlServer(producerConnectionString, sqlServerOptions =>
{
sqlServerOptions.EnableRetryOnFailure();
sqlServerOptions.MigrationsHistoryTable(MigrationTables.ProducerSnapshotOslo, Schema.ProducerSnapshotOslo);
})
.UseExtendedSqlServerMigrations());
}

private static void RunInMemoryDb(
IServiceCollection services,
ILoggerFactory loggerFactory,
ILogger logger)
{
services
.AddDbContext<ProducerContext>(options => options
.UseLoggerFactory(loggerFactory)
.UseInMemoryDatabase(Guid.NewGuid().ToString(), sqlServerOptions => { }));

logger.LogWarning("Running InMemory for {Context}!", nameof(ProducerContext));
}
}
}
168 changes: 148 additions & 20 deletions src/ParcelRegistry.Producer.Snapshot.Oslo/Infrastructure/Program.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,167 @@
namespace ParcelRegistry.Producer.Snapshot.Oslo.Infrastructure
{
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Autofac;
using Autofac.Extensions.DependencyInjection;
using Be.Vlaanderen.Basisregisters.Api;
using Be.Vlaanderen.Basisregisters.Aws.DistributedMutex;
using Destructurama;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Modules;
using Serilog;
using Serilog.Debugging;
using Serilog.Extensions.Logging;

public sealed class Program
{
private Program()
{ }

public static void Main(string[] args)
=> Run(new ProgramOptions
public static async Task Main(string[] args)
{
AppDomain.CurrentDomain.FirstChanceException += (_, eventArgs) =>
Log.Debug(
eventArgs.Exception,
"FirstChanceException event raised in {AppDomain}.",
AppDomain.CurrentDomain.FriendlyName);

AppDomain.CurrentDomain.UnhandledException += (_, eventArgs) =>
Log.Fatal((Exception)eventArgs.ExceptionObject, "Encountered a fatal exception, exiting program.");

Log.Information("Initializing ParcelRegistry.Producer.Snapshot.Oslo");

var host = new HostBuilder()
.ConfigureAppConfiguration((_, configurationBuilder) =>
{
Hosting =
{
HttpPort = 7016
},
Logging =
configurationBuilder
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: false)
.AddJsonFile($"appsettings.{Environment.MachineName.ToLowerInvariant()}.json", optional: true, reloadOnChange: false)
.AddEnvironmentVariables()
.AddCommandLine(args);
})
.ConfigureLogging((hostContext, loggingBuilder) =>
{
SelfLog.Enable(Console.WriteLine);
Log.Logger = new LoggerConfiguration() //NOSONAR logging configuration is safe
.ReadFrom.Configuration(hostContext.Configuration)
.Enrich.FromLogContext()
.Enrich.WithMachineName()
.Enrich.WithThreadId()
.Enrich.WithEnvironmentUserName()
.Destructure.JsonNetTypes()
.CreateLogger();
loggingBuilder.ClearProviders();
loggingBuilder.AddSerilog(Log.Logger);
})
.ConfigureServices((hostContext, services) =>
{
var healthChecksBuilder = services.AddHealthChecks();
var connectionStrings = hostContext.Configuration
.GetSection("ConnectionStrings")
.GetChildren();

foreach (var connectionString in connectionStrings
.Where(x => !x.Value.Contains("host", StringComparison.OrdinalIgnoreCase)))
{
WriteTextToConsole = false,
WriteJsonToConsole = false
},
Runtime =
healthChecksBuilder.AddSqlServer(
connectionString.Value,
name: $"sqlserver-{connectionString.Key.ToLowerInvariant()}",
tags: new[] { "db", "sql", "sqlserver" });
}

foreach (var connectionString in connectionStrings
.Where(x => x.Value.Contains("host", StringComparison.OrdinalIgnoreCase)))
{
CommandLineArgs = args
},
MiddlewareHooks =
healthChecksBuilder.AddNpgSql(
connectionString.Value,
name: $"npgsql-{connectionString.Key.ToLowerInvariant()}",
tags: new[] { "db", "sql", "npgsql" });
}

healthChecksBuilder.AddDbContextCheck<ProducerContext>(
$"dbcontext-{nameof(ProducerContext).ToLowerInvariant()}",
tags: new[] { "db", "sql", "sqlserver" });

var origins = hostContext.Configuration
.GetSection("Cors")
.GetChildren()
.Select(c => c.Value)
.ToArray();

foreach (var origin in origins)
{
ConfigureDistributedLock = DistributedLockOptions.LoadFromConfiguration
services.AddCors(options =>
{
options.AddDefaultPolicy(builder =>
{
builder.WithOrigins(origin);
});
});
}
});
})
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureContainer<ContainerBuilder>((hostContext, builder) =>
{
var services = new ServiceCollection();
var loggerFactory = new SerilogLoggerFactory(Log.Logger);

builder.RegisterModule(new ProducerModule(hostContext.Configuration, services, loggerFactory));

builder
.RegisterType<SnapshotProducer>()
.As<IHostedService>()
.SingleInstance();

builder.Populate(services);
})
.ConfigureWebHostDefaults(webHostBuilder =>
webHostBuilder
.UseStartup<Startup>()
.UseKestrel())
.UseConsoleLifetime()
.Build();

Log.Information("Starting ParcelRegistry.Producer.Snapshot.Oslo");

var logger = host.Services.GetRequiredService<ILogger<Program>>();
var configuration = host.Services.GetRequiredService<IConfiguration>();

try
{
await DistributedLock<Program>.RunAsync(
async () => { await host.RunAsync().ConfigureAwait(false); },
DistributedLockOptions.LoadFromConfiguration(configuration),
logger)
.ConfigureAwait(false);
}
catch (AggregateException aggregateException)
{
foreach (var innerException in aggregateException.InnerExceptions)
{
logger.LogCritical(innerException, "Encountered a fatal exception, exiting program.");
}
}
catch (Exception e)
{
logger.LogCritical(e, "Encountered a fatal exception, exiting program.");
Log.CloseAndFlush();

private static void Run(ProgramOptions options)
=> new WebHostBuilder()
.UseDefaultForApi<Startup>(options)
.RunWithLock<Program>();
// Allow some time for flushing before shutdown.
await Task.Delay(500, default);
throw;
}
finally
{
logger.LogInformation("Stopping...");
}
}
}
}
Loading
Loading