Skip to content

Commit

Permalink
Resolving Oauthbearer Authentication Attributes from Settings for Out…
Browse files Browse the repository at this point in the history
…put Binding (#511)

* Resolving secure settings from settings for oauthbearer authentication

* Adding Unit Test cases for oauthbearer settings
  • Loading branch information
jainharsh98 authored Jul 30, 2024
1 parent 690daec commit 6ba749b
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
if (entity.Attribute.AuthenticationMode == BrokerAuthenticationMode.OAuthBearer)
{
conf.SaslOauthbearerMethod = (SaslOauthbearerMethod)entity.Attribute.OAuthBearerMethod;
conf.SaslOauthbearerClientId = entity.Attribute.OAuthBearerClientId;
conf.SaslOauthbearerClientSecret = entity.Attribute.OAuthBearerClientSecret;
conf.SaslOauthbearerScope = entity.Attribute.OAuthBearerScope;
conf.SaslOauthbearerTokenEndpointUrl = entity.Attribute.OAuthBearerTokenEndpointUrl;
conf.SaslOauthbearerExtensions = entity.Attribute.OAuthBearerExtensions;
conf.SaslOauthbearerClientId = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerClientId);
conf.SaslOauthbearerClientSecret = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerClientSecret);
conf.SaslOauthbearerScope = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerScope);
conf.SaslOauthbearerTokenEndpointUrl = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerTokenEndpointUrl);
conf.SaslOauthbearerExtensions = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerExtensions);
}

return conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,5 +372,87 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_From_AppSetting_InAzure
Assert.Equal(sslCa.FullName, config.SslCaLocation);
Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation);
}

[Fact]
public void GetProducerConfig_When_OAuthBearer_Auth_Defined_Should_Contain_Them()
{
var attribute = new KafkaAttribute("brokers:9092", "myTopic")
{
AuthenticationMode = BrokerAuthenticationMode.OAuthBearer,
Protocol = BrokerProtocol.SaslSsl,
OAuthBearerClientId = "clientId",
OAuthBearerClientSecret = "secret",
OAuthBearerMethod = Config.OAuthBearerMethod.Oidc,
OAuthBearerScope = "scope",
OAuthBearerExtensions = "key=value",
OAuthBearerTokenEndpointUrl = "endpointUrl",
};

var entity = new KafkaProducerEntity()
{
Attribute = attribute,
ValueType = typeof(ProtoUser),
};

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance);
var config = factory.GetProducerConfig(entity);
Assert.Equal(16, config.Count());
Assert.Equal("brokers:9092", config.BootstrapServers);
Assert.Equal(SecurityProtocol.SaslSsl, config.SecurityProtocol);
Assert.Equal(SaslMechanism.OAuthBearer, config.SaslMechanism);
Assert.Equal("secret", config.SaslOauthbearerClientSecret);
Assert.Equal("clientId", config.SaslOauthbearerClientId);
Assert.Equal(SaslOauthbearerMethod.Oidc, config.SaslOauthbearerMethod);
Assert.Equal("scope", config.SaslOauthbearerScope);
Assert.Equal("key=value", config.SaslOauthbearerExtensions);
Assert.Equal("endpointUrl", config.SaslOauthbearerTokenEndpointUrl);
}

[Fact]
public void GetProducerConfig_When_OauthSettings_Resolve_From_AppSetting_InAzure()
{
AzureEnvironment.SetRunningInAzureEnvVars();

var attribute = new KafkaAttribute("brokers:9092", "myTopic")
{
AuthenticationMode = BrokerAuthenticationMode.OAuthBearer,
Protocol = BrokerProtocol.SaslSsl,
OAuthBearerClientId = "OAuthBearerClientId",
OAuthBearerClientSecret = "OAuthBearerClientSecret",
OAuthBearerMethod = Config.OAuthBearerMethod.Oidc,
OAuthBearerScope = "OAuthBearerScope",
OAuthBearerExtensions = "OAuthBearerExtensions",
OAuthBearerTokenEndpointUrl = "OAuthBearerTokenEndpointUrl",
};

var entity = new KafkaProducerEntity
{
Attribute = attribute,
ValueType = typeof(ProtoUser)
};

var configSslLocations = new Dictionary<string, string>
{
{"OAuthBearerClientId", "clientId"},
{"OAuthBearerClientSecret", "secret"},
{"OAuthBearerScope", "scope"},
{"OAuthBearerExtensions", "key=value"},
{"OAuthBearerTokenEndpointUrl", "endpointUrl"},
};

var configuration = new ConfigurationBuilder().AddInMemoryCollection(configSslLocations).Build();

var factory = new KafkaProducerFactory(configuration, new DefaultNameResolver(configuration), NullLoggerFactory.Instance);
var config = factory.GetProducerConfig(entity);
Assert.Equal("brokers:9092", config.BootstrapServers);
Assert.Equal(SecurityProtocol.SaslSsl, config.SecurityProtocol);
Assert.Equal(SaslMechanism.OAuthBearer, config.SaslMechanism);
Assert.Equal("secret", config.SaslOauthbearerClientSecret);
Assert.Equal("clientId", config.SaslOauthbearerClientId);
Assert.Equal(SaslOauthbearerMethod.Oidc, config.SaslOauthbearerMethod);
Assert.Equal("scope", config.SaslOauthbearerScope);
Assert.Equal("key=value", config.SaslOauthbearerExtensions);
Assert.Equal("endpointUrl", config.SaslOauthbearerTokenEndpointUrl);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,5 +510,93 @@ public void GetConsumerConfig_When_Protocol_is_Not_SSL()
Assert.Equal(result.SslCaLocation, null);
Assert.Equal(result.SslCertificateLocation, null);
}

[Fact]
public void GetConsumerConfig_When_OAuthBearer_Auth_Defined_Should_Contain_Them()
{
var attribute = new KafkaTriggerAttribute("brokers:9092", "myTopic")
{
AuthenticationMode = BrokerAuthenticationMode.OAuthBearer,
Protocol = BrokerProtocol.SaslSsl,
OAuthBearerClientId = "clientId",
OAuthBearerClientSecret = "secret",
OAuthBearerMethod = Config.OAuthBearerMethod.Oidc,
OAuthBearerScope = "scope",
OAuthBearerExtensions = "key=value",
OAuthBearerTokenEndpointUrl = "endpointUrl",
};

var config = this.emptyConfiguration;

var bindingProvider = new KafkaTriggerAttributeBindingProvider(
config,
Options.Create(new KafkaOptions()),
new KafkaEventDataConvertManager(NullLogger.Instance),
new DefaultNameResolver(config),
NullLoggerFactory.Instance);

MethodInfo consumerConfigMethod = typeof(KafkaTriggerAttributeBindingProvider).GetMethod("CreateConsumerConfiguration", BindingFlags.NonPublic | BindingFlags.Instance);

KafkaListenerConfiguration result = (KafkaListenerConfiguration)consumerConfigMethod.Invoke(bindingProvider, new object[] { attribute });
Assert.Equal("brokers:9092", result.BrokerList);
Assert.Equal(SecurityProtocol.SaslSsl, result.SecurityProtocol);
Assert.Equal(SaslMechanism.OAuthBearer, result.SaslMechanism);
Assert.Equal("secret", result.SaslOAuthBearerClientSecret);
Assert.Equal("clientId", result.SaslOAuthBearerClientId);
Assert.Equal(SaslOauthbearerMethod.Oidc, result.SaslOAuthBearerMethod);
Assert.Equal("scope", result.SaslOAuthBearerScope);
Assert.Equal("key=value", result.SaslOAuthBearerExtensions);
Assert.Equal("endpointUrl", result.SaslOAuthBearerTokenEndpointUrl);
}

[Fact]
public void GetConsumerConfig_When_OauthBearer_Settings_Resolve_From_AppSetting_InAzure()
{
AzureEnvironment.SetRunningInAzureEnvVars();

var attribute = new KafkaTriggerAttribute("brokers:9092", "myTopic")
{
AuthenticationMode = BrokerAuthenticationMode.OAuthBearer,
Protocol = BrokerProtocol.SaslSsl,
OAuthBearerClientId = "OAuthBearerClientId",
OAuthBearerClientSecret = "OAuthBearerClientSecret",
OAuthBearerMethod = Config.OAuthBearerMethod.Oidc,
OAuthBearerScope = "OAuthBearerScope",
OAuthBearerExtensions = "OAuthBearerExtensions",
OAuthBearerTokenEndpointUrl = "OAuthBearerTokenEndpointUrl",
};

var configSslLocations = new Dictionary<string, string>
{
{"OAuthBearerClientId", "clientId"},
{"OAuthBearerClientSecret", "secret"},
{"OAuthBearerScope", "scope"},
{"OAuthBearerExtensions", "key=value"},
{"OAuthBearerTokenEndpointUrl", "endpointUrl"},
};

var config = new ConfigurationBuilder().AddInMemoryCollection(configSslLocations).Build();

var bindingProvider = new KafkaTriggerAttributeBindingProvider(
config,
Options.Create(new KafkaOptions()),
new KafkaEventDataConvertManager(NullLogger.Instance),
new DefaultNameResolver(config),
NullLoggerFactory.Instance);

MethodInfo consumerConfigMethod = typeof(KafkaTriggerAttributeBindingProvider).GetMethod("CreateConsumerConfiguration", BindingFlags.NonPublic | BindingFlags.Instance);

KafkaListenerConfiguration result = (KafkaListenerConfiguration)consumerConfigMethod.Invoke(bindingProvider, new object[] { attribute });

Assert.Equal("brokers:9092", result.BrokerList);
Assert.Equal(SecurityProtocol.SaslSsl, result.SecurityProtocol);
Assert.Equal(SaslMechanism.OAuthBearer, result.SaslMechanism);
Assert.Equal("secret", result.SaslOAuthBearerClientSecret);
Assert.Equal("clientId", result.SaslOAuthBearerClientId);
Assert.Equal(SaslOauthbearerMethod.Oidc, result.SaslOAuthBearerMethod);
Assert.Equal("scope", result.SaslOAuthBearerScope);
Assert.Equal("key=value", result.SaslOAuthBearerExtensions);
Assert.Equal("endpointUrl", result.SaslOAuthBearerTokenEndpointUrl);
}
}
}

0 comments on commit 6ba749b

Please sign in to comment.