diff --git a/README.md b/README.md index 68bc349..765ff61 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ public class Citizen implements HBRecord { @HBColumnMultiVersion(family = "tracked", column = "phone_number") private NavigableMap phoneNumber; - @HBColumn(family = "optional", column = "pincode", codecFlags = {@Flag(name = BestSuitCodec.SERIALISE_AS_STRING, value = "true")}) + @HBColumn(family = "optional", column = "pincode", codecFlags = {@Flag(name = BestSuitCodec.SERIALIZE_AS_STRING, value = "true")}) private Integer pincode; @Override @@ -79,7 +79,7 @@ See source files [Citizen.java](./src/test/java/com/flipkart/hbaseobjectmapper/t * serializes `null` as `null` * To control/modify serialization/deserialization behavior, you may define your own codec (by implementing the `Codec` interface) or you may extend the default codec (`BestSuitCodec`). * The optional parameter `codecFlag` (supported by both `@HBColumn` and `@HBColumnMultiVersion` annotations) can be used to pass custom flags to the underlying codec. (e.g. You may write your codec to serialize field `Integer id` in `Citizen` class differently from field `Integer id` in `Employee` class) -* The default codec class `BestSuitCodec` takes a flag `BestSuitCodec.SERIALISE_AS_STRING`, whose value is "serializeAsString" (as in the above `Citizen` class example). When this flag is set to `true` on a field, the default codec serializes that field (even numerical fields) as `String`s. +* The default codec class `BestSuitCodec` takes a flag `BestSuitCodec.SERIALIZE_AS_STRING`, whose value is "serializeAsString" (as in the above `Citizen` class example). When this flag is set to `true` on a field, the default codec serializes that field (even numerical fields) as `String`s. * Your custom codec may take other such flags to customize serialization/deserialization behavior at a class field level. ## MapReduce use-cases @@ -246,7 +246,7 @@ Add below entry within the `dependencies` section of your `pom.xml`: com.flipkart hbase-object-mapper - 1.7 + 1.8 ``` See artifact details: [com.flipkart:hbase-object-mapper on **Maven Central**](http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22com.flipkart%22%20AND%20a%3A%22hbase-object-mapper%22) or @@ -255,7 +255,7 @@ See artifact details: [com.flipkart:hbase-object-mapper on **Maven Central**](ht To build this project, follow below steps: * Do a `git clone` of this repository - * Checkout latest stable version `git checkout v1.7` + * Checkout latest stable version `git checkout v1.8` * Execute `mvn clean install` from shell Currently, projects that use this library are running on [Hortonworks Data Platform v2.4](https://hortonworks.com/blog/apache-hadoop-2-4-0-released/) (corresponds to Hadoop 2.7 and HBase 1.1). However, if you're using a different distribution of Hadoop (like [Cloudera](http://www.cloudera.com/)) or if you are using a different version of Hadoop, you may change the versions in [pom.xml](./pom.xml) to desired ones and build the project. diff --git a/pom.xml b/pom.xml index 6bf1d39..1506079 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> HBase Object Mapper - Java-annotation based compact utility library for HBase that helps you: + A Java-annotation based compact utility library for HBase that helps you: [1] convert objects of your bean-like classes to HBase rows and vice-versa, for use in writing MapReduce jobs for HBase tables and writing high-quality unit test cases [2] define 'Data Access Object' classes for random access of HBase rows @@ -12,7 +12,7 @@ 4.0.0 com.flipkart hbase-object-mapper - 1.7 + 1.8 https://flipkart-incubator.github.io/hbase-object-mapper/ https://github.com/flipkart-incubator/hbase-object-mapper @@ -33,7 +33,7 @@ development Flipkart - http://www.flipkart.com/ + https://www.flipkart.com diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/AbstractHBDAO.java b/src/main/java/com/flipkart/hbaseobjectmapper/AbstractHBDAO.java index 92f9d8a..a8cf75e 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/AbstractHBDAO.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/AbstractHBDAO.java @@ -1,10 +1,12 @@ package com.flipkart.hbaseobjectmapper; +import com.flipkart.hbaseobjectmapper.codec.Codec; import com.flipkart.hbaseobjectmapper.exceptions.FieldNotMappedToHBaseColumnException; import com.google.common.reflect.TypeToken; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; @@ -19,19 +21,25 @@ /** * A Data Access Object class that enables simple random access (read/write) of HBase rows. *

- * Please note: This class relies heavily on HBase client library's {@link HBTable} class, which isn't thread-safe (See: HBASE-17361). Hence, this class isn't thread-safe. + * Please note: This class relies heavily on HBase client library's {@link Table} interface, whose implementations aren't thread-safe. Hence, this class isn't thread-safe. + *

+ *

+ * To learn more about thread-safe access to HBase, see conversation here: HBASE-17361 *

* * @param Data type of row key (must be '{@link Comparable} with itself' and must be {@link Serializable}) * @param Entity type that maps to an HBase row (this type must have implemented {@link HBRecord} interface) + * @see Connection#getTable(TableName) + * @see Table + * @see HTable */ public abstract class AbstractHBDAO, T extends HBRecord> implements Closeable { /** - * Default number of versions to fetch + * Default number of versions to fetch. Change this to {@link Integer#MAX_VALUE} if you want the default behavior to be 'all versions'. */ private static final int DEFAULT_NUM_VERSIONS = 1; - protected static final HBObjectMapper hbObjectMapper = new HBObjectMapper(); + protected final HBObjectMapper hbObjectMapper; protected final Connection connection; protected final Table table; protected final Class rowKeyClass; @@ -40,14 +48,18 @@ public abstract class AbstractHBDAO, T ex private final Map fields; /** - * Constructs a data access object. Classes extending this class must call this constructor using super + * Constructs a data access object using a custom codec. Classes extending this class must call this constructor using super. + *

+ * Note: If you want to use the default codec, just use the constructor {@link #AbstractHBDAO(Configuration)} + *

* * @param configuration Hadoop configuration + * @param codec Your custom codec * @throws IOException Exceptions thrown by HBase * @throws IllegalStateException Annotation(s) on base entity may be incorrect */ @SuppressWarnings("unchecked") - protected AbstractHBDAO(Configuration configuration) throws IOException { + protected AbstractHBDAO(Configuration configuration, Codec codec) throws IOException { hbRecordClass = (Class) new TypeToken(getClass()) { }.getRawType(); rowKeyClass = (Class) new TypeToken(getClass()) { @@ -55,12 +67,25 @@ protected AbstractHBDAO(Configuration configuration) throws IOException { if (hbRecordClass == null || rowKeyClass == null) { throw new IllegalStateException(String.format("Unable to resolve HBase record/rowkey type (record class is resolving to %s and rowkey class is resolving to %s)", hbRecordClass, rowKeyClass)); } + hbObjectMapper = HBObjectMapperFactory.construct(codec); hbTable = new WrappedHBTable<>(hbRecordClass); connection = ConnectionFactory.createConnection(configuration); table = connection.getTable(hbTable.getName()); fields = hbObjectMapper.getHBFields(hbRecordClass); } + /** + * Constructs a data access object. Classes extending this class must call this constructor using super. + * + * @param configuration Hadoop configuration + * @throws IOException Exceptions thrown by HBase + * @throws IllegalStateException Annotation(s) on base entity may be incorrect + */ + @SuppressWarnings("unchecked") + protected AbstractHBDAO(Configuration configuration) throws IOException { + this(configuration, null); + } + /** * Get specified number of versions of a row from HBase table by it's row key * @@ -70,7 +95,7 @@ protected AbstractHBDAO(Configuration configuration) throws IOException { * @throws IOException When HBase call fails */ public T get(R rowKey, int versions) throws IOException { - Result result = this.table.get(new Get(hbObjectMapper.rowKeyToBytes(rowKey)).setMaxVersions(versions)); + Result result = this.table.get(new Get(toBytes(rowKey)).setMaxVersions(versions)); return hbObjectMapper.readValue(rowKey, result, hbRecordClass); } @@ -97,7 +122,7 @@ public T get(R rowKey) throws IOException { public T[] get(R[] rowKeys, int versions) throws IOException { List gets = new ArrayList<>(rowKeys.length); for (R rowKey : rowKeys) { - gets.add(new Get(hbObjectMapper.rowKeyToBytes(rowKey)).setMaxVersions(versions)); + gets.add(new Get(toBytes(rowKey)).setMaxVersions(versions)); } Result[] results = this.table.get(gets); @SuppressWarnings("unchecked") T[] records = (T[]) Array.newInstance(hbRecordClass, rowKeys.length); @@ -129,7 +154,7 @@ public T[] get(R[] rowKeys) throws IOException { public List get(List rowKeys, int versions) throws IOException { List gets = new ArrayList<>(rowKeys.size()); for (R rowKey : rowKeys) { - gets.add(new Get(hbObjectMapper.rowKeyToBytes(rowKey)).setMaxVersions(versions)); + gets.add(new Get(toBytes(rowKey)).setMaxVersions(versions)); } Result[] results = this.table.get(gets); List records = new ArrayList<>(rowKeys.size()); @@ -160,7 +185,7 @@ public List get(List rowKeys) throws IOException { * @throws IOException When HBase call fails */ public List get(R startRowKey, R endRowKey, int versions) throws IOException { - Scan scan = new Scan(hbObjectMapper.rowKeyToBytes(startRowKey), hbObjectMapper.rowKeyToBytes(endRowKey)).setMaxVersions(versions); + Scan scan = new Scan(toBytes(startRowKey), toBytes(endRowKey)).setMaxVersions(versions); ResultScanner scanner = table.getScanner(scan); List records = new ArrayList<>(); for (Result result : scanner) { @@ -220,7 +245,7 @@ public List persist(List> records) throws IOException { * @throws IOException When HBase call fails */ public void delete(R rowKey) throws IOException { - Delete delete = new Delete(hbObjectMapper.rowKeyToBytes(rowKey)); + Delete delete = new Delete(toBytes(rowKey)); this.table.delete(delete); } @@ -243,7 +268,7 @@ public void delete(HBRecord record) throws IOException { public void delete(R[] rowKeys) throws IOException { List deletes = new ArrayList<>(rowKeys.length); for (R rowKey : rowKeys) { - deletes.add(new Delete(hbObjectMapper.rowKeyToBytes(rowKey))); + deletes.add(new Delete(toBytes(rowKey))); } this.table.delete(deletes); } @@ -257,7 +282,7 @@ public void delete(R[] rowKeys) throws IOException { public void delete(List> records) throws IOException { List deletes = new ArrayList<>(records.size()); for (HBRecord record : records) { - deletes.add(new Delete(hbObjectMapper.rowKeyToBytes(record.composeRowKey()))); + deletes.add(new Delete(toBytes(record.composeRowKey()))); } this.table.delete(deletes); } @@ -316,7 +341,7 @@ private void populateFieldValuesToMap(Field field, Result result, Map cells = result.getColumnCells(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column())); for (Cell cell : cells) { Type fieldType = hbObjectMapper.getFieldType(field, hbColumn.isMultiVersioned()); - final R rowKey = hbObjectMapper.bytesToRowKey(CellUtil.cloneRow(cell), (Class) field.getDeclaringClass()); + final R rowKey = hbObjectMapper.bytesToRowKey(CellUtil.cloneRow(cell), hbTable.getCodecFlags(), (Class) field.getDeclaringClass()); if (!map.containsKey(rowKey)) { map.put(rowKey, new TreeMap()); } @@ -391,7 +416,7 @@ public NavigableMap> fetchFieldValues(R startRowKe Field field = getField(fieldName); WrappedHBColumn hbColumn = new WrappedHBColumn(field); validateFetchInput(field, hbColumn); - Scan scan = new Scan(hbObjectMapper.rowKeyToBytes(startRowKey), hbObjectMapper.rowKeyToBytes(endRowKey)); + Scan scan = new Scan(toBytes(startRowKey), toBytes(endRowKey)); scan.addColumn(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column())); scan.setMaxVersions(versions); ResultScanner scanner = table.getScanner(scan); @@ -430,7 +455,7 @@ public Map> fetchFieldValues(R[] rowKeys, String f validateFetchInput(field, hbColumn); List gets = new ArrayList<>(rowKeys.length); for (R rowKey : rowKeys) { - Get get = new Get(hbObjectMapper.rowKeyToBytes(rowKey)); + Get get = new Get(toBytes(rowKey)); get.setMaxVersions(versions); get.addColumn(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column())); gets.add(get); @@ -443,6 +468,10 @@ public Map> fetchFieldValues(R[] rowKeys, String f return map; } + private byte[] toBytes(R rowKey) { + return hbObjectMapper.rowKeyToBytes(rowKey, hbTable.getCodecFlags()); + } + private void validateFetchInput(Field field, WrappedHBColumn hbColumn) { if (!hbColumn.isPresent()) { throw new FieldNotMappedToHBaseColumnException(hbRecordClass, field.getName()); diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/Family.java b/src/main/java/com/flipkart/hbaseobjectmapper/Family.java index 22770f6..16448b2 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/Family.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/Family.java @@ -1,15 +1,16 @@ package com.flipkart.hbaseobjectmapper; -import java.lang.annotation.ElementType; import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + /** * Represents a column family in HBase */ -@Target(ElementType.TYPE) -@Retention(RetentionPolicy.RUNTIME) +@Target(TYPE) +@Retention(RUNTIME) public @interface Family { /** * Column family name diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/FamilyAndColumn.java b/src/main/java/com/flipkart/hbaseobjectmapper/FamilyAndColumn.java new file mode 100644 index 0000000..e4982fb --- /dev/null +++ b/src/main/java/com/flipkart/hbaseobjectmapper/FamilyAndColumn.java @@ -0,0 +1,9 @@ +package com.flipkart.hbaseobjectmapper; + +import org.apache.hadoop.hbase.util.Pair; + +class FamilyAndColumn extends Pair { + FamilyAndColumn(String family, String column) { + super(family, column); + } +} diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/Flag.java b/src/main/java/com/flipkart/hbaseobjectmapper/Flag.java index d9ab3f0..457a0ae 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/Flag.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/Flag.java @@ -2,18 +2,19 @@ import com.flipkart.hbaseobjectmapper.codec.Codec; -import java.lang.annotation.ElementType; import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + /** * A flag for {@link Codec Codec} (specify parameter name and value) *

* This is to be used exclusively for input to {@link HBColumn#codecFlags() codecFlags} parameter of {@link HBColumn} and {@link HBColumnMultiVersion} annotations */ -@Target(ElementType.FIELD) -@Retention(RetentionPolicy.RUNTIME) +@Target(FIELD) +@Retention(RUNTIME) public @interface Flag { String name(); diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/HBColumn.java b/src/main/java/com/flipkart/hbaseobjectmapper/HBColumn.java index 12101d2..ee6bb7b 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/HBColumn.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/HBColumn.java @@ -3,18 +3,19 @@ import com.flipkart.hbaseobjectmapper.codec.Codec; import java.io.Serializable; -import java.lang.annotation.ElementType; import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.lang.reflect.Type; import java.util.Map; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + /** * Maps an entity field to an HBase column */ -@Target(ElementType.FIELD) -@Retention(RetentionPolicy.RUNTIME) +@Target(FIELD) +@Retention(RUNTIME) public @interface HBColumn { /** diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/HBColumnMultiVersion.java b/src/main/java/com/flipkart/hbaseobjectmapper/HBColumnMultiVersion.java index 93bed47..500be09 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/HBColumnMultiVersion.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/HBColumnMultiVersion.java @@ -3,13 +3,14 @@ import com.flipkart.hbaseobjectmapper.codec.Codec; import java.io.Serializable; -import java.lang.annotation.ElementType; import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.lang.reflect.Type; import java.util.Map; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + /** * Maps an entity field of type NavigableMap<Long, T> to an HBase column whose data type is represented as data type T. *

@@ -17,8 +18,8 @@ *

* Please note: T must be {@link Serializable} */ -@Target(ElementType.FIELD) -@Retention(RetentionPolicy.RUNTIME) +@Target(FIELD) +@Retention(RUNTIME) public @interface HBColumnMultiVersion { /** diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapper.java b/src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapper.java index e4d1017..305a66e 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapper.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapper.java @@ -14,16 +14,14 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import java.io.Serializable; import java.lang.reflect.*; import java.util.*; /** - *

An object mapper class that helps convert objects of your bean-like class to HBase's {@link Put} and {@link Result} objects (and vice-versa).

- *

This class is for use in MapReduce jobs which read from and/or write to HBase tables and their unit-tests.

- *

This class is thread-safe.

+ *

An object mapper class that helps convert/serialize objects of your bean-like class to HBase's {@link Put} and {@link Result} objects (and vice-versa). Your 'bean-like class' must implement {@link HBRecord} interface and should preferably follow JavaBeans conventions.

+ *

This class is for use in MapReduce jobs which read from and/or write to HBase tables and their unit-tests. This class is thread-safe.

*/ public class HBObjectMapper { @@ -37,6 +35,9 @@ public class HBObjectMapper { * @param codec Codec to be used for serialization and deserialization of fields */ public HBObjectMapper(Codec codec) { + if (codec == null) { + throw new IllegalArgumentException("Parameter 'codec' cannot be null. If you want to use the default codec, use the no-arg constructor"); + } this.codec = codec; } @@ -54,56 +55,55 @@ public HBObjectMapper() { * @param Data type of row key * @return Byte array */ - > byte[] rowKeyToBytes(R rowKey) { - return valueToByteArray(rowKey); + > byte[] rowKeyToBytes(R rowKey, Map codecFlags) { + return valueToByteArray(rowKey, codecFlags); } @SuppressWarnings("unchecked") - , T extends HBRecord> R bytesToRowKey(byte[] rowKeyBytes, Class entityClass) { + , T extends HBRecord> R bytesToRowKey(byte[] rowKeyBytes, Map codecFlags, Class entityClass) { try { - return (R) byteArrayToValue(rowKeyBytes, entityClass.getDeclaredMethod("composeRowKey").getReturnType(), null); + return (R) byteArrayToValue(rowKeyBytes, entityClass.getDeclaredMethod("composeRowKey").getReturnType(), codecFlags); } catch (NoSuchMethodException e) { throw new InternalError(e); } } private , T extends HBRecord> T mapToObj(byte[] rowKeyBytes, NavigableMap>> map, Class clazz) { - R rowKey = bytesToRowKey(rowKeyBytes, clazz); - T obj; + WrappedHBTable hbTable = new WrappedHBTable<>(clazz); + R rowKey = bytesToRowKey(rowKeyBytes, hbTable.getCodecFlags(), clazz); + T record; validateHBClass(clazz); try { - obj = clazz.newInstance(); + record = clazz.newInstance(); } catch (Exception ex) { throw new ObjectNotInstantiatableException("Error while instantiating empty constructor of " + clazz.getName(), ex); } try { - obj.parseRowKey(rowKey); + record.parseRowKey(rowKey); } catch (Exception ex) { throw new RowKeyCouldNotBeParsedException(String.format("Supplied row key \"%s\" could not be parsed", rowKey), ex); } for (Field field : clazz.getDeclaredFields()) { WrappedHBColumn hbColumn = new WrappedHBColumn(field); - if (hbColumn.isSingleVersioned()) { + if (hbColumn.isPresent()) { NavigableMap> familyMap = map.get(Bytes.toBytes(hbColumn.family())); if (familyMap == null || familyMap.isEmpty()) continue; NavigableMap columnVersionsMap = familyMap.get(Bytes.toBytes(hbColumn.column())); - if (columnVersionsMap == null || columnVersionsMap.isEmpty()) - continue; - Map.Entry lastEntry = columnVersionsMap.lastEntry(); - objectSetFieldValue(obj, field, lastEntry.getValue(), hbColumn.codecFlags()); - } else if (hbColumn.isMultiVersioned()) { - NavigableMap> familyMap = map.get(Bytes.toBytes(hbColumn.family())); - if (familyMap == null || familyMap.isEmpty()) - continue; - NavigableMap columnVersionsMap = familyMap.get(Bytes.toBytes(hbColumn.column())); - objectSetFieldValue(obj, field, columnVersionsMap, hbColumn.codecFlags()); + if (hbColumn.isSingleVersioned()) { + if (columnVersionsMap == null || columnVersionsMap.isEmpty()) + continue; + Map.Entry lastEntry = columnVersionsMap.lastEntry(); + objectSetFieldValue(record, field, lastEntry.getValue(), hbColumn.codecFlags()); + } else { + objectSetFieldValue(record, field, columnVersionsMap, hbColumn.codecFlags()); + } } } - return obj; + return record; } - private , T extends HBRecord> boolean isFieldNull(Field field, HBRecord obj) { + private static > boolean isFieldNull(Field field, HBRecord obj) { try { field.setAccessible(true); return field.get(obj) == null; @@ -127,63 +127,57 @@ private byte[] valueToByteArray(Serializable value, Map codecFla } } - private byte[] valueToByteArray(Serializable value) { - return valueToByteArray(value, null); - } - /** - *

Converts an object representing an HBase row key into HBase's {@link ImmutableBytesWritable}.

- *

This method is for use in Mappers, uni-tests for Mappers and unit-tests for Reducers.

+ *

Serialize an object to HBase's {@link ImmutableBytesWritable}.

+ *

This method is for use in Mappers, unit-tests for Mappers and unit-tests for Reducers.

* - * @param rowKey Row key object to be serialized - * @param Data type of row key + * @param value Object to be serialized * @return Byte array, wrapped in HBase's data type + * @see #getRowKey */ - public > ImmutableBytesWritable rowKeyToIbw(R rowKey) { - return new ImmutableBytesWritable(valueToByteArray(rowKey)); + public ImmutableBytesWritable toIbw(Serializable value) { + return new ImmutableBytesWritable(valueToByteArray(value, null)); } private , T extends HBRecord> WrappedHBTable validateHBClass(Class clazz) { Constructor constructor; - WrappedHBTable hbTable; try { constructor = clazz.getDeclaredConstructor(); - int numOfHBColumns = 0, numOfHBRowKeys = 0; - hbTable = new WrappedHBTable<>(clazz); - Set> columns = new HashSet<>(clazz.getDeclaredFields().length, 1.0f); - for (Field field : clazz.getDeclaredFields()) { - if (field.isAnnotationPresent(HBRowKey.class)) { - numOfHBRowKeys++; + } catch (NoSuchMethodException e) { + throw new NoEmptyConstructorException(String.format("Class %s needs to specify an empty (public) constructor", clazz.getName()), e); + } + if (!Modifier.isPublic(constructor.getModifiers())) { + throw new EmptyConstructorInaccessibleException(String.format("Empty constructor of class %s is inaccessible. It needs to be public.", clazz.getName())); + } + int numOfHBColumns = 0, numOfHBRowKeys = 0; + WrappedHBTable hbTable = new WrappedHBTable<>(clazz); + Set columns = new HashSet<>(clazz.getDeclaredFields().length, 1.0f); + for (Field field : clazz.getDeclaredFields()) { + if (field.isAnnotationPresent(HBRowKey.class)) { + numOfHBRowKeys++; + } + WrappedHBColumn hbColumn = new WrappedHBColumn(field); + if (hbColumn.isPresent()) { + if (!hbTable.isColumnFamilyPresent(hbColumn.family())) { + throw new IllegalArgumentException(String.format("Class %s has field '%s' mapped to HBase column %s - but column family %s isn't configured in %s annotation", + clazz.getName(), field.getName(), hbColumn, hbColumn.family(), HBTable.class.getSimpleName())); } - WrappedHBColumn hbColumn = new WrappedHBColumn(field); - if (hbColumn.isPresent()) { - if (!hbTable.isColumnFamilyPresent(hbColumn.family())) { - throw new IllegalArgumentException(String.format("Class %s has field '%s' mapped to HBase column %s - but column family %s isn't configured in %s annotation", - clazz.getName(), field.getName(), hbColumn, hbColumn.family(), HBTable.class.getSimpleName())); - } - if (hbColumn.isSingleVersioned()) { - validateHBColumnSingleVersionField(field); - } else if (hbColumn.isMultiVersioned()) { - validateHBColumnMultiVersionField(field); - } - if (!columns.add(new Pair<>(hbColumn.family(), hbColumn.column()))) { - throw new FieldsMappedToSameColumnException(String.format("Class %s has more than one field (e.g. '%s') mapped to same HBase column %s", clazz.getName(), field.getName(), hbColumn)); - } - numOfHBColumns++; + if (hbColumn.isSingleVersioned()) { + validateHBColumnSingleVersionField(field); + } else if (hbColumn.isMultiVersioned()) { + validateHBColumnMultiVersionField(field); } + if (!columns.add(new FamilyAndColumn(hbColumn.family(), hbColumn.column()))) { + throw new FieldsMappedToSameColumnException(String.format("Class %s has more than one field (e.g. '%s') mapped to same HBase column %s", clazz.getName(), field.getName(), hbColumn)); + } + numOfHBColumns++; } - if (numOfHBColumns == 0) { - throw new MissingHBColumnFieldsException(clazz); - } - if (numOfHBRowKeys == 0) { - throw new MissingHBRowKeyFieldsException(clazz); - } - - } catch (NoSuchMethodException e) { - throw new NoEmptyConstructorException(String.format("Class %s needs to specify an empty constructor", clazz.getName()), e); } - if (!Modifier.isPublic(constructor.getModifiers())) { - throw new EmptyConstructorInaccessibleException(String.format("Empty constructor of class %s is inaccessible", clazz.getName())); + if (numOfHBColumns == 0) { + throw new MissingHBColumnFieldsException(clazz); + } + if (numOfHBRowKeys == 0) { + throw new MissingHBRowKeyFieldsException(clazz); } return hbTable; } @@ -191,6 +185,7 @@ private , T extends HBRecord> WrappedH /** * Internal note: This should be in sync with {@link #getFieldType(Field, boolean)} */ + private void validateHBColumnMultiVersionField(Field field) { validationHBColumnField(field); if (!(field.getGenericType() instanceof ParameterizedType)) { @@ -246,6 +241,7 @@ private void validationHBColumnField(Field field) { } } + @SuppressWarnings("unchecked") private , T extends HBRecord> NavigableMap>> objToMap(HBRecord obj) { Class clazz = (Class) obj.getClass(); validateHBClass(clazz); @@ -294,22 +290,22 @@ private , T extends HBRecord> Navigabl return map; } - private > byte[] getFieldValueAsBytes(HBRecord obj, Field field, Map codecFlags) { - R fieldValue; + private > byte[] getFieldValueAsBytes(HBRecord record, Field field, Map codecFlags) { + Serializable fieldValue; try { field.setAccessible(true); - fieldValue = (R) field.get(obj); + fieldValue = (Serializable) field.get(record); } catch (IllegalAccessException e) { throw new BadHBaseLibStateException(e); } return valueToByteArray(fieldValue, codecFlags); } - private > NavigableMap getFieldValuesVersioned(Field field, HBRecord obj, Map codecFlags) { + private > NavigableMap getFieldValuesVersioned(Field field, HBRecord record, Map codecFlags) { try { field.setAccessible(true); @SuppressWarnings("unchecked") - NavigableMap fieldValueVersions = (NavigableMap) field.get(obj); + NavigableMap fieldValueVersions = (NavigableMap) field.get(record); if (fieldValueVersions == null) return null; if (fieldValueVersions.size() == 0) { @@ -422,7 +418,7 @@ public > List writeValueAsResult( * @param clazz {@link Class} to which you want to convert to (must implement {@link HBRecord} interface) * @param Data type of row key * @param Entity type - * @return Bean-like object + * @return Object of bean-like class * @throws CodecException One or more column values is a byte[] that couldn't be deserialized into field type (as defined in your entity class) */ public , T extends HBRecord> T readValue(ImmutableBytesWritable rowKey, Result result, Class clazz) { @@ -439,7 +435,7 @@ public , T extends HBRecord> T readVal * @param clazz {@link Class} to which you want to convert to (must implement {@link HBRecord} interface) * @param Data type of row key * @param Entity type - * @return Bean-like object + * @return Object of bean-like class * @throws CodecException One or more column values is a byte[] that couldn't be deserialized into field type (as defined in your entity class) */ public , T extends HBRecord> T readValue(Result result, Class clazz) { @@ -450,7 +446,7 @@ , T extends HBRecord> T readValue(R ro if (rowKey == null) return readValueFromResult(result, clazz); else - return readValueFromRowAndResult(rowKeyToBytes(rowKey), result, clazz); + return readValueFromRowAndResult(rowKeyToBytes(rowKey, WrappedHBTable.getCodecFlags(clazz)), result, clazz); } private boolean isResultEmpty(Result result) { @@ -463,7 +459,9 @@ private , T extends HBRecord> T readVa } private , T extends HBRecord> T readValueFromRowAndResult(byte[] rowKey, Result result, Class clazz) { - if (isResultEmpty(result)) return null; + if (isResultEmpty(result)) { + return null; + } return mapToObj(rowKey, result.getMap(), clazz); } @@ -517,7 +515,7 @@ Object byteArrayToValue(byte[] value, Type type, Map codecFlags) * @param clazz {@link Class} to which you want to convert to (must implement {@link HBRecord} interface) * @param Data type of row key * @param Entity type - * @return Bean-like object + * @return Object of bean-like class * @throws CodecException One or more column values is a byte[] that couldn't be deserialized into field type (as defined in your entity class) */ public , T extends HBRecord> T readValue(ImmutableBytesWritable rowKey, Put put, Class clazz) { @@ -536,14 +534,14 @@ public , T extends HBRecord> T readVal * @param clazz {@link Class} to which you want to convert to (must implement {@link HBRecord} interface) * @param Data type of row key * @param Entity type - * @return Bean-like object + * @return Object of bean-like class * @throws CodecException One or more column values is a byte[] that couldn't be deserialized into field type (as defined in your entity class) */ , T extends HBRecord> T readValue(R rowKey, Put put, Class clazz) { if (rowKey == null) return readValueFromPut(put, clazz); else - return readValueFromRowAndPut(rowKeyToBytes(rowKey), put, clazz); + return readValueFromRowAndPut(rowKeyToBytes(rowKey, WrappedHBTable.getCodecFlags(clazz)), put, clazz); } private , T extends HBRecord> T readValueFromRowAndPut(byte[] rowKey, Put put, Class clazz) { @@ -580,7 +578,7 @@ private , T extends HBRecord> T readVa * @param clazz {@link Class} to which you want to convert to (must implement {@link HBRecord} interface) * @param Data type of row key * @param Entity type - * @return Bean-like object + * @return Object of bean-like class * @throws CodecException One or more column values is a byte[] that couldn't be deserialized into field type (as defined in your entity class) */ public , T extends HBRecord> T readValue(Put put, Class clazz) { @@ -597,7 +595,8 @@ public , T extends HBRecord> T readVal * * @param record object of your bean-like class (of type that extends {@link HBRecord}) * @param Data type of row key - * @return Row key + * @return Serialised row key wrapped in {@link ImmutableBytesWritable} + * @see #toIbw(Serializable) */ public > ImmutableBytesWritable getRowKey(HBRecord record) { if (record == null) { @@ -606,18 +605,18 @@ public > ImmutableBytesWritable getRowKey return new ImmutableBytesWritable(composeRowKey(record)); } - private > byte[] composeRowKey(HBRecord obj) { + private , T extends HBRecord> byte[] composeRowKey(HBRecord record) { R rowKey; try { - rowKey = obj.composeRowKey(); + rowKey = record.composeRowKey(); } catch (Exception ex) { throw new RowKeyCantBeComposedException(ex); } if (rowKey == null || rowKey.toString().isEmpty()) { throw new RowKeyCantBeEmptyException(); } - return valueToByteArray(rowKey); - + WrappedHBTable hbTable = new WrappedHBTable<>((Class) record.getClass()); + return valueToByteArray(rowKey, hbTable.getCodecFlags()); } /** @@ -661,8 +660,9 @@ public , T extends HBRecord> boolean i */ public , T extends HBRecord> Map getHBFields(Class clazz) { validateHBClass(clazz); - Map mappings = new HashMap<>(); - for (Field field : clazz.getDeclaredFields()) { + final Field[] declaredFields = clazz.getDeclaredFields(); + Map mappings = new HashMap<>(declaredFields.length, 1.0f); + for (Field field : declaredFields) { if (new WrappedHBColumn(field).isPresent()) { mappings.put(field.getName(), field); } diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapperFactory.java b/src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapperFactory.java new file mode 100644 index 0000000..2b8f0e7 --- /dev/null +++ b/src/main/java/com/flipkart/hbaseobjectmapper/HBObjectMapperFactory.java @@ -0,0 +1,18 @@ +package com.flipkart.hbaseobjectmapper; + +import com.flipkart.hbaseobjectmapper.codec.Codec; + +/** + * Maintains one instance of {@link HBObjectMapper} class. For internal use only. + */ +class HBObjectMapperFactory { + /** + * Default instance + */ + private static HBObjectMapper hbObjectMapper = new HBObjectMapper(); + + static HBObjectMapper construct(Codec codec) { + return codec == null ? hbObjectMapper : new HBObjectMapper(codec); + } +} + diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/HBRowKey.java b/src/main/java/com/flipkart/hbaseobjectmapper/HBRowKey.java index 8d3bfea..1c9688b 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/HBRowKey.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/HBRowKey.java @@ -1,14 +1,17 @@ package com.flipkart.hbaseobjectmapper; -import java.lang.annotation.ElementType; import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + /** - * Indicates that the annotated field (in part or full) forms row key. This is to be treated as a 'marker' annotation. Actual row key composition solely depends on your implementation of {@link HBRecord#composeRowKey()} method + * Indicates that the annotated field (in part or full) forms row key. + *

+ * This is just as a 'marker' annotation. Actual row key composition solely depends on your implementation of {@link HBRecord#composeRowKey()} method */ -@Target(ElementType.FIELD) -@Retention(RetentionPolicy.RUNTIME) +@Target(FIELD) +@Retention(RUNTIME) public @interface HBRowKey { } diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/HBTable.java b/src/main/java/com/flipkart/hbaseobjectmapper/HBTable.java index ddc64b1..8399362 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/HBTable.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/HBTable.java @@ -1,15 +1,21 @@ package com.flipkart.hbaseobjectmapper; -import java.lang.annotation.ElementType; +import com.flipkart.hbaseobjectmapper.codec.Codec; + +import java.io.Serializable; import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.lang.reflect.Type; +import java.util.Map; + +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * Maps an entity class to a table in HBase */ -@Target(ElementType.TYPE) -@Retention(RetentionPolicy.RUNTIME) +@Target(TYPE) +@Retention(RUNTIME) public @interface HBTable { /** * Name of the HBase table @@ -25,4 +31,13 @@ * @return Column families and their specs */ Family[] families() default {}; + + /** + * [optional] flags to be passed to codec's {@link Codec#serialize(Serializable, Map) serialize} and {@link Codec#deserialize(byte[], Type, Map) deserialize} methods + *

+ * Note: These flags will be passed as a Map<String, String> (param name and param value) + * + * @return Flags + */ + Flag[] rowKeyCodecFlags() default {}; } diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/WrappedHBColumn.java b/src/main/java/com/flipkart/hbaseobjectmapper/WrappedHBColumn.java index 9b0b104..f421124 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/WrappedHBColumn.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/WrappedHBColumn.java @@ -2,21 +2,25 @@ import com.flipkart.hbaseobjectmapper.exceptions.BothHBColumnAnnotationsPresentException; +import com.flipkart.hbaseobjectmapper.exceptions.DuplicateCodecFlagForColumnException; import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; + /** * A wrapper class for {@link HBColumn} and {@link HBColumnMultiVersion} annotations (for internal use only) */ class WrappedHBColumn { - private String family, column; - private boolean multiVersioned = false, singleVersioned = false; - private Class annotationClass; - private Map codecFlags; + private final String family, column; + private final boolean multiVersioned, singleVersioned; + private final Class annotationClass; + private final Map codecFlags; + private final Field field; WrappedHBColumn(Field field) { + this.field = field; HBColumn hbColumn = field.getAnnotation(HBColumn.class); HBColumnMultiVersion hbColumnMultiVersion = field.getAnnotation(HBColumnMultiVersion.class); if (hbColumn != null && hbColumnMultiVersion != null) { @@ -26,21 +30,33 @@ class WrappedHBColumn { family = hbColumn.family(); column = hbColumn.column(); singleVersioned = true; + multiVersioned = false; annotationClass = HBColumn.class; codecFlags = toMap(hbColumn.codecFlags()); } else if (hbColumnMultiVersion != null) { family = hbColumnMultiVersion.family(); column = hbColumnMultiVersion.column(); + singleVersioned = false; multiVersioned = true; annotationClass = HBColumnMultiVersion.class; codecFlags = toMap(hbColumnMultiVersion.codecFlags()); + } else { + family = null; + column = null; + singleVersioned = false; + multiVersioned = false; + annotationClass = null; + codecFlags = null; } } private Map toMap(Flag[] codecFlags) { - Map flagsMap = new HashMap<>(); + Map flagsMap = new HashMap<>(codecFlags.length, 1.0f); for (Flag flag : codecFlags) { - flagsMap.put(flag.name(), flag.value()); + String previousValue = flagsMap.put(flag.name(), flag.value()); + if (previousValue != null) { + throw new DuplicateCodecFlagForColumnException(field.getDeclaringClass(), field.getName(), annotationClass, flag.name()); + } } return flagsMap; } diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/WrappedHBTable.java b/src/main/java/com/flipkart/hbaseobjectmapper/WrappedHBTable.java index 9875521..b37a57f 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/WrappedHBTable.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/WrappedHBTable.java @@ -1,5 +1,6 @@ package com.flipkart.hbaseobjectmapper; +import com.flipkart.hbaseobjectmapper.exceptions.DuplicateCodecFlagForRowKeyException; import com.flipkart.hbaseobjectmapper.exceptions.ImproperHBTableAnnotationExceptions; import org.apache.hadoop.hbase.TableName; @@ -17,8 +18,11 @@ class WrappedHBTable, T extends HBRecord< private final TableName tableName; private final Map families; // This should evolve to Map + private final Map codecFlags; + private final Class clazz; WrappedHBTable(Class clazz) { + this.clazz = clazz; final HBTable hbTable = clazz.getAnnotation(HBTable.class); if (hbTable == null) { throw new ImproperHBTableAnnotationExceptions.MissingHBTableAnnotationException(String.format("Class %s is missing %s annotation", clazz.getName(), HBTable.class.getSimpleName())); @@ -27,6 +31,7 @@ class WrappedHBTable, T extends HBRecord< throw new ImproperHBTableAnnotationExceptions.EmptyTableNameOnHBTableAnnotationException(String.format("Annotation %s on class %s has empty name", HBTable.class.getName(), clazz.getName())); } tableName = TableName.valueOf(hbTable.name().getBytes()); + codecFlags = toMap(hbTable.rowKeyCodecFlags()); families = new HashMap<>(hbTable.families().length, 1.0f); for (Family family : hbTable.families()) { if (family.name().isEmpty()) { @@ -42,6 +47,17 @@ class WrappedHBTable, T extends HBRecord< } } + private Map toMap(Flag[] codecFlags) { + Map flagsMap = new HashMap<>(codecFlags.length, 1.0f); + for (Flag flag : codecFlags) { + String previousValue = flagsMap.put(flag.name(), flag.value()); + if (previousValue != null) { + throw new DuplicateCodecFlagForRowKeyException(clazz, flag.name()); + } + } + return flagsMap; + } + int getNumVersions(String familyName) { return families.get(familyName); } @@ -58,8 +74,16 @@ TableName getName() { return tableName; } + public Map getCodecFlags() { + return codecFlags; + } + @Override public String toString() { return tableName.getNameAsString(); } + + static , T extends HBRecord> Map getCodecFlags(Class clazz) { + return new WrappedHBTable<>(clazz).codecFlags; + } } diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/codec/BestSuitCodec.java b/src/main/java/com/flipkart/hbaseobjectmapper/codec/BestSuitCodec.java index 6b843c8..4513690 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/codec/BestSuitCodec.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/codec/BestSuitCodec.java @@ -25,12 +25,12 @@ *

* This codec takes the following {@link Flag Flag}s: *

    - *
  • {@link #SERIALISE_AS_STRING}: When passed, it indicates this codec to store field value in it's string representation (e.g. 560034 is serialized into a byte[] that represents the string "560034"). Note that, this flag applies only to fields of data types in point 1 above.
  • + *
  • {@link #SERIALIZE_AS_STRING}: When this flag is "true", it indicates this codec ({@link BestSuitCodec}) to store field/rowkey value in it's string representation (e.g. 560034 is serialized into a byte[] that represents the string "560034"). This flag applies only to fields or rowkeys of data types in point 1 above.
  • *
*/ public class BestSuitCodec implements Codec { - public static final String SERIALISE_AS_STRING = "serializeAsString"; + public static final String SERIALIZE_AS_STRING = "serializeAsString"; private static final Map fromBytesMethodNames = new HashMap() { { @@ -115,7 +115,7 @@ public byte[] serialize(Serializable object, Map flags) throws S return null; Class clazz = object.getClass(); if (toBytesMethods.containsKey(clazz)) { - boolean serializeAsString = isSerializeAsStringOn(flags); + boolean serializeAsString = isSerializeAsStringTrue(flags); try { Method toBytesMethod = toBytesMethods.get(clazz); return serializeAsString ? Bytes.toBytes(String.valueOf(object)) : (byte[]) toBytesMethod.invoke(null, object); @@ -139,17 +139,17 @@ public Serializable deserialize(byte[] bytes, Type type, Map fla if (bytes == null) return null; if (type instanceof Class && fromBytesMethods.containsKey(type)) { - boolean serializeAsString = isSerializeAsStringOn(flags); + boolean serializeAsString = isSerializeAsStringTrue(flags); try { - Serializable fieldValue; + Serializable value; if (serializeAsString) { Constructor constructor = constructors.get(type); - fieldValue = (Serializable) constructor.newInstance(Bytes.toString(bytes)); + value = (Serializable) constructor.newInstance(Bytes.toString(bytes)); } else { Method method = fromBytesMethods.get(type); - fieldValue = (Serializable) method.invoke(null, new Object[]{bytes}); + value = (Serializable) method.invoke(null, new Object[]{bytes}); } - return fieldValue; + return value; } catch (Exception e) { throw new DeserializationException("Could not deserialize byte array into an object using HBase's native methods", e); } @@ -174,7 +174,7 @@ public boolean canDeserialize(Type type) { return objectMapper.canDeserialize(javaType); } - private boolean isSerializeAsStringOn(Map flags) { - return flags != null && flags.get(SERIALISE_AS_STRING) != null && flags.get(SERIALISE_AS_STRING).equalsIgnoreCase("true"); + private static boolean isSerializeAsStringTrue(Map flags) { + return flags != null && flags.get(SERIALIZE_AS_STRING) != null && flags.get(SERIALIZE_AS_STRING).equalsIgnoreCase("true"); } } diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/codec/Codec.java b/src/main/java/com/flipkart/hbaseobjectmapper/codec/Codec.java index 0ca3cb5..cff5a99 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/codec/Codec.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/codec/Codec.java @@ -12,12 +12,13 @@ */ public interface Codec { /** - * Serialize object to a byte[] + * Serializes object to a byte[] * * @param object Object to be serialized - * @param flags Flags for tuning serialization behavior + * @param flags Flags for tuning serialization behavior (Implementations of this method are expected to handle null and empty map in the same way) * @return byte array - this would be used 'as is' in setting the column value in HBase row * @throws SerializationException If serialization fails (e.g. when input object has a field of data type that isn't serializable by this codec) + * @see #deserialize(byte[], Type, Map) */ byte[] serialize(Serializable object, Map flags) throws SerializationException; @@ -25,10 +26,12 @@ public interface Codec { * Deserialize byte[] into an object * * @param bytes byte array that needs to be deserialized - * @param flags Flags for tuning deserialization behavior + * @param flags Flags for tuning deserialization behavior (Implementations of this method are expected to handle null and empty map in the same way) * @param type Java type to which this byte[] needs to be deserialized to - * @return Object + * @return The object * @throws DeserializationException If deserialization fails (e.g. malformed string or definition of a data type used isn't available at runtime) + * @see #serialize(Serializable, Map) + * @see #canDeserialize(Type) */ Serializable deserialize(byte[] bytes, Type type, Map flags) throws DeserializationException; @@ -36,7 +39,8 @@ public interface Codec { * Check whether a specific type can be deserialized using this codec * * @param type Java type - * @return true or false + * @return true (if an object of specified type can be deserialized using this codec) or false + * @see #deserialize(byte[], Type, Map) */ boolean canDeserialize(Type type); diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/codec/JavaObjectStreamCodec.java b/src/main/java/com/flipkart/hbaseobjectmapper/codec/JavaObjectStreamCodec.java index e6d93de..10418de 100644 --- a/src/main/java/com/flipkart/hbaseobjectmapper/codec/JavaObjectStreamCodec.java +++ b/src/main/java/com/flipkart/hbaseobjectmapper/codec/JavaObjectStreamCodec.java @@ -4,6 +4,9 @@ import java.lang.reflect.Type; import java.util.Map; +/** + * Just a reference implementation, kept here for testing purposes. In real world, you should never use this codec. Either use the (default) {@link BestSuitCodec} or write your own. + */ public class JavaObjectStreamCodec implements Codec { /* * @inherit @@ -39,7 +42,7 @@ public Serializable deserialize(byte[] bytes, Type type, Map fla */ @Override public boolean canDeserialize(Type type) { - return true; + return true; // I'm (may be wrongly) assuming ObjectInputStream and ObjectOutputStream works on all objects } public static Serializable deepCopy(Serializable object) { diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/exceptions/DuplicateCodecFlagForColumnException.java b/src/main/java/com/flipkart/hbaseobjectmapper/exceptions/DuplicateCodecFlagForColumnException.java new file mode 100644 index 0000000..ccc3928 --- /dev/null +++ b/src/main/java/com/flipkart/hbaseobjectmapper/exceptions/DuplicateCodecFlagForColumnException.java @@ -0,0 +1,7 @@ +package com.flipkart.hbaseobjectmapper.exceptions; + +public class DuplicateCodecFlagForColumnException extends IllegalArgumentException { + public DuplicateCodecFlagForColumnException(Class recordClass, String fieldName, Class annotationClass, String flagName) { + super(String.format("The @%s annotation on field %s on class %s has duplicate codec flags (See flag %s)", annotationClass, fieldName, recordClass, flagName)); + } +} diff --git a/src/main/java/com/flipkart/hbaseobjectmapper/exceptions/DuplicateCodecFlagForRowKeyException.java b/src/main/java/com/flipkart/hbaseobjectmapper/exceptions/DuplicateCodecFlagForRowKeyException.java new file mode 100644 index 0000000..4804a19 --- /dev/null +++ b/src/main/java/com/flipkart/hbaseobjectmapper/exceptions/DuplicateCodecFlagForRowKeyException.java @@ -0,0 +1,12 @@ +package com.flipkart.hbaseobjectmapper.exceptions; + +import com.flipkart.hbaseobjectmapper.HBRecord; +import com.flipkart.hbaseobjectmapper.HBTable; + +import java.io.Serializable; + +public class DuplicateCodecFlagForRowKeyException extends IllegalArgumentException { + public , T extends HBRecord> DuplicateCodecFlagForRowKeyException(Class clazz, String flagName) { + super(String.format("The %s annotation on %s class has duplicate codec flags. See codec flag '%s'.", HBTable.class.getSimpleName(), clazz.getName(), flagName)); + } +} diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/TestHBObjectMapper.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/TestHBObjectMapper.java index a54f202..9767964 100644 --- a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/TestHBObjectMapper.java +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/TestHBObjectMapper.java @@ -1,9 +1,6 @@ package com.flipkart.hbaseobjectmapper.testcases; -import com.flipkart.hbaseobjectmapper.HBColumn; -import com.flipkart.hbaseobjectmapper.HBColumnMultiVersion; -import com.flipkart.hbaseobjectmapper.HBObjectMapper; -import com.flipkart.hbaseobjectmapper.HBRecord; +import com.flipkart.hbaseobjectmapper.*; import com.flipkart.hbaseobjectmapper.exceptions.*; import com.flipkart.hbaseobjectmapper.testcases.entities.*; import org.apache.hadoop.hbase.client.Put; @@ -15,6 +12,7 @@ import java.io.Serializable; import java.util.*; +import static com.flipkart.hbaseobjectmapper.testcases.TestObjects.validObjects; import static com.flipkart.hbaseobjectmapper.testcases.util.LiteralsUtil.triplet; import static org.junit.Assert.*; @@ -40,14 +38,13 @@ public class TestHBObjectMapper { ); final HBObjectMapper hbMapper = new HBObjectMapper(); - final List validObjs = TestObjects.validObjects; - final Result someResult = hbMapper.writeValueAsResult(validObjs.get(0)); - final Put somePut = hbMapper.writeValueAsPut(validObjs.get(0)); + final Result someResult = hbMapper.writeValueAsResult(validObjects.get(0)); + final Put somePut = hbMapper.writeValueAsPut(validObjects.get(0)); @Test public void testHBObjectMapper() { - for (HBRecord obj : validObjs) { + for (HBRecord obj : validObjects) { System.out.printf("Original object: %s%n", obj); testResult(obj); testResultWithRow(obj); @@ -72,7 +69,7 @@ public void testResult(HBRecord p) { public > void testResultWithRow(HBRecord p) { long start, end; Result result = hbMapper.writeValueAsResult(l(p, p)).get(0); - ImmutableBytesWritable rowKey = hbMapper.rowKeyToIbw(p.composeRowKey()); + ImmutableBytesWritable rowKey = hbMapper.getRowKey(p); start = System.currentTimeMillis(); HBRecord pFromResult = hbMapper.readValue(rowKey, result, p.getClass()); end = System.currentTimeMillis(); @@ -104,7 +101,7 @@ public void testPut(HBRecord p) { public > void testPutWithRow(HBRecord p) { long start, end; Put put = hbMapper.writeValueAsPut(p); - ImmutableBytesWritable rowKey = hbMapper.rowKeyToIbw(p.composeRowKey()); + ImmutableBytesWritable rowKey = hbMapper.getRowKey(p); start = System.currentTimeMillis(); HBRecord pFromPut = hbMapper.readValue(rowKey, put, p.getClass()); end = System.currentTimeMillis(); @@ -114,7 +111,7 @@ public > void testPutWithRow(HBRecord @Test(expected = RowKeyCouldNotBeParsedException.class) public void testInvalidRowKey() { - hbMapper.readValue(hbMapper.rowKeyToIbw("invalid row key"), hbMapper.writeValueAsPut(TestObjects.validObjects.get(0)), Citizen.class); + hbMapper.readValue(hbMapper.toIbw("invalid row key"), hbMapper.writeValueAsPut(validObjects.get(0)), Citizen.class); } @Test @@ -226,48 +223,49 @@ public void testEmptyPuts() { assertNull("Null Put object should return null", nullCitizen); } - @Test - public void testGetRowKey() { - ImmutableBytesWritable rowKey = hbMapper.getRowKey(new HBRecord() { - @Override - public String composeRowKey() { - return "rowkey"; - } + @HBTable(name = "dummy1") + private static class DummyRowKeyClass implements HBRecord { + private String rowKey; - @Override - public void parseRowKey(String rowKey) { + public DummyRowKeyClass(String rowKey) { + this.rowKey = rowKey; + } - } - }); - assertEquals("Row keys don't match", rowKey, hbMapper.rowKeyToIbw("rowkey")); - try { - hbMapper.getRowKey(new HBRecord() { - @Override - public String composeRowKey() { - return null; - } + @Override + public String composeRowKey() { + return rowKey; + } + + @Override + public void parseRowKey(String rowKey) { + this.rowKey = rowKey; + } + } - @Override - public void parseRowKey(String rowKey) { + @HBTable(name = "dummy2") + private class RowKeyComposeThrowsExceptionClass implements HBRecord { + @Override + public String composeRowKey() { + throw new RuntimeException("Some blah"); + } + + @Override + public void parseRowKey(String rowKey) { + + } + } - } - }); + @Test + public void testGetRowKey() { + assertEquals("Row keys don't match", hbMapper.getRowKey(new DummyRowKeyClass("rowkey")), hbMapper.toIbw("rowkey")); + try { + hbMapper.getRowKey(new DummyRowKeyClass(null)); fail("null row key should've thrown a " + RowKeyCantBeEmptyException.class.getName()); } catch (RowKeyCantBeEmptyException ignored) { } try { - hbMapper.getRowKey(new HBRecord() { - @Override - public String composeRowKey() { - throw new RuntimeException("Some blah"); - } - - @Override - public void parseRowKey(String rowKey) { - - } - }); + hbMapper.getRowKey(new RowKeyComposeThrowsExceptionClass()); fail("If row key can't be composed, an " + RowKeyCantBeComposedException.class.getName() + " was expected"); } catch (RowKeyCantBeComposedException ignored) { @@ -275,7 +273,7 @@ public void parseRowKey(String rowKey) { try { HBRecord nullRecord = null; hbMapper.getRowKey(nullRecord); - fail("If object is null, a " + NullPointerException.class.getName() + " was expected"); + fail("If object is null, a " + NullPointerException.class.getSimpleName() + " was expected"); } catch (NullPointerException ignored) { } diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/TestObjects.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/TestObjects.java index da4d06b..f0176ee 100644 --- a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/TestObjects.java +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/TestObjects.java @@ -5,10 +5,7 @@ import com.flipkart.hbaseobjectmapper.exceptions.AllHBColumnFieldsNullException; import com.flipkart.hbaseobjectmapper.exceptions.FieldAnnotatedWithHBColumnMultiVersionCantBeEmpty; import com.flipkart.hbaseobjectmapper.exceptions.HBRowKeyFieldCantBeNullException; -import com.flipkart.hbaseobjectmapper.testcases.entities.Citizen; -import com.flipkart.hbaseobjectmapper.testcases.entities.Contact; -import com.flipkart.hbaseobjectmapper.testcases.entities.Dependents; -import com.flipkart.hbaseobjectmapper.testcases.entities.Employee; +import com.flipkart.hbaseobjectmapper.testcases.entities.*; import org.javatuples.Triplet; import java.math.BigDecimal; @@ -39,11 +36,16 @@ public class TestObjects { new Citizen("IND", 105, "Nilesh", null, null, null, null, null, null, null, null, null, null, new Dependents(null, Arrays.asList(141, 142)), null) ); - public static final List validEmployeeObjectsNoVersion = asList( + public static final List validEmployeeObjects = asList( new Employee(1L, "Raja", (short) 0), new Employee(2L, "Ramnik", (short) 8) ); + public static final List validStudentObjects = asList( + new Student(1, "Ishan"), + new Student(2, "Akshit") + ); + private static List asList(HBRecord... hbRecords) { List output = new ArrayList<>(); Collections.addAll(output, hbRecords); @@ -75,7 +77,8 @@ private static List asList(HBRecord... hbRecords) { public static final List validObjects = new ArrayList() { { addAll(TestObjects.validCitizenObjects); - addAll(TestObjects.validEmployeeObjectsNoVersion); + addAll(TestObjects.validEmployeeObjects); + addAll(TestObjects.validStudentObjects); } }; diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/codec/TestCodecs.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/codec/TestCodecs.java index a2d9657..e36dd3b 100644 --- a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/codec/TestCodecs.java +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/codec/TestCodecs.java @@ -38,8 +38,13 @@ public void testBestSuitCodec() { public void testWithCodec(Codec codec) { HBObjectMapper hbObjectMapper = new HBObjectMapper(codec); try { - for (HBRecord object : TestObjects.validObjects) { - Class objectClass = object.getClass(); + for (HBRecord record : TestObjects.validObjects) { + Class objectClass = record.getClass(); + final Serializable rowKey = record.composeRowKey(); + final Map rowKeyCodecFlags = toMap(objectClass.getAnnotation(HBTable.class).rowKeyCodecFlags()); + byte[] bytes = codec.serialize(rowKey, rowKeyCodecFlags); + Serializable deserializedRowKey = codec.deserialize(bytes, rowKey.getClass(), rowKeyCodecFlags); + assertEquals(String.format("Row key got corrupted after serialization and deserialization, for this record:\n%s\n", record), rowKey, deserializedRowKey); for (Object re : hbObjectMapper.getHBFields(objectClass).entrySet()) { Map.Entry e = (Map.Entry) re; String fieldName = e.getKey(); @@ -47,15 +52,15 @@ public void testWithCodec(Codec codec) { field.setAccessible(true); if (field.isAnnotationPresent(HBColumnMultiVersion.class)) { final Type actualFieldType = ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[1]; - Object fieldValuesMap = field.get(object); + Object fieldValuesMap = field.get(record); if (fieldValuesMap == null) continue; for (NavigableMap.Entry entry : ((NavigableMap) fieldValuesMap).entrySet()) { - verifySerDe(codec, objectClass.getSimpleName() + "." + fieldName, actualFieldType, (Serializable) entry.getValue(), toMap(field.getAnnotation(HBColumnMultiVersion.class).codecFlags())); + verifyFieldSerDe(codec, objectClass.getSimpleName() + "." + fieldName, actualFieldType, (Serializable) entry.getValue(), toMap(field.getAnnotation(HBColumnMultiVersion.class).codecFlags())); } } else { - Serializable fieldValue = (Serializable) field.get(object); - verifySerDe(codec, objectClass.getSimpleName() + "." + fieldName, field.getGenericType(), fieldValue, toMap(field.getAnnotation(HBColumn.class).codecFlags())); + Serializable fieldValue = (Serializable) field.get(record); + verifyFieldSerDe(codec, objectClass.getSimpleName() + "." + fieldName, field.getGenericType(), fieldValue, toMap(field.getAnnotation(HBColumn.class).codecFlags())); } } } @@ -73,6 +78,8 @@ public void testWithCodec(Codec codec) { } private Map toMap(Flag[] codecFlags) { + if (codecFlags == null) + return null; Map flagsMap = new HashMap<>(); for (Flag flag : codecFlags) { flagsMap.put(flag.name(), flag.value()); @@ -81,7 +88,7 @@ private Map toMap(Flag[] codecFlags) { } - private void verifySerDe(Codec codec, String fieldFullName, Type type, Serializable fieldValue, Map flags) throws SerializationException, DeserializationException { + private void verifyFieldSerDe(Codec codec, String fieldFullName, Type type, Serializable fieldValue, Map flags) throws SerializationException, DeserializationException { byte[] bytes = codec.serialize(fieldValue, flags); Serializable deserializedFieldValue = codec.deserialize(bytes, type, flags); assertEquals(String.format("Field %s got corrupted after serialization and deserialization of it's value:\n%s\n", fieldFullName, fieldValue), fieldValue, deserializedFieldValue); @@ -119,9 +126,14 @@ public boolean canDeserialize(Type type) { @Test public void testDeserializationFailure() { - HBObjectMapper hbObjectMapper = new HBObjectMapper(); + HBObjectMapper hbObjectMapper = new HBObjectMapper(new BestSuitCodec() { + + @Override + public Serializable deserialize(byte[] bytes, Type type, Map flags) throws DeserializationException { + throw new DeserializationException("Dummy exception", null); + } + }); Put put = hbObjectMapper.writeValueAsPut(TestObjects.validObjects.get(0)); - put.addColumn(Bytes.toBytes("optional"), Bytes.toBytes("age"), new byte[]{0}); // Corrupt serialized data try { System.out.println(hbObjectMapper.readValue(put, Citizen.class)); fail("Trying to serialize corrupt data should've thrown " + CodecException.class.getSimpleName()); diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/entities/Citizen.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/entities/Citizen.java index a0d991a..ceed31a 100644 --- a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/entities/Citizen.java +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/entities/Citizen.java @@ -38,7 +38,7 @@ public class Citizen implements HBRecord { private Long f3; @HBColumn(family = "optional", column = "f4") private BigDecimal f4; - @HBColumn(family = "optional", column = "pincode", codecFlags = {@Flag(name = BestSuitCodec.SERIALISE_AS_STRING, value = "true")}) + @HBColumn(family = "optional", column = "pincode", codecFlags = {@Flag(name = BestSuitCodec.SERIALIZE_AS_STRING, value = "true")}) private Integer pincode; @HBColumnMultiVersion(family = "optional", column = "phone_number") private NavigableMap phoneNumber; diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/entities/Student.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/entities/Student.java new file mode 100644 index 0000000..d4986f1 --- /dev/null +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/entities/Student.java @@ -0,0 +1,41 @@ +package com.flipkart.hbaseobjectmapper.testcases.entities; + +import com.flipkart.hbaseobjectmapper.*; +import com.flipkart.hbaseobjectmapper.codec.BestSuitCodec; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@SuppressWarnings("unused") +@ToString +@EqualsAndHashCode +@HBTable(name = "students", families = {@Family(name = "a")}, rowKeyCodecFlags = {@Flag(name = BestSuitCodec.SERIALIZE_AS_STRING, value = "true")}) +public class Student implements HBRecord { + @HBRowKey + private Integer studentId; + + @HBColumn(family = "a", column = "name") + private String name; + + @Override + public Integer composeRowKey() { + return studentId; + } + + @Override + public void parseRowKey(Integer rowKey) { + studentId = rowKey; + } + + public Integer getStudentId() { + return studentId; + } + + public Student() { + + } + + public Student(Integer studentId, String name) { + this.studentId = studentId; + this.name = name; + } +} diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/TestCitizenMR.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/TestCitizenMR.java index c9fc667..1b7ec39 100644 --- a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/TestCitizenMR.java +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/TestCitizenMR.java @@ -46,7 +46,7 @@ public void testSingleMapper() throws IOException { hbObjectMapper.writeValueAsResult(citizen) ) - .withOutput(hbObjectMapper.rowKeyToIbw("key"), new IntWritable(citizen.getAge())) + .withOutput(hbObjectMapper.toIbw("key"), new IntWritable(citizen.getAge())) .runTest(); } @@ -61,7 +61,7 @@ public void testMultipleMappers() throws IOException { @Test public void testReducer() throws Exception { - Pair reducerResult = citizenReduceDriver.withInput(hbObjectMapper.rowKeyToIbw("key"), Arrays.asList(new IntWritable(1), new IntWritable(5))).run().get(0); + Pair reducerResult = citizenReduceDriver.withInput(hbObjectMapper.toIbw("key"), Arrays.asList(new IntWritable(1), new IntWritable(5))).run().get(0); CitizenSummary citizenSummary = hbObjectMapper.readValue(reducerResult.getFirst(), (Put) reducerResult.getSecond(), CitizenSummary.class); assertEquals("Unexpected result from CitizenReducer", (Float) 3.0f, citizenSummary.getAverageAge()); } diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/TestEmployeeMR.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/TestEmployeeMR.java index bf83c6c..992aebb 100644 --- a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/TestEmployeeMR.java +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/TestEmployeeMR.java @@ -39,20 +39,20 @@ public void setUp() { @Test public void testSingleMapper() throws IOException { - Employee employee = (Employee) TestObjects.validEmployeeObjectsNoVersion.get(1); + Employee employee = (Employee) TestObjects.validEmployeeObjects.get(1); employeeMapDriver .withInput( hbObjectMapper.getRowKey(employee), hbObjectMapper.writeValueAsResult(employee) ) - .withOutput(hbObjectMapper.rowKeyToIbw("key"), new IntWritable(employee.getReporteeCount())) + .withOutput(hbObjectMapper.toIbw("key"), new IntWritable(employee.getReporteeCount())) .runTest(); } @Test public void testMultipleMappers() throws IOException { - List> hbRecords = MRTestUtil.writeValueAsRowKeyResultPair(TestObjects.validEmployeeObjectsNoVersion); + List> hbRecords = MRTestUtil.writeValueAsRowKeyResultPair(TestObjects.validEmployeeObjects); List> mapResults = employeeMapDriver.withAll(hbRecords).run(); for (Pair mapResult : mapResults) { assertEquals("Rowkey got corrupted in Mapper", Bytes.toString(mapResult.getFirst().get()), "key"); @@ -61,7 +61,7 @@ public void testMultipleMappers() throws IOException { @Test public void testReducer() throws Exception { - Pair reducerResult = employeeReduceDriver.withInput(hbObjectMapper.rowKeyToIbw("key"), Arrays.asList(new IntWritable(1), new IntWritable(5), new IntWritable(0))).run().get(0); + Pair reducerResult = employeeReduceDriver.withInput(hbObjectMapper.toIbw("key"), Arrays.asList(new IntWritable(1), new IntWritable(5), new IntWritable(0))).run().get(0); EmployeeSummary employeeSummary = hbObjectMapper.readValue(reducerResult.getFirst(), (Put) reducerResult.getSecond(), EmployeeSummary.class); assertEquals("Unexpected result from EmployeeMapper", (Float) 2.0f, employeeSummary.getAverageReporteeCount()); } diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/samples/CitizenMapper.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/samples/CitizenMapper.java index aa60158..b46fcd5 100644 --- a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/samples/CitizenMapper.java +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/samples/CitizenMapper.java @@ -17,6 +17,6 @@ protected void map(ImmutableBytesWritable key, Result value, Context context) th Citizen e = hbObjectMapper.readValue(key, value, Citizen.class); if (e.getAge() == null) return; - context.write(hbObjectMapper.rowKeyToIbw("key"), new IntWritable(e.getAge().intValue())); + context.write(hbObjectMapper.toIbw("key"), new IntWritable(e.getAge().intValue())); } } diff --git a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/samples/EmployeeMapper.java b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/samples/EmployeeMapper.java index 508de5c..b068258 100644 --- a/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/samples/EmployeeMapper.java +++ b/src/test/java/com/flipkart/hbaseobjectmapper/testcases/mr/samples/EmployeeMapper.java @@ -16,6 +16,6 @@ public class EmployeeMapper extends TableMapper 0) - context.write(hbObjectMapper.rowKeyToIbw("key"), new IntWritable(e.getReporteeCount().intValue())); + context.write(hbObjectMapper.toIbw("key"), new IntWritable(e.getReporteeCount().intValue())); } }