\documentclass{vldb}
\usepackage {algorithm}
\usepackage {verbatim}
\usepackage {times}
\usepackage {algorithmic}
\usepackage{url}
\usepackage{array}
\usepackage {multirow}
\usepackage {multicol}
\makeatletter
\newenvironment{sql}%
{\begin{list}{}{%
\setlength{\topsep}{0pt}\setlength{\partopsep}{0pt}\setlength{\parskip}{0pt}%
\setlength{\parsep}{0pt}\setlength{\labelwidth}{0pt}%
\setlength{\rightmargin}{0pt}\setlength{\leftmargin}{0pt}%
\setlength{\labelsep}{0pt}%
\obeylines\@vobeyspaces\normalfont\ttfamily%
\item[]}}
{\end{list}\vskip5pt\noindent}
\makeatother
\newcommand{\ol}[1]{\texttt{\small #1}}
\newcommand\agg{\mathcal{A}}
\newcommand\yG{{\tilde G}}
\newcommand\yQ{{\tilde Q}}
\newcommand\ysigma{{\tilde\sigma}}
\newcommand\ygamma{{\tilde\gamma}}
\newcommand\yf{{\tilde f}}
\newcommand\ym{{\tilde m}}
\newcommand\darg{\,\cdot\,}
\newcommand\gammahat{{\hat\gamma}}
\newcommand\alphahat{{\hat\alpha}}
\newcommand\phat{{\hat p}}
\newcommand\Fhat{{\hat F}}
\newcommand\fhat{{\hat f}}
\newcommand\Ghat{{\hat G}}
\newcommand\Qhat{{\hat Q}}
\newcommand\xA{\mathcal{A}}
\newcommand\xG{\mathcal{G}}
\newcommand\xS{\mathcal{S}}
\newcommand\xX{\mathcal{X}}
\newcommand\alphabar{{\bar\alpha}}
\newcommand\Fbar{{\bar F}}
\newcommand\eps{\epsilon}
\newcommand\SE{\ensuremath{\text{SE}}\relax}
\newcommand\MSRE{\ensuremath{\text{MSRE}}\relax}
\DeclareMathOperator{\var}{Var}
\DeclareMathOperator{\std}{Std}
\DeclareMathOperator{\bias}{Bias}
\DeclareMathOperator*{\minimize}{minimize}
\DeclareMathOperator*{\argmin}{arg\,min}
\newcommand{\sset}[1]{\left\{\,#1\,\right\}}
\newcommand{\ssetl}[1]{\{\,#1\,\}}
\newtheorem{theorem}{Theorem}
\newcounter{wenum}
\newenvironment{widenum}% Makes wide enumeration lists
{\begin{list}{\arabic{wenum}.}%
{\setlength\leftmargin{0pt}%
\setlength\labelsep{1em}%
\setlength\itemindent{1em}%
\setlength\labelwidth{0pt}%
\setlength\itemsep{0pt}%
\setlength\topsep{4pt}%
\setlength\parsep{4pt}%
\usecounter{wenum}}}%
{\end{list}}
\newcommand{\bpar}[1]{\vskip 5pt\noindent\textbf{#1}\hskip 1em}
% algorithms
\renewcommand{\algorithmiccomment}[1]{\hfill \emph{// #1}}
\newcommand{\COMMENTLINE}[1]{\STATE \emph{// #1}}
\newcommand{\func}[1]{\textsc{#1}}
\newcommand{\nikedistributed}[1]{\stackrel{\mathrm{#1}}{\sim}}
% \titlenote hack for Fei---I hope he likes it!
\makeatletter
\def\titlenote{\@ifnextchar[\@xtitlenote{\stepcounter\@mpfn
\global\advance\titlenotecount by 1
\@titlenotetext
}}
\makeatother
% hyphenation
\begin{document}
\title{Online Aggregation for Large MapReduce Jobs}
\numberofauthors{1}
\author{
\alignauthor Niketan Pansare$^1$, Vinayak Borkar$^2$, Tyson Condie$^3$, Chris Jermaine$^1$ \\
\affaddr{$^1$Rice University, $^2$UC Irvine, $^3$Yahoo!} \\
\email{np6@rice.edu, vborky@yahoo.com, tcondie@yahoo-inc.com, cmj4@rice.edu}
}
\maketitle
\begin{abstract}
In online aggregation, a database system processes a user's aggregation query in online fashion. At all times
during processing, the system gives the
user an estimate as to the final query result, with the confidence bounds that become tighter and tighter over time.
In this paper, we consider how online aggregation
can be built into a MapReduce system for large-scale data processing. Given the MapReduce paradigm's close
relationship with cloud computing (in that one might expect a large fraction of MapReduce jobs to be run in the cloud),
online aggregation is a very attractive
technology. Since large-scale cloud computations are typically pay-as-you-go, a user
can monitor the accuracy obtained on online fashion, and then save money
by killing the computation early once sufficient accuracy has been obtained.
\end{abstract}
\section{Introduction}
When running online aggregation (OLA) \cite{OnlineAgg, RippleJoin, DBO}, at all times
during query processing, a database system gives a
user a statistically-valid estimate for the final answer to an aggregate query, along with confidence
bounds of the form: ``with probability $p$, the actual query answer is within the range from $low$
to $high$''. As the computation progresses, the bounds narrow, until (at query completion) the bounds are zero-width,
indicating complete accuracy. The main benefit of OLA is that if an acceptably accurate answer can be arrived at very quickly (perhaps in a tiny fraction
of the time needed to run the entire query), the query can be aborted, saving significant computer and human time.
Though OLA has arguably had quite a bit of scientific impact (stimulating
significant subsequent research), its commercial impact has been limited or even non-existent. In our view, there have been two main reasons
for this lack of adoption:
\begin{enumerate}
\item First, implementing OLA within a database engine would likely require extensive changes to the database kernel. OLA requires
some sort of statistically quantifiable randomness within the database engine. Most OLA algorithms require that the blocks (or tuples)
in a relation be processed using a ``random'' ordering, where ``random'' has a very stringent mathematical definition. Since this would require
significant changes to most kernels and would wreak havoc with techniques widely-implemented by database vendors
(such as indexing), vendors and kernel developers have justifiably
viewed OLA with suspicion.
\item Second, the goal of saving human and computer time has never been as
compelling as one might think. A user of an analytic database who writes
a query that goes into a queue and finally makes it out into a big, production warehouse for evaluation actually
has little motivation to kill the query early. Ending the query early might save some
CPU cycles or disk bandwidth that can then be used by others, but the user
who killed the query early does not benefit directly. Furthermore, the
database hardware/software/maintenance costs in a self-managed system are
not elastic, and do not decrease appreciably if many users decide to stop their queries early.
\end{enumerate}
Significantly, we feel that these two impediments to widespread adoption of OLA may have become
less important over time. The ``We can't change the kernel'' argument is less important at a time when
people are implementing all sorts of new database or
data-oriented systems from scratch, particularly for large-scale, shared nothing cluster environments. The
``Why stop early?'' argument is also harder
to make nowadays, given the current move into the cloud. When
someone other than the end-user's organization is managing the compute infrastructure, as a query runs,
dollars are quantifiably flowing from the end-user's organization and into the cloud. Now that there may be a real and observable cost
associated with every CPU cycle consumed and byte transferred, the end-user will likely have to justify those costs to management.
It stands to reason that being able to achieve 99\% of the accuracy in 10\% of the time will become much more attractive under such a
cost model. Thus, we feel that
OLA is an old idea whose time has come.
\vspace{5 pt}
\noindent
\textbf{Online Aggregation for Large-Scale Computing.}
Given the potential for OLA to be newly relevant, and given the current interest on very large-scale,
data-oriented computing, in this paper we consider the problem of providing OLA in a shared-nothing environment, on top of a
distributed file system such as HDFS \cite{HDFS}. While we concentrate on implementing OLA on top of a MapReduce engine \cite{mapreduce-osdi},
many of our most basic research contributions are not specific to MapReduce, and should apply broadly.
Realizing OLA for large-scale, distributed computing is a challenging problem, and a
simple extension to the classic work on OLA will not suffice.
Classic work in OLA assumes that blocks or tuples are processed in a statistically random fashion, so that the
set of data seen at any point in the computation are a random subset of the data in the system---if this is the case, then it is often
easy to estimate the final answer using classic methods from survey (finite population) sampling theory \cite{surveysampling}.
The difference in a large-scale, distributed computing environment is the importance
of elapsed time. In this type of environment, the basic unit of data that is processed is a
\emph{block}, which may contain millions of tuples and be a significant fraction of a gigabyte in size.
When many machines are working in parallel, it is natural that there would be a lot of variation in the time taken to process each
block. Some blocks could have a lot of data, and take longer to process. It is not unusual for machines to simply die, so they appear as if they have been processing a block forever.
This variation in processing time is of tremendous importance if it is somehow correlated with the aggregate value of the block. Such
correlation is to be expected: after all, blocks
with a lot of data may have greater aggregate values, and take longer to process. In such a scenario, since those nodes that
are processing large blocks with big aggregates are more likely to spend more time on those blocks, the set of blocks that have actually
completed processing
at any particular point are more likely to have small values, leading to biased estimates. This is an example of the
well-known ``inspection paradox'' described by renewal-reward theory \cite{renewaltheory}. Dealing with this in a principled fashion in a
distributed environment is challenging, and requires innovation both in system design and in statistical analysis.
\vspace{5 pt}
\noindent
\textbf{Our Contributions.}
We make the following contributions:
\begin{itemize}
\item We propose a system model that is appropriate for OLA
over MapReduce in a large-scale, distributed environment.
\item We describe in detail how
we implemented our model in Hyracks \cite{hyracks}.
\item We discuss a Bayesian framework for producing estimates and confidence bounds
within our model.
\item We offer experimental evidence that our model does not introduce inefficiencies and
that it does in fact produce accurate and usable estimates very quickly.
\end{itemize}
\section{Processing Large Data Sets}
\label{sec:mapreduce}
MapReduce is a programming model for performing aggregate computations over
large data sets. The programmer specifies a {\em map} function that processes
input records and produces a list of intermediate key/value pairs, and a {\em
reduce} function that is called once for each distinct map output key and
associated list of intermediate values. Optionally, the programmer can supply
a {\em combiner} function, which is applied to the intermediate results between
the map and reduce steps. The combiner interface is similar to reduce
functions, except that they are not passed {\em all} the values for a given
key: instead, a combiner emits an output value that summarizes the input values
it was passed. Combiners are typically used to perform ``pre-aggregation,''
which can reduce the amount of network traffic when the map and reduce steps
are executed in a distributed environment.
The MapReduce architecture consists of a query processing layer, that is based
on a dataflow of map and reduce operations, and a \emph{Distributed File
System} (DFS), which stores the input to the map function and the output of the
reduce function. The intermediate data is typically stored on the local file
system. The query processing layer consists of a single {\em master} node and
many {\em worker} nodes~\cite{mapreduce-osdi}. The master is responsible for
accepting {\em jobs} that specify the user-defined functions and automatically
parallelizing those functions into units of work called {\em tasks}. Each task
is assigned a portion of the relevant input and is individually scheduled on
the worker nodes. Workers are assigned a fixed number of {\em slots} for
executing tasks (e.g., two maps and two reduces). A heartbeat protocol between
each worker and the master is used to update the masters' bookkeeping state of
running tasks, and drive the scheduling of new tasks: if the master identifies
free worker slots, it will schedule further tasks on the worker.
A map task is assigned a portion of the input file called a {\em split}. By
default, a split contains a single DFS block (64MB by default), so typically
the total number of file blocks determines the number of map tasks. The
execution of a map task is divided into two phases. The {\em map} phase reads
the assigned split from DFS, parses it into records (key/value pairs), and
applies the map function to each record. After the map function has been
applied to each input record, the {\em commit} phase executes the combiner
function (if given) and registers the final output with the worker; who then
informs the master that the task has finished executing.
The execution of a reduce task is divided into three phases. In the {\em
shuffle} phase, the reduce task receives its assigned key range from the output
of each map task. This phase typically runs concurrently with the map tasks in
a pipelined fashion. After receiving its partitions from all map tasks, the
reduce task enters the {\em group} phase. This is commonly performed via
sorting techniques. Finally, the {\em reduce} phase invokes the user-defined
reduce function for each distinct key and associated list of values.
%The final
%output of the reduce function is written to the DFS and the master is informed
%on completion.
%In this design, the output of both map and reduce tasks is written to disk
%before it can be consumed. This is particularly expensive for reduce tasks,
%because their output is written to DFS. Output materialization simplifies
%fault tolerance, because it reduces the amount of state that must be restored to
%consistency after a node failure. If any task (either map or reduce) fails, the
%master simply schedules a new task to perform the same work as the failed
%task. Since a task never exports any data other than its final answer, no
%further recovery steps are needed.
The remainder of this paper describes a MapReduce OLA library, implemented on top of Hyracks, for estimating
the following aggregates: \texttt{SUM}, \texttt{COUNT}, \texttt{AVG},
\texttt{VAR}, or \texttt{STD\_DEV}. To obtain an OLA estimate for each distinct key (or group)
present in the data set, the programmer specifies the choice of
aggregate in a job configuration file along with an implementation of the map
and reduce functions----the reduce function is used as the combiner. Our
system provides early estimates of the final result for each of the groups present in the data set
by running the combiner
at regular intervals and passing those results to our OLA library. The
resulting estimates can then be written to HDFS or sent to some arbitrary consumer, such as
a terminal. The programmer can halt the job when the estimation
has reached a satisfactory level of confidence, \`{a} la CONTROL~\cite{control}.
\section{An Operational Model for OLA}
\subsection{Why an Operational Model?}
The first step to getting OLA to work in a distributed, MapReduce environment is to define the abstract, operational model
of the system. This model defines how data are processed in the system, and serves as a contract
between the system implementors and the statistical analysis that underlies the OLA estimation process.
%As long as the system implementors
%respect the model, the statistical analysis and estimation process will correctly analyze the potential for error and bias in the system,
%giving useful and accurate estimates and confidence bounds.
This operational model must meet two requirements:
\begin{enumerate}
\item First, the model must be amenable to statistical analysis. That is, at any point during the computation, it must be possible
to take a snapshot of the system and to use that snapshot to predict the final output of the MapReduce program.
\item Second, the model must be amenable to implementation. It should impose little
or no overhead on the system, so that there is little additional cost associated with running a MapReduce OLA program, compared to
a non-OLA MapReduce program. It should allow the actual implementation the freedom to deal with problems such as dead or slow
machines and fluctuating resource availability, as well as allowing the implementation to take
into account the physical placement of data in the system when assigning data to a CPU for processing.
Furthermore, the model must be easy to implement, requiring little in the way of change or modification
to the system it is built upon.
\end{enumerate}
As discussed in the introduction to the paper, we note that that the ``classic'' OLA operational model (where
data are simply processed in random order) is
not directly applicable, because it ignores the ``inspection
paradox.''
\subsection{Our Model for OLA Over MapReduce}
As such, we must define a somewhat more complicated operational model, whose key ideas are
as follows.
We assume that data have been organized into storage units that we refer to as \emph{blocks}. A ``block''
is nothing more than an arbitrary subset of the data in the system;
typically, we would expect a block to contain tens or hundreds of megabytes of data. We allow for the possibility that the
data may have been organized into blocks in an adversarial fashion that
the OLA software cannot control (that is, some blocks
may be very large or very small, or may contain all of the data with the greatest aggregate values).
At the time that the OLA computation begins, all of the blocks are logically ordered in a statistically random fashion, into a list that we term
the \emph{global stream} $S$. We assume the existence of a \texttt{GetNext()}
operation that internally maintains a variable $i$, which is initially set to zero. When \texttt{GetNext()} is called, $i$ is incremented
and $S_i$ (the $i$th block in the
global stream) is returned.
As in other MapReduce implementations, we assume a central \emph{scheduler} (master) whose job is creating mappers and reducers, supplying them with
data, and scheduling all of them on the physical system hardware.
When the scheduler decides that there are enough system resources to process the next block, it makes a call to \texttt{GetNext()} to obtain the
identifier for a random block. The scheduler must scheduler the block given to it by \texttt{GetNext()}; it cannot schedule the blocks out of order.
After some (possible) delay, this block is assigned to a mapper, at which time the block is
``processed'' by the mapper. This processing
may include dead time while the block is read from disk and transferred over the network, and it includes all of the necessary processing
of the actual bits and bytes in the block. Once the scheduler has assigned a block to some mapper, it may then
call \texttt{GetNext()} to obtain another unprocessed block to assign to another mapper. We assume that the scheduler only assigns one block
at-a-time to each mapper, so that the processing times are independent across each mapper.
Note that while the scheduler is not allowed to schedule blocks out of order, nor can it have more than one block that has
been obtained from \texttt{GetNext()} that has not been scheduled, it may wait as long as it wants to call \texttt{GetNext()}, and it may
also wait as long as it wants to schedule a block once it has been obtained by \texttt{GetNext()}.
This flexibility is important because it allows the scheduler to wait for a physical
mapper to become available that is located close to some physical copy of the block.
\subsection{Taking a Snapshot}
When it is time for the statistical analysis software to estimate the final answer to the query, a snapshot must be taken of the system.
This snapshot consists of all of the statistics that will be used by the software to compute its estimate.
These statistics are collected on a per block basis. For block $i$, they include:
\begin{enumerate}
\item The status of the block. This status is either \texttt{done} if the block has been fully processed, \texttt{processing} if
the block is being processed
by a physical mapper, or \texttt{unassigned} if the block has been obtained by \texttt{GetNext()}
but is waiting to be assigned to a mapper.
\vspace{-3 pt}
\item If the block's status is \texttt{done}, then for each group in the block, the snapshot contains $x_{i,j}$, which is the value obtained by aggregating
all records in the block that fall in group $j$.\footnote{For simplicity and clarity in the rest of the paper, we will drop the $j$ and
assume that all measured quantities, times, and statistics refer to a single group for which we have decided to perform estimation. The extensions to the
multi-group case are straightforward---they require collecting all of the statistics on a per-group basis---and will only serve to complicate our notation.}
\vspace{-3 pt}
\item The snapshot contains $t_{i}^{sch}$, which is the time taken to assign the block to a mapper.
If the block's status is \texttt{unassigned}, then a lower bound on $t_{i}^{sch}$ is given; this
is the time elapsed, waiting for assignment.
\vspace{-3 pt}
\item If the block's status is not \texttt{unassigned}, then the snapshot contains the IP address (machine) of the mapper, as well as where
the block was physically obtained from; this takes the value \texttt{local}
(if the block was read from the same machine as the physical mapper),
\texttt{rack} (if it was read from a different machine on the same rack),
or \texttt{dist} (if it was read from a machine on a different rack).
\vspace{-3 pt}
\item If the block's status is not \texttt{unassigned}, the snapshot also contains the time $t_{i}^{proc}$, which is the time required to process the
block. If the block's status is \texttt{processing}, then $t_{i}^{proc}$ is the time taken since the mapper was first given the block by the
scheduler.
\end{enumerate}
\vspace{5 pt}
\noindent
\textbf{Why this model? Why these statistics?}
We end this section by considering some of the intuition behind the operational model.
From an implementor's point of view, the model is compelling because it allows for a lot of freedom. The scheduler is able to process blocks on
whatever machine it chooses. It can wait to schedule a block if no appropriate machine is available.
It can ramp up the computation over time by adding more physical mappers, or ramp it down by simply not
asking for blocks. The only real constraint is that when the scheduler makes a call to \texttt{GetNext()} to obtain a new block, it \emph{must} assign
the block it is given.
From a statistical point of view, the model has been designed with one singular goal in mind: at
the time that a snapshot is taken, we wish to be able to (reasonably) view the
$x_{i}$ values associated with each of the blocks that have been received by some call to \texttt{GetNext()} as a set of
independent, identically distributed (iid) samples from a random variable
with distribution function $f(x_{i})$. As with
any finite population, by randomly permuting the population and then traversing the items in order, we produce a set of (approximately)
iid samples from a distribution where:
\begin{align}
f(x_{i}) = \frac{\sum_j I(x_{i} = x_{j})}{n} \nonumber
\end{align}
\noindent In this expression, $n$ is the size of the population and the function
$I$ returns the value $1$ if the boolean argument is true, and $0$ otherwise.
The word ``approximately'' is necessary only because of the small correlation induced by the fact that the population is finite.
If this were the entire story, then we would essentially be done: we would obtain a set of iid samples from $f(.)$, and hundreds
of years of statistical theory would
tell us exactly how to infer the various characteristics of $f(.)$ and estimate the final query
result.
Unfortunately, an
added complication is that we have the so-called ``inspection paradox'' to deal with. While the aggregate
values associated with blocks that have \emph{been obtained} by \texttt{GetNext()}
can be seen as iid samples from $f(.)$, the aggregate values associated with the blocks that have
\emph{been fully processed} and have observable values cannot. That is, it may be the
case that the time taken to schedule or to process a block is correlated with its contents. Thus, when
we take a snapshot, some non-random set of the blocks returned by \texttt{GetNext()}
may not yet have completed processing.
For example, waiting a long
to time schedule may, in fact, be the result of a block having a high aggregate value---the block may be in a busy part of the cluster and
so it is difficult to schedule, but the reason that part of the cluster is busy is that its blocks have more bytes, and hence higher
aggregate values.
To take this into account, we allow for the scheduling and processing times to be correlated with the actual aggregate value, and we assume that the
set values associated with the blocks returned by \texttt{GetNext()} are actually samples from a three-dimensional
distribution $f(x_{i}, t_{i}^{sch}, t_{i}^{proc})$.
By using this three-dimensional distribution function, it
will allow us to make predictions about the $x_{i}$ values that we have not seen, but for
which we have information about $t_{i}^{sch}$ and $t_{i}^{proc}$, and hence we can deal with the inspection paradox
in a principled fashion. For example, if we have a particular block that has been processed for 125 seconds, where
it took 5 seconds to schedule, \emph{can} correctly view $x_i$ as a random sample from the distribution $f(x_i| t_{i}^{sch} = 5,
t_{i}^{proc} \geq 125)$, thereby neutralizing the inspection paradox.
This is precisely why all of the various timings are collected during the snapshot:
they must be taken into account when estimates and confidence bounds are produced.
While this is a fairly thorough introduction to the intuition behind the model and the associated statistical considerations,
the actual estimation process will be described in detail
subsequently.
\section{Implementing the Model}
In this section we describe our implementation of the OLA model in
Hyracks~\cite{hyracks}. Hyracks is a new open source project that supports map
and reduce operations, along with higher level relational operations i.e.,
filter (selection), projection, and join. The Hyracks architecture is similar
to Hadoop -- it has a single master node for submitting jobs (queries) and
housing the task scheduler, which executes tasks on worker nodes running in the
cluster. Hyracks tasks support read and write operations in HDFS, which we
leverage to store the input to the map tasks and the output of the reduce
tasks. Like Hadoop, when a client submits a MapReduce job, Hyracks assigns a
single map task to a given block in the input data, and creates a configurable
number of reduce tasks that are assigned specific groups using some
partitioning function.
We modified the Hyracks implementation in two ways. First, we created a single
queue containing the blocks in the input data. The order of the blocks in the
queue is uniformly shuffled using the \ol{java.util.Collections.shuffle}
routine from the Java Standard Library~\cite{java-stl}. When Hyracks schedules
a map task, it assigns the map task the current block at the head of the queue.
The map task's execution time includes the time to obtain its assigned block
from HDFS, the execution of the map function on each input record, and the
execution of the combiner on the complete map function output. In this work we
ignore performance issues involving locality; although we do account for block
locality in our model. In future work, we plan on investigating locality
scheduling techniques that use multiple (shuffled) queues -- scheduling from
the queue with the optimal locality placement -- and Delay
Scheduling~\cite{delay-scheduling}.
\begin{table}
\centering
\begin{tabular}{|l|l|} \hline
\textit{Field} & \textit{Description} \\ \hline\hline
\texttt{block\_id} & The block identifier \\ \hline
\texttt{block\_sch} & The block scheduling time \\ \hline
\texttt{block\_loc} & The block locality \\ \hline
\texttt{mapper\_ip} & Map task IP address \\ \hline
\texttt{mapper\_start} & Map task start time \\ \hline
\texttt{mapper\_end} & Map task finish time \\ \hline
\texttt{est\_start} & The estimation start time \\ \hline
\end{tabular}
\caption{Meta-data associated with the map task output.}
\label{tbl:metadata}
\end{table}
Our second modification involves running the estimator in the reduce task
during the shuffle phase. In the shuffle phase, the reduce task is
continuously receiving the output of completed map tasks. The output of a map
task includes a {\em data} file containing the groups assigned to the reduce
task and a {\em meta-data} file with the fields listed in
Table~\ref{tbl:metadata}. If the map output contains no groups for a given
reduce task then an empty data file is given along with a complete meta-data
file. The meta-data contains the information relevant to a single block being
processed by a map task. Specifically, it identifies the block (\texttt{block\_id}),
the time it took to schedule the block (\texttt{block\_sch}), and the block locality
(\texttt{block\_loc}) relative to the map task execution: machine-local, rack-local, or
distant. Also included is the map task IP address (mapper\_ip), start time
(\texttt{mapper\_start}) and end time (\texttt{mapper\_end}). Finally, the meta-data indicates
time when the estimator is called on the reduce task (\texttt{est\_start}). The reduce
task executes the estimator when it receives a new map output. The location of
all data and meta-data files received thus far is given to the estimator when
it is called. When the estimator completes, its output can be written to HDFS
or forwarded to a downstream operator.
One issue that caused some heartache for us during the debugging of our
system is that a reasonably synchronized, global time
must be maintained for the system. Since block processing times are typically on the order of minutes, this
synchronization need only be accurate to within a few seconds. But a significant drift can cause problems.
The reason is that when a block arrives at the reducer, the total processing time
is computed by subtracting the time that the block is received from the
time that estimation began. Likewise, at estimation
time, the reduce task performing the estimation must subtract the start time
for the block from the current time to obtain a lower bound on the total processing
time for the block. Due to the way we implemented this originally, those lower bounds
were not consistent with the total processing time recorded for each block---the bounds tended to be much too large. Since a
correlation between processing time and aggregate value had been observed, the result
was that at estimation time the system ``guessed'' that the aggregate value for these unfinished blocks was very large, and significant over-estimates
routinely occurred.
\section{Estimation}
In this section, we consider how estimates and confidence bounds for those estimates
can be obtained. As intimated previously, this is a challenging problem, as we
must take into account processing times as well as observed aggregate values in order to circumvent the inspection paradox.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\subsection{Overview}
We will apply a Bayesian approach for estimation \cite{OHaganF04}; for brevity, this section will assume that
the reader has some very basic familiarity with Bayesian statistics. The Bayesian approach
has several obvious benefits for this particular problem. Most significant is the fact
that the inspection paradox ``goes away'' under the Bayesian approach if one
takes into account the time spent waiting for each block to be processed as observed data.
In standard Bayesian fashion, we will first describe a stochastic, parametric process that
we imagine was used to produce the ``observed'' as well as the ``hidden'' data.
The ``observed data'' will collectively be referred using variable $\textbf{X}$. This set includes all of the known
aggregate values and processing times. Our generative process will also produce a set of unobserved variables collectively
referred to as $\Theta$. $\Theta$ includes any data that is unobserved (for example,
the processing time for a block that has not yet finished)---this data is collectively referred to as \textbf{Y}---as well as any unknown parameters required by the generative process (for example, the mean aggregate value per block).
In Bayesian fashion, we will then attempt to infer the distribution $P(\Theta|\textbf{X})$, which is referred to as a \emph{posterior distribution} for $\Theta$.
Then, given $\textbf{X}$ as
well as $P(\Theta|\textbf{X})$, it is possible to obtain a posterior distribution over the actual query result, which can be used to obtain confidence bounds that
are reported to the user.
Note that our development is directly applicable only
to \texttt{SUM} and \texttt{COUNT} queries, which are both evaluated by simply summing $x_i$ values (in the \texttt{SUM} case,
$x_i$ will contain the total aggregate value for the block, and in the \texttt{COUNT} case, $x_i$ will contain the tuple count for the block). Extensions to other
aggregates such as \texttt{VARIANCE} are straightforward; in general they require that we maintain multiple moments for each block, such as the sum-of-squared
tuple values in the case of \texttt{VARIANCE}.
\subsection{Generative process}
To obtain the data that we must analyze to produce estimates
and confidence bounds, we imagine that the following steps are repeated, once for each of the $n$ blocks
in the system:
\begin{enumerate}
\item $\textbf{Z}_i \sim \textnormal{Normal}(\mu, \Sigma)$
\item $(\textbf{X}_i, \textbf{Y}_i) \leftarrow \textnormal{PostProcess}(\textbf{Z}_i)$
\end{enumerate}
\noindent ``$\sim$'' should be read as ``is sampled from''.
After this process has been repeated $n$ times (once for each block)---our goal is then to infer the posterior distribution for $\Theta$ using
\textbf{X}.
This process requires some additional explanation. We begin by describing the vector $\textbf{Z}_i$.
If there are $m$ machines being used to execute a query,
we imagine that associated with the $i$th block is a vector $\textbf{Z}_i$ with $3m + 2$ entries, which contains both
observed and hidden data.
$\textbf{Z}_i$ takes the form:
\begin{align}
\textbf{Z}_i = \langle x_i, t_i^{sch}, &t_{i,1}^{loc}, t_{i,1}^{rack}, t_{i,1}^{dist}, \nonumber \\
&t_{i,2}^{loc}, t_{i,2}^{rack}, t_{i,2}^{dist}, ... \nonumber \\
&t_{i,m}^{loc}, t_{i,m}^{rack}, t_{i,m}^{dist}\rangle \nonumber
\end{align}
\noindent The vector has the following components:
\begin{enumerate}
\item $x_i$ is the value that is obtained when the block is aggregated.
\vspace{-3 pt}
\item $t_i^{sch}$ is the time required to schedule the block, once it has first been selected for scheduling.
\vspace{-3 pt}
\item $t_{i,j}^{loc}$ is the time taken to actually process the block by a mapper on machine $j$, given
that the block is to be read locally from machine $j$.
\vspace{-3 pt}
\item $t_{i,j}^{rack}$ is the time taken to process the block by a mapper on machine $j$, given
that the block is to be read from a machine on the same rack as machine $j$.
\vspace{-3 pt}
\item $t_{i,j}^{dist}$ is the time taken to process the block by a mapper on machine $j$,
given that the block must be read from a machine on a different rack.
\end{enumerate}
Note that $\textbf{Z}_i$ has $(3m + 2)$ dimensions, rather than the three dimensions one might expect after reading Section 3 of the paper.
The reason is that we do not have a single processing time distribution; rather, we have $3m$ such distributions, with three distributions for
each machine, depending on where the actual data comes from. This provides for a very fine-grained model, where processing times can differ
from machine to machine.
Also note that we assume that $\textbf{Z}_i$ is normally distributed, with mean vector $\mu$ and covariance
matrix $\Sigma$.
At first glance, assuming normality may seem questionable, but in practice this is not a particularly significant assumption
because we are aggregating over many $\textbf{Z}_i$ values---one for each block. Assuming normality here is similar to
appealing to the Central Limit Theorem \cite{MLE} when applying more traditional, non-Bayesian methods.
Finally, note that in
step (2) of the generative process, $\textbf{Z}_i$ is ``post-processed'' to actually produce the observable data $\textbf{X}_i$ that is associated with the $i$th
block. This removes the data from $\textbf{Z}_i$ that could not/should not be observed, and puts this unobservable data
into $\textbf{Y}_i$.
For example, given that each block is processed only once, no one is ever going to observe both $t_{i,1}^{loc}$ and $t_{i,5}^{loc}$ for a given block---we might
imagine that both values exist, but they will never be observed together. Hence, $\textbf{X}_i$ will never contain both of these values, and one or the
other must end up in $\textbf{Y}_i$.
In fact, there are four different ways in which the ``post-processing'' will be performed, depending upon the state of block $i$ at the time that the
estimation is performed:
\begin{description}
\item[$i$ in case 1: (No information)] $\textbf{X}_i = \langle \rangle$
\newline In this case, the block as not been chosen by the scheduler and so no information is available. $\textbf{X}_i$ is empty, and
$\textbf{Y}_i = \textbf{Z}_i$.
\vspace{-3 pt}
\item[$i$ in case 2: (Scheduling)] $\textbf{X}_i = \langle \lfloor t_i^{sch} \rfloor\rangle$
\newline In this case, the block is at the head of the scheduler's queue and is waiting for a
map task to be assigned to it. Thus, we have a lower bound on the scheduling time, denoted by $\lfloor t_i^{sch} \rfloor$.
This is simply the amount of time the block has been waiting to be scheduled. Again in this case, $\textbf{Y}_i = \textbf{Z}_i$.
\vspace{-3 pt}
\item[$i$ in case 3: (Scheduled and processing)] $\textbf{X}_i = \langle t_i^{sch}, \lfloor t_{i,W_i}^{L_i} \rfloor \rangle$
\newline In this case, a map task has been assigned to the block and processing has begun.
Thus, we have access to an exact value for $t_i^{sch}$. We also have
$W_i$, which is the identity of the machine on which the block is being processed, and $L_i$, which is the locality
information for the block ($loc$, $rack$, or $dist$). Finally, we know
$\lfloor t_{i,W_i}^{L_i} \rfloor$, which is a lower bound on the processing time for the block---one can view
$t_{i,W_i}^{L_i}$ as being equivalent to $t_{i}^{proc}$ from Section 3. In case 3,
$\textbf{Y}_i$ contains everything in $\textbf{Z}_i$ except for $t_i^{sch}$.
\vspace{-3 pt}
\item[$i$ in case 4: (Scheduled and processed)] $\textbf{X}_i = \langle x_i, t_i^{sch}, t_{i,W_i}^{L_i} \rangle$
\newline In this case, the map task has finished processing the data and the aggregate value has finally arrived at the
reducer. Hence, in addition to $W_i$ and $L_i$, we know exact values for the scheduling time, the processing time,
and the aggregate value for the block. Here, $\textbf{Y}_i$ contains everything in $\textbf{Z}_i$ except for the
three values in $\textbf{X}_i$.
\end{description}
\subsection{Prior Distributions}
To make our model fully Bayesian, we must supply priors on $\mu$ and $\Sigma$. In our implementation, each $\mu_k \sim$ InvGamma$(1, 1)$ (where
$k$ refers to the $k$th dimension in $\textbf{Z}_i$). The inverse Gamma distribution is a standard,
uninformative prior for values that must be non-negative---it makes sense
to have non-negative means for all of the
time values in the $\textbf{Z}_i$ vector. It will also usually make sense to have a non-negative mean for $x_i$; if not, then another suitable, uninformative
prior can be used.
Handling the covariance matrix $\Sigma$ is a bit trickier. The standard prior distribution for a covariance matrix is the Inverse Wishart distribution, but
we do not use it because the Inverse Wishart is a multi-dimensional distribution that is not
easily factorizable (that is, it is not easy to write the marginal distribution for each dimension). This is a bit problematic for our application as we will discuss subsequently.
Thus, we let $\sigma_k \sim$ InvGamma$(1, 1)$, where $\Sigma_{k,k} = \sigma^2_k$. Then, we assume that the following process is used to generate the rest
of $\Sigma$:
\vspace{10 pt}
\noindent \textbf{while} (\texttt{true})
\noindent
\verb" "\textbf{for} $k_1 = 1$ to $(3m + 2)$ \textbf{do}:
\noindent
\verb" "\textbf{for} $k_2 = k_1 + 1$ to $(3m + 2)$ \textbf{do}:
\vspace{5 pt}
\noindent
\verb" "$\rho_{k_1,k_2} \sim \textnormal{GenBeta}(-1, 1, 1, 1)$
\noindent
\verb" "$\Sigma_{k_1,k_2} = \Sigma_{k_2,k_1} = \rho_{k_1,k_2} \times \sigma_{k_1} \times \sigma_{k_2}$
\vspace{5 pt}
\noindent
\verb" "\textbf{if} $\Sigma$ is positive-definite, \textbf{break}
\vspace{10 pt}
\noindent Here, $\textnormal{GenBeta}(-1, 1, 1, 1)$ refers to a generalized Beta$(1, 1)$ distribution, stretched to cover
the range from $-1$ to $1$ (rather than the usual $0$ to $1$.
What this process does is to essentially sample a correlation $\rho$ for each of the pairs of variables in $\textbf{Z}_i$,
and to then check whether a valid covariance matrix has been obtained (one that is positive definite).
If it has not, then whole process is repeated again. The PDF for $\Sigma$ can then be written as:
\begin{align}
P(\Sigma) \propto
\begin{cases}
0 \text{ if $\Sigma$ is not positive-definite}\\
\left (\begin{array}{c}
\prod_k \textnormal{InvGamma}(\sigma_k|1, 1) \\
\times \prod_{k_1,k_2} \textnormal{GenBeta}(\rho_{k_1,k_2}|-1, 1, 1, 1)
\end{array} \right) \text{otherwise}
\end{cases} \nonumber
\end{align}
\subsection{Posterior Distribution}
In this subsection, we tackle the problem of obtaining a formula for the desired posterior
distribution, $P(\Theta|\textbf{X})$. Recall that $\textbf{X} = \bigcup_i \{\textbf{X}_i\}$,
and the unobservable data set $\Theta$ contains $\textbf{Y} = \bigcup_i \{\textbf{Y}_i\}$, as well as the normal
parameters $\mu$ and $\Sigma$.
From elementary probability, we know that:
$$P(\Theta|\textbf{X}) = \frac{P(\textbf{X}|\Theta) P(\Theta)}{P(\textbf{X})} $$
\noindent This means that there are three quantities that we must derive expressions for: $P(\textbf{X}|\Theta)$, $P(\Theta)$, and $P(\textbf{X})$.
We deal with $P(\textbf{X}|\Theta)$ first. From the generative process, we know that $P(\textbf{X}|\Theta) = \prod_i P(\textbf{X}_i|\Theta)$. We can
easily write an expression for each $P(\textbf{X}_i|\Theta)$, which will depend upon the case that holds for block $i$:
\begin{description}
\item[$i$ in case 1: (No information)] $P(\textbf{X}_i|\Theta) =$ $P(\langle \rangle|\Theta) = 1$ since $\textbf{X}_i$ is empty.
\vspace{-3 pt}
\item[$i$ in case 2: (Scheduling)] Here, we have only a lower bound on the scheduling time. Thus,
$P(\textbf{X}_i|\Theta) =$ $P(\langle \lfloor t_i^{sch} \rfloor\rangle|\Theta) = 1$ if $t_i^{sch} \geq \lfloor t_i^{sch} \rfloor$, and $0$ otherwise since
this is impossible.
\vspace{-3 pt}
\item[$i$ in case 3: (Scheduled and processing)] In this case, $P(\textbf{X}_i|\Theta) =$ $P(\langle t_i^{sch}, \lfloor t_{i,W_i}^{L_i} \rfloor \rangle|\Theta) =$ Normal$(t_i^{sch} | \mu, \Sigma, \textbf{Y}_i)$ if $t_{i,W_i}^{L_i} \geq \lfloor t_{i,W_i}^{L_i} \rfloor$, and
$0$ otherwise since this is again impossible.
\vspace{-3 pt}
\item[$i$ in case 4: (Scheduled and processed)] Here, we evaluate a normal distribution:
$P(\textbf{X}_i|\Theta) =$ $P(\langle x_i, t_i^{sch}, t_{i,W_i}^{L_i} \rangle|\Theta) =$ \\ Normal$(x_i, t_i^{sch}, t_{i,W_i}^{L_i} | \mu, \Sigma, \textbf{Y}_i)$.
\end{description}
\noindent Now, we move onto deriving an expression for $P(\Theta)$. From the last few subsections, we have:
\begin{align}
P(\Theta) &= P(\textbf{Y}| \mu, \Sigma) P(\mu) P(\Sigma) \nonumber \\
&= P(\mu) P(\Sigma) \prod_i P(\textbf{Y}_i| \mu, \Sigma) \nonumber \\
&= P(\Sigma) \prod_j \textnormal{InvGamma}(\mu_j|1, 1) \prod_i \textnormal{Normal}(\textbf{Y}_i| \mu, \Sigma) \nonumber
\end{align}
\noindent where an explicit formula for $P(\Sigma)$ was given previously.
This gives us expressions for $P(\textbf{X}|\Theta)$ and $P(\Theta)$. In standard Bayesian fashion, we ignore $P(\textbf{X})$, which would
be very difficult to compute since it would involved integrating out $\Theta$.
But since $P(\textbf{X})$ does not depend upon
$\Theta$, it is merely a normalizing constant that is necessary for the total mass of $P(\Theta|\textbf{X})$ to be
one, and is not needed to compare the relative merits of candidate $\Theta$ values.
\subsection{Putting It All Together}
Since our goal is to produce estimates and confidence bounds for the actual query result, we are not interested
in the posterior distribution $P(\Theta|\textbf{X})$ for its own sake. Rather, we will use $P(\Theta|\textbf{X})$
to produce estimates and confidence bounds for the answer.
To describe how this is done, note that given a possible value for $\Theta$---combined with the visible data \textbf{X}---
we have access to each and every $x_i$ value in the database. Thus, given a particular $\Theta$, it is very easy to compute
the query answer as:
$$\sum_i x_i |_{\textbf{X}_i, \Theta}$$
Then by integrating $P(\Theta|\textbf{X})$ over all possible $\Theta$, we obtain various
statistics describing the eventual query result. For example, the following gives us the expected value of the query result:
$$\int_{\Theta} P(\Theta|\textbf{X}) \sum_i x_i |_{\textbf{X}_i, \Theta} d\Theta$$
\noindent And we can obtain the lower end $l$ for a $95$\% confidence bound on the query result by computing $\Lambda$ and $l$ so that:
\begin{align}
&\int_{\Theta \in \Lambda} P(\Theta|\textbf{X}) d\Theta = 0.025 \textnormal{ where } \nonumber \\
&\textbf{max}_{\Theta \in \Lambda} \{ \sum_i x_i |_{\textbf{X}_i, \Theta}\} \leq l \textnormal{ and } \textbf{min}_{\Theta \in \bar{\Lambda}} \{ \sum_i x_i |_{\textbf{X}_i, \Theta}\} \geq l \nonumber
\end{align}
\noindent The upper end could be computed in a similar fashion.
Unfortunately, performing this sort of computation exactly is difficult. The difficulty is often circumvented using
so-called ``Markov Chain Monte Carlo'' (MCMC) methods \cite{Robert} that sample directly from a distribution such as
$P(\textbf{X}|\Theta)$.
In our case, we apply a particular MCMC method called a \emph{Gibbs sampler} to
the problem \cite{Gibbs}.
The samples obtained from a Gibbs sampler are easily used to compute expected value and confidence bounds. For example, we can run the sampler to produce
several hundred candidate $\Theta$ values, and then average the associated query results---this is equivalent to the expected value
computation described above.
Cutting off the top 2.5\% and the bottom 2.5\% of the set of query results, and then taking the highest and lowest remaining results, gives 95\%
confidence bounds on the query answer.
\begin{comment}
\subsection{Simplifying the Model}
We note that the formula for $P(\Theta|\textbf{X})$ can
be simplified significantly by simply ignoring extraneous or useless hidden variables in $\Theta$. This has the effect of greatly increasing the computational efficiency of the associated Gibbs sampler.
Since our goal is only to use the
various $x_i$'s to compute the query result distribution, a variable $y \in \Theta$ can be considered ``extraneous'' if (a) $y$ is not some $x_i$,
and (b) $y$ is easily integrated out to obtain $P(\Theta - \{y\}|\textbf{X}) =$ $\int_y P(\Theta|\textbf{X})dy.$
It turns out that there are three categories of extraneous variables that are easily ignored.
First, for any $i$, we can ignore a variable from $\textbf{Y}_i$ as long as (a) the variable in question is not $x_i$, and (b) the function
$P(\textbf{X}_i|\Theta)$ does not depend on the variable. To describe precisely which variables can be ignored, we consider again the four
possible states of block $i$:
\begin{description}
\item[$i$ in case 1: (No information)] Here, $P(\textbf{X}_i|\Theta) = 1$. Thus, we can ignore everything in $\textbf{Y}_i$ except for $x_i$.
After ``integrating out'' everything that has been removed, $P(\textbf{Y}_i|\mu, \Sigma) =$ \\Normal$(x_i|\mu, \Sigma)$.
\item[$i$ in case 2: (Scheduling)] Finally, $P(\textbf{X}_i|\Theta) = 1$ if $t_i^{sch} \geq \lfloor t_i^{sch} \rfloor$, and $0$ otherwise. Thus, we
can ignore everything in $\textbf{Y}_i$ except for $x_i$ and $t_i^{sch}$, and
$P(\textbf{Y}_i|\mu, \Sigma) =$ Normal$(x_i, t_i^{sch}|\mu, \Sigma)$.
\item[$i$ in case 3: (Scheduled and processing)] In this case, $P(\textbf{X}_i|\Theta) =$ Normal$(t_i^{sch} | \mu, \Sigma, \textbf{Y}_i)$ if $t_{i,W_i}^{L_i}
\geq \lfloor t_{i,W_i}^{L_i} \rfloor$, and
$0$ otherwise. Here we
can ignore everything in $\textbf{Y}_i$ except for $x_i$ and $t_{i,W_i}^{L_i}$, and $P(\textbf{Y}_i|\mu, \Sigma) =$ Normal$(x_i, t_{i,W_i}^{L_i}|\mu, \Sigma)$.
\item[$i$ in case 4: (Scheduled and processed)] Here
$P(\textbf{X}_i|\Theta) =$ \\Normal$(x_i, t_i^{sch}, t_{i,W_i}^{L_i} | \mu, \Sigma, \textbf{Y}_i)$. In this case, we ignore everything in $\textbf{Y}_i$, and
$P(\textbf{Y}_i|\mu, \Sigma) = 1$.
\end{description}
In addition, any $\mu_k$, $\sigma_k$, or $\rho_{k_1, k_2}$ associated with a value that was never observed in any $\textbf{X}_i$ can be ignored.
For example, if machine
$j$ was never used in a computation, then all parameters associated with this machine can be dropped.
Lastly, if two timings (or could never) be observed together, then the correlation between them can be ignored. Since $x_i$ and $t_i^{sch}$ are the first two values in
$\textbf{Z}_i$, and any machine-specific processing time that is observed can be observed with only that particular pair of values, we can ignore every correlation not of the form $\rho_{1, k}$ or $\rho_{2, k}$.
These latter two cases are why we chose a trivially factorizable prior for
$\Sigma$---to ``integrate out'' an unused entry, row, or column of the prior on the covariance matrix, we simply ignore the terms associated
with the unwanted variables when computing
$P(\Sigma)$.
\end{comment}
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\begin{figure*} [t]
\centering
\includegraphics[scale = .9, angle = -90]{Fig1}
\vspace{-7 pt}
\caption{Posterior query result distribution for number of Wikipedia page hits over the English language, at various
query completion percentages, using
both a randomized and arbitrary block ordering. The actual query result is a vertical black line.}
\vspace{-8 pt}
\label{fig:1}
\end{figure*}
\begin{figure*} [t]
\centering
\includegraphics[scale = .9, angle = -90]{Fig2}
\vspace{-5 pt}
\caption{Identical to Figure 1, except for the French language.}
\vspace{-8 pt}
\label{fig:2}
\end{figure*}
\begin{figure*} [t]
\centering
\includegraphics[scale = .9, angle = -90]{Fig3}
\vspace{-5 pt}
\caption{Posterior query result language for the French distribution, at various query completion percentages, taking into account and
ignoring correlation between aggregate value and processing time.}
\vspace{-8 pt}
\label{fig:3}
\end{figure*}
\section{Experiments}
In this section, we describe a set of experiments over the software we have developed. Our experiments are designed to answer the following questions:
Can the confidence bounds that our system reports be trusted? How important is it to take into account the correlation between processing time and data value in both synthetic and real data? How important is choosing blocks in a statistically randomized order?
In a realistic setting, is the system able to produce accurate results quickly?
\subsection{Experiment One}
\noindent \textbf{Basic Setup.} In the first experiment, we run our version of Hyracks with OLA over six months of
data from Wikipedia page traffic
data set (available at http://aws.amazon.com/datasets/4182), with the simple goal of counting the number of Wikipedia page hits on a per-language basis
over those six months. Six months of Wikipedia data take up about 220GB (compressed), and are stored in 3,960 blocks.
We run our software on an eleven node cluster, with one master node and
ten slaves. Each machine has four disks, four cores running at 2.3 GHz, and 12GB of RAM. We use 80 mappers and ten reducers (with one reducer
available for each of the ten languages that are to be counted). The entire MapReduce process takes approximately 46 minutes to run to completion.
To demonstrate the relative importance of the different components of our software, we run three different versions of our OLA software. The first
uses every method described in the paper. In the second version, randomization of blocks is not performed by the scheduler, so blocks are scheduled in
arbitrary (but non-random) order. In the third version, randomization is used, but the (possible) correlation between processing/scheduling time and the aggregate
value is not taken into account by the system, leaving the software vulnerable to the ``inspection paradox'' described earlier.
\vspace{5 pt}
\noindent \textbf{Results.} A subset of the observed results are given in Figures 1, 2, and 3. Figure 1 shows the posterior distribution of possible
query results computed by our system, for the English language, at various times during the MapReduce task (we show results after 10\% of the task is complete,
after 20\% is complete, after 30\%, and so on). Two posterior distributions are plotted: one computed running the first version (all featues from the paper),
and a second computed running the second version (no randomization). Each plot also shows the true query result for the English language.
Figure 2 is similar to Figure 1, but it shows the results for the French language.
Figure 3 also shows results for the French language, but it shows the computed posterior distribution for the query result after a much smaller portion of the
task has completed: 1\%, 2\%, 3\%, and so on. This plot also shows both version one of our software, and version three (randomization, but no correlation).
\vspace{5 pt}
\noindent \textbf{Discussion.} It is clear that without randomization, severe bias is possible, and so confidence bounds
obtained without a random scheduling order are useless. With randomization, the confidence bounds from Figures 1 and 2 seem remarkably accurate. It is significant
that the bounds obtained
are quite narrow, very quickly.
Take the English language. After 10\% of the blocks have been processed, the bounds go from approximately $4.1$ to $4.4 \times 10^{10}$, which
represents a possible error of only $\pm3\%$. For many applications, it may be acceptable to simply kill the computation with this level of accuracy.
In general, convergence could be made to happen even more quickly by increasing the number of blocks used to store
the same data set, though this could have a negative effect on the overall processing time of the MapReduce job.
Even for this particular data set (where the correlation between processing time and aggregate value is quite weak) there is still a clear
benefit to taking into account the correlation, particularly when only a very small fraction of the data has been processed. Consider the plots corresponding
to finishing 3\% and 4\% of the MapReduce job. Without correlation, the posterior distribution almost totally misses the actual query result. Taking into account
the correlation, the distribution is neatly bisected by the correct result.
\subsection{Experiment Two}
\noindent \textbf{Basic Setup.} This experiment is somewhat similar to the first, except that our goal is to try to determine, in a systematic fashion, how accurate
the computed posterior distribution is when used to compute 95\% confidence bounds. Since testing accuracy requires many, many repetitions of the MapReduce task,
instead of actually
running the task in a real cluster, we use a simulator. In our simulation, a random aggregate value is associated with each block, and random processing times
are associated with each block as well; the correlation between aggregate value and processing time is set to be 0.7. Under the same setup as above (80 mappers, 3,960
blocks), we repeat the MapReduce and
the estimation process 100 times. Every time that the estimation is re-run, we consider a several different task-completion percentages (1\% done,
2\% done, 3\% done, and so on). At each task-completion percentage, we use the computed posterior distribution to obtain 95\% confidence bounds by ``chopping off'' the
top 2.5\% and bottom 2.5\% of the posterior distribution. We then determine whether or not the actual query result is within the 95\% confidence bounds, and compute
the fraction of the time that the query result is not within the 95\% confidence bounds over each of the 100 repetitions. If our estimation process worked perfectly,
then the 95\% confidence bounds would cover the actual answer 95\% of the time, with a 5\% error rate. This whole process is repeated twice: once
using the full estimation process, and a second time ignoring the possibility of correlation between processing time and aggregate value.
\vspace{5 pt}
\noindent \textbf{Results.} The results are given in Table 2, and are mostly self-explanatory. For each of the listed task-completion percentages, the fraction of
``incorrect'' 95\% confidence intervals is given, for both of the software versions.
\begin{table}[t]
\begin{center}
\caption{Fraction of confidence bounds that are ``incorrect''.}
\vspace{10 pt}
\begin{tabular}
{| c || c | c | c | c | c | c | c || } \hline
& \multicolumn{7}{c ||}{Percentage of MapReduce task complete} \\ \hline
& 2\% & 3\% & 4\% & 5\% & 10\% & 20\% & 30\% \\ \hline
w corr & .03 & .01 & .06 & .05 & .05 & .02 & .05 \\
w/o corr & .70 & .63 & .62 & .61 & .37 & .22 & .13 \\ \hline
\end{tabular}
\end{center}
\vspace{-10 pt}
\end{table}
\vspace{5 pt}
\noindent \textbf{Discussion.} As can clearly be seen, the confidence bounds computed when taking into account
correlation are accurate, coming very close to the expected 5\% throughout query execution. On the other hand, the results obtained without
taking into account the correlation are very poor, particularly when only a small fraction of the MapReduce task has been completed. This mirrors
the results shown in Figure 2, but under much more extreme circumstances.
\section{Related Work}
OLA has been studied for some time in the context of classic SQL databases \cite{OnlineAgg, RippleJoin, DBO}.
The only other work to consider OLA in the context of MapReduce was the
Hadoop Online Prototype (HOP) system. HOP added a second execution plan to the
Hadoop architecture that allowed pipelining in the map and reduce Hadoop
operators~\cite{mapreduce-online}. Pipelining operators is a prerequisite to
any OLA query plan, and a pipelining plan was shown to perform better in cases
involving short jobs. HOP supported OLA queries by executing reduce tasks at
data dependent intervals e.g., on 10\%, 20\%,\ldots,90\% of the data. The
query estimate assumed a uniform sample of the input data but made no
modifications to enforce this in the Hadoop scheduler. This led to significant
error in their estimates. To compensate, the authors modified the query to
contain extra parameters that indicated how many samples of a particular
aggregate group were present, and scaled the estimate accordingly.
%This is
%deeply flawed in a number of ways. For instance, it requires knowledge of the
%total number of samples in a given aggregate group e.g., the total number
%samples for a given hour in the Wikipedia data set. Furthermore, it places the
%onus on the developer to add this extra information to the query. In contrast,
%our system does not require any a priori knowledge of the data nor does it
%force the programmer to add extra query parameters. Our model enforces a
%uniform sample of the input data in the scheduler and accounts for biases that
%arise in a data center environment.
\bibliographystyle{plain} % (uses file "plain.bst")
\bibliography{main} % expects file "main.bib"
\end{document}