diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubExtensionConfigProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubExtensionConfigProvider.cs index ee1bac11c..e9fc38809 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubExtensionConfigProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubExtensionConfigProvider.cs @@ -72,6 +72,12 @@ public void Initialize(ExtensionConfigContext context) context.AddBindingRule() .BindToCollector(BuildFromAttribute); + context.AddBindingRule() + .BindToInput(attribute => + { + return _options.Value.GetEventHubClient(attribute.EventHubName, attribute.Connection); + }); + ExceptionHandler = (e => { LogExceptionReceivedEvent(e, _loggerFactory); diff --git a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/EventHubEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/EventHubEndToEndTests.cs index 30a7ea155..505c6c81b 100644 --- a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/EventHubEndToEndTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/EventHubEndToEndTests.cs @@ -5,8 +5,10 @@ using System.Collections.Generic; using System.Reflection; using System.Text; +using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.EventHubs; +using Microsoft.Azure.WebJobs.EventHubs; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; @@ -14,62 +16,68 @@ namespace Microsoft.Azure.WebJobs.Host.EndToEndTests { - public class EventHubEndToEndTests : IClassFixture + public class EventHubEndToEndTests { private const string TestHubName = "webjobstesthub"; - private const string TestHub2Name = "webjobstesthub2"; - private const string TestHub2Connection = "AzureWebJobsTestHubConnection2"; + private const int Timeout = 30000; + private static EventWaitHandle _eventWait; + private static string _testId; + private static List _results; - public EventHubEndToEndTests(TestFixture fixture) + public EventHubEndToEndTests() { - Fixture = fixture; - - EventHubTestJobs.Result = null; + _results = new List(); + _testId = Guid.NewGuid().ToString(); + _eventWait = new ManualResetEvent(initialState: false); } - private TestFixture Fixture { get; } - [Fact] public async Task EventHub_SingleDispatch() { - var method = typeof(EventHubTestJobs).GetMethod("SendEvent_TestHub", BindingFlags.Static | BindingFlags.Public); - var id = Guid.NewGuid().ToString(); - EventHubTestJobs.EventId = id; - await Fixture.Host.CallAsync(method, new { input = id }); - - await TestHelpers.Await(() => + using (JobHost host = BuildHost()) { - return EventHubTestJobs.Result != null; - }); + var method = typeof(EventHubTestSingleDispatchJobs).GetMethod("SendEvent_TestHub", BindingFlags.Static | BindingFlags.Public); + var id = Guid.NewGuid().ToString(); + await host.CallAsync(method, new { input = _testId }); - Assert.Equal(id, EventHubTestJobs.Result); + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + } } [Fact] public async Task EventHub_MultipleDispatch() { - // send some events BEFORE starting the host, to ensure - // the events are received in batch - var method = typeof(EventHubTestJobs).GetMethod("SendEvents_TestHub2", BindingFlags.Static | BindingFlags.Public); - var id = Guid.NewGuid().ToString(); - EventHubTestJobs.EventId = id; - int numEvents = 5; - await Fixture.Host.CallAsync(method, new { numEvents = numEvents, input = id }); - - await TestHelpers.Await(() => + using (JobHost host = BuildHost()) { - return EventHubTestJobs.Result != null; - }); - - var eventsProcessed = (string[])EventHubTestJobs.Result; - Assert.True(eventsProcessed.Length >= 1); + // send some events BEFORE starting the host, to ensure + // the events are received in batch + var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public); + int numEvents = 5; + await host.CallAsync(method, new { numEvents = numEvents, input = _testId }); + + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + } } - public class EventHubTestJobs + [Fact] + public async Task EventHub_PartitionKey() { - public static string EventId; - public static object Result { get; set; } + using (JobHost host = BuildHost()) + { + var method = typeof(EventHubParitionKeyTestJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public); + _eventWait = new ManualResetEvent(initialState: false); + await host.CallAsync(method, new { input = _testId }); + bool result = _eventWait.WaitOne(Timeout); + + Assert.True(result); + } + } + + public class EventHubTestSingleDispatchJobs + { public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out EventData evt) { evt = new EventData(Encoding.UTF8.GetBytes(input)); @@ -77,36 +85,41 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out E evt.Properties.Add("TestProp2", "value2"); } - public static void SendEvents_TestHub2(int numEvents, string input, [EventHub(TestHub2Name, Connection = TestHub2Connection)] out EventData[] events) - { - events = new EventData[numEvents]; - for (int i = 0; i < numEvents; i++) - { - var evt = new EventData(Encoding.UTF8.GetBytes(input)); - evt.Properties.Add("TestIndex", i); - evt.Properties.Add("TestProp1", "value1"); - evt.Properties.Add("TestProp2", "value2"); - events[i] = evt; - } - } public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt, - string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties, - IDictionary systemProperties) + string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties, + IDictionary systemProperties) { // filter for the ID the current test is using - if (evt == EventId) + if (evt == _testId) { Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30); Assert.Equal("value1", properties["TestProp1"]); Assert.Equal("value2", properties["TestProp2"]); - Result = evt; + _eventWait.Set(); + } + } + } + + public class EventHubTestMultipleDispatchJobs + { + + public static void SendEvents_TestHub(int numEvents, string input, [EventHub(TestHubName)] out EventData[] events) + { + events = new EventData[numEvents]; + for (int i = 0; i < numEvents; i++) + { + var evt = new EventData(Encoding.UTF8.GetBytes(input)); + evt.Properties.Add("TestIndex", i); + evt.Properties.Add("TestProp1", "value1"); + evt.Properties.Add("TestProp2", "value2"); + events[i] = evt; } } - public static void ProcessMultipleEvents([EventHubTrigger(TestHub2Name, Connection = TestHub2Connection)] string[] events, + public static void ProcessMultipleEvents([EventHubTrigger(TestHubName)] string[] events, string[] partitionKeyArray, DateTime[] enqueuedTimeUtcArray, IDictionary[] propertiesArray, IDictionary[] systemPropertiesArray) { @@ -121,47 +134,82 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHub2Name, Connecti } // filter for the ID the current test is using - if (events[0] == EventId) + if (events[0] == _testId) { - Result = events; + _results.AddRange(events); + _eventWait.Set(); } } } - public class TestFixture : IDisposable + public class EventHubParitionKeyTestJobs { - public JobHost Host { get; } - - public TestFixture() + public static async Task SendEvents_TestHub( + string input, + [EventHub(TestHubName)] EventHubClient client) { - var config = new ConfigurationBuilder() - .AddEnvironmentVariables() - .AddTestSettings() - .Build(); + List list = new List(); + EventData evt = new EventData(Encoding.UTF8.GetBytes(input)); - string connection = config.GetConnectionStringOrSetting("AzureWebJobsTestHubConnection"); - Assert.True(!string.IsNullOrEmpty(connection), "Required test connection string is missing."); + // Send event without PK + await client.SendAsync(evt); - var host = new HostBuilder() - .ConfigureDefaultTestHost(b => - { - b.AddAzureStorage() - .AddEventHubs(options => - { - options.AddSender(TestHubName, connection); - options.AddReceiver(TestHubName, connection); - }); - }) - .Build(); - - Host = host.GetJobHost(); - Host.StartAsync().GetAwaiter().GetResult(); + // Send event with different PKs + for (int i = 0; i < 5; i++) + { + evt = new EventData(Encoding.UTF8.GetBytes(input)); + await client.SendAsync(evt, "test_pk" + i); + } } - public void Dispose() + public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName)] EventData[] events) { - Host?.Dispose(); + foreach (EventData eventData in events) + { + string message = Encoding.UTF8.GetString(eventData.Body); + + // filter for the ID the current test is using + if (message == _testId) + { + _results.Add(eventData.SystemProperties.PartitionKey); + _results.Sort(); + + if (_results.Count == 6 && _results[5] == "test_pk4") + { + _eventWait.Set(); + } + } + } } } + + private JobHost BuildHost() + { + JobHost jobHost = null; + + var config = new ConfigurationBuilder() + .AddEnvironmentVariables() + .AddTestSettings() + .Build(); + + string connection = config.GetConnectionStringOrSetting("AzureWebJobsTestHubConnection"); + Assert.True(!string.IsNullOrEmpty(connection), "Required test connection string is missing."); + + var host = new HostBuilder() + .ConfigureDefaultTestHost(b => + { + b.AddEventHubs(options => + { + options.AddSender(TestHubName, connection); + options.AddReceiver(TestHubName, connection); + }); + }) + .Build(); + + jobHost = host.GetJobHost(); + jobHost.StartAsync().GetAwaiter().GetResult(); + + return jobHost; + } } } \ No newline at end of file