Skip to content

Commit

Permalink
feat(consumer): add offset override
Browse files Browse the repository at this point in the history
  • Loading branch information
ArneD committed Dec 5, 2024
1 parent af5784b commit f9fa15c
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 34 deletions.
8 changes: 4 additions & 4 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.14.1

nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.2.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.2.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.2.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.2.0

nuget Be.Vlaanderen.Basisregisters.Shaperon 10.0.2

Expand Down
8 changes: 4 additions & 4 deletions paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,19 @@ NUGET
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1)
Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14.1)
Microsoft.CSharp (>= 4.7)
Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.2)
AWSSDK.Core (>= 3.7.302.15)
AWSSDK.SQS (>= 3.7.300.54)
Microsoft.Extensions.Logging (>= 8.0)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.2)
Confluent.Kafka (>= 2.3)
Microsoft.EntityFrameworkCore.SqlServer (>= 8.0.2)
Microsoft.Extensions.Logging (>= 8.0)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.2)
Confluent.Kafka (>= 2.3)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.2)
Confluent.Kafka (>= 2.3)
Microsoft.Extensions.Logging (>= 8.0)
Be.Vlaanderen.Basisregisters.Middleware.AddProblemJsonHeader (3.0)
Expand Down
24 changes: 4 additions & 20 deletions src/ParcelRegistry.Consumer.Address.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace ParcelRegistry.Consumer.Address.Console
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.Extensions;
using Destructurama;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
Expand Down Expand Up @@ -84,7 +85,7 @@ public static async Task Main(string[] args)
));

services
.AddDbContextFactory<ConsumerAddressContext>((provider, options) => options
.AddDbContextFactory<ConsumerAddressContext>((_, options) => options
.UseLoggerFactory(loggerFactory)
.UseSqlServer(hostContext.Configuration.GetConnectionString("ConsumerAddress"), sqlServerOptions =>
{
Expand Down Expand Up @@ -118,25 +119,8 @@ public static async Task Main(string[] args)
hostContext.Configuration["Kafka:SaslUserName"],
hostContext.Configuration["Kafka:SaslPassword"]));

var offset = hostContext.Configuration["AddressTopicOffset"];

if (!string.IsNullOrWhiteSpace(offset) && long.TryParse(offset, out var result))
{
var ignoreDataCheck = hostContext.Configuration.GetValue<bool>("IgnoreAddressTopicOffsetDataCheck", false);

if (!ignoreDataCheck)
{
using var ctx = c.Resolve<ConsumerAddressContext>();

if (ctx.AddressConsumerItems.Any())
{
throw new InvalidOperationException(
$"Cannot set Kafka offset to {offset} because {nameof(ctx.AddressConsumerItems)} has data.");
}
}

consumerOptions.ConfigureOffset(new Offset(result));
}
using var ctx = c.Resolve<ConsumerAddressContext>();
ctx.OverrideConfigureOffset(consumerOptions);

return consumerOptions;
});
Expand Down
2 changes: 0 additions & 2 deletions src/ParcelRegistry.Consumer.Address.Console/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

"GroupSuffix": "",
"AddressTopic": "dev.address",
"AddressTopicOffset": "",
"IgnoreAddressTopicOffsetDataCheck": false,

"BaseUrl": "https://api.staging-basisregisters.vlaanderen/",

Expand Down
7 changes: 5 additions & 2 deletions src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace ParcelRegistry.Consumer.Address
using System.IO;
using System.Linq;
using System.Reflection;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.SqlServer.MigrationExtensions;
using Microsoft.EntityFrameworkCore;
Expand All @@ -15,11 +16,12 @@ namespace ParcelRegistry.Consumer.Address
using Parcel.DataStructures;
using ParcelRegistry.Infrastructure;

public class ConsumerAddressContext : SqlServerConsumerDbContext<ConsumerAddressContext>, IAddresses
public class ConsumerAddressContext : SqlServerConsumerDbContext<ConsumerAddressContext>, IAddresses, IOffsetOverrideDbSet
{
public override string ProcessedMessagesSchema => Schema.ConsumerAddress;

public DbSet<AddressConsumerItem> AddressConsumerItems { get; set; }
public DbSet<AddressConsumerItem> AddressConsumerItems => Set<AddressConsumerItem>();
public DbSet<OffsetOverride> OffsetOverrides => Set<OffsetOverride>();

// This needs to be here to please EF
public ConsumerAddressContext()
Expand Down Expand Up @@ -90,6 +92,7 @@ protected override void OnModelCreating(ModelBuilder modelBuilder)

modelBuilder
.ApplyConfigurationsFromAssembly(typeof(ConsumerAddressContext).GetTypeInfo().Assembly);
modelBuilder.ApplyConfiguration(new OffsetOverrideConfiguration(ProcessedMessagesSchema));
}
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace ParcelRegistry.Consumer.Address.Migrations
{
/// <inheritdoc />
public partial class AddOffsetOverride : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "OffsetOverrides",
schema: "ParcelRegistryConsumerAddress",
columns: table => new
{
ConsumerGroupId = table.Column<string>(type: "nvarchar(450)", nullable: false),
Offset = table.Column<long>(type: "bigint", nullable: false),
Configured = table.Column<bool>(type: "bit", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_OffsetOverrides", x => x.ConsumerGroupId);
});

migrationBuilder.CreateIndex(
name: "IX_Addresses_Position",
schema: "ParcelRegistryConsumerAddress",
table: "Addresses",
column: "Position");
}

/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "OffsetOverrides",
schema: "ParcelRegistryConsumerAddress");

migrationBuilder.DropIndex(
name: "IX_Addresses_Position",
schema: "ParcelRegistryConsumerAddress",
table: "Addresses");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,26 @@ protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "6.0.3")
.HasAnnotation("ProductVersion", "8.0.3")
.HasAnnotation("Relational:MaxIdentifierLength", 128);

SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1);
SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder);

modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.OffsetOverride", b =>
{
b.Property<string>("ConsumerGroupId")
.HasColumnType("nvarchar(450)");

b.Property<bool>("Configured")
.HasColumnType("bit");

b.Property<long>("Offset")
.HasColumnType("bigint");

b.HasKey("ConsumerGroupId");

b.ToTable("OffsetOverrides", "ParcelRegistryConsumerAddress");
});

modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b =>
{
Expand Down Expand Up @@ -76,6 +92,8 @@ protected override void BuildModel(ModelBuilder modelBuilder)

b.HasIndex("IsRemoved");

b.HasIndex("Position");

b.ToTable("Addresses", "ParcelRegistryConsumerAddress");
});
#pragma warning restore 612, 618
Expand Down

0 comments on commit f9fa15c

Please sign in to comment.