Search
Terry He

How to Use a Table Load Tool to Batch Puts into HBase/MapR Database

October 15, 2020

Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019, may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/data-fabric.html

Original Post Information:

"authorDisplayName": "Terry He",
"publish": "2015-04-22T07:00:00.000Z",
"tags": "nosql"

Apache HBase is an in-Hadoop database that delivers wide-column schema flexibility with strongly consistent reads and writes. Clients can access HBase data through either a native Java API, a Thrift or REST gateway, or now through a C API, making it very easy to access data. MapR Database, yet another in-Hadoop database has the same HBase APIs, but provides enterprise-grade features for production deployments of HBase applications.

Put, Get and Scan are some of the prominent programming APIs that get used in the context of HBase applications. For certain write-heavy workloads, Put operations can get slow, so batching these Put operations is a commonly used technique to increase the overall throughput of the system. The following program illustrates a table load tool, which is a great utility program that can be used for batching Puts into an HBase/MapR Database table. The program creates a simple HBase table with a single column within a column family and inserts 100000 rows with 100 bytes of data. The batch size for the Puts is set to 500 in this example.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.zip.CRC32;

import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class LoadTableMTBatch {

    static long uniqueSeed = System.currentTimeMillis();
    static long[] count;
    static long[] latency;
    static int[] keySizes;
    public static long printPerNum = 10000;
    public static boolean noCRC = false;
    public static long keySize = 8;
    public static long startRow = 0;
    public static int batchSize = 500;
    public static int preSplit = 1; //Used as a demo - Not accurated key distribution
    public static boolean flush = false;
    public static boolean autoFlush = false;
        public static final String KEY_PREFIX="user";
    public static final long startKey = 0L;
    public static final long endKey = 999999999999999L;
    public static final String HBASE_RESOURCE_NAME = "/opt/mapr/hbase/hbase-0.98.9/conf/hbase-site.xml";
    public static String ZOOKEEPER_NODES = "localhost"; //Default to localhost, only needed for accessing HBase
    public static final Pair ZOOKEEPER_SETTINGS = new Pair(
            "hbase.zookeeper.quorum", ZOOKEEPER_NODES);

    public static void usage(String arg) {
        System.err.println("bad token: " + arg);
        System.err
                .println("loadMT -rows <100000> -valuesize <100 bytes="">  -debug -path  -threads <10> -batchSize <500> -numCF <1> -numC <1> -preSplit <1> -zookeeperNodes  -AutoFlush -flush");
        System.exit(1);
    }

    public static void main(String[] args) throws java.io.IOException {
        Configuration conf = HBaseConfiguration.create();
        String tableName = null;
        long numRows = 100000;
        long numCF = 1;
        long numC = 1;
        long valueSize = 100;
        int numThreads = 10;
        boolean augment = false;

        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-rows")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numRows = Long.parseLong(args[i]);
            } else if (args[i].equals("-path")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                tableName = args[i];
            } else if (args[i].equals("-debug")) {
                conf.set("fs.mapr.trace", "debug");
            } else if (args[i].equals("-valuesize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                valueSize = Long.parseLong(args[i]);
            } else if (args[i].equals("-threads")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numThreads = Integer.parseInt(args[i]);
            } else if (args[i].equals("-p")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                printPerNum = Long.parseLong(args[i]);
            } else if (args[i].equals("-hbase")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                conf.addResource(new Path(args[i]));
            } else if (args[i].equals("-numCF")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numCF = Integer.parseInt(args[i]);
            } else if (args[i].equals("-numC")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numC = Integer.parseInt(args[i]);
            } else if (args[i].equals("-batchSize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                batchSize = Integer.parseInt(args[i]);
            } else if (args[i].equals("-preSplit")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                preSplit = Integer.parseInt(args[i]);
            } else if (args[i].equals("-zookeeperNodes")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                ZOOKEEPER_NODES = args[i];
            } else if (args[i].equals("-AutoFlush")) {
                autoFlush = true;
            } else if (args[i].equals("-flush")) {
                flush = true;
            } else {
                usage(args[i]);
            }
        }
        if (tableName == null) {
            System.out.println("Must specify path");
            usage("path");
        }
        LoadTableMTBatch lt = new LoadTableMTBatch();
        try {
            LoadTableMTBatch.init(conf, tableName, numRows, numCF, numC,
                    valueSize, augment);
            lt.loadTable(conf, tableName, numRows, numCF, numC, valueSize,
                    numThreads);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    public void generateKeySizes() {
        Random rand = new Random(uniqueSeed);
        keySizes = new int[10];
        keySizes[0] = rand.nextInt(5) + 5;
        keySizes[1] = rand.nextInt(40) + 10;
        keySizes[2] = rand.nextInt(50) + 50;
        keySizes[3] = rand.nextInt(400) + 100;
        keySizes[4] = rand.nextInt(500) + 500;
        keySizes[5] = rand.nextInt(4000) + 1000;
        keySizes[6] = rand.nextInt(5000) + 5000;
        keySizes[7] = rand.nextInt(10000) + 10000;
        keySizes[8] = rand.nextInt(12000) + 20000;
        keySizes[9] = rand.nextInt(32 * 1024 - 1) + 1;
    }

    public void loadTable(Configuration conf, String tableName, long numRows,
            long numCF, long numC, long valueSize, int numThreads)
            throws Exception {
        Thread[] loadThreads = new Thread[numThreads];
        count = new long[numThreads];
        latency = new long[numThreads];

        if (keySize < 1) {
            generateKeySizes();
        }

        long offset = (endKey - startKey) / numThreads;
        for (int i = 0; i < loadThreads.length; i++) {
            latency[i] = 0;
            if (preSplit <= 1000="" 1)="" {="" loadthreads[i]="new" thread(new="" loadtablerunnable(conf,="" tablename,="" numrows,="" numcf,="" numc,="" valuesize,="" i,="" numthreads,="" batchsize));="" }="" else="" batchsize,="" startkey="" +="" i="" *="" offset,="" ((i="" offset)="" -="" 1));="" for="" (int="" <="" loadthreads.length;="" i++)="" loadthreads[i].start();="" long="" inserts="0," insertsold="0," rate="0," overallrate="0," ta="0," tb="0," t0="0," elapsedtime="0;" averagelatency="0;" minlatency="0;" maxlatency="0;" boolean="" alive="true;" 1;="" while="" (true)="" if="" (loadthreads[i].isalive())="" insertsold)="" (ta="" tb);="" t0);="" elapsedtime;="" min="" max="" average="" latency="" synchronized="" (latency)="" arrays.sort(latency);="" 1];="" latency.length);="" system.out.println("elapsed="" time:="" "="" ";="" inserts:="" current="" sec;="" overall="" batchsize="" 1000000l="" "ms;"="" "ms");="" (!alive)="" break;="" print="" out="" interval="" thread.sleep(1000);="" loadthreads[i].join();="" public="" static="" getsum(long[]="" array)="" sum="0;" (long="" l="" :="" return="" sum;="" void="" createtable(configuration="" conf,="" string="" numcf)="" throws="" exception="" hbaseadmin="" admin="new" hbaseadmin(conf);="" system.out.println("created="" object");="" htabledescriptor="" des="new" htabledescriptor(tablename.getbytes());="" numcf;="" des.addfamily(new="" hcolumndescriptor("f"="" i));="" try="" (presplit="" admin.createtable(des);="" byte[]="" startkeybyte="Bytes.toBytes(KEY_PREFIX+startKey);" endkeybyte="Bytes.toBytes(KEY_PREFIX+endKey);" admin.createtable(des,="" startkeybyte,="" endkeybyte,="" presplit);="" catch="" (tableexistsexception="" te)="" te.printstacktrace();="" (ioexception="" ie)="" ie.printstacktrace();="" init(configuration="" augment)="" ioexception,="" (augment)="" htable="" intable="new" htable(conf,="" tablename);="" result="" infores="inTable.get(new" get("homerow".getbytes()));="" startrow="inTable.incrementColumnValue("homeRow".getBytes()," "f0".getbytes(),="" "c0".getbytes(),="" numrows)="" numrows;="" numcf="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c1".getbytes()));="" numc="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c2".getbytes()));="" uniqueseed="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c3".getbytes()));="" keysize="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c4".getbytes()));="" createtable(conf,="" numcf);="" put="" info="new" put("homerow".getbytes());="" info.add("f0".getbytes(),="" bytes.tobytes(numrows));="" "c1".getbytes(),="" bytes.tobytes(numcf));="" "c2".getbytes(),="" bytes.tobytes(numc));="" "c3".getbytes(),="" bytes.tobytes(uniqueseed));="" "c4".getbytes(),="" bytes.tobytes(keysize));="" intable.put(info);="" intable.flushcommits();="" load(configuration="" int="" threadnum,="" startkey,="" endkey)="" ioexception="" system.out.println("starting="" load="" thread="" threadnum);="" threadnum="" start="" key="" (key_prefix="" startkey)="" end="" :"="" endkey));="" family;="" column;="" p="null;" counter="0;" table="null;" random="" rand="new" random(uniqueseed);="" incrementrandom(rand,="" (int)="" startrow);="" endrow="startRow" htable(createhbaseconfiguration(),="" tablename.getbytes());="" table.setautoflush(autoflush);="" startrow;="" endrow;="" byte[][]="" rowkeys="new" byte[batchsize][];="" families="new" columns="new" values="new" batch="0;" batchsize;="" batch++)="" rowkey="new" byte[(int)="" keysize];="" (keysize="" 0)="" randsize="keySizes[rand.nextInt(Integer.MAX_VALUE)" %="" 10];="" numthreads="" 1);="" byte[randsize="" stringbuilder="" keybuilder="new" stringbuilder();="" keybuilder.append(i);="" keybuilder.append(batch);="" createkey(rowkey,="" long.valueof(keybuilder.tostring())="" ^="" uniqueseed);="" rowkeys[batch]="rowKey;" generate="" endkey);="" value="" valuesize];="" fillbuffer(valuesize,="" value,="" batch);="" values[batch]="value;" cf="" c="" family="f" (numcf="" families[batch]="family.getBytes();" column="c" (numc="" columns[batch]="column.getBytes();" list="" puts="new" arraylist();="" starttime="System.nanoTime();" put(rowkeys[batch]);="" p.add(families[batch],="" columns[batch],="" values[batch]);="" puts.add(p);="" table.put(puts);="" (flush)="" table.flushcommits();="" (exception="" e)="" e.printstacktrace();="" endtime="System.nanoTime();" latency[threadnum]="(endTime" starttime);="" count[threadnum]="counter;" finally="" (table="" !="null)" table.close();="" incrementrandom(random="" rand,="" num)="" num;="" rand.nextint();="" createkey(byte[]="" buffer,="" seed)="" random(seed);="" crc32="" chksum="new" crc32();="" rand.nextbytes(buffer);="" chksum.update(buffer);="" return;="" createkeyforregion(byte[]="" longrandom().nextlong(endkey="" startkey);="" buffer="Bytes.toBytes(KEY_PREFIX" key);="" buffer;="" fillbuffernocrc(long="" newseed="seed" system.currenttimemillis();="" random(newseed);="" fillbuffer(long="" chksum.getvalue();="" configuration="" createhbaseconfiguration()="" conf="HBaseConfiguration.create();" conf.addresource(new="" path(hbase_resource_name));="" conf.set((string)zookeeper_settings.getfirst(),="" (string)="" zookeeper_settings.getsecond());="" conf;="" class="" loadtablerunnable="" implements="" runnable="" private="" tablename;="" valuesize;="" threadnum;="" endkey="-1;" loadtablerunnable(configuration="" batchsize)="" this.conf="conf;" this.tablename="tableName;" this.numrows="numRows;" this.numcf="numCF;" this.numc="numC;" this.valuesize="valueSize;" this.threadnum="threadNum;" this.numthreads="numThreads;" this.batchsize="batchSize;" this.startkey="startKey;" this.endkey="endKey;" run()="" (endkey="=" -1)="" loadtablemtbatch.load(conf,="" 0,="" 0);="" system.exit(-1);="" longrandom="" extends="" final="" serialversionuid="1L;" **="" generating="" a="" in="" the="" range="" of="" 0<="value<=n" @param="" n="" @return="" nextlong(long="" n)="" (n="" throw="" new="" illegalargumentexception();="" small="" use="" nextint="" and="" cast="" (long)="" nextint((int)="" n);="" large="" both="" high="" low="" ints="" highlimit="(int)">> 32);
            long high = (long) nextInt(highLimit) << 32;
            long low = ((long) nextInt()) & 0xffffffffL;
            return (high | low);
        }
    }

}

MapR to HPE Product Naming

Historical MapR Product Name
New Product Name
MapR (Document) Database Enterprise PremierHPE Ezmeral Data Fabric Document Database

Related

Ted Dunning & Ellen Friedman

3 ways a data fabric enables a data-first approach

Mar 15, 2022
Nicolas Perez

A Functional Approach to Logging in Apache Spark

Feb 5, 2021
Cenz Wong

Getting Started with DataTaps in Kubernetes Pods

Jul 6, 2021
Kiran Kumar Mavatoor

Accessing HPE Ezmeral Data Fabric Object Storage from Spring Boot S3 Micro Service deployed in K3s cluster

Sep 13, 2021
Carol McDonald

An Inside Look at the Components of a Recommendation Engine

Jan 22, 2021
Carol McDonald

Analyzing Flight Delays with Apache Spark GraphFrames and MapR Database

Dec 16, 2020
Nicolas Perez

Apache Spark as a Distributed SQL Engine

Jan 7, 2021
Carol McDonald

Apache Spark Machine Learning Tutorial

Nov 25, 2020

HPE Developer Newsletter

Stay in the loop.

Sign up for the HPE Developer Newsletter or visit the Newsletter Archive to see past content.

By clicking on “Subscribe Now”, I agree to HPE sending me personalized email communication about HPE and select HPE-Partner products, services, offers and events. I understand that my email address will be used in accordance with HPE Privacy Statement. You may unsubscribe from receiving HPE and HPE-Partner news and offers at any time by clicking on the Unsubscribe button at the bottom of the newsletter.

For more information on how HPE manages, uses, and protects your personal data please refer to HPE Privacy Statement.