2007-05-31 21:23:42 +00:00
|
|
|
\documentclass[10pt]{article}
|
|
|
|
\usepackage{fancyvrb}
|
|
|
|
\usepackage{url}
|
|
|
|
\DefineVerbatimEnvironment{lua}{Verbatim}{fontsize=\small,commandchars=\@\#\%}
|
|
|
|
\DefineVerbatimEnvironment{C}{Verbatim}{fontsize=\small,commandchars=\@\#\%}
|
|
|
|
\DefineVerbatimEnvironment{mime}{Verbatim}{fontsize=\small,commandchars=\$\#\%}
|
|
|
|
\newcommand{\stick}[1]{\vbox{\setlength{\parskip}{0pt}#1}}
|
|
|
|
\newcommand{\bl}{\ensuremath{\mathtt{\backslash}}}
|
2007-10-11 21:16:28 +00:00
|
|
|
\newcommand{\CR}{\texttt{CR}}
|
|
|
|
\newcommand{\LF}{\texttt{LF}}
|
|
|
|
\newcommand{\CRLF}{\texttt{CR~LF}}
|
|
|
|
\newcommand{\nil}{\texttt{nil}}
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
\title{Filters, sources, sinks, and pumps\\
|
|
|
|
{\large or Functional programming for the rest of us}}
|
|
|
|
\author{Diego Nehab}
|
|
|
|
|
|
|
|
\begin{document}
|
|
|
|
|
|
|
|
\maketitle
|
|
|
|
|
|
|
|
\begin{abstract}
|
2007-10-11 21:16:28 +00:00
|
|
|
Certain data processing operations can be implemented in the
|
|
|
|
form of filters. A filter is a function that can process
|
|
|
|
data received in consecutive invocations, returning partial
|
|
|
|
results each time it is called. Examples of operations that
|
|
|
|
can be implemented as filters include the end-of-line
|
|
|
|
normalization for text, Base64 and Quoted-Printable transfer
|
|
|
|
content encodings, the breaking of text into lines, SMTP
|
|
|
|
dot-stuffing, and there are many others. Filters become
|
|
|
|
even more powerful when we allow them to be chained together
|
|
|
|
to create composite filters. In this context, filters can be
|
|
|
|
seen as the internal links in a chain of data transformations.
|
|
|
|
Sources and sinks are the corresponding end points in these
|
|
|
|
chains. A source is a function that produces data, chunk by
|
|
|
|
chunk, and a sink is a function that takes data, chunk by
|
|
|
|
chunk. Finally, pumps are procedures that actively drive
|
|
|
|
data from a source to a sink, and indirectly through all
|
|
|
|
intervening filters. In this article, we describe the design of an
|
|
|
|
elegant interface for filters, sources, sinks, chains, and
|
|
|
|
pumps, and we illustrate each step with concrete examples.
|
2007-05-31 21:23:42 +00:00
|
|
|
\end{abstract}
|
|
|
|
|
|
|
|
\section{Introduction}
|
|
|
|
|
|
|
|
Within the realm of networking applications, we are often
|
2007-10-11 21:16:28 +00:00
|
|
|
required to apply transformations to streams of data. Examples
|
2007-05-31 21:23:42 +00:00
|
|
|
include the end-of-line normalization for text, Base64 and
|
|
|
|
Quoted-Printable transfer content encodings, breaking text
|
|
|
|
into lines with a maximum number of columns, SMTP
|
|
|
|
dot-stuffing, \texttt{gzip} compression, HTTP chunked
|
|
|
|
transfer coding, and the list goes on.
|
|
|
|
|
|
|
|
Many complex tasks require a combination of two or more such
|
|
|
|
transformations, and therefore a general mechanism for
|
|
|
|
promoting reuse is desirable. In the process of designing
|
2007-10-11 21:16:28 +00:00
|
|
|
\texttt{LuaSocket~2.0}, we repeatedly faced this problem.
|
|
|
|
The solution we reached proved to be very general and
|
|
|
|
convenient. It is based on the concepts of filters, sources,
|
|
|
|
sinks, and pumps, which we introduce below.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
\emph{Filters} are functions that can be repeatedly invoked
|
|
|
|
with chunks of input, successively returning processed
|
|
|
|
chunks of output. More importantly, the result of
|
|
|
|
concatenating all the output chunks must be the same as the
|
2007-05-31 22:27:40 +00:00
|
|
|
result of applying the filter to the concatenation of all
|
2007-05-31 21:23:42 +00:00
|
|
|
input chunks. In fancier language, filters \emph{commute}
|
2007-10-11 21:16:28 +00:00
|
|
|
with the concatenation operator. More importantly, filters
|
|
|
|
must handle input data correctly no matter how the stream
|
|
|
|
has been split into chunks.
|
|
|
|
|
|
|
|
A \emph{chain} is a function that transparently combines the
|
|
|
|
effect of one or more filters. The interface of a chain is
|
|
|
|
indistinguishable from the interface of its component
|
|
|
|
filters. This allows a chained filter to be used wherever
|
|
|
|
an atomic filter is accepted. In particular, chains can be
|
2007-05-31 22:27:40 +00:00
|
|
|
themselves chained to create arbitrarily complex operations.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
Filters can be seen as internal nodes in a network through
|
|
|
|
which data will flow, potentially being transformed many
|
2007-10-11 21:16:28 +00:00
|
|
|
times along the way. Chains connect these nodes together.
|
|
|
|
The initial and final nodes of the network are
|
|
|
|
\emph{sources} and \emph{sinks}, respectively. Less
|
|
|
|
abstractly, a source is a function that produces new data
|
|
|
|
every time it is invoked. Conversely, sinks are functions
|
|
|
|
that give a final destination to the data they receive.
|
|
|
|
Naturally, sources and sinks can also be chained with
|
|
|
|
filters to produce filtered sources and sinks.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
Finally, filters, chains, sources, and sinks are all passive
|
|
|
|
entities: they must be repeatedly invoked in order for
|
|
|
|
anything to happen. \emph{Pumps} provide the driving force
|
|
|
|
that pushes data through the network, from a source to a
|
2007-10-11 21:16:28 +00:00
|
|
|
sink, and indirectly through all intervening filters.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
2007-05-31 22:27:40 +00:00
|
|
|
In the following sections, we start with a simplified
|
|
|
|
interface, which we later refine. The evolution we present
|
|
|
|
is not contrived: it recreates the steps we followed
|
|
|
|
ourselves as we consolidated our understanding of these
|
|
|
|
concepts within our application domain.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
2007-05-31 22:27:40 +00:00
|
|
|
\subsection{A simple example}
|
2007-05-31 21:23:42 +00:00
|
|
|
|
2007-10-11 21:16:28 +00:00
|
|
|
The end-of-line normalization of text is a good
|
2007-05-31 21:23:42 +00:00
|
|
|
example to motivate our initial filter interface.
|
|
|
|
Assume we are given text in an unknown end-of-line
|
|
|
|
convention (including possibly mixed conventions) out of the
|
2007-10-11 21:16:28 +00:00
|
|
|
commonly found Unix (\LF), Mac OS (\CR), and
|
|
|
|
DOS (\CRLF) conventions. We would like to be able to
|
|
|
|
use the folowing code to normalize the end-of-line markers:
|
2007-05-31 21:23:42 +00:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 21:16:28 +00:00
|
|
|
local CRLF = "\013\010"
|
|
|
|
local input = source.chain(source.file(io.stdin), normalize(CRLF))
|
|
|
|
local output = sink.file(io.stdout)
|
|
|
|
pump.all(input, output)
|
2007-05-31 21:23:42 +00:00
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
This program should read data from the standard input stream
|
2007-10-11 21:16:28 +00:00
|
|
|
and normalize the end-of-line markers to the canonic
|
|
|
|
\CRLF\ marker, as defined by the MIME standard.
|
|
|
|
Finally, the normalized text should be sent to the standard output
|
2007-05-31 21:23:42 +00:00
|
|
|
stream. We use a \emph{file source} that produces data from
|
|
|
|
standard input, and chain it with a filter that normalizes
|
|
|
|
the data. The pump then repeatedly obtains data from the
|
|
|
|
source, and passes it to the \emph{file sink}, which sends
|
|
|
|
it to the standard output.
|
|
|
|
|
|
|
|
In the code above, the \texttt{normalize} \emph{factory} is a
|
2007-10-11 21:16:28 +00:00
|
|
|
function that creates our normalization filter, which
|
|
|
|
replaces any end-of-line marker with the canonic marker.
|
|
|
|
The initial filter interface is
|
2007-05-31 21:23:42 +00:00
|
|
|
trivial: a filter function receives a chunk of input data,
|
|
|
|
and returns a chunk of processed data. When there are no
|
|
|
|
more input data left, the caller notifies the filter by invoking
|
2007-10-11 21:16:28 +00:00
|
|
|
it with a \nil\ chunk. The filter responds by returning
|
|
|
|
the final chunk of processed data (which could of course be
|
|
|
|
the empty string).
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
Although the interface is extremely simple, the
|
2007-05-31 22:27:40 +00:00
|
|
|
implementation is not so obvious. A normalization filter
|
2007-05-31 21:23:42 +00:00
|
|
|
respecting this interface needs to keep some kind of context
|
2007-05-31 22:27:40 +00:00
|
|
|
between calls. This is because a chunk boundary may lie between
|
2007-10-11 21:16:28 +00:00
|
|
|
the \CR\ and \LF\ characters marking the end of a single line. This
|
2007-05-31 22:27:40 +00:00
|
|
|
need for contextual storage motivates the use of
|
|
|
|
factories: each time the factory is invoked, it returns a
|
2007-05-31 21:23:42 +00:00
|
|
|
filter with its own context so that we can have several
|
|
|
|
independent filters being used at the same time. For
|
|
|
|
efficiency reasons, we must avoid the obvious solution of
|
|
|
|
concatenating all the input into the context before
|
2007-10-11 21:16:28 +00:00
|
|
|
producing any output chunks.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
2007-05-31 22:27:40 +00:00
|
|
|
To that end, we break the implementation into two parts:
|
2007-05-31 21:23:42 +00:00
|
|
|
a low-level filter, and a factory of high-level filters. The
|
2007-05-31 22:27:40 +00:00
|
|
|
low-level filter is implemented in C and does not maintain
|
2007-05-31 21:23:42 +00:00
|
|
|
any context between function calls. The high-level filter
|
2007-05-31 22:27:40 +00:00
|
|
|
factory, implemented in Lua, creates and returns a
|
2007-05-31 21:23:42 +00:00
|
|
|
high-level filter that maintains whatever context the low-level
|
|
|
|
filter needs, but isolates the user from its internal
|
|
|
|
details. That way, we take advantage of C's efficiency to
|
|
|
|
perform the hard work, and take advantage of Lua's
|
|
|
|
simplicity for the bookkeeping.
|
|
|
|
|
|
|
|
\subsection{The Lua part of the filter}
|
|
|
|
|
|
|
|
Below is the complete implementation of the factory of high-level
|
|
|
|
end-of-line normalization filters:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 21:16:28 +00:00
|
|
|
function filter.cycle(lowlevel, context, extra)
|
2007-05-31 21:23:42 +00:00
|
|
|
return function(chunk)
|
|
|
|
local ret
|
2007-10-11 21:16:28 +00:00
|
|
|
ret, context = lowlevel(context, chunk, extra)
|
2007-05-31 21:23:42 +00:00
|
|
|
return ret
|
|
|
|
end
|
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
function normalize(marker)
|
2007-10-11 21:16:28 +00:00
|
|
|
return filter.cycle(eol, 0, marker)
|
2007-05-31 21:23:42 +00:00
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
The \texttt{normalize} factory simply calls a more generic
|
2007-10-11 21:16:28 +00:00
|
|
|
factory, the \texttt{cycle}~factory, passing the low-level
|
|
|
|
filter~\texttt{eol}. The \texttt{cycle}~factory receives a
|
2007-05-31 21:23:42 +00:00
|
|
|
low-level filter, an initial context, and an extra
|
2007-05-31 22:27:40 +00:00
|
|
|
parameter, and returns a new high-level filter. Each time
|
|
|
|
the high-level filer is passed a new chunk, it invokes the
|
|
|
|
low-level filter with the previous context, the new chunk,
|
|
|
|
and the extra argument. It is the low-level filter that
|
|
|
|
does all the work, producing the chunk of processed data and
|
2007-10-11 21:16:28 +00:00
|
|
|
a new context. The high-level filter then replaces its
|
2007-05-31 22:27:40 +00:00
|
|
|
internal context, and returns the processed chunk of data to
|
|
|
|
the user. Notice that we take advantage of Lua's lexical
|
2007-05-31 21:23:42 +00:00
|
|
|
scoping to store the context in a closure between function
|
|
|
|
calls.
|
|
|
|
|
2007-10-11 21:16:28 +00:00
|
|
|
\subsection{The C part of the filter}
|
|
|
|
|
|
|
|
As for the low-level filter, we must first accept
|
2007-05-31 21:23:42 +00:00
|
|
|
that there is no perfect solution to the end-of-line marker
|
2007-05-31 22:27:40 +00:00
|
|
|
normalization problem. The difficulty comes from an
|
|
|
|
inherent ambiguity in the definition of empty lines within
|
2007-05-31 21:23:42 +00:00
|
|
|
mixed input. However, the following solution works well for
|
|
|
|
any consistent input, as well as for non-empty lines in
|
|
|
|
mixed input. It also does a reasonable job with empty lines
|
|
|
|
and serves as a good example of how to implement a low-level
|
|
|
|
filter.
|
|
|
|
|
2007-10-11 21:16:28 +00:00
|
|
|
The idea is to consider both \CR\ and~\LF\ as end-of-line
|
2007-05-31 21:23:42 +00:00
|
|
|
\emph{candidates}. We issue a single break if any candidate
|
2007-10-11 21:16:28 +00:00
|
|
|
is seen alone, or if it is followed by a different
|
|
|
|
candidate. In other words, \CR~\CR~and \LF~\LF\ each issue
|
|
|
|
two end-of-line markers, whereas \CR~\LF~and \LF~\CR\ issue
|
|
|
|
only one marker each. It is easy to see that this method
|
|
|
|
correctly handles the most common end-of-line conventions.
|
|
|
|
|
|
|
|
With this in mind, we divide the low-level filter into two
|
|
|
|
simple functions. The inner function~\texttt{pushchar} performs the
|
|
|
|
normalization itself. It takes each input character in turn,
|
|
|
|
deciding what to output and how to modify the context. The
|
|
|
|
context tells if the last processed character was an
|
|
|
|
end-of-line candidate, and if so, which candidate it was.
|
|
|
|
For efficiency, we use Lua's auxiliary library's buffer
|
|
|
|
interface:
|
2007-05-31 21:23:42 +00:00
|
|
|
\begin{quote}
|
|
|
|
\begin{C}
|
|
|
|
@stick#
|
|
|
|
@#define candidate(c) (c == CR || c == LF)
|
2007-10-11 21:16:28 +00:00
|
|
|
static int pushchar(int c, int last, const char *marker,
|
2007-05-31 21:23:42 +00:00
|
|
|
luaL_Buffer *buffer) {
|
|
|
|
if (candidate(c)) {
|
|
|
|
if (candidate(last)) {
|
2007-10-11 21:16:28 +00:00
|
|
|
if (c == last)
|
|
|
|
luaL_addstring(buffer, marker);
|
2007-05-31 21:23:42 +00:00
|
|
|
return 0;
|
|
|
|
} else {
|
|
|
|
luaL_addstring(buffer, marker);
|
|
|
|
return c;
|
|
|
|
}
|
|
|
|
} else {
|
2007-10-11 21:16:28 +00:00
|
|
|
luaL_pushchar(buffer, c);
|
2007-05-31 21:23:42 +00:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
%
|
|
|
|
\end{C}
|
|
|
|
\end{quote}
|
|
|
|
|
2007-10-11 21:16:28 +00:00
|
|
|
The outer function~\texttt{eol} simply interfaces with Lua.
|
|
|
|
It receives the context and input chunk (as well as an
|
|
|
|
optional custom end-of-line marker), and returns the
|
|
|
|
transformed output chunk and the new context.
|
|
|
|
Notice that if the input chunk is \nil, the operation
|
|
|
|
is considered to be finished. In that case, the loop will
|
|
|
|
not execute a single time and the context is reset to the
|
|
|
|
initial state. This allows the filter to be reused many
|
|
|
|
times:
|
2007-05-31 21:23:42 +00:00
|
|
|
\begin{quote}
|
|
|
|
\begin{C}
|
|
|
|
@stick#
|
|
|
|
static int eol(lua_State *L) {
|
2007-10-11 21:16:28 +00:00
|
|
|
int context = luaL_checkint(L, 1);
|
2007-05-31 21:23:42 +00:00
|
|
|
size_t isize = 0;
|
|
|
|
const char *input = luaL_optlstring(L, 2, NULL, &isize);
|
|
|
|
const char *last = input + isize;
|
|
|
|
const char *marker = luaL_optstring(L, 3, CRLF);
|
|
|
|
luaL_Buffer buffer;
|
|
|
|
luaL_buffinit(L, &buffer);
|
|
|
|
if (!input) {
|
|
|
|
lua_pushnil(L);
|
|
|
|
lua_pushnumber(L, 0);
|
|
|
|
return 2;
|
|
|
|
}
|
|
|
|
while (input < last)
|
2007-10-11 21:16:28 +00:00
|
|
|
context = pushchar(*input++, context, marker, &buffer);
|
2007-05-31 21:23:42 +00:00
|
|
|
luaL_pushresult(&buffer);
|
2007-10-11 21:16:28 +00:00
|
|
|
lua_pushnumber(L, context);
|
2007-05-31 21:23:42 +00:00
|
|
|
return 2;
|
|
|
|
}
|
|
|
|
%
|
|
|
|
\end{C}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
When designing your own filters, the challenging part is to
|
2007-05-31 22:27:40 +00:00
|
|
|
decide what will be in the context. For line breaking, for
|
2007-10-11 21:16:28 +00:00
|
|
|
instance, it could be the number of bytes that still fit in the
|
2007-05-31 21:23:42 +00:00
|
|
|
current line. For Base64 encoding, it could be a string
|
|
|
|
with the bytes that remain after the division of the input
|
2007-05-31 22:27:40 +00:00
|
|
|
into 3-byte atoms. The MIME module in the \texttt{LuaSocket}
|
2007-05-31 21:23:42 +00:00
|
|
|
distribution has many other examples.
|
|
|
|
|
|
|
|
\section{Filter chains}
|
|
|
|
|
2007-10-11 21:16:28 +00:00
|
|
|
Chains greatly increase the power of filters. For example,
|
2007-05-31 22:27:40 +00:00
|
|
|
according to the standard for Quoted-Printable encoding,
|
2007-10-11 21:16:28 +00:00
|
|
|
text should be normalized to a canonic end-of-line marker
|
|
|
|
prior to encoding. After encoding, the resulting text must
|
|
|
|
be broken into lines of no more than 76 characters, with the
|
|
|
|
use of soft line breaks (a line terminated by the \texttt{=}
|
|
|
|
sign). To help specifying complex transformations like
|
|
|
|
this, we define a chain factory that creates a composite
|
|
|
|
filter from one or more filters. A chained filter passes
|
|
|
|
data through all its components, and can be used wherever a
|
|
|
|
primitive filter is accepted.
|
2007-05-31 22:27:40 +00:00
|
|
|
|
|
|
|
The chaining factory is very simple. The auxiliary
|
|
|
|
function~\texttt{chainpair} chains two filters together,
|
|
|
|
taking special care if the chunk is the last. This is
|
2007-10-11 21:16:28 +00:00
|
|
|
because the final \nil\ chunk notification has to be
|
2007-05-31 22:27:40 +00:00
|
|
|
pushed through both filters in turn:
|
2007-05-31 21:23:42 +00:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
local function chainpair(f1, f2)
|
|
|
|
return function(chunk)
|
|
|
|
local ret = f2(f1(chunk))
|
|
|
|
if chunk then return ret
|
|
|
|
else return ret .. f2() end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
function filter.chain(...)
|
2007-10-11 21:16:28 +00:00
|
|
|
local f = select(1, ...)
|
|
|
|
for i = 2, select('@#', ...) do
|
|
|
|
f = chainpair(f, select(i, ...))
|
2007-05-31 21:23:42 +00:00
|
|
|
end
|
|
|
|
return f
|
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
Thanks to the chain factory, we can
|
2007-05-31 22:27:40 +00:00
|
|
|
define the Quoted-Printable conversion as such:
|
2007-05-31 21:23:42 +00:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 21:16:28 +00:00
|
|
|
local qp = filter.chain(normalize(CRLF), encode("quoted-printable"),
|
|
|
|
wrap("quoted-printable"))
|
|
|
|
local input = source.chain(source.file(io.stdin), qp)
|
|
|
|
local output = sink.file(io.stdout)
|
|
|
|
pump.all(input, output)
|
2007-05-31 21:23:42 +00:00
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
\section{Sources, sinks, and pumps}
|
|
|
|
|
|
|
|
The filters we introduced so far act as the internal nodes
|
|
|
|
in a network of transformations. Information flows from node
|
|
|
|
to node (or rather from one filter to the next) and is
|
2007-05-31 22:27:40 +00:00
|
|
|
transformed along the way. Chaining filters together is our
|
2007-05-31 21:23:42 +00:00
|
|
|
way to connect nodes in this network. As the starting point
|
|
|
|
for the network, we need a source node that produces the
|
|
|
|
data. In the end of the network, we need a sink node that
|
|
|
|
gives a final destination to the data.
|
|
|
|
|
|
|
|
\subsection{Sources}
|
|
|
|
|
|
|
|
A source returns the next chunk of data each time it is
|
2007-10-11 21:16:28 +00:00
|
|
|
invoked. When there is no more data, it simply returns~\nil.
|
|
|
|
In the event of an error, the source can inform the
|
|
|
|
caller by returning \nil\ followed by the error message.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
Below are two simple source factories. The \texttt{empty} source
|
|
|
|
returns no data, possibly returning an associated error
|
2007-10-11 21:16:28 +00:00
|
|
|
message. The \texttt{file} source yields the contents of a file
|
|
|
|
in a chunk by chunk fashion:
|
2007-05-31 21:23:42 +00:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
function source.empty(err)
|
|
|
|
return function()
|
|
|
|
return nil, err
|
|
|
|
end
|
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
function source.file(handle, io_err)
|
|
|
|
if handle then
|
|
|
|
return function()
|
|
|
|
local chunk = handle:read(2048)
|
|
|
|
if not chunk then handle:close() end
|
|
|
|
return chunk
|
|
|
|
end
|
|
|
|
else return source.empty(io_err or "unable to open file") end
|
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
\subsection{Filtered sources}
|
|
|
|
|
2007-05-31 22:27:40 +00:00
|
|
|
A filtered source passes its data through the
|
2007-05-31 21:23:42 +00:00
|
|
|
associated filter before returning it to the caller.
|
2007-05-31 22:27:40 +00:00
|
|
|
Filtered sources are useful when working with
|
|
|
|
functions that get their input data from a source (such as
|
2007-10-11 21:16:28 +00:00
|
|
|
the pumps in our examples). By chaining a source with one or
|
2007-05-31 22:27:40 +00:00
|
|
|
more filters, the function can be transparently provided
|
|
|
|
with filtered data, with no need to change its interface.
|
2007-05-31 21:23:42 +00:00
|
|
|
Here is a factory that does the job:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
function source.chain(src, f)
|
2007-10-11 21:16:28 +00:00
|
|
|
return function()
|
|
|
|
if not src then
|
|
|
|
return nil
|
|
|
|
end
|
2007-05-31 21:23:42 +00:00
|
|
|
local chunk, err = src()
|
|
|
|
if not chunk then
|
|
|
|
src = nil
|
|
|
|
return f(nil)
|
2007-10-11 21:16:28 +00:00
|
|
|
else
|
|
|
|
return f(chunk)
|
|
|
|
end
|
|
|
|
end
|
2007-05-31 21:23:42 +00:00
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
\subsection{Sinks}
|
|
|
|
|
2007-10-11 21:16:28 +00:00
|
|
|
Just as we defined an interface for source of data,
|
2007-05-31 22:27:40 +00:00
|
|
|
we can also define an interface for a data destination.
|
|
|
|
We call any function respecting this
|
2007-05-31 21:23:42 +00:00
|
|
|
interface a \emph{sink}. In our first example, we used a
|
|
|
|
file sink connected to the standard output.
|
|
|
|
|
|
|
|
Sinks receive consecutive chunks of data, until the end of
|
2007-10-11 21:16:28 +00:00
|
|
|
data is signaled by a \nil\ input chunk. A sink can be
|
2007-05-31 21:23:42 +00:00
|
|
|
notified of an error with an optional extra argument that
|
2007-10-11 21:16:28 +00:00
|
|
|
contains the error message, following a \nil\ chunk.
|
2007-05-31 21:23:42 +00:00
|
|
|
If a sink detects an error itself, and
|
2007-10-11 21:16:28 +00:00
|
|
|
wishes not to be called again, it can return \nil,
|
2007-05-31 21:23:42 +00:00
|
|
|
followed by an error message. A return value that
|
2007-10-11 21:16:28 +00:00
|
|
|
is not \nil\ means the sink will accept more data.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
Below are two useful sink factories.
|
|
|
|
The table factory creates a sink that stores
|
|
|
|
individual chunks into an array. The data can later be
|
|
|
|
efficiently concatenated into a single string with Lua's
|
|
|
|
\texttt{table.concat} library function. The \texttt{null} sink
|
|
|
|
simply discards the chunks it receives:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
function sink.table(t)
|
|
|
|
t = t or {}
|
|
|
|
local f = function(chunk, err)
|
|
|
|
if chunk then table.insert(t, chunk) end
|
|
|
|
return 1
|
|
|
|
end
|
|
|
|
return f, t
|
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
local function null()
|
|
|
|
return 1
|
|
|
|
end
|
|
|
|
|
|
|
|
function sink.null()
|
|
|
|
return null
|
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
Naturally, filtered sinks are just as useful as filtered
|
|
|
|
sources. A filtered sink passes each chunk it receives
|
2007-10-11 21:16:28 +00:00
|
|
|
through the associated filter before handing it down to the
|
2007-05-31 21:23:42 +00:00
|
|
|
original sink. In the following example, we use a source
|
|
|
|
that reads from the standard input. The input chunks are
|
|
|
|
sent to a table sink, which has been coupled with a
|
|
|
|
normalization filter. The filtered chunks are then
|
|
|
|
concatenated from the output array, and finally sent to
|
|
|
|
standard out:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 21:16:28 +00:00
|
|
|
local input = source.file(io.stdin)
|
|
|
|
local output, t = sink.table()
|
|
|
|
output = sink.chain(normalize(CRLF), output)
|
|
|
|
pump.all(input, output)
|
2007-05-31 21:23:42 +00:00
|
|
|
io.write(table.concat(t))
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
\subsection{Pumps}
|
|
|
|
|
2007-10-11 21:16:28 +00:00
|
|
|
Although not on purpose, our interface for sources is
|
|
|
|
compatible with Lua iterators. That is, a source can be
|
|
|
|
neatly used in conjunction with \texttt{for} loops. Using
|
|
|
|
our file source as an iterator, we can write the following
|
|
|
|
code:
|
2007-05-31 21:23:42 +00:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
for chunk in source.file(io.stdin) do
|
|
|
|
io.write(chunk)
|
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
Loops like this will always be present because everything
|
|
|
|
we designed so far is passive. Sources, sinks, filters: none
|
|
|
|
of them can do anything on their own. The operation of
|
|
|
|
pumping all data a source can provide into a sink is so
|
|
|
|
common that it deserves its own function:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
function pump.step(src, snk)
|
|
|
|
local chunk, src_err = src()
|
|
|
|
local ret, snk_err = snk(chunk, src_err)
|
2007-05-31 22:27:40 +00:00
|
|
|
if chunk and ret then return 1
|
|
|
|
else return nil, src_err or snk_err end
|
2007-05-31 21:23:42 +00:00
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
function pump.all(src, snk, step)
|
2007-05-31 22:27:40 +00:00
|
|
|
step = step or pump.step
|
|
|
|
while true do
|
|
|
|
local ret, err = step(src, snk)
|
|
|
|
if not ret then
|
|
|
|
if err then return nil, err
|
|
|
|
else return 1 end
|
|
|
|
end
|
|
|
|
end
|
2007-05-31 21:23:42 +00:00
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
The \texttt{pump.step} function moves one chunk of data from
|
|
|
|
the source to the sink. The \texttt{pump.all} function takes
|
|
|
|
an optional \texttt{step} function and uses it to pump all the
|
2007-10-11 21:16:28 +00:00
|
|
|
data from the source to the sink.
|
|
|
|
Here is an example that uses the Base64 and the
|
|
|
|
line wrapping filters from the \texttt{LuaSocket}
|
|
|
|
distribution. The program reads a binary file from
|
2007-05-31 21:23:42 +00:00
|
|
|
disk and stores it in another file, after encoding it to the
|
|
|
|
Base64 transfer content encoding:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 21:16:28 +00:00
|
|
|
local input = source.chain(
|
2007-05-31 21:23:42 +00:00
|
|
|
source.file(io.open("input.bin", "rb")),
|
|
|
|
encode("base64"))
|
2007-10-11 21:16:28 +00:00
|
|
|
local output = sink.chain(
|
2007-05-31 21:23:42 +00:00
|
|
|
wrap(76),
|
|
|
|
sink.file(io.open("output.b64", "w")))
|
2007-10-11 21:16:28 +00:00
|
|
|
pump.all(input, output)
|
2007-05-31 21:23:42 +00:00
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
The way we split the filters here is not intuitive, on
|
|
|
|
purpose. Alternatively, we could have chained the Base64
|
|
|
|
encode filter and the line-wrap filter together, and then
|
|
|
|
chain the resulting filter with either the file source or
|
2007-10-11 21:16:28 +00:00
|
|
|
the file sink. It doesn't really matter.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
\section{Exploding filters}
|
|
|
|
|
2007-10-11 21:16:28 +00:00
|
|
|
Our current filter interface has one serious shortcoming.
|
|
|
|
Consider for example a \texttt{gzip} decompression filter.
|
|
|
|
During decompression, a small input chunk can be exploded
|
|
|
|
into a huge amount of data. To address this problem, we
|
|
|
|
decided to change the filter interface and allow exploding
|
|
|
|
filters to return large quantities of output data in a chunk
|
|
|
|
by chunk manner.
|
2007-05-31 22:27:40 +00:00
|
|
|
|
|
|
|
More specifically, after passing each chunk of input to
|
|
|
|
a filter, and collecting the first chunk of output, the
|
|
|
|
user must now loop to receive other chunks from the filter until no
|
2007-05-31 21:23:42 +00:00
|
|
|
filtered data is left. Within these secondary calls, the
|
|
|
|
caller passes an empty string to the filter. The filter
|
|
|
|
responds with an empty string when it is ready for the next
|
|
|
|
input chunk. In the end, after the user passes a
|
2007-10-11 21:16:28 +00:00
|
|
|
\nil\ chunk notifying the filter that there is no
|
2007-05-31 21:23:42 +00:00
|
|
|
more input data, the filter might still have to produce too
|
|
|
|
much output data to return in a single chunk. The user has
|
2007-10-11 21:16:28 +00:00
|
|
|
to loop again, now passing \nil\ to the filter each time,
|
|
|
|
until the filter itself returns \nil\ to notify the
|
2007-05-31 21:23:42 +00:00
|
|
|
user it is finally done.
|
|
|
|
|
|
|
|
Fortunately, it is very easy to modify a filter to respect
|
|
|
|
the new interface. In fact, the end-of-line translation
|
|
|
|
filter we presented earlier already conforms to it. The
|
|
|
|
complexity is encapsulated within the chaining functions,
|
|
|
|
which must now include a loop. Since these functions only
|
2007-05-31 22:27:40 +00:00
|
|
|
have to be written once, the user is rarely affected.
|
2007-05-31 21:23:42 +00:00
|
|
|
Interestingly, the modifications do not have a measurable
|
2007-05-31 22:27:40 +00:00
|
|
|
negative impact in the performance of filters that do
|
2007-05-31 21:23:42 +00:00
|
|
|
not need the added flexibility. On the other hand, for a
|
|
|
|
small price in complexity, the changes make exploding
|
2007-10-11 21:16:28 +00:00
|
|
|
filters practical.
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
\section{A complex example}
|
|
|
|
|
|
|
|
The LTN12 module in the \texttt{LuaSocket} distribution
|
2007-10-11 21:16:28 +00:00
|
|
|
implements all the ideas we have described. The MIME
|
|
|
|
and SMTP modules are tightly integrated with LTN12,
|
2007-05-31 21:23:42 +00:00
|
|
|
and can be used to showcase the expressive power of filters,
|
|
|
|
sources, sinks, and pumps. Below is an example
|
|
|
|
of how a user would proceed to define and send a
|
2007-05-31 22:27:40 +00:00
|
|
|
multipart message, with attachments, using \texttt{LuaSocket}:
|
2007-05-31 21:23:42 +00:00
|
|
|
\begin{quote}
|
|
|
|
\begin{mime}
|
|
|
|
local smtp = require"socket.smtp"
|
|
|
|
local mime = require"mime"
|
|
|
|
local ltn12 = require"ltn12"
|
|
|
|
|
|
|
|
local message = smtp.message{
|
|
|
|
headers = {
|
|
|
|
from = "Sicrano <sicrano@example.com>",
|
|
|
|
to = "Fulano <fulano@example.com>",
|
|
|
|
subject = "A message with an attachment"},
|
|
|
|
body = {
|
2007-10-11 21:16:28 +00:00
|
|
|
preamble = "Hope you can see the attachment" .. CRLF,
|
2007-05-31 21:23:42 +00:00
|
|
|
[1] = {
|
2007-10-11 21:16:28 +00:00
|
|
|
body = "Here is our logo" .. CRLF},
|
2007-05-31 21:23:42 +00:00
|
|
|
[2] = {
|
|
|
|
headers = {
|
|
|
|
["content-type"] = 'image/png; name="luasocket.png"',
|
|
|
|
["content-disposition"] =
|
|
|
|
'attachment; filename="luasocket.png"',
|
|
|
|
["content-description"] = 'LuaSocket logo',
|
|
|
|
["content-transfer-encoding"] = "BASE64"},
|
|
|
|
body = ltn12.source.chain(
|
|
|
|
ltn12.source.file(io.open("luasocket.png", "rb")),
|
|
|
|
ltn12.filter.chain(
|
|
|
|
mime.encode("base64"),
|
|
|
|
mime.wrap()))}}}
|
|
|
|
|
|
|
|
assert(smtp.send{
|
|
|
|
rcpt = "<fulano@example.com>",
|
|
|
|
from = "<sicrano@example.com>",
|
|
|
|
source = message})
|
|
|
|
\end{mime}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
The \texttt{smtp.message} function receives a table
|
|
|
|
describing the message, and returns a source. The
|
|
|
|
\texttt{smtp.send} function takes this source, chains it with the
|
2007-05-31 22:27:40 +00:00
|
|
|
SMTP dot-stuffing filter, connects a socket sink
|
|
|
|
with the server, and simply pumps the data. The message is never
|
2007-05-31 21:23:42 +00:00
|
|
|
assembled in memory. Everything is produced on demand,
|
|
|
|
transformed in small pieces, and sent to the server in chunks,
|
|
|
|
including the file attachment that is loaded from disk and
|
|
|
|
encoded on the fly. It just works.
|
|
|
|
|
|
|
|
\section{Conclusions}
|
|
|
|
|
2007-05-31 22:27:40 +00:00
|
|
|
In this article, we introduced the concepts of filters,
|
2007-05-31 21:23:42 +00:00
|
|
|
sources, sinks, and pumps to the Lua language. These are
|
2007-05-31 22:27:40 +00:00
|
|
|
useful tools for stream processing in general. Sources provide
|
2007-05-31 21:23:42 +00:00
|
|
|
a simple abstraction for data acquisition. Sinks provide an
|
|
|
|
abstraction for final data destinations. Filters define an
|
|
|
|
interface for data transformations. The chaining of
|
|
|
|
filters, sources and sinks provides an elegant way to create
|
2007-05-31 22:27:40 +00:00
|
|
|
arbitrarily complex data transformations from simpler
|
2007-10-11 21:16:28 +00:00
|
|
|
components. Pumps simply push the data through.
|
|
|
|
|
|
|
|
\section{Acknowledgements}
|
|
|
|
|
|
|
|
The concepts described in this text are the result of long
|
|
|
|
discussions with David Burgess. A version of this text has
|
|
|
|
been released on-line as the Lua Technical Note 012, hence
|
|
|
|
the name of the corresponding LuaSocket module,
|
|
|
|
\texttt{ltn12}. Wim Couwenberg contributed to the
|
|
|
|
implementation of the module, and Adrian Sietsma was the
|
|
|
|
first to notice the correspondence between sources and Lua
|
|
|
|
iterators.
|
|
|
|
|
2007-05-31 21:23:42 +00:00
|
|
|
|
|
|
|
\end{document}
|