Skip to content

Commit

Permalink
Exposing EventHubClient to support sending Partition Key. Fixes #1643.
Browse files Browse the repository at this point in the history
  • Loading branch information
alrod committed Jun 17, 2019
1 parent 9692d21 commit c17b391
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public void Initialize(ExtensionConfigContext context)
context.AddBindingRule<EventHubAttribute>()
.BindToCollector(BuildFromAttribute);

context.AddBindingRule<EventHubAttribute>()
.BindToInput(attribute =>
{
return _options.Value.GetEventHubClient(attribute.EventHubName, attribute.Connection);
});

ExceptionHandler = (e =>
{
LogExceptionReceivedEvent(e, _loggerFactory);
Expand Down
208 changes: 128 additions & 80 deletions test/Microsoft.Azure.WebJobs.Host.EndToEndTests/EventHubEndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,108 +5,121 @@
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Xunit;

namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
{
public class EventHubEndToEndTests : IClassFixture<EventHubEndToEndTests.TestFixture>
public class EventHubEndToEndTests
{
private const string TestHubName = "webjobstesthub";
private const string TestHub2Name = "webjobstesthub2";
private const string TestHub2Connection = "AzureWebJobsTestHubConnection2";
private const int Timeout = 30000;
private static EventWaitHandle _eventWait;
private static string _testId;
private static List<string> _results;

public EventHubEndToEndTests(TestFixture fixture)
public EventHubEndToEndTests()
{
Fixture = fixture;

EventHubTestJobs.Result = null;
_results = new List<string>();
_testId = Guid.NewGuid().ToString();
_eventWait = new ManualResetEvent(initialState: false);
}

private TestFixture Fixture { get; }

[Fact]
public async Task EventHub_SingleDispatch()
{
var method = typeof(EventHubTestJobs).GetMethod("SendEvent_TestHub", BindingFlags.Static | BindingFlags.Public);
var id = Guid.NewGuid().ToString();
EventHubTestJobs.EventId = id;
await Fixture.Host.CallAsync(method, new { input = id });

await TestHelpers.Await(() =>
using (JobHost host = BuildHost<EventHubTestSingleDispatchJobs>())
{
return EventHubTestJobs.Result != null;
});
var method = typeof(EventHubTestSingleDispatchJobs).GetMethod("SendEvent_TestHub", BindingFlags.Static | BindingFlags.Public);
var id = Guid.NewGuid().ToString();
await host.CallAsync(method, new { input = _testId });

Assert.Equal(id, EventHubTestJobs.Result);
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
}
}

[Fact]
public async Task EventHub_MultipleDispatch()
{
// send some events BEFORE starting the host, to ensure
// the events are received in batch
var method = typeof(EventHubTestJobs).GetMethod("SendEvents_TestHub2", BindingFlags.Static | BindingFlags.Public);
var id = Guid.NewGuid().ToString();
EventHubTestJobs.EventId = id;
int numEvents = 5;
await Fixture.Host.CallAsync(method, new { numEvents = numEvents, input = id });

await TestHelpers.Await(() =>
using (JobHost host = BuildHost<EventHubTestMultipleDispatchJobs>())
{
return EventHubTestJobs.Result != null;
});

var eventsProcessed = (string[])EventHubTestJobs.Result;
Assert.True(eventsProcessed.Length >= 1);
// send some events BEFORE starting the host, to ensure
// the events are received in batch
var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public);
int numEvents = 5;
await host.CallAsync(method, new { numEvents = numEvents, input = _testId });

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
}
}

public class EventHubTestJobs
[Fact]
public async Task EventHub_PartitionKey()
{
public static string EventId;
public static object Result { get; set; }
using (JobHost host = BuildHost<EventHubParitionKeyTestJobs>())
{
var method = typeof(EventHubParitionKeyTestJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public);
_eventWait = new ManualResetEvent(initialState: false);
await host.CallAsync(method, new { input = _testId });

bool result = _eventWait.WaitOne(Timeout);

Assert.True(result);
}
}

public class EventHubTestSingleDispatchJobs
{
public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out EventData evt)
{
evt = new EventData(Encoding.UTF8.GetBytes(input));
evt.Properties.Add("TestProp1", "value1");
evt.Properties.Add("TestProp2", "value2");
}

public static void SendEvents_TestHub2(int numEvents, string input, [EventHub(TestHub2Name, Connection = TestHub2Connection)] out EventData[] events)
{
events = new EventData[numEvents];
for (int i = 0; i < numEvents; i++)
{
var evt = new EventData(Encoding.UTF8.GetBytes(input));
evt.Properties.Add("TestIndex", i);
evt.Properties.Add("TestProp1", "value1");
evt.Properties.Add("TestProp2", "value2");
events[i] = evt;
}
}

public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt,
string partitionKey, DateTime enqueuedTimeUtc, IDictionary<string, object> properties,
IDictionary<string, object> systemProperties)
string partitionKey, DateTime enqueuedTimeUtc, IDictionary<string, object> properties,
IDictionary<string, object> systemProperties)
{
// filter for the ID the current test is using
if (evt == EventId)
if (evt == _testId)
{
Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30);

Assert.Equal("value1", properties["TestProp1"]);
Assert.Equal("value2", properties["TestProp2"]);

Result = evt;
_eventWait.Set();
}
}
}

public class EventHubTestMultipleDispatchJobs
{

public static void SendEvents_TestHub(int numEvents, string input, [EventHub(TestHubName)] out EventData[] events)
{
events = new EventData[numEvents];
for (int i = 0; i < numEvents; i++)
{
var evt = new EventData(Encoding.UTF8.GetBytes(input));
evt.Properties.Add("TestIndex", i);
evt.Properties.Add("TestProp1", "value1");
evt.Properties.Add("TestProp2", "value2");
events[i] = evt;
}
}

public static void ProcessMultipleEvents([EventHubTrigger(TestHub2Name, Connection = TestHub2Connection)] string[] events,
public static void ProcessMultipleEvents([EventHubTrigger(TestHubName)] string[] events,
string[] partitionKeyArray, DateTime[] enqueuedTimeUtcArray, IDictionary<string, object>[] propertiesArray,
IDictionary<string, object>[] systemPropertiesArray)
{
Expand All @@ -121,47 +134,82 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHub2Name, Connecti
}

// filter for the ID the current test is using
if (events[0] == EventId)
if (events[0] == _testId)
{
Result = events;
_results.AddRange(events);
_eventWait.Set();
}
}
}

public class TestFixture : IDisposable
public class EventHubParitionKeyTestJobs
{
public JobHost Host { get; }

public TestFixture()
public static async Task SendEvents_TestHub(
string input,
[EventHub(TestHubName)] EventHubClient client)
{
var config = new ConfigurationBuilder()
.AddEnvironmentVariables()
.AddTestSettings()
.Build();
List<EventData> list = new List<EventData>();
EventData evt = new EventData(Encoding.UTF8.GetBytes(input));

string connection = config.GetConnectionStringOrSetting("AzureWebJobsTestHubConnection");
Assert.True(!string.IsNullOrEmpty(connection), "Required test connection string is missing.");
// Send event without PK
await client.SendAsync(evt);

var host = new HostBuilder()
.ConfigureDefaultTestHost<EventHubTestJobs>(b =>
{
b.AddAzureStorage()
.AddEventHubs(options =>
{
options.AddSender(TestHubName, connection);
options.AddReceiver(TestHubName, connection);
});
})
.Build();

Host = host.GetJobHost();
Host.StartAsync().GetAwaiter().GetResult();
// Send event with different PKs
for (int i = 0; i < 5; i++)
{
evt = new EventData(Encoding.UTF8.GetBytes(input));
await client.SendAsync(evt, "test_pk" + i);
}
}

public void Dispose()
public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName)] EventData[] events)
{
Host?.Dispose();
foreach (EventData eventData in events)
{
string message = Encoding.UTF8.GetString(eventData.Body);

// filter for the ID the current test is using
if (message == _testId)
{
_results.Add(eventData.SystemProperties.PartitionKey);
_results.Sort();

if (_results.Count == 6 && _results[5] == "test_pk4")
{
_eventWait.Set();
}
}
}
}
}

private JobHost BuildHost<T>()
{
JobHost jobHost = null;

var config = new ConfigurationBuilder()
.AddEnvironmentVariables()
.AddTestSettings()
.Build();

string connection = config.GetConnectionStringOrSetting("AzureWebJobsTestHubConnection");
Assert.True(!string.IsNullOrEmpty(connection), "Required test connection string is missing.");

var host = new HostBuilder()
.ConfigureDefaultTestHost<T>(b =>
{
b.AddEventHubs(options =>
{
options.AddSender(TestHubName, connection);
options.AddReceiver(TestHubName, connection);
});
})
.Build();

jobHost = host.GetJobHost();
jobHost.StartAsync().GetAwaiter().GetResult();

return jobHost;
}
}
}

0 comments on commit c17b391

Please sign in to comment.