Tuesday, January 27, 2015

Reading mongodump bson file from Spark in scala using mongo-hadoop

I couldn't find a complete Scala version using mongo-hadoop v1.3.1 to read a mongodump bson file, so here's one I prepared earlier:

val bsonData = sc.newAPIHadoopFile(
"file:///your/file.bson",
classOf[com.mongodb.hadoop.BSONFileInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object, org.bson.BSONObject]]),
classOf[Object],
classOf[org.bson.BSONObject])


Note that (for v1.3.1) we need to subclass com.mongodb.hadoop.BSONFileInputFormat to avoid this compilation error: "inferred type arguments do not conform to method newAPIHadoopFile's type parameter bounds".  This isn't required if reading from Mongo directly using com.mongodb.hadoop.MongoInputFormat.

Also, you can pass a Configuration object as a final parameter if you need to set any specific conf values.

For more bson examples see here: https://github.com/mongodb/mongo-hadoop/blob/master/BSON_README.md

For Java examples see here: http://crcsmnky.github.io/2014/07/13/mongodb-spark-input/

Tuesday, January 6, 2015

How to access HBase from spark-shell using YARN as the master on CDH 5.3 and Spark 1.2

How to access HBase from spark-shell using YARN as the master on CDH 5.3 and Spark 1.2

From terminal:

# export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-server.jar:/etc/hbase/conf/hbase-site.xml

# spark-shell --master yarn-client


Now you can access HBase from the Spark shell prompt:

import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val tableName = "My_HBase_Table_Name"

val hconf = HBaseConfiguration.create()

hconf.set(TableInputFormat.INPUT_TABLE, tableName)

val admin = new HBaseAdmin(hconf)
if (!admin.isTableAvailable(tableName)) {
  val tableDesc = new HTableDescriptor(tableName)
  admin.createTable(tableDesc)
}

val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

val result = hBaseRDD.count()


Thanks to these refs for pointers:
http://comments.gmane.org/gmane.comp.java.hadoop.hbase.user/44744
http://apache-spark-user-list.1001560.n3.nabble.com/HBase-and-non-existent-TableInputFormat-td14370.html