Sunday, 15 May 2011

scala - Testing multiple outputs with MRUnit 1.1.0 -



scala - Testing multiple outputs with MRUnit 1.1.0 -

related question @ testing multiple outputs mrunit reply not applicable newer version 1.1.0

the question how setup multiple named outputs underlying mock implementations recognize named paths. writing write same reducer record 2 paths. can same thing in regular mr job calling multipleoutputs.addnamedoutput(job, "mos", ...)

when seek run through mrunit, next exception

named output 'mos' not defined java.lang.illegalargumentexception: named output 'mos' not defined @ org.apache.hadoop.mapreduce.lib.output.multipleoutputs.checknamedoutputname(multipleoutputs.java:256) @ org.apache.hadoop.mapreduce.lib.output.multipleoutputs.write(multipleoutputs.java:426) @ testmultipleoutputsaction$testreducer$$anonfun$reduce$1.apply(testmultipleoutputs.scala:48) @ testmultipleoutputsaction$testreducer$$anonfun$reduce$1.apply(testmultipleoutputs.scala:47) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.iterablelike$class.foreach(iterablelike.scala:72) @ scala.collection.abstractiterable.foreach(iterable.scala:54) @ testmultipleoutputsaction$testreducer.reduce(testmultipleoutputs.scala:47) @ testmultipleoutputsaction$testreducer.reduce(testmultipleoutputs.scala:35)

the scala code posted here. apologize lengthiness of code. tried pulling in pieces create easy running code standalone.

import org.apache.hadoop.conf.configuration import org.apache.hadoop.mrunit.mapreduce.mapreducedriver import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce.{counters, taskinputoutputcontext, reducer, mapper} import org.apache.hadoop.mapreduce.lib.output.multipleoutputs import scala.collection.javaconversions._ import scala.collection.javaconverters._ import org.scalatest.funsuite import org.apache.hadoop.io.sequencefile.{writer, reader} import java.nio.file.{path, paths, files} import com.typesafe.scalalogging.slf4j.logging import org.apache.hadoop.fs.{path => hadoopfspath} object testmultipleoutputsaction { class testmapper extends mapper[longwritable, mapwritable, longwritable, mapwritable] logging { override def setup(context: mapper[longwritable, mapwritable, longwritable, mapwritable]#context) { } override def cleanup(context: mapper[longwritable, mapwritable, longwritable, mapwritable]#context) { } override def map(key: longwritable, value: mapwritable, context: mapper[longwritable, mapwritable, longwritable, mapwritable]#context) { context.write(key, value) } } class testreducer extends reducer[longwritable, mapwritable, longwritable, mapwritable] logging { var multipleoutputs: multipleoutputs[longwritable, mapwritable] = null override def setup(context: reducer[longwritable, mapwritable, longwritable, mapwritable]#context) { multipleoutputs = new multipleoutputs[longwritable, mapwritable](context.asinstanceof[taskinputoutputcontext[_, _, longwritable, mapwritable]]) super.setup(context) } override def cleanup(context: reducer[longwritable, mapwritable, longwritable, mapwritable]#context) { } override def reduce(key: longwritable, values: java.lang.iterable[mapwritable], context: reducer[longwritable, mapwritable, longwritable, mapwritable]#context) { values.foreach(value => { multipleoutputs.write("mos", key, value, "outputpath1") multipleoutputs.write("mos", key, value, "outputpath2") }) } } } object testhelper extends logging { def generateinput(conf: configuration, deleteonexit: boolean): string = { val dirpath = files.createtempdirectory(paths.get("/tmp"), "multiple_outputs") val filepath = files.createtempfile(dirpath, "part-m-", ".0001") if (deleteonexit) { filepath.tofile.deleteonexit() } logger.info(s"writing path [${filepath.tofile.getabsolutepath}] ...") val seqfilepath = new hadoopfspath(filepath.tofile.getabsolutepath) val author = sequencefile.createwriter(conf, writer.file(seqfilepath), writer.keyclass(classof[longwritable]), writer.valueclass(classof[mapwritable])) (i <- 1 10) { val mapwritable = new mapwritable() mapwritable.put(new text("mod2"), new longwritable(i % 2)) writer.append(new longwritable(i), mapwritable) } writer.close() logger.info(s"writing path [${filepath.tofile.getabsolutepath}] completed") dirpath.tofile.getabsolutepath } def readinput(conf: configuration , path: string , mapreducedriver: mapreducedriver[longwritable, mapwritable, longwritable, mapwritable, longwritable, mapwritable]) { val entries = files.newdirectorystream(paths.get(path), "part-m-*") var numrecords = 0 entries.asscala.foreach(entry => { val entryname = entry.tofile.getname val absolutepath = entry.tofile.getabsolutepath logger.debug(s"entry name : [${entryname}], absolute path : [${absolutepath}]") val validentry = entryname.startswith("part-m-") if (validentry) { logger.debug(s"adding inputs path : [${absolutepath}] ...") val hadooppath = new hadoopfspath(absolutepath) val reader = new sequencefile.reader(conf, reader.file(hadooppath)) var key = new longwritable() var mapwritable = new mapwritable() var numfilerecords = 0 while (reader.next(key, mapwritable)) { logger.debug(key + "\t" + mapwritable) mapreducedriver.addinput(key, mapwritable) numfilerecords = numfilerecords + 1 numrecords = numrecords + 1 } logger.debug(s"adding inputs path : [${absolutepath}] completed. num file records : [${numfilerecords}]") } }) logger.debug(s"adding inputs path : [${path}] completed. num records : [${numrecords}]") } def writeoutput(conf: configuration, dirpath: path, outputpairs: java.util.list[org.apache.hadoop.mrunit.types.pair[longwritable, mapwritable]], deleteonexit: boolean): unit = { val filepath = files.createtempfile(dirpath, "part-m-", ".0001") if (deleteonexit) { filepath.tofile.deleteonexit() } logger.info(s"writing path [${filepath.tofile.getabsolutepath}] ...") val seqfilepath = new hadoopfspath(filepath.tofile.getabsolutepath) val author = sequencefile.createwriter(conf, writer.file(seqfilepath), writer.keyclass(classof[longwritable]), writer.valueclass(classof[mapwritable])) outputpairs.asscala.toseq.foreach(outputpair => { logger.debug(s"key : [${outputpair.getfirst}], value : [${outputpair.getsecond}]") writer.append(outputpair.getfirst, outputpair.getsecond) }) writer.close() logger.info(s"writing path [${filepath.tofile.getabsolutepath}] completed") } def checkcounters(counters: counters): unit = { counters.getgroupnames.asscala.foreach(groupname => { counters.getgroup(groupname).iterator().asscala.foreach(counter => { logger.debug(s"groupname: [${groupname}], countername: [${counter.getname}], countervalue : [${counter.getvalue}]") }) }) } } object testmultipleoutputs extends funsuite logging { def testmultipleoutputs(conf: configuration, inputpath: string, deleteonexit: boolean) { logger.info(s"testinput : input path : [${inputpath}] ...") val mapreducedriver = new mapreducedriver[longwritable, mapwritable, longwritable, mapwritable, longwritable, mapwritable]() .withmapper(new testmultipleoutputsaction.testmapper) .withreducer(new testmultipleoutputsaction.testreducer) mapreducedriver.addmultioutput("mos", classof[longwritable], classof[mapwritable]) val parentoutputpath = files.createtempdirectory(paths.get("/tmp"), "pr_output") if (deleteonexit) { parentoutputpath.tofile.deleteonexit } testhelper.readinput(conf, inputpath, mapreducedriver) val outputpairs = mapreducedriver.run() testhelper.writeoutput(conf, parentoutputpath, outputpairs, deleteonexit) testhelper.checkcounters(mapreducedriver.getcounters()) logger.info(s"testinput : input path : [${inputpath}] completed") } } class testmultipleoutputs extends funsuite logging { test("multiple outputs action") { val deleteonexit = true val conf = new configuration() val inputpath = testhelper.generateinput(conf, deleteonexit) testmultipleoutputs.testmultipleoutputs(conf, inputpath, deleteonexit) } }

i had same problem in java, , annotating unit test

@runwith(powermockrunner.class) @preparefortest(priceperplacementreducer.class)

after doing right imports (basically, powermock version 1.5.1 , junit binder) solved me.

scala hadoop mrunit multipleoutputs

No comments:

Post a Comment