-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-28484: Add ability to replicate to a different tableName #6578
base: master
Are you sure you want to change the base?
Conversation
0ad9568
to
670844c
Compare
670844c
to
0448f31
Compare
@Apache9 Happy New Year! Do you have any thoughts on the new design and implementation? Thank you for all of your help so far! |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
🎊 +1 overall
This message was automatically generated. |
Will take a look soon. Thanks for preparing the design doc and also the PR. |
@Apache9 Would you mind taking a look at this when you get a chance? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The solution looks OK, but there are still some concerns about how to make use of the ReplicationSinkTranslator...
@@ -139,6 +140,8 @@ public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerH | |||
Class<? extends SourceFSConfigurationProvider> c = | |||
Class.forName(className).asSubclass(SourceFSConfigurationProvider.class); | |||
this.provider = c.getDeclaredConstructor().newInstance(); | |||
} catch (RuntimeException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for debug?
@@ -153,6 +156,8 @@ private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { | |||
filter = walEntryFilterClass == null | |||
? null | |||
: (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); | |||
} catch (RuntimeException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too.
@@ -968,6 +968,9 @@ public enum OperationStatusCode { | |||
public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT = | |||
"org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl"; | |||
public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled"; | |||
public static final String REPLICATION_SINK_TRANSLATOR = "hbase.replication.sink.translator"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better not put things in HConstants, put them into the package where we use it?
import org.apache.hadoop.hbase.TableName; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
|
||
@InterfaceAudience.Public |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be IA.LimitedPrivate("CONFIG")? Do we expected users to use it directly in their code?
Class<?> translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, | ||
IdentityReplicationSinkTranslator.class, ReplicationSinkTranslator.class); | ||
try { | ||
return (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC we have a ReflectionUtils or something to call constructors of a class, here we do not need to pass the Configuration object to it? Maybe the translator needs to load some configurations?
throw e; | ||
} catch (Exception e) { | ||
LOG.warn("Failed to instantiate " + translatorClass); | ||
return new IdentityReplicationSinkTranslator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right choice to fallback to default implementation? I'm not sure...
import org.apache.yetus.audience.InterfaceAudience; | ||
|
||
@InterfaceAudience.Public | ||
public interface ReplicationSinkTranslator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better add some javadoc's to explain the meanings of the methods and the usage of this class?
mutation.setClusterIds(clusterIds); | ||
mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, | ||
TableName sinkTableName = translator.getSinkTableName(tableName); | ||
ExtendedCell sinkCell = translator.getSinkExtendedCell(tableName, cell); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the reason why I want to see the javadoc for this method,as why do we need to pass the original table name in? And I think we will just do tableName mapping, so we do not need to call the above getSinkTableName everytime as all the cells from the WALEntry are for the same table?
Design document
Jira
Currently, replication can only occur if the source and sink clusters both house tables with the same (tableName, family) pairs. This requirement exists so that the sink cluster knows where to persist the data it receives from the source cluster. In this PR, we loosen the naming constraint and give clients more configuration power over the name of their sink namespaces and tableNames.