Hadoop On Azure (II): Funciones MapReduce y procesamiento de datos de entrada

Debido a las características del proyecto en el que hemos estado trabajando, una de las primeras tareas que se nos planteo fue el procesamiento de datos que nos venían en formato XML dentro de Hadoop. Aunque Hadoop permite consumir datos en formatos de entrada muy diferentes, desde ficheros de texto plano, binarios, XML, bases de datos, etc…, la mayoría de los ejemplos y situaciones están orientados a utilizar ficheros de texto plano, donde cada línea representa un registro de entrada.

Por ello, la utilización de XML como formato de los datos de entrada, implicó en un principio un pequeño desafío a la hora de encontrar la mejor alternativa para tratar este tipo de datos.

Antes de entrar en detalle a comentar las posibles alternativas disponibles para procesar en Hadoop ficheros con formato XML (lo haremos en el siguiente post), quisiera introducir por encima en este, el funcionamiento básico de Hadoop y como realiza el procesamiento de los datos de entrada.

Funciones MapReduce

Lo primero comentar que la base sobre la que trabaja Hadoop son los programas o funciones MapReduce que permiten procesar grandes volúmenes de datos en paralelo.

Los datos de entrada a las funciones MapReduce, se proporcionan en la forma de parejas clave-valor (key, value), que serán procesadas por las funciones de Mapeo y Reducción, para generar y obtener nuevos conjuntos de parejas clave-valor de salida. Es decir, los programas MapReduce transforman listas de elementos de datos de entrada (en la forma de parejas clave-valor), para obtener listas de elementos de datos de salida (también como parejas clave-valor), realizándolo en dos etapas sucesivas, la de mapeo y la de reducción.

En la primera etapa, de mapeo, se proporciona la lista de elementos de datos (parejas clave-valor), uno a uno, a la función denominada “Mapper”, que transforma cada elemento individual en uno, ninguno o varios elemento de datos de salida.

Función de mapeo:

map: (k1, v1) –> lista(k2,v2)

Visualmente cuando hace un mapeo uno a uno, podría verse como:

image

 

En la segunda etapa, de reducción que permite agregar los elementos de datos, se proporciona a la función denominada “Reducer”, un elemento que le permite iterar sobre los valores de entrada procedentes de una lista de entrada asociados con un valor de clave determinada, es decir, se proporciona al reductor todos los valores (lista(v2)) asociados con una determinada clave (k2), para que opere sobre estos valores y obtenga uno o unos pocos valores de salida diferentes.

Función de reducción:

reduce: (k2, lista(v2)) –> lista(k3,v3)

Visualmente para el caso de obtener un único valor a partir de la lista de entrada, podría verse como::

image

Para profundizar más en los conceptos básicos de MapReduce, las siguientes referencias y enlaces proporcionan una idea bastante completa.

Referencia: Tom White, “Hadoop: The Definitive Guide, 2nd Edition”, O’Relly Media / Yahoo Press, 2010 (http://hadoopbook.com)

Referencia: Yahoo Hadoop Tutorial (http://developer.yahoo.com/hadoop/tutorial/module4.html)

Procesamiento de datos de entrada en Hadoop

Una aspecto muy importante de todo este proceso es como realiza Hadoop el procesamiento de los datos de entrada para obtener el conjunto de parejas claves-valor que serán procesadas por los programas MapReduce.

Básicamente, el procesamiento de los datos que se lleva a cabo en un nodo en Hadoop, sigue el siguiente flujo:

 

image

Como veremos más adelante, Hadoop es capaz de procesar datos procedentes de fuentes de datos diversas y con diferentes formatos de entrada, desde ficheros de texto plano, ficheros binarios, ficheros XML, hasta bases de datos y otros orígenes de datos.

Para llevar a cabo este procesamiento, reflejado de forma esquemática en la ilustración anterior, Hadoop utiliza una serie de clases, entres las que se encuentran las clases InputFormat, InputSplit y RecordReader, que son las más interesantes desde el punto de vista del desarrollador.

Actualmente para especificar trabajos de tipo MapReduce, en Hadoop estan conviviendo dos APIs la vieja  representada por el espacio de nombres org.apache.hadoop.mapred y la nueva  representada por org.apache.hadoop.mapreduce. A la hora de definir las clases bases de la API, en la vieja se utilizaban sobre todo interfaces, mientras que en la nueva se utilizan principalmente clases abstractas.

Hoy en día es posible describir un trabajo MapReduce, bien utilizando una API o la otra, pero en la especificación y configuración de un mismo trabajo MapReduce no se deben de mezclar clases de una y otra API.

A continuación pasaremos a describir cada una de las principales clases en Hadoop, responsables de procesar los datos de entrada.

InputFormat

En primer lugar es necesario definir que se va a utilizar como fuente de datos de entrada y como se van a obtener esos datos. Para ello Hadoop utiliza la clase InputFormat (que en la versión antigua de la API es una interfaz y en la versión nueva es una clase abstracta), a través de una clase que derive de ella (bien implementando la interfaz o extendiendo la clase, según el caso). Esta clase además especificará mediante una clase que derive de la clase InputSplit como se van a trocear los datos procedentes de la fuente de datos para asignárselos a un “mapeador”, así como la clase que derive de la RecordReader que se utilizara para ir leyendo y suministrando a la función de mapeo las sucesivas parejas clave-valor.

Hadoop proporciona varias clases que permiten utilizar como fuente de datos: el sistema de ficheros (FileInputFormat, TextInputFormat, KeyValueTextInputFormat, SequenceFileInputFormat, …), una base de datos (DBInputFormat), etc…, pero también admite crear clases personalizadas para por ejemplo tratar datos procedentes de la Wikipedia(WikipediaPageInputFormat) o de otras fuentes de datos (p.e. que lean de un puerto).

La clase que utiliza por defecto salvo que se le indique otra, es TextInputFormat, que trata cada línea de cada fichero de entrada como un registro separado, donde la clave es el offset en bytes desde el principio del fichero que tiene cada comienzo de línea y el valor es el contenido de cada línea.

Para crear una clase de tipo InputFormat, habrá que tener en cuenta que la clase contemple las siguientes funcionalidades:

  • Seleccionar los ficheros u otros objetos que serán usados como entrada
  • Definir los InputSplits que dividen un fichero o fuente de datos de entrada, en trozos para asignarlos a las diferentes tareas de mapeo
  • Proporcionar una factoría de objetos RecordReader que lean del fichero o fuente de datos de entrada y proporcionen las diferentes parejas clave-valor.

Además esta clase deberá de:

O bien implementar la interfaz definida por la vieja API MapReduce, o heredar de una clase que la implemente. La interfaz definida por la vieja API para la clase InputFormat  (org.apache.hadoop.mapred.InputFormat) es:

public interface InputFormat <K,V> {

  /** Logically split the set of input files for the job **/
  InputSplit [ ] getSplits (JobConf job, int numSplits) throws IOException;     

  /** Get the RecordReader for the given InputSplit **/
  RecordReader <K,V> getRecordReader (InputSplit split, JobConf job, Reporter reporter) 
       throws IOException; 

}

O bien extender la clase abstracta definida por la nueva API MapReduce o heredar de una clase que la extienda. La clase abstracta definida por la nueva API para la clase InputFormat (org.apache.hadoop.mapreduce.InputFormat) es:

public abstract class InputFormat<K, V> {

  /** Logically split the set of input files for the job **/
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;

  /** Create a record reader for a given split **/
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}

InputSplit

El siguiente paso consiste en especificar como se van a trocear los datos procedentes de la fuente de datos de entrada para asignárselos a cada tarea de mapeo individual. Para ello Hadoop utiliza una clase de tipo InputSplit, que especifica los datos o la unidad de trabajo que se asignará a cada tarea de mapeo o “mapper” individual, para que se encargue de procesarla.

Es decir, un InputSplit es un trozo de los datos de entrada o “split”, que es procesado por un único “mapper”. Cada “mapper” procesa un único “split”. Cada “split” es dividido en registros, y el mapper procesa cada registro (pareja clave-valor") de uno en uno. Los “split” y registros son conceptos lógicos, no hay nada que obligue a que estén vinculados a ficheros.

En el caso de que sea necesario crear una clase InputSplit personalizada, esta clase deberá de:

O bien implementar la interfaz definida por la vieja API MapReduce, o heredar de una clase que la implemente. La interfaz definida por la vieja API para la clase InputSplit  (org.apache.hadoop.mapred.InputSplit) es:

public interface InputSplit extends Writable {

  /** Get the total number of bytes in the data of the InputSplit **/
  long getLength() throws IOException; 

  /** Get the list of hostnames where the input split is located**/
  String [ ] getLocations() throws IOExcecption;  

}

O bien extender la clase abstracta definida por la nueva API MapReduce o heredar de una clase que la extienda. La clase abstracta definida por la nueva API para la clase InputSplit (org.apache.hadoop.mapreduce.InputSplit) es:

public abstract class InputSplit {

  /** Get the size of the split **/
  public abstract long getLength() throws IOException, InterruptedException;

  /** Get the list of nodes by name where the data for the split would be local **/
  public abstract 
    String[] getLocations() throws IOException, InterruptedException;
}

RecordReader

Por último es necesario irle proporcionando a la tarea de mapeo las parejas clave-valor procedentes de la fuente de datos.

Hadoop realiza esta tarea a través de una clase de tipo RecordReader, que no es más que un iterador sobre los registros, que es utilizado por las tareas de mapeo para generar los registros (parejas clave-valor) que pasará a la función de mapeo. Por tanto el RecordReader se encarga de dividir los datos en parejas clave-valor para dárselos como entrada al Mapper.

En el caso de que sea necesario crear una clase RecordReader personalizada esta clase deberá de:

O bien implementar la interfaz definida por la vieja API MapReduce, o heredar de una clase que la implemente. La interfaz definida por la vieja API para la clase RecordReader (org.apache.hadoop.mapred.RecordReader) es:

public interface RecordReader <K,V> {  

  /** Reads the next key/value pair from the input for processing **/
  boolean next (K key, V value) throws IOException;     

  /** Creates an object of the appropiate type to be used as a key **/
  K createKey();      

  /** Creates an object of the appropiate type to be used as a value **/
  V createValue();    

  /** Returns the current position in the input **/
  long getPos() throws IOException;     

  /** Close this InputSplit to future operations **/
  public void close() throws IOException;     

  /** How much of the input has the RecordReader consumed i.e. has been processed by  **/
  float getProgress() throws IOException;    

}

O bien extender la clase abstracta definida por la nueva API MapReduce o heredar de una clase que la extienda. La clase abstracta definida por la nueva API para la clase RecordReader (org.apache.hadoop.mapreduce.RecordReader) es:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

  /** Called once at initialization. **/
  public abstract void initialize(InputSplit split,
                                  TaskAttemptContext context
                                  ) throws IOException, InterruptedException;

  /** Read the next key, value pair. **/
  public abstract boolean nextKeyValue() throws IOException, InterruptedException;

  /** Get the current key **/
  public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
  
  /** Get the current value. **/
  public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
  
  /** The current progress of the record reader through its data. **/
  public abstract float getProgress() throws IOException, InterruptedException;
  
  /** Close the record reader.**/
  public abstract void close() throws IOException;

}

En el siguiente post veremos como entre las alternativas disponibles para procesar datos de entrada en formato XML, nos encontramos con clases personalizadas de tipo InputFormat o de tipo RecordReader, que son capaces de tener en cuenta las características de este tipo de datos y de procesarlos convenientemente.

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *