Skip to content

Commit

Permalink
Merge pull request #3859 from Bhashinee/master
Browse files Browse the repository at this point in the history
Migrate the MongoDB data services to MongoDB driver 4.9.1
  • Loading branch information
Bhashinee authored Jan 22, 2025
2 parents d7dfc69 + c86c955 commit 72f9f01
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,6 @@ private MongoDB() {
public static final String SSL_ENABLED = "mongoDB_ssl_enabled";
public static final String AUTH_SOURCE = "mongoDB_auth_source";
public static final String CONNECTIONS_PER_HOST = "mongoDB_connectionsPerHost";
public static final String THREADS_ALLOWED_TO_BLOCK_CONN_MULTIPLIER = "mongoDB_threadsAllowedToBlockForConnectionMultiplier";
public static final String RESULT_COLUMN_NAME = "Document";

public static class MongoOperationLabels {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@
<description>WSO2 Data Services Core Bundle</description>

<dependencies>

<dependency>
<groupId>org.wso2.orbit.org.jongo.wso2</groupId>
<artifactId>jongo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.xmlbeans</groupId>
<artifactId>xmlbeans</artifactId>
Expand Down Expand Up @@ -183,7 +178,15 @@
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
Expand Down Expand Up @@ -366,9 +369,7 @@
org.apache.poi.openxml4j.exceptions;version="${poi-ooxml.orbit.imp.pkg.version}",
org.apache.poi.ss.usermodel;version="${poi.orbit.imp.pkg.version}",
org.apache.commons.collections4;version="${commons-collections4.orbit.imp.pkg.version}",
com.mongodb;version="${mongo-java-driver.orbit.imp.pkg.version}",
com.mongodb.util;version="${mongo-java-driver.orbit.imp.pkg.version}",
org.jongo;version="${jongo.orbit.imp.pkg.version}",
com.mongodb;version="${mongodb-driver-sync.imp.pkg.version}",
org.wso2.micro.integrator.dataservices.sql.driver;version="${project.version}",
org.wso2.micro.integrator.dataservices.sql.driver.parser;version="${project.version}",
org.wso2.micro.integrator.dataservices.sql.driver.util;version="${project.version}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@
*/
package org.wso2.micro.integrator.dataservices.core.description.config;

import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoCredential;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoClient;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jongo.Jongo;
import org.wso2.micro.integrator.dataservices.common.DBConstants;
import org.wso2.micro.integrator.dataservices.core.DBUtils;import org.wso2.micro.integrator.dataservices.core.DataServiceFault;import org.wso2.micro.integrator.dataservices.core.engine.DataService;import org.wso2.micro.integrator.dataservices.core.odata.MongoDataHandler;import org.wso2.micro.integrator.dataservices.core.odata.ODataDataHandler;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -45,13 +45,10 @@ public class MongoConfig extends Config {
private static final Log log = LogFactory.getLog(
MongoConfig.class);

private MongoClient mongoClient;

private String [] servers;
private final MongoClient mongoClient;

private MongoClientOptions mongoClientOptions;

private Jongo jongo;
private final MongoClientSettings mongoClientSettings;
private final MongoDatabase mongoDatabase;

public MongoConfig(DataService dataService, String configId, Map<String, String> properties, boolean odataEnable)
throws DataServiceFault {
Expand All @@ -60,39 +57,29 @@ public MongoConfig(DataService dataService, String configId, Map<String, String>
if (DBUtils.isEmptyString(serversParam)) {
throw new DataServiceFault("The data source param '" + DBConstants.MongoDB.SERVERS + "' is required");
}
this.servers = serversParam.split(",");
String[] servers = serversParam.split(",");
String database = properties.get(DBConstants.MongoDB.DATABASE);
if (DBUtils.isEmptyString(database)) {
throw new DataServiceFault("The data source param '" + DBConstants.MongoDB.DATABASE + "' is required");
}
try {
this.mongoClientOptions = extractMongoOptions(properties);
this.mongoClient = createNewMongo(properties);
String writeConcern = properties.get(DBConstants.MongoDB.WRITE_CONCERN);
if (!DBUtils.isEmptyString(writeConcern)) {
this.getMongoClient().setWriteConcern(WriteConcern.valueOf(writeConcern));
}
String readPref = properties.get(DBConstants.MongoDB.READ_PREFERENCE);
if (!DBUtils.isEmptyString(readPref)) {
this.getMongoClient().setReadPreference(ReadPreference.valueOf(readPref));
}
this.getMongoClient().getDatabase(database);
this.jongo = new Jongo(this.getMongoClient().getDB(database));
List<ServerAddress> serverAddresses = createServerAddresses(servers);
MongoCredential mongoCredentials = createCredential(properties);
this.mongoClientSettings = extractMongoOptions(properties, writeConcern, readPref, serverAddresses,
mongoCredentials);
this.mongoClient = createNewMongo(this.mongoClientSettings);
this.mongoDatabase = this.getMongoClient().getDatabase(database);
} catch (Exception e) {
throw new DataServiceFault(e, DBConstants.FaultCodes.CONNECTION_UNAVAILABLE_ERROR, e.getMessage());
}

}

public MongoClient createNewMongo(Map<String, String> properties) throws DataServiceFault {
public MongoClient createNewMongo(MongoClientSettings mongoClientSettings) throws DataServiceFault {
try {
MongoCredential credential = createCredential(properties);
if (credential != null) {
return new MongoClient(this.createServerAddresses(this.getServers()),
Collections.singletonList(credential), getMongoClientOptions());
} else {
return new MongoClient(this.createServerAddresses(this.getServers()), getMongoClientOptions());
}
return MongoClients.create(mongoClientSettings);
} catch (Exception e) {
throw new DataServiceFault(e);
}
Expand All @@ -101,7 +88,7 @@ public MongoClient createNewMongo(Map<String, String> properties) throws DataSer
@Override
public boolean isActive() {
try {
Mongo mongo = this.createNewMongo(getProperties());
MongoClient mongo = this.createNewMongo(this.mongoClientSettings);
return mongo != null;
} catch (Exception e) {
log.error("Error in checking Mongo config availability", e);
Expand All @@ -116,39 +103,47 @@ public void close() {

@Override
public ODataDataHandler createODataHandler() {
return new MongoDataHandler(getConfigId(), getJongo());
return new MongoDataHandler(getConfigId(), this.mongoDatabase);
}

private MongoClientOptions extractMongoOptions(Map<String, String> properties) {
MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
private MongoClientSettings extractMongoOptions(Map<String, String> properties, String writeConcern,
String readPref, List<ServerAddress> serverAddresses,
MongoCredential mongoCredentials) {
MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();
String connectionsPerHost = properties.get(DBConstants.MongoDB.CONNECTIONS_PER_HOST);
if (!DBUtils.isEmptyString(connectionsPerHost)) {
builder.connectionsPerHost(Integer.parseInt(connectionsPerHost));
settingsBuilder.applyToConnectionPoolSettings(builder -> builder.maxSize(Integer.parseInt(connectionsPerHost)));
}
String maxWaitTime = properties.get(DBConstants.MongoDB.MAX_WAIT_TIME);
if (!DBUtils.isEmptyString(maxWaitTime)) {
builder.maxWaitTime(Integer.parseInt(maxWaitTime));
settingsBuilder.applyToConnectionPoolSettings(builder -> builder.maxWaitTime(Integer.parseInt(maxWaitTime),
TimeUnit.MILLISECONDS));
}
String connectTimeout = properties.get(DBConstants.MongoDB.CONNECT_TIMEOUT);
if (!DBUtils.isEmptyString(connectTimeout)) {
builder.connectTimeout(Integer.parseInt(connectTimeout));
settingsBuilder.applyToSocketSettings(builder -> builder.connectTimeout(Integer.parseInt(connectTimeout),
TimeUnit.MILLISECONDS));
}
String socketTimeout = properties.get(DBConstants.MongoDB.SOCKET_TIMEOUT);
if (!DBUtils.isEmptyString(socketTimeout)) {
builder.socketTimeout(Integer.parseInt(socketTimeout));
settingsBuilder.applyToSocketSettings(builder -> builder.readTimeout(Integer.parseInt(socketTimeout),
TimeUnit.MILLISECONDS));
}
String threadsAllowedToBlockForConnectionMultiplier = properties.get(
DBConstants.MongoDB.THREADS_ALLOWED_TO_BLOCK_CONN_MULTIPLIER);
if (!DBUtils.isEmptyString(threadsAllowedToBlockForConnectionMultiplier)) {
builder.threadsAllowedToBlockForConnectionMultiplier(
Integer.parseInt(threadsAllowedToBlockForConnectionMultiplier));
}

String sslEnabled = (properties.get(DBConstants.MongoDB.SSL_ENABLED));
if (Boolean.parseBoolean(sslEnabled)) {
builder.sslEnabled(true);
settingsBuilder.applyToSslSettings(builder -> builder.enabled(true));
}
if (!DBUtils.isEmptyString(writeConcern)) {
settingsBuilder.writeConcern(WriteConcern.valueOf(writeConcern));
}
if (!DBUtils.isEmptyString(readPref)) {
settingsBuilder.readPreference(ReadPreference.valueOf(readPref));
}
settingsBuilder.applyToClusterSettings(builder -> builder.hosts(serverAddresses));
if (mongoCredentials != null) {
settingsBuilder.credential(mongoCredentials);
}
return builder.build();
return settingsBuilder.build();
}

public MongoClient getMongoClient() {
Expand Down Expand Up @@ -189,9 +184,6 @@ private MongoCredential createCredential(Map<String, String> properties) throws
case DBConstants.MongoDB.MongoAuthenticationTypes.SCRAM_SHA_1:
credential = MongoCredential.createScramSha1Credential(username, authSource, password.toCharArray());
break;
case DBConstants.MongoDB.MongoAuthenticationTypes.MONGODB_CR:
credential = MongoCredential.createMongoCRCredential(username, authSource, password.toCharArray());
break;
case DBConstants.MongoDB.MongoAuthenticationTypes.GSSAPI:
credential = MongoCredential.createGSSAPICredential(username);
break;
Expand All @@ -207,16 +199,8 @@ private MongoCredential createCredential(Map<String, String> properties) throws
}
}

public String[] getServers() {
return servers;
}

public MongoClientOptions getMongoClientOptions() {
return mongoClientOptions;
}

public Jongo getJongo() {
return jongo;
public MongoDatabase getMongoDatabase() {
return mongoDatabase;
}

@Override
Expand Down
Loading

0 comments on commit 72f9f01

Please sign in to comment.