From 684982b6eb84b431f239c85067eb50620b432a57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20P=C4=99czek?= Date: Wed, 28 Feb 2024 17:17:48 +0100 Subject: [PATCH] Adding isolated worker model trigger binding --- .github/workflows/continuous-integration.yml | 2 +- .../ThreadStatsFunctions.cs | 13 +++ .../local.settings.json | 10 +++ .../Model/DocumentChange.cs | 57 ++++++++++++ .../Model/DocumentChangeType.cs | 38 ++++++++ ...b.Azure.Functions.Worker.Extensions.csproj | 1 + .../RethinkDbTriggerAttribute.cs | 88 +++++++++++++++++++ .../RethinkDbExtensionConfigProvider.cs | 19 +++- ...ethinkDbTriggerAttributeBindingProvider.cs | 15 +++- .../Trigger/RethinkDbTriggerBinding.cs | 12 +-- .../Trigger/RethinkDbTriggerValueBinder.cs | 46 ++++++++++ 11 files changed, 289 insertions(+), 12 deletions(-) create mode 100644 demos/Demo.Azure.Functions.Worker.RethinkDb/local.settings.json create mode 100644 src/RethinkDb.Azure.Functions.Worker.Extensions/Model/DocumentChange.cs create mode 100644 src/RethinkDb.Azure.Functions.Worker.Extensions/Model/DocumentChangeType.cs create mode 100644 src/RethinkDb.Azure.Functions.Worker.Extensions/RethinkDbTriggerAttribute.cs create mode 100644 src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerValueBinder.cs diff --git a/.github/workflows/continuous-integration.yml b/.github/workflows/continuous-integration.yml index 631535e..0dbad30 100644 --- a/.github/workflows/continuous-integration.yml +++ b/.github/workflows/continuous-integration.yml @@ -20,7 +20,7 @@ jobs: - name: Restore run: dotnet restore src/RethinkDb.Azure.WebJobs.Extensions - name: Build - run: dotnet build src/RethinkDb.Azure.WebJobs.Extensions --configuration Release --no-restore + run: dotnet build src/RethinkDb.Azure.WebJobs.Extensions --configuration Release --no-restore - name: Test run: dotnet test test/RethinkDb.Azure.WebJobs.Extensions.Tests --configuration Release code-scanning: diff --git a/demos/Demo.Azure.Functions.Worker.RethinkDb/ThreadStatsFunctions.cs b/demos/Demo.Azure.Functions.Worker.RethinkDb/ThreadStatsFunctions.cs index ceeb963..a25a04a 100644 --- a/demos/Demo.Azure.Functions.Worker.RethinkDb/ThreadStatsFunctions.cs +++ b/demos/Demo.Azure.Functions.Worker.RethinkDb/ThreadStatsFunctions.cs @@ -5,6 +5,7 @@ using Microsoft.Azure.Functions.Worker.Http; using Microsoft.Extensions.Logging; using RethinkDb.Azure.Functions.Worker.Extensions; +using RethinkDb.Azure.Functions.Worker.Extensions.Model; using Demo.Azure.Functions.Worker.RethinkDb.Model; namespace Demo.Azure.Functions.Worker.RethinkDb @@ -128,5 +129,17 @@ public async Task StoreThreadStatsSeries( RethinkDbDocuments = documents }; } + + [Function("HandleThreadStatsChange")] + public void HandleThreadStatsChange( + [RethinkDbTrigger( + databaseName: "Demo", + tableName: "ThreadStats", + HostnameSetting = "RethinkDbHostname", + UserSetting = "RethinkDbUser", + PasswordSetting = "RethinkDbPassword")]DocumentChange change) + { + _logger.LogInformation($"[ThreadStats Change Received] {change.NewValue}"); + } } } diff --git a/demos/Demo.Azure.Functions.Worker.RethinkDb/local.settings.json b/demos/Demo.Azure.Functions.Worker.RethinkDb/local.settings.json new file mode 100644 index 0000000..d9a0800 --- /dev/null +++ b/demos/Demo.Azure.Functions.Worker.RethinkDb/local.settings.json @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated", + "RethinkDbHostname": "[Enter RethinkDB hostname here]", + "RethinkDbUser": "[Enter RethinkDB user name here]", + "RethinkDbPassword": "[Enter RethinkDB user password here]" + } +} \ No newline at end of file diff --git a/src/RethinkDb.Azure.Functions.Worker.Extensions/Model/DocumentChange.cs b/src/RethinkDb.Azure.Functions.Worker.Extensions/Model/DocumentChange.cs new file mode 100644 index 0000000..1027ec7 --- /dev/null +++ b/src/RethinkDb.Azure.Functions.Worker.Extensions/Model/DocumentChange.cs @@ -0,0 +1,57 @@ +using System.Collections.Generic; +using System.Text.Json.Serialization; + +namespace RethinkDb.Azure.Functions.Worker.Extensions.Model +{ + /// + /// Represents a document change in RethinkDB table. + /// + public class DocumentChange + { + #region Properties + /// + /// Gets or sets the old value. When a document is inserted, old value will be null. + /// + [JsonPropertyName("old_val")] + public dynamic OldValue { get; set; } + + /// + /// Gets or sets the new value. When a document is deleted, new value will be null. + /// + [JsonPropertyName("new_val")] + public dynamic NewValue { get; set; } + + /// + /// If is set to true, this property will indicate the type of change. + /// + [JsonPropertyName("type")] + public DocumentChangeType? Type { get; set; } + #endregion + } + + /// + /// Represents a document change in RethinkDB table. + /// + public class DocumentChange + { + #region Properties + /// + /// Gets or sets the old value. When a document is inserted, old value will be null. + /// + [JsonPropertyName("old_val")] + public T OldValue { get; set; } + + /// + /// Gets or sets the new value. When a document is deleted, new value will be null. + /// + [JsonPropertyName("new_val")] + public T NewValue { get; set; } + + /// + /// If is set to true, this property will indicate the type of change. + /// + [JsonPropertyName("type")] + public DocumentChangeType? Type { get; set; } + #endregion + } +} diff --git a/src/RethinkDb.Azure.Functions.Worker.Extensions/Model/DocumentChangeType.cs b/src/RethinkDb.Azure.Functions.Worker.Extensions/Model/DocumentChangeType.cs new file mode 100644 index 0000000..6eb3dc8 --- /dev/null +++ b/src/RethinkDb.Azure.Functions.Worker.Extensions/Model/DocumentChangeType.cs @@ -0,0 +1,38 @@ +using System.Text.Json.Serialization; + +namespace RethinkDb.Azure.Functions.Worker.Extensions.Model +{ + /// + /// The type of . + /// + [JsonConverter(typeof(JsonStringEnumConverter))] + public enum DocumentChangeType + { + /// + /// Document was added. + /// + Add, + /// + /// Document was removed. + /// + Remove, + /// + /// Document was changed. + /// + Change, + /// + /// Initial document. + /// + Initial, + /// + /// If an initial result for a document has been sent and a change is made to that document that would move it to the unsent part of the result set + /// (e.g., a changefeed monitors the top 100 posters, the first 50 have been sent, and poster 48 has become poster 52), + /// an 'uninitial' notification will be sent, with an old_val field but no new_val field. + /// + Uninitial, + /// + /// A state document. + /// + State + } +} diff --git a/src/RethinkDb.Azure.Functions.Worker.Extensions/RethinkDb.Azure.Functions.Worker.Extensions.csproj b/src/RethinkDb.Azure.Functions.Worker.Extensions/RethinkDb.Azure.Functions.Worker.Extensions.csproj index 5ce10f4..c8b6d44 100644 --- a/src/RethinkDb.Azure.Functions.Worker.Extensions/RethinkDb.Azure.Functions.Worker.Extensions.csproj +++ b/src/RethinkDb.Azure.Functions.Worker.Extensions/RethinkDb.Azure.Functions.Worker.Extensions.csproj @@ -30,6 +30,7 @@ + diff --git a/src/RethinkDb.Azure.Functions.Worker.Extensions/RethinkDbTriggerAttribute.cs b/src/RethinkDb.Azure.Functions.Worker.Extensions/RethinkDbTriggerAttribute.cs new file mode 100644 index 0000000..45ac89f --- /dev/null +++ b/src/RethinkDb.Azure.Functions.Worker.Extensions/RethinkDbTriggerAttribute.cs @@ -0,0 +1,88 @@ +using System; +using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; + +namespace RethinkDb.Azure.Functions.Worker.Extensions +{ + /// + /// Attribute used to define a trigger that binds to a RethinkDB table. + /// + public sealed class RethinkDbTriggerAttribute : TriggerBindingAttribute + { + /// + /// The name of the database containing the table to which the parameter applies (may include binding parameters). + /// + public string DatabaseName { get; private set; } + + /// + /// The name of the table to which the parameter applies (may include binding parameters). + /// + public string TableName { get; private set; } + + /// + /// The name of the app setting containing the hostname or IP address of the server to which the parameter applies. + /// + public string HostnameSetting { get; set; } + + /// + /// The name of the app setting containing the TCP port of the server to which the parameter applies. + /// + public string PortSetting { get; set; } + + /// + /// The name of the app setting containing the authorization key to the server to which the parameter applies. + /// + public string AuthorizationKeySetting { get; set; } + + /// + /// The name of the app setting containing the user account to connect as to the server to which the parameter applies. + /// + public string UserSetting { get; set; } + + /// + /// The name of the app setting containing the user account password to connect as to the server to which the parameter applies. + /// + public string PasswordSetting { get; set; } + + /// + /// The name of the app setting containing the value indicating if SSL/TLS encryption should be enabled for connection to the server. + /// + /// The underlying driver (RethinkDb.Driver) requires a commercial license for SSL/TLS encryption. + public string EnableSslSetting { get; set; } + + /// + /// The name of the app setting containing the "license to" of underlying driver (RethinkDb.Driver) commercial license. + /// + public string LicenseToSetting { get; set; } + + /// + /// The name of the app setting containing the "license key" of underlying driver (RethinkDb.Driver) commercial license. + /// + public string LicenseKeySetting { get; set; } + + /// + /// The value indicating if field should be included for . + /// + public bool IncludeTypes { get; set; } = false; + + /// + /// Defines a trigger that binds to a RethinkDB table. + /// + /// Name of the database of the table to which the parameter applies (may include binding parameters). + /// Name of the table to which the parameter applies (may include binding parameters). + public RethinkDbTriggerAttribute(string databaseName, string tableName) + { + if (String.IsNullOrWhiteSpace(databaseName)) + { + throw new ArgumentNullException(nameof(databaseName)); + } + + if (String.IsNullOrWhiteSpace(tableName)) + { + throw new ArgumentNullException(nameof(tableName)); + } + + DatabaseName = databaseName; + TableName = tableName; + } + } +} diff --git a/src/RethinkDb.Azure.WebJobs.Extensions/Config/RethinkDbExtensionConfigProvider.cs b/src/RethinkDb.Azure.WebJobs.Extensions/Config/RethinkDbExtensionConfigProvider.cs index f569ab5..c011525 100644 --- a/src/RethinkDb.Azure.WebJobs.Extensions/Config/RethinkDbExtensionConfigProvider.cs +++ b/src/RethinkDb.Azure.WebJobs.Extensions/Config/RethinkDbExtensionConfigProvider.cs @@ -17,6 +17,21 @@ namespace RethinkDb.Azure.WebJobs.Extensions.Config [Extension("RethinkDB")] internal class RethinkDbExtensionConfigProvider : IExtensionConfigProvider { + #region Classes + private class RethinkDbOpenType : OpenType.Poco + { + public override bool IsMatch(Type type, OpenTypeMatchContext context) + { + if (type.FullName == "System.Object") + { + return true; + } + + return base.IsMatch(type, context); + } + } + #endregion + #region Fields private readonly IConfiguration _configuration; private readonly RethinkDbOptions _options; @@ -47,13 +62,13 @@ public void Initialize(ExtensionConfigContext context) // RethinkDB Bindings var bindingAttributeBindingRule = context.AddBindingRule(); bindingAttributeBindingRule.AddValidator(ValidateHost); - bindingAttributeBindingRule.BindToCollector(typeof(RethinkDbCollectorConverter<>), _options, _rethinkDBConnectionFactory); + bindingAttributeBindingRule.BindToCollector(typeof(RethinkDbCollectorConverter<>), _options, _rethinkDBConnectionFactory); bindingAttributeBindingRule.WhenIsNotNull(nameof(RethinkDbAttribute.Id)) .BindToValueProvider(CreateValueBinderAsync); // RethinkDB Trigger var triggerAttributeBindingRule = context.AddBindingRule(); - triggerAttributeBindingRule.BindToTrigger(new RethinkDbTriggerAttributeBindingProvider(_configuration, _options, _rethinkDBConnectionFactory, _nameResolver, _loggerFactory)); + triggerAttributeBindingRule.BindToTrigger(new RethinkDbTriggerAttributeBindingProvider(_configuration, _options, _rethinkDBConnectionFactory, _nameResolver, _loggerFactory)); } private void ValidateHost(RethinkDbAttribute attribute, Type paramType) diff --git a/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerAttributeBindingProvider.cs b/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerAttributeBindingProvider.cs index d312565..4c0770f 100644 --- a/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerAttributeBindingProvider.cs +++ b/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerAttributeBindingProvider.cs @@ -15,6 +15,10 @@ namespace RethinkDb.Azure.WebJobs.Extensions.Trigger internal class RethinkDbTriggerAttributeBindingProvider : ITriggerBindingProvider { #region Fields + private static readonly Type DOCUMENTCHANGE_TYPE = typeof(DocumentChange); + private static readonly Type OBJECT_TYPE = typeof(object); + private static readonly Type STRING_TYPE = typeof(string); + private const string UNABLE_TO_RESOLVE_APP_SETTING_FORMAT = "Unable to resolve app setting for property '{0}.{1}'. Make sure the app setting exists and has a valid value."; private readonly IConfiguration _configuration; @@ -45,20 +49,23 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex throw new ArgumentNullException(nameof(context)); } - ParameterInfo parameter = context.Parameter; - - RethinkDbTriggerAttribute triggerAttribute = parameter.GetCustomAttribute(inherit: false); + RethinkDbTriggerAttribute triggerAttribute = context.Parameter.GetCustomAttribute(inherit: false); if (triggerAttribute is null) { return _nullTriggerBindingTask; } + if ((context.Parameter.ParameterType != DOCUMENTCHANGE_TYPE) && (context.Parameter.ParameterType != OBJECT_TYPE) && (context.Parameter.ParameterType != STRING_TYPE)) + { + return _nullTriggerBindingTask; + } + ConnectionOptions triggerConnectionOptions = ResolveTriggerConnectionOptions(triggerAttribute); Task triggerConnectionTask = _rethinkDBConnectionFactory.GetConnectionAsync(triggerConnectionOptions); TableOptions triggerTableOptions = ResolveTriggerTableOptions(triggerAttribute); - return Task.FromResult(new RethinkDbTriggerBinding(parameter, triggerConnectionTask, triggerTableOptions, triggerAttribute.IncludeTypes)); + return Task.FromResult(new RethinkDbTriggerBinding(context.Parameter, triggerConnectionTask, triggerTableOptions, triggerAttribute.IncludeTypes)); } private ConnectionOptions ResolveTriggerConnectionOptions(RethinkDbTriggerAttribute triggerAttribute) diff --git a/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerBinding.cs b/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerBinding.cs index 58bb131..df0f232 100644 --- a/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerBinding.cs +++ b/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerBinding.cs @@ -14,17 +14,18 @@ namespace RethinkDb.Azure.WebJobs.Extensions.Trigger internal class RethinkDbTriggerBinding : ITriggerBinding { #region Fields + private static readonly Type DOCUMENTCHANGE_TYPE = typeof(DocumentChange); + private static readonly IReadOnlyDictionary EMPTY_BINDING_DATA = new Dictionary(); + private readonly ParameterInfo _parameter; private readonly Task _rethinkDbConnectionTask; private readonly TableOptions _rethinkDbTableOptions; private readonly Driver.Ast.Table _rethinkDbTable; private readonly bool _includeTypes; - - private readonly Task _emptyBindingDataTask = Task.FromResult(new TriggerData(null, new Dictionary())); #endregion #region Properties - public Type TriggerValueType => typeof(DocumentChange); + public Type TriggerValueType => DOCUMENTCHANGE_TYPE; public IReadOnlyDictionary BindingDataContract { get; } = new Dictionary(); #endregion @@ -43,8 +44,9 @@ public RethinkDbTriggerBinding(ParameterInfo parameter, Task rethin #region Methods public Task BindAsync(object value, ValueBindingContext context) { - // ValueProvider is via binding rules. - return _emptyBindingDataTask; + IValueProvider valueBinder = new RethinkDbTriggerValueBinder(_parameter, value); + + return Task.FromResult(new TriggerData(valueBinder, EMPTY_BINDING_DATA)); } public Task CreateListenerAsync(ListenerFactoryContext context) diff --git a/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerValueBinder.cs b/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerValueBinder.cs new file mode 100644 index 0000000..8d4eea5 --- /dev/null +++ b/src/RethinkDb.Azure.WebJobs.Extensions/Trigger/RethinkDbTriggerValueBinder.cs @@ -0,0 +1,46 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Bindings; +using Newtonsoft.Json; + +namespace RethinkDb.Azure.WebJobs.Extensions.Trigger +{ + internal class RethinkDbTriggerValueBinder : IValueProvider + { + #region Fields + private static readonly Type STRING_TYPE = typeof(string); + + private readonly ParameterInfo _parameter; + private readonly object _value; + private readonly bool _parameterTypeIsString; + #endregion + + #region Properties + public Type Type => _parameter.ParameterType; + #endregion + + #region Constructor + public RethinkDbTriggerValueBinder(ParameterInfo parameter, object value) + { + _parameter = parameter; + _value = value; + _parameterTypeIsString = parameter.ParameterType == STRING_TYPE; + } + #endregion + + #region Methods + public Task GetValueAsync() + { + if (_parameterTypeIsString) + { + return Task.FromResult((object)JsonConvert.SerializeObject(_value)); + } + + return Task.FromResult(_value); + } + + public string ToInvokeString() => String.Empty; + #endregion + } +}