October 31, 2014

The How To Of HBase Coprocessors

What is HBase™? HBase is a column oriented non-relational big data database. HBase is a distributed, scalable, reliable, and versioned storage system capable of providing random read/write access in real-time. It was modeled after Google’s Bigtable and just as Bigtable is built on top of Google File System, in the same manner HBase is built on HDFS (Hadoop Distributed File System). This means it is tightly integrated with Hadoop and provides base classes for writing MapReduce jobs for HBase.

HBase is capable of hosting large tables with billions of rows and millions of columns with generally millisecond SLA (Service Level Agreement). One of the key features of HBase is Coprocessor, the premise of this blog post. I am assuming readers would be familiar with HBase and also with the various terms like Column Family, HFile, Region, and RegionServer. If not, an elaboration of each is provided at the conclusion of this blog.

What is Coprocessor? Simply stated, Coprocessor is a framework that provides an easy way to run your custom code on Region Server.

When working with any data store (like RDBMS or HBase) you fetch the data (in case of RDBMS you may use query and in case of HBase you use either Get or Scan). To fetch only relevant data you filter it (for RDBMS you put conditions in ‘WHERE’ clause and in HBase you use Filters). After fetching the desired data, you can perform your business computation on the data.

This scenario is close to ideal for “small data,” like a few thousand rows with a bunch of columns. Now imagine a scenario where there are billions of rows and millions of columns and you want to perform some computation which requires all the data, like calculating average or sum. Even if you are interested in just a few columns, you still have to fetch all the rows. There are a few drawbacks in this approach as described below:

  1. In this approach the data transfer (from data store to client side) will become the bottleneck, and the time required to complete the operation is limited by the rate at which data transfer is taking place.
  2. It’s not always possible to hold this much data in memory and perform computation.
  3. Bandwidth is one of the most precious resources in any data center. Operations like this will severely impact the performance of your cluster.
  4. Your client code is becoming thick as you are maintaining the code for calculating average or summation on client side. Not a major drawback when talking of severe issues like performance/bandwidth but still worth giving consideration.

In a scenario like this it’s better to move the computation to the data itself; just like stored procedure (a better analogy is MapReduce model). Coprocessor helps you achieve this but you can do more than that. To give an idea of Coprocessor’s capabilities, different people give different analogies. The three most famous analogies for Coprocessor present in the industry are:

  1. Triggers and Stored Procedure: This is most common analogy that you will find for Coprocessor. (The official document uses this analogy). Observer Coprocessor
    (discussed below) is compared to triggers because like triggers they execute your custom code when certain event occurs (like Get or Put etc.). Similarly Endpoints Coprocessor (discussed below) is compared to the stored procedures and you can perform custom computation on data directly inside the region server.
  2. MapReduce: As in MapReduce you move the computation to the data in the same way. Coprocessor executes your custom computation directly on Region Servers, i.e. where data resides. That’s why some people compare Coprocessor to small MapReduce jobs.
  3. AOP: Some people compare it to Aspect Oriented Programming (AOP). As in AOP, you apply advice by intercepting the request then running some custom code (probably cross-cutting) and then forwarding the request on its path as if nothing happened (or even return it back). Similarly in Coprocessor you have this facility of intercepting the request and running custom code and then forwarding it on its path (or returning it).

Actually the Coprocessor derives its roots from Google™ Bigtable™ but it deviates from it largely in its design. Currently there are efforts going in the HBase community to bridge this gap. For more information see JIRA ticket HBASE-4047.

In HBase, to implement a Coprocessor certain steps must be followed as described below:

  1. Either your class should extend one of the Coprocessor classes (like BaseRegionObserver) or it should implement Coprocessor interfaces (like Coprocessor, CoprocessorService).
  2. Load the Coprocessor: Currently there are two ways to load the Coprocessor. One is static (i.e. loading from configuration) and the other is dynamic (i.e. loading from table descriptor either through Java code or through ‘hbase shell’). Both are discussed below in detail.
  3. Finally your client-side code to call the Coprocessor. This is the easiest step, as HBase handles the Coprocessor transparently and you don’t have to do much to call the Coprocessor.

Coprocessors are not designed to be used by the end users but by developers. Coprocessors are executed directly on region server; therefore a faulty/malicious code can bring your region server down. Currently there is no mechanism to prevent this, but there are efforts going on for this. For more, see JIRA ticketHBASE-4047.

Coprocessor can be broadly divided into two categories – Observer and Endpoint – and each one is discussed separately:

1. Observer Coprocessor: As stated above, these are just like database triggers, i.e. they execute your custom code on the occurrence of certain events. If you prefer (or if you are from Java background) you can also visualize it like advice (before and after only). Coprocessors allow you to hook your custom code in two places during the lifecycle of the event. One is just before the occurrence of the event (just like before advice in AOP). For example, it will allow your custom code to run just before the ‘Put’ operation. All methods providing this feature will start with the prefix ‘pre.’ For example, if you want to your code to be executed before the put operation then you should override following method of RegionObserver class. We will walk through a working example after this boring introduction. ☺

public void prePut (final ObserverContext e, final Put put, final WALEdit edit,final Durability durability) throws IOException {
}

Similarly, the Observer Coprocessor also provides hooks for your code to get executed just after the occurrence of the event (similar to after advice in AOP terminology). These methods will start with the prefix ‘post.’ For example, if you want your code to be executed after the ‘Put’ operation, you should override following method:

public void postPut(final ObserverContext e, final Put put, final WALEdit edit, final Durability durability) throws IOException { }

In short, the following conventions are followed:

Override preXXX() if you want your code to be executed before the occurrence of the event.

Override postXXX() if you want your code to be executed after the occurrence of the event.

A few use cases of the Observer Coprocessor are:

  1. Security: Before performing any operation (like ‘Get’, ‘Put’) you can check for permission in the ‘preXXX’ methods.
  2. Referential Integrity: Unlike traditional RDBMS, HBase doesn’t have the concept of referential integrity (foreign key). Suppose for example you have a requirement that whenever you insert a record in ‘users’ table, a corresponding entry should also be created in ‘user_daily_attendance’ table. One way you could solve this is by using two ‘Put’ one for each table, this way you are throwing the responsibility (of the referential integrity) to the user. A better way is to use Coprocessor and overriding ‘postPut’ method in which you write the code to insert the record in ‘user_daily_attendance’ table. This way client code is more lean and clean.
  3. Secondary Index: Coprocessor can be used to maintain secondary indexes. For more information please see SecondaryIndexing.

Observer Coprocessor has following flavors:

  1. RegionObserver: This Coprocessor provides the facility to hook your code when the events on region are triggered. Most common example include ‘preGet’ and ‘postGet’ for ‘Get’ operation and ‘prePut’ and ‘postPut’ for ‘Put’ operation.
  2. Region Server Observer: Provides hook for the events related to the RegionServer, such as stopping the RegionServer and performing operations before or after merges, commits, or rollbacks.
  3. WAL Observer: Provides hooks for WAL (Write-Ahead-Log) related operation. It has only two method ‘preWALWrite()’ and ‘postWALWrite()’.
  4. Master Observer: This observer provides hooks for DDL like operation, such as create, delete, modify table.

Example of Observer Coprocessor:

Table 1: ‘users’ table

HBase

Consider a hypothetical example having the ‘users’ table as shown above. In the above example, the client can query the information about the employee. For the purpose of demonstration of Coprocessor we assuming that ‘admin’ is a special person and his details shouldn’t be visible to any client querying the table. To achieve this we will take the help of Coprocessor.

Following are the steps:

  1. Write a class that extends the BaseRegionObserver class.
  2. Override the ‘preGetOp()’ method (Note that ‘preGet()’ method is now deprecated). You should use ‘preGetOp’ method here because first check if the queried rowkey is ‘admin’ or not. If it ‘admin’ then return the call without allowing the system to perform the get operation thus saving on performance.
  3. Export your code in a jar file.
  4. Place the jar in HDFS where HBase can locate it.
  5. Load the Coprocessor.
  6. Write a simple program to test it.

Let’s see each step in detail:

Step 1 and Step2: Below is a class that extends one of the Coprocessor classes (BaseRegionObserver) and overrides the ‘preGetOp’ method.

public class RegionObserverExample extends BaseRegionObserver {

	private static final byte[] ADMIN = Bytes.toBytes("admin");
	private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
	private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
                private static final byte[] VALUE = Bytes.toBytes("You can’t see Admin details");
	
	@Override
	public void preGetOp(final ObserverContext e, final Get get, final List results) throws IOException {
		
	    if (Bytes.equals(get.getRow(),ADMIN)) {
	    	Cell c = CellUtil.createCell(get.getRow(),COLUMN _FAMILY, COLUMN, System.currentTimeMillis(), (byte)4, VALUE);
	      	results.add(c);
	    	e.bypass();
	    }
	    
	    List kvs = new ArrayList(results.size());
	    for (Cell c : results) {
	    	kvs.add(KeyValueUtil.ensureKeyValue(c));
	    }
	    preGet(e, get, kvs);
	    results.clear();
	    results.addAll(kvs);
	}
}

Overriding the ‘preGetOp()’ will only work for ‘Get’ operation. For ‘Scan’ operation it won’t help you. To deal with it you have to override another method called ‘preScannerOpen()’ method, and add a Filter explicitly for admin as shown below:

@Override
public RegionScanner preScannerOpen(final ObserverContext e, final Scan scan, final RegionScanner s) throws IOException {	
	
	Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
	scan.setFilter(filter);
	return s;
}

This method works but there is a side effect. If the client has used any Filter in his scan, then that Filter won’t have any effect because our filter has replaced it.

Another option you can try is to deliberately remove the admin from result. This approach is shown below:

@Override
public boolean postScannerNext(final ObserverContext e, final InternalScanner s, final List results, final int limit, final boolean hasMore) throws IOException {
		Result result = null;
		Iterator iterator = results.iterator();
		while (iterator.hasNext()) {
			result = iterator.next();
			if (Bytes.equals(result.getRow(), ROWKEY)) {
				iterator.remove();
				break;
			}
		}
	    return hasMore;
	}

Step 3: It’s pretty convenient to export the above program in a jar file. Let’s assume that we exported it in a file called ‘coprocessor.jar’.

Step 4: Copy the jar to HDFS. I have used the Hadoop copy command:

hadoop fs –copyFromLocal coprocessor.jar coprocessor.jar

Step 5: See Loading of Coprocessor. For observer you can use any of the way you want.

Step 6: Run the following program to test. The first part is testing ‘Get’ and second ‘Scan’.

Configuration conf = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(conf);
HTableInterface table = connection.getTable("users");
Get get = new Get(Bytes.toBytes("admin"));
Result result = table.get(get);
for (Cell c : result.rawCells()) {
	System.out.println(Bytes.toString(CellUtil.cloneRow(c))
		+ "==> " + Bytes.toString(CellUtil.cloneFamily(c))
		+ "{" + Bytes.toString(CellUtil.cloneQualifier(c))
		+ ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}");
}
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
for (Result res : scanner) {
	for (Cell c : res.rawCells()) {
		System.out.println(Bytes.toString(CellUtil.cloneRow(c))
		+ " ==> " + Bytes.toString(CellUtil.cloneFamily(c))
		+ " {" + Bytes.toString(CellUtil.cloneQualifier(c))
		+ ":" + Bytes.toLong(CellUtil.cloneValue(c))
		+ "}");
	}
}

2. Endpoint Coprocessor: This kind of Coprocessor can be compared to stored procedure found in RDBMS. They help in performing computation which is not possible either through observe Coprocessor or otherwise. For example, calculating average or summation over the entire table that spans across multiple regions. They do so by providing a hook for your custom code and then running it across all regions. With Endpoints Coprocessor you can create your own dynamic RPC protocol and thus can provide communication between client and region server, thus enabling you to run your custom code on region server (on each region of a table). Unlike observer Coprocessor (where your custom code is executed transparently when events like ‘Get’ operation occurs), in Endpoint Coprocessor you have to explicitly invoke the Coprocessor by using the ‘CoprocessorService()’ method of the ‘HTableInterface’ (or HTable). A working example is given below.

From version 0.96, implementing Endpoint Coprocessor is not straight forward. Now it is done with the help of Google’s Protocol Buffer. For more details on Protocol Buffer, refer to this excellent Protocol Buffer Guide. For migrating Endpoints of version 0.94 or before to 0.96 or later you have to upgrade your Endpoint Coprocessor. For more details, see JIRA ticket HBASE-5448. For writing Endpoint Coprocessor, one should:

  1. Create a ‘.proto’ file defining your service.
  2. Execute the ‘protoc’ command to generate the Java code from the above ‘.proto’ file.
  3. Write a class that should:
    1. Extend the above generated service class.
    2. It should also implement two interfaces Coprocessor and CoprocessorService.
    3. Override the service method.
  4. Load the Coprocessor.
  5. Write a client code to call Coprocessor.

Example of Endpoint Coprocessor: We are following the same example as described above. Just to recap:

Hbase

In our hypothetical example (See Table 1), to demonstrate the Endpoint Coprocessor we see a trivial use case and will try to calculate the total (Sum) of gross salary of all employees. We will go step by step:

Step 1: Create a ‘proto’ file to define your service, request and response. Let’s call this file “sum.proto”. Below is the content of the ‘sum.proto’ file

option java_package = "org.myname.hbase.Coprocessor.autogenerated";
option java_outer_classname = "Sum";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequest {
	required string family = 1;
	required string column = 2;
}

message SumResponse {
  required int64 sum = 1 [default = 0];
}

service SumService {
  rpc getSum(SumRequest)
    returns (SumResponse);
}

Step 2: Compile the proto file using proto compiler (for detailed instructions see this excellent official documentation).

$ protoc --java_out=src ./sum.proto

(Note: It is necessary for you to create the src folder).
This will generate a class call “Sum.java”.

Step 3: Write your Endpoint Coprocessor: Firstly your class should extend the service just defined above (i.e. Sum.SumService). Second it should implement Coprocessor and CoprocessorService interfaces. Third, override the ‘getService()’, ‘start()’, ‘stop()’ and ‘getSum()’ methods. Below is the full code:

public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService {
	
	private RegionCoprocessorEnvironment env;
	
	@Override
	public Service getService() {
		return this;
	}

	@Override
	public void start(CoprocessorEnvironment env) throws IOException {
		if (env instanceof RegionCoprocessorEnvironment) {
			this.env = (RegionCoprocessorEnvironment)env;
		} else {
			throw new CoprocessorException("Must be loaded on a table region!");
		}
	}

	
	@Override
	public void stop(CoprocessorEnvironment env) throws IOException {
		// do mothing
	}

	
	@Override
	public void getSum(RpcController controller, SumRequest request, RpcCallback done) {
		Scan scan = new Scan();
		scan.addFamily(Bytes.toBytes(request.getFamily()));
		scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
		SumResponse response = null;
		InternalScanner scanner = null;
		try {
			scanner = env.getRegion().getScanner(scan);
			List results = new ArrayList();
			boolean hasMore = false;
		                long sum = 0L;
		    	do {
		        		hasMore = scanner.next(results);
		        		for (Cell cell : results) {
		        			sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
		       		 }
		        		results.clear();
		      	} while (hasMore);

		      	response = SumResponse.newBuilder().setSum(sum).build();
		    
		} catch (IOException ioe) {
			ResponseConverter.setControllerException(controller, ioe);
	    } finally {
	    	if (scanner != null) {
	    		try {
	    			scanner.close();
	    		} catch (IOException ignored) {}
	    	}
	    }
		done.run(response);
	}

Step 4: Load the Coprocessor. See loading of Coprocessor. I recommend using static approach for Endpoint Coprocessor.
Step 5: Now we have to write the client code to test it. To do so in your main method, write the following code as shown below:

Configuration conf = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(conf);
HTableInterface table = connection.getTable("users");
final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
try {
Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null,
new Batch.Call<SumService, Long>() {
	@Override
		public Long call(SumService aggregate) throws IOException {
BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
			aggregate.getSum(null, request, rpcCallback);
			SumResponse response = rpcCallback.get();
			return response.hasSum() ? response.getSum() : 0L;
		}
	});
	for (Long sum : results.values()) {
		System.out.println("Sum = " + sum);
	}
} catch (ServiceException e) {
e.printStackTrace();
} catch (Throwable e) {
	e.printStackTrace();
}

Loading of Coprocessor:

Coprocessor can be loaded broadly in two ways. One is static (loading through configuration files) and the other one is dynamic loading.

Dynamic loading: Dynamic loading means loading Coprocessor without restarting HBase. Dynamic loading can be done in three ways:

A. Using Shell: You can load the Coprocessor using the HBase shell as follows:

1. Disable the table so that you can load Coprocessor

hbase(main):001:0> disable ‘users’

2. Load the Coprocessor: (i.e. coprocessor.jar) that you copied to HDFS by using following command:

hbase(main):002:0> alter users’, METHOD => ‘table_att’, ‘Coprocessor’=>’hdfs://localhost/user/gbhardwaj/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|’

where “hdfs://localhost/user/gbhardwaj/coprocessor.jar” is the full path of the ‘coprocessor.jar’ in your HDFS.and “org.myname.hbase.Coprocessor.RegionObserverExample” is the full name of your class (including package name).

3. Enable the table:

hbase(main):003:0> enable ‘users'

4. Verify if Coprocessor is loaded by typing following command:

hbase(main):04:0> describe ‘users'

You must see some output like this:

DESCRIPTION ENABLED

users’, {TABLE_ATTRIBUTES => {Coprocessor$1 => true ‘hdfs://localhost/user/gbhardwaj/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|’}, {NAME => ‘ personalDet’ …………………

B. Using setValue() method of HTableDescriptor: This is done entirely in Java as follows:

String tableName = "users";
String path = "hdfs://localhost/user/gbhardwaj/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.setValue("COPROCESSOR$1", path +
			      "|" + RegionObserverExample.class.getCanonicalName() + 
			      "|" + Coprocessor.PRIORITY_USER);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);

C. Using addCoprocessor() method of HTableDescriptor: This method is available from 0.96 version onwards. Personally I prefer this way only:

String tableName = "users";
String path = "hdfs://localhost/user/gbhardwaj/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);

 

2. Static Loading: Static loading means that your Coprocessor will take effect only when you restart your HBase and there is a reason for it. In this you make changes ‘hbase-site.xml’ and therefore have to restart HBase for your changes to take place.

Create following entry in ‘hbase-site.xml’ file located in ‘conf’ directory:

      hbase.Coprocessor.region.classes
      org.myname.hbase.Coprocessor.endpoint.SumEndPoint

Make your code available to the HBase. I used the following simple steps – first, export your endpoint (SumEndPoint. java), service (Sum.java) and other relevant protocol buffer classes (i.e. all classes found under ‘<Protocol Buffer Directory>/ java/src/main/java/com/google/protobuf’ directory in jar file). Second, put this jar in the ‘lib’ folder of HBase and finally restart the HBase.

Note: Although the documentation clearly states:

Add a table Coprocessor to this table. The Coprocessor type must be RegionObserver or Endpoint. It won’t check if the class can be loaded or not. Whether a Coprocessor is loadable or not will be determined when a region is opened.”

This means that you can load both Observer and Endpoint Coprocessor statically using the following Method of HTableDescriptor:

addCoprocessor(String className, org.apache.hadoop.fs.Path jarFilePath, int priority, Map<String,String> kvs) throws IOException

In my case, the above method worked fine for Observer Coprocessor but didn’t work for Endpoint Coprocessor, causing the table to become unavailable and finally I had to restart my HBase. The same Endpoint Coprocessor worked fine when loaded statically. Use the above method for Endpoint Coprocessor with caution.

As I wrap up, here are few terms I have used in the above blog:

Column Family: It is generally said that HBase is a column-oriented database, which means it has columns, and columns are grouped in a column family. While columns are dynamic (i.e. there are no fixed number of columns in a row as opposed to RDBMS where the number of columns are fixed all the time) and it is not required to be declared at the time of table creation, column families are fixed and it is required to define them at the time of table creation. They must remain more or less fixed throughout the life of the table. You can add column families after table creation but it is an expensive operation and is generally avoided.

HFile: Column Family is stored in HFile. A column family can span across multiple HFiles (until major compaction) but the converse is not true, i.e. an HFile cannot have multiple column families; it will always host a single column family.

Region: Although HBase tables host billions of rows,it is not possible to store all that together. Therefore, it is divided into chunks. A Region is responsible for holding a set of rows. Also when you keep adding more rows and its size grows beyond a threshold value, it automatically splits in two regions. A region hosts a continuous set of rows.

RegionsServer: Regions are hosted by Region Server. A Region Server can host many Regions but a Region is always hosted by one and only one Region Server.