Search
Terry He

How to Use a Table Load Tool to Batch Puts into HBase/MapR Database

October 15, 2020

Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019, may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/data-fabric.html

Original Post Information:

"authorDisplayName": "Terry He",
"publish": "2015-04-22T07:00:00.000Z",
"tags": "nosql"

Apache HBase is an in-Hadoop database that delivers wide-column schema flexibility with strongly consistent reads and writes. Clients can access HBase data through either a native Java API, a Thrift or REST gateway, or now through a C API, making it very easy to access data. MapR Database, yet another in-Hadoop database has the same HBase APIs, but provides enterprise-grade features for production deployments of HBase applications.

Put, Get and Scan are some of the prominent programming APIs that get used in the context of HBase applications. For certain write-heavy workloads, Put operations can get slow, so batching these Put operations is a commonly used technique to increase the overall throughput of the system. The following program illustrates a table load tool, which is a great utility program that can be used for batching Puts into an HBase/MapR Database table. The program creates a simple HBase table with a single column within a column family and inserts 100000 rows with 100 bytes of data. The batch size for the Puts is set to 500 in this example.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.zip.CRC32;

import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class LoadTableMTBatch {

    static long uniqueSeed = System.currentTimeMillis();
    static long[] count;
    static long[] latency;
    static int[] keySizes;
    public static long printPerNum = 10000;
    public static boolean noCRC = false;
    public static long keySize = 8;
    public static long startRow = 0;
    public static int batchSize = 500;
    public static int preSplit = 1; //Used as a demo - Not accurated key distribution
    public static boolean flush = false;
    public static boolean autoFlush = false;
        public static final String KEY_PREFIX="user";
    public static final long startKey = 0L;
    public static final long endKey = 999999999999999L;
    public static final String HBASE_RESOURCE_NAME = "/opt/mapr/hbase/hbase-0.98.9/conf/hbase-site.xml";
    public static String ZOOKEEPER_NODES = "localhost"; //Default to localhost, only needed for accessing HBase
    public static final Pair ZOOKEEPER_SETTINGS = new Pair(
            "hbase.zookeeper.quorum", ZOOKEEPER_NODES);

    public static void usage(String arg) {
        System.err.println("bad token: " + arg);
        System.err
                .println("loadMT -rows <100000> -valuesize <100 bytes="">  -debug -path  -threads <10> -batchSize <500> -numCF <1> -numC <1> -preSplit <1> -zookeeperNodes  -AutoFlush -flush");
        System.exit(1);
    }

    public static void main(String[] args) throws java.io.IOException {
        Configuration conf = HBaseConfiguration.create();
        String tableName = null;
        long numRows = 100000;
        long numCF = 1;
        long numC = 1;
        long valueSize = 100;
        int numThreads = 10;
        boolean augment = false;

        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-rows")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numRows = Long.parseLong(args[i]);
            } else if (args[i].equals("-path")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                tableName = args[i];
            } else if (args[i].equals("-debug")) {
                conf.set("fs.mapr.trace", "debug");
            } else if (args[i].equals("-valuesize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                valueSize = Long.parseLong(args[i]);
            } else if (args[i].equals("-threads")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numThreads = Integer.parseInt(args[i]);
            } else if (args[i].equals("-p")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                printPerNum = Long.parseLong(args[i]);
            } else if (args[i].equals("-hbase")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                conf.addResource(new Path(args[i]));
            } else if (args[i].equals("-numCF")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numCF = Integer.parseInt(args[i]);
            } else if (args[i].equals("-numC")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                numC = Integer.parseInt(args[i]);
            } else if (args[i].equals("-batchSize")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                batchSize = Integer.parseInt(args[i]);
            } else if (args[i].equals("-preSplit")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                preSplit = Integer.parseInt(args[i]);
            } else if (args[i].equals("-zookeeperNodes")) {
                i++;
                if (i >= args.length)
                    usage(args[i]);
                ZOOKEEPER_NODES = args[i];
            } else if (args[i].equals("-AutoFlush")) {
                autoFlush = true;
            } else if (args[i].equals("-flush")) {
                flush = true;
            } else {
                usage(args[i]);
            }
        }
        if (tableName == null) {
            System.out.println("Must specify path");
            usage("path");
        }
        LoadTableMTBatch lt = new LoadTableMTBatch();
        try {
            LoadTableMTBatch.init(conf, tableName, numRows, numCF, numC,
                    valueSize, augment);
            lt.loadTable(conf, tableName, numRows, numCF, numC, valueSize,
                    numThreads);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    public void generateKeySizes() {
        Random rand = new Random(uniqueSeed);
        keySizes = new int[10];
        keySizes[0] = rand.nextInt(5) + 5;
        keySizes[1] = rand.nextInt(40) + 10;
        keySizes[2] = rand.nextInt(50) + 50;
        keySizes[3] = rand.nextInt(400) + 100;
        keySizes[4] = rand.nextInt(500) + 500;
        keySizes[5] = rand.nextInt(4000) + 1000;
        keySizes[6] = rand.nextInt(5000) + 5000;
        keySizes[7] = rand.nextInt(10000) + 10000;
        keySizes[8] = rand.nextInt(12000) + 20000;
        keySizes[9] = rand.nextInt(32 * 1024 - 1) + 1;
    }

    public void loadTable(Configuration conf, String tableName, long numRows,
            long numCF, long numC, long valueSize, int numThreads)
            throws Exception {
        Thread[] loadThreads = new Thread[numThreads];
        count = new long[numThreads];
        latency = new long[numThreads];

        if (keySize < 1) {
            generateKeySizes();
        }

        long offset = (endKey - startKey) / numThreads;
        for (int i = 0; i < loadThreads.length; i++) {
            latency[i] = 0;
            if (preSplit <= 1000="" 1)="" {="" loadthreads[i]="new" thread(new="" loadtablerunnable(conf,="" tablename,="" numrows,="" numcf,="" numc,="" valuesize,="" i,="" numthreads,="" batchsize));="" }="" else="" batchsize,="" startkey="" +="" i="" *="" offset,="" ((i="" offset)="" -="" 1));="" for="" (int="" <="" loadthreads.length;="" i++)="" loadthreads[i].start();="" long="" inserts="0," insertsold="0," rate="0," overallrate="0," ta="0," tb="0," t0="0," elapsedtime="0;" averagelatency="0;" minlatency="0;" maxlatency="0;" boolean="" alive="true;" 1;="" while="" (true)="" if="" (loadthreads[i].isalive())="" insertsold)="" (ta="" tb);="" t0);="" elapsedtime;="" min="" max="" average="" latency="" synchronized="" (latency)="" arrays.sort(latency);="" 1];="" latency.length);="" system.out.println("elapsed="" time:="" "="" ";="" inserts:="" current="" sec;="" overall="" batchsize="" 1000000l="" "ms;"="" "ms");="" (!alive)="" break;="" print="" out="" interval="" thread.sleep(1000);="" loadthreads[i].join();="" public="" static="" getsum(long[]="" array)="" sum="0;" (long="" l="" :="" return="" sum;="" void="" createtable(configuration="" conf,="" string="" numcf)="" throws="" exception="" hbaseadmin="" admin="new" hbaseadmin(conf);="" system.out.println("created="" object");="" htabledescriptor="" des="new" htabledescriptor(tablename.getbytes());="" numcf;="" des.addfamily(new="" hcolumndescriptor("f"="" i));="" try="" (presplit="" admin.createtable(des);="" byte[]="" startkeybyte="Bytes.toBytes(KEY_PREFIX+startKey);" endkeybyte="Bytes.toBytes(KEY_PREFIX+endKey);" admin.createtable(des,="" startkeybyte,="" endkeybyte,="" presplit);="" catch="" (tableexistsexception="" te)="" te.printstacktrace();="" (ioexception="" ie)="" ie.printstacktrace();="" init(configuration="" augment)="" ioexception,="" (augment)="" htable="" intable="new" htable(conf,="" tablename);="" result="" infores="inTable.get(new" get("homerow".getbytes()));="" startrow="inTable.incrementColumnValue("homeRow".getBytes()," "f0".getbytes(),="" "c0".getbytes(),="" numrows)="" numrows;="" numcf="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c1".getbytes()));="" numc="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c2".getbytes()));="" uniqueseed="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c3".getbytes()));="" keysize="Bytes.toLong(infoRes.getValue("f0".getBytes()," "c4".getbytes()));="" createtable(conf,="" numcf);="" put="" info="new" put("homerow".getbytes());="" info.add("f0".getbytes(),="" bytes.tobytes(numrows));="" "c1".getbytes(),="" bytes.tobytes(numcf));="" "c2".getbytes(),="" bytes.tobytes(numc));="" "c3".getbytes(),="" bytes.tobytes(uniqueseed));="" "c4".getbytes(),="" bytes.tobytes(keysize));="" intable.put(info);="" intable.flushcommits();="" load(configuration="" int="" threadnum,="" startkey,="" endkey)="" ioexception="" system.out.println("starting="" load="" thread="" threadnum);="" threadnum="" start="" key="" (key_prefix="" startkey)="" end="" :"="" endkey));="" family;="" column;="" p="null;" counter="0;" table="null;" random="" rand="new" random(uniqueseed);="" incrementrandom(rand,="" (int)="" startrow);="" endrow="startRow" htable(createhbaseconfiguration(),="" tablename.getbytes());="" table.setautoflush(autoflush);="" startrow;="" endrow;="" byte[][]="" rowkeys="new" byte[batchsize][];="" families="new" columns="new" values="new" batch="0;" batchsize;="" batch++)="" rowkey="new" byte[(int)="" keysize];="" (keysize="" 0)="" randsize="keySizes[rand.nextInt(Integer.MAX_VALUE)" %="" 10];="" numthreads="" 1);="" byte[randsize="" stringbuilder="" keybuilder="new" stringbuilder();="" keybuilder.append(i);="" keybuilder.append(batch);="" createkey(rowkey,="" long.valueof(keybuilder.tostring())="" ^="" uniqueseed);="" rowkeys[batch]="rowKey;" generate="" endkey);="" value="" valuesize];="" fillbuffer(valuesize,="" value,="" batch);="" values[batch]="value;" cf="" c="" family="f" (numcf="" families[batch]="family.getBytes();" column="c" (numc="" columns[batch]="column.getBytes();" list="" puts="new" arraylist();="" starttime="System.nanoTime();" put(rowkeys[batch]);="" p.add(families[batch],="" columns[batch],="" values[batch]);="" puts.add(p);="" table.put(puts);="" (flush)="" table.flushcommits();="" (exception="" e)="" e.printstacktrace();="" endtime="System.nanoTime();" latency[threadnum]="(endTime" starttime);="" count[threadnum]="counter;" finally="" (table="" !="null)" table.close();="" incrementrandom(random="" rand,="" num)="" num;="" rand.nextint();="" createkey(byte[]="" buffer,="" seed)="" random(seed);="" crc32="" chksum="new" crc32();="" rand.nextbytes(buffer);="" chksum.update(buffer);="" return;="" createkeyforregion(byte[]="" longrandom().nextlong(endkey="" startkey);="" buffer="Bytes.toBytes(KEY_PREFIX" key);="" buffer;="" fillbuffernocrc(long="" newseed="seed" system.currenttimemillis();="" random(newseed);="" fillbuffer(long="" chksum.getvalue();="" configuration="" createhbaseconfiguration()="" conf="HBaseConfiguration.create();" conf.addresource(new="" path(hbase_resource_name));="" conf.set((string)zookeeper_settings.getfirst(),="" (string)="" zookeeper_settings.getsecond());="" conf;="" class="" loadtablerunnable="" implements="" runnable="" private="" tablename;="" valuesize;="" threadnum;="" endkey="-1;" loadtablerunnable(configuration="" batchsize)="" this.conf="conf;" this.tablename="tableName;" this.numrows="numRows;" this.numcf="numCF;" this.numc="numC;" this.valuesize="valueSize;" this.threadnum="threadNum;" this.numthreads="numThreads;" this.batchsize="batchSize;" this.startkey="startKey;" this.endkey="endKey;" run()="" (endkey="=" -1)="" loadtablemtbatch.load(conf,="" 0,="" 0);="" system.exit(-1);="" longrandom="" extends="" final="" serialversionuid="1L;" **="" generating="" a="" in="" the="" range="" of="" 0<="value<=n" @param="" n="" @return="" nextlong(long="" n)="" (n="" throw="" new="" illegalargumentexception();="" small="" use="" nextint="" and="" cast="" (long)="" nextint((int)="" n);="" large="" both="" high="" low="" ints="" highlimit="(int)">> 32);
            long high = (long) nextInt(highLimit) << 32;
            long low = ((long) nextInt()) & 0xffffffffL;
            return (high | low);
        }
    }

}

MapR to HPE Product Naming

Historical MapR Product Name
New Product Name
MapR (Document) Database Enterprise Premier
HPE Ezmeral Data Fabric Document Database

Related

Kiran Kumar Mavatoor

Accessing HPE Ezmeral Data Fabric Object Storage from Spring Boot S3 Micro Service deployed in K3s cluster

Sep 13, 2021
Don Wake

On-Premise Adventures: How to build an Apache Spark lab on Kubernetes

Jun 15, 2021
Carol McDonald

Using Apache Spark DataFrames for Processing of Tabular Data

Dec 13, 2021
Denis Choukroun

Deep Learning Model Training – A First-Time User’s Experience with Determined – Part 2

May 3, 2022
Didier Lalli

Calling all developers… Make your voices heard!

May 28, 2021
Joel Baxter & Kartik Mathur & Don Wake

Building Dynamic Machine Learning Pipelines with KubeDirector

Aug 14, 2020
Sridhar Reddy

File, objects, databases and streams – Oh my!

Mar 31, 2022
Ellen Friedman

How fine-grained data placement helps optimize application performance

Oct 22, 2021

HPE Developer Newsletter

Stay in the loop.

Sign up for the HPE Developer Newsletter or visit the Newsletter Archive to see past content.

By clicking on “Subscribe Now”, I agree to HPE sending me personalized email communication about HPE and select HPE-Partner products, services, offers and events. I understand that my email address will be used in accordance with HPE Privacy Statement. You may unsubscribe from receiving HPE and HPE-Partner news and offers at any time by clicking on the Unsubscribe button at the bottom of the newsletter.

For more information on how HPE manages, uses, and protects your personal data please refer to HPE Privacy Statement.