Skip to content

Commit

Permalink
Merge branch origin/v18:
Browse files Browse the repository at this point in the history
 * Row key codec flags
 * AbstractHBDAO with Codec as input
 * Javadoc improvements
 * Test cases improvements
  • Loading branch information
m-manu committed Sep 22, 2017
1 parent 07e1a12 commit 1e85caa
Show file tree
Hide file tree
Showing 28 changed files with 424 additions and 226 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Citizen implements HBRecord<String> {
@HBColumnMultiVersion(family = "tracked", column = "phone_number")
private NavigableMap<Long, Integer> phoneNumber;

@HBColumn(family = "optional", column = "pincode", codecFlags = {@Flag(name = BestSuitCodec.SERIALISE_AS_STRING, value = "true")})
@HBColumn(family = "optional", column = "pincode", codecFlags = {@Flag(name = BestSuitCodec.SERIALIZE_AS_STRING, value = "true")})
private Integer pincode;

@Override
Expand Down Expand Up @@ -79,7 +79,7 @@ See source files [Citizen.java](./src/test/java/com/flipkart/hbaseobjectmapper/t
* serializes `null` as `null`
* To control/modify serialization/deserialization behavior, you may define your own codec (by implementing the `Codec` interface) or you may extend the default codec (`BestSuitCodec`).
* The optional parameter `codecFlag` (supported by both `@HBColumn` and `@HBColumnMultiVersion` annotations) can be used to pass custom flags to the underlying codec. (e.g. You may write your codec to serialize field `Integer id` in `Citizen` class differently from field `Integer id` in `Employee` class)
* The default codec class `BestSuitCodec` takes a flag `BestSuitCodec.SERIALISE_AS_STRING`, whose value is "serializeAsString" (as in the above `Citizen` class example). When this flag is set to `true` on a field, the default codec serializes that field (even numerical fields) as `String`s.
* The default codec class `BestSuitCodec` takes a flag `BestSuitCodec.SERIALIZE_AS_STRING`, whose value is "serializeAsString" (as in the above `Citizen` class example). When this flag is set to `true` on a field, the default codec serializes that field (even numerical fields) as `String`s.
* Your custom codec may take other such flags to customize serialization/deserialization behavior at a class field level.

## MapReduce use-cases
Expand Down Expand Up @@ -246,7 +246,7 @@ Add below entry within the `dependencies` section of your `pom.xml`:
<dependency>
<groupId>com.flipkart</groupId>
<artifactId>hbase-object-mapper</artifactId>
<version>1.7</version>
<version>1.8</version>
</dependency>
```
See artifact details: [com.flipkart:hbase-object-mapper on **Maven Central**](http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22com.flipkart%22%20AND%20a%3A%22hbase-object-mapper%22) or
Expand All @@ -255,7 +255,7 @@ See artifact details: [com.flipkart:hbase-object-mapper on **Maven Central**](ht
To build this project, follow below steps:

* Do a `git clone` of this repository
* Checkout latest stable version `git checkout v1.7`
* Checkout latest stable version `git checkout v1.8`
* Execute `mvn clean install` from shell

Currently, projects that use this library are running on [Hortonworks Data Platform v2.4](https://hortonworks.com/blog/apache-hadoop-2-4-0-released/) (corresponds to Hadoop 2.7 and HBase 1.1). However, if you're using a different distribution of Hadoop (like [Cloudera](http://www.cloudera.com/)) or if you are using a different version of Hadoop, you may change the versions in [pom.xml](./pom.xml) to desired ones and build the project.
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<name>HBase Object Mapper</name>
<description>
Java-annotation based compact utility library for HBase that helps you:
A Java-annotation based compact utility library for HBase that helps you:
[1] convert objects of your bean-like classes to HBase rows and vice-versa, for use in writing MapReduce jobs
for HBase tables and writing high-quality unit test cases
[2] define 'Data Access Object' classes for random access of HBase rows
</description>
<modelVersion>4.0.0</modelVersion>
<groupId>com.flipkart</groupId>
<artifactId>hbase-object-mapper</artifactId>
<version>1.7</version>
<version>1.8</version>
<url>https://flipkart-incubator.github.io/hbase-object-mapper/</url>
<scm>
<url>https://github.com/flipkart-incubator/hbase-object-mapper</url>
Expand All @@ -33,7 +33,7 @@
<role>development</role>
</roles>
<organization>Flipkart</organization>
<organizationUrl>http://www.flipkart.com/</organizationUrl>
<organizationUrl>https://www.flipkart.com</organizationUrl>
</developer>
</developers>
<repositories>
Expand Down
59 changes: 44 additions & 15 deletions src/main/java/com/flipkart/hbaseobjectmapper/AbstractHBDAO.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.flipkart.hbaseobjectmapper;

import com.flipkart.hbaseobjectmapper.codec.Codec;
import com.flipkart.hbaseobjectmapper.exceptions.FieldNotMappedToHBaseColumnException;
import com.google.common.reflect.TypeToken;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

Expand All @@ -19,19 +21,25 @@
/**
* A <i>Data Access Object</i> class that enables simple random access (read/write) of HBase rows.
* <p>
* Please note: This class relies heavily on HBase client library's {@link HBTable} class, which isn't thread-safe (See: <a href="https://issues.apache.org/jira/browse/HBASE-17361">HBASE-17361</a>). Hence, this class isn't thread-safe.
* Please note: This class relies heavily on HBase client library's {@link Table} interface, whose implementations aren't thread-safe. Hence, this class isn't thread-safe.
* </p>
* <p>
* To learn more about thread-safe access to HBase, see conversation here: <a href="https://issues.apache.org/jira/browse/HBASE-17361">HBASE-17361</a>
* </p>
*
* @param <R> Data type of row key (must be '{@link Comparable} with itself' and must be {@link Serializable})
* @param <T> Entity type that maps to an HBase row (this type must have implemented {@link HBRecord} interface)
* @see Connection#getTable(TableName)
* @see Table
* @see HTable
*/
public abstract class AbstractHBDAO<R extends Serializable & Comparable<R>, T extends HBRecord<R>> implements Closeable {

/**
* Default number of versions to fetch
* Default number of versions to fetch. Change this to {@link Integer#MAX_VALUE} if you want the default behavior to be 'all versions'.
*/
private static final int DEFAULT_NUM_VERSIONS = 1;
protected static final HBObjectMapper hbObjectMapper = new HBObjectMapper();
protected final HBObjectMapper hbObjectMapper;
protected final Connection connection;
protected final Table table;
protected final Class<R> rowKeyClass;
Expand All @@ -40,27 +48,44 @@ public abstract class AbstractHBDAO<R extends Serializable & Comparable<R>, T ex
private final Map<String, Field> fields;

/**
* Constructs a data access object. Classes extending this class <strong>must</strong> call this constructor using <code>super</code>
* Constructs a data access object using a custom codec. Classes extending this class <strong>must</strong> call this constructor using <code>super</code>.
* <p>
* <b>Note: </b>If you want to use the default codec, just use the constructor {@link #AbstractHBDAO(Configuration)}
* </p>
*
* @param configuration Hadoop configuration
* @param codec Your custom codec
* @throws IOException Exceptions thrown by HBase
* @throws IllegalStateException Annotation(s) on base entity may be incorrect
*/
@SuppressWarnings("unchecked")
protected AbstractHBDAO(Configuration configuration) throws IOException {
protected AbstractHBDAO(Configuration configuration, Codec codec) throws IOException {
hbRecordClass = (Class<T>) new TypeToken<T>(getClass()) {
}.getRawType();
rowKeyClass = (Class<R>) new TypeToken<R>(getClass()) {
}.getRawType();
if (hbRecordClass == null || rowKeyClass == null) {
throw new IllegalStateException(String.format("Unable to resolve HBase record/rowkey type (record class is resolving to %s and rowkey class is resolving to %s)", hbRecordClass, rowKeyClass));
}
hbObjectMapper = HBObjectMapperFactory.construct(codec);
hbTable = new WrappedHBTable<>(hbRecordClass);
connection = ConnectionFactory.createConnection(configuration);
table = connection.getTable(hbTable.getName());
fields = hbObjectMapper.getHBFields(hbRecordClass);
}

/**
* Constructs a data access object. Classes extending this class <strong>must</strong> call this constructor using <code>super</code>.
*
* @param configuration Hadoop configuration
* @throws IOException Exceptions thrown by HBase
* @throws IllegalStateException Annotation(s) on base entity may be incorrect
*/
@SuppressWarnings("unchecked")
protected AbstractHBDAO(Configuration configuration) throws IOException {
this(configuration, null);
}

/**
* Get specified number of versions of a row from HBase table by it's row key
*
Expand All @@ -70,7 +95,7 @@ protected AbstractHBDAO(Configuration configuration) throws IOException {
* @throws IOException When HBase call fails
*/
public T get(R rowKey, int versions) throws IOException {
Result result = this.table.get(new Get(hbObjectMapper.rowKeyToBytes(rowKey)).setMaxVersions(versions));
Result result = this.table.get(new Get(toBytes(rowKey)).setMaxVersions(versions));
return hbObjectMapper.readValue(rowKey, result, hbRecordClass);
}

Expand All @@ -97,7 +122,7 @@ public T get(R rowKey) throws IOException {
public T[] get(R[] rowKeys, int versions) throws IOException {
List<Get> gets = new ArrayList<>(rowKeys.length);
for (R rowKey : rowKeys) {
gets.add(new Get(hbObjectMapper.rowKeyToBytes(rowKey)).setMaxVersions(versions));
gets.add(new Get(toBytes(rowKey)).setMaxVersions(versions));
}
Result[] results = this.table.get(gets);
@SuppressWarnings("unchecked") T[] records = (T[]) Array.newInstance(hbRecordClass, rowKeys.length);
Expand Down Expand Up @@ -129,7 +154,7 @@ public T[] get(R[] rowKeys) throws IOException {
public List<T> get(List<R> rowKeys, int versions) throws IOException {
List<Get> gets = new ArrayList<>(rowKeys.size());
for (R rowKey : rowKeys) {
gets.add(new Get(hbObjectMapper.rowKeyToBytes(rowKey)).setMaxVersions(versions));
gets.add(new Get(toBytes(rowKey)).setMaxVersions(versions));
}
Result[] results = this.table.get(gets);
List<T> records = new ArrayList<>(rowKeys.size());
Expand Down Expand Up @@ -160,7 +185,7 @@ public List<T> get(List<R> rowKeys) throws IOException {
* @throws IOException When HBase call fails
*/
public List<T> get(R startRowKey, R endRowKey, int versions) throws IOException {
Scan scan = new Scan(hbObjectMapper.rowKeyToBytes(startRowKey), hbObjectMapper.rowKeyToBytes(endRowKey)).setMaxVersions(versions);
Scan scan = new Scan(toBytes(startRowKey), toBytes(endRowKey)).setMaxVersions(versions);
ResultScanner scanner = table.getScanner(scan);
List<T> records = new ArrayList<>();
for (Result result : scanner) {
Expand Down Expand Up @@ -220,7 +245,7 @@ public List<R> persist(List<? extends HBRecord<R>> records) throws IOException {
* @throws IOException When HBase call fails
*/
public void delete(R rowKey) throws IOException {
Delete delete = new Delete(hbObjectMapper.rowKeyToBytes(rowKey));
Delete delete = new Delete(toBytes(rowKey));
this.table.delete(delete);
}

Expand All @@ -243,7 +268,7 @@ public void delete(HBRecord<R> record) throws IOException {
public void delete(R[] rowKeys) throws IOException {
List<Delete> deletes = new ArrayList<>(rowKeys.length);
for (R rowKey : rowKeys) {
deletes.add(new Delete(hbObjectMapper.rowKeyToBytes(rowKey)));
deletes.add(new Delete(toBytes(rowKey)));
}
this.table.delete(deletes);
}
Expand All @@ -257,7 +282,7 @@ public void delete(R[] rowKeys) throws IOException {
public void delete(List<? extends HBRecord<R>> records) throws IOException {
List<Delete> deletes = new ArrayList<>(records.size());
for (HBRecord<R> record : records) {
deletes.add(new Delete(hbObjectMapper.rowKeyToBytes(record.composeRowKey())));
deletes.add(new Delete(toBytes(record.composeRowKey())));
}
this.table.delete(deletes);
}
Expand Down Expand Up @@ -316,7 +341,7 @@ private void populateFieldValuesToMap(Field field, Result result, Map<R, Navigab
List<Cell> cells = result.getColumnCells(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column()));
for (Cell cell : cells) {
Type fieldType = hbObjectMapper.getFieldType(field, hbColumn.isMultiVersioned());
final R rowKey = hbObjectMapper.bytesToRowKey(CellUtil.cloneRow(cell), (Class<T>) field.getDeclaringClass());
final R rowKey = hbObjectMapper.bytesToRowKey(CellUtil.cloneRow(cell), hbTable.getCodecFlags(), (Class<T>) field.getDeclaringClass());
if (!map.containsKey(rowKey)) {
map.put(rowKey, new TreeMap<Long, Object>());
}
Expand Down Expand Up @@ -391,7 +416,7 @@ public NavigableMap<R, NavigableMap<Long, Object>> fetchFieldValues(R startRowKe
Field field = getField(fieldName);
WrappedHBColumn hbColumn = new WrappedHBColumn(field);
validateFetchInput(field, hbColumn);
Scan scan = new Scan(hbObjectMapper.rowKeyToBytes(startRowKey), hbObjectMapper.rowKeyToBytes(endRowKey));
Scan scan = new Scan(toBytes(startRowKey), toBytes(endRowKey));
scan.addColumn(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column()));
scan.setMaxVersions(versions);
ResultScanner scanner = table.getScanner(scan);
Expand Down Expand Up @@ -430,7 +455,7 @@ public Map<R, NavigableMap<Long, Object>> fetchFieldValues(R[] rowKeys, String f
validateFetchInput(field, hbColumn);
List<Get> gets = new ArrayList<>(rowKeys.length);
for (R rowKey : rowKeys) {
Get get = new Get(hbObjectMapper.rowKeyToBytes(rowKey));
Get get = new Get(toBytes(rowKey));
get.setMaxVersions(versions);
get.addColumn(Bytes.toBytes(hbColumn.family()), Bytes.toBytes(hbColumn.column()));
gets.add(get);
Expand All @@ -443,6 +468,10 @@ public Map<R, NavigableMap<Long, Object>> fetchFieldValues(R[] rowKeys, String f
return map;
}

private byte[] toBytes(R rowKey) {
return hbObjectMapper.rowKeyToBytes(rowKey, hbTable.getCodecFlags());
}

private void validateFetchInput(Field field, WrappedHBColumn hbColumn) {
if (!hbColumn.isPresent()) {
throw new FieldNotMappedToHBaseColumnException(hbRecordClass, field.getName());
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/com/flipkart/hbaseobjectmapper/Family.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.flipkart.hbaseobjectmapper;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* Represents a column family in HBase
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Target(TYPE)
@Retention(RUNTIME)
public @interface Family {
/**
* Column family name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.flipkart.hbaseobjectmapper;

import org.apache.hadoop.hbase.util.Pair;

class FamilyAndColumn extends Pair<String, String> {
FamilyAndColumn(String family, String column) {
super(family, column);
}
}
9 changes: 5 additions & 4 deletions src/main/java/com/flipkart/hbaseobjectmapper/Flag.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

import com.flipkart.hbaseobjectmapper.codec.Codec;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* A flag for {@link Codec Codec} (specify parameter name and value)
* <p>
* This is to be used exclusively for input to {@link HBColumn#codecFlags() codecFlags} parameter of {@link HBColumn} and {@link HBColumnMultiVersion} annotations
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Target(FIELD)
@Retention(RUNTIME)
public @interface Flag {
String name();

Expand Down
9 changes: 5 additions & 4 deletions src/main/java/com/flipkart/hbaseobjectmapper/HBColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
import com.flipkart.hbaseobjectmapper.codec.Codec;

import java.io.Serializable;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Type;
import java.util.Map;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* Maps an entity field to an HBase column
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Target(FIELD)
@Retention(RUNTIME)
public @interface HBColumn {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@
import com.flipkart.hbaseobjectmapper.codec.Codec;

import java.io.Serializable;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Type;
import java.util.Map;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* Maps an entity field of type <code>NavigableMap&lt;Long, T&gt;</code> to an HBase column whose data type is represented as data type <code>T</code>.
* <p>
* As the name explains, this annotation is the multi-version variant of {@link HBColumn}.
* <p>
* <b>Please note</b>: <code>T</code> must be {@link Serializable}
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Target(FIELD)
@Retention(RUNTIME)
public @interface HBColumnMultiVersion {

/**
Expand Down
Loading

0 comments on commit 1e85caa

Please sign in to comment.