Added some code.
authorStanislaw Klekot <dozzie@jarowit.net>
Sat, 18 Apr 2015 23:43:24 +0000 (01:43 +0200)
committerStanislaw Klekot <dozzie@jarowit.net>
Sat, 18 Apr 2015 23:43:24 +0000 (01:43 +0200)
examples/example_html.erl [new file with mode: 0644]
examples/example_inets.erl [new file with mode: 0644]
examples/example_nginx.erl [new file with mode: 0644]
src/gen_sse_server.erl [new file with mode: 0644]
src/mod_sse.app.src [new file with mode: 0644]
src/mod_sse.erl [new file with mode: 0644]
src/mod_sse_app.erl [new file with mode: 0644]
src/mod_sse_sup.erl [new file with mode: 0644]
src/mod_sse_worker.erl [new file with mode: 0644]
src/overview.edoc [new file with mode: 0644]

diff --git a/examples/example_html.erl b/examples/example_html.erl
new file mode 100644 (file)
index 0000000..8fe8582
--- /dev/null
@@ -0,0 +1,8 @@
+%%%---------------------------------------------------------------------------
+%%% @doc
+%%%   Example HTML to check Server-Sent Events in the browser.
+%%%
+%%% @end
+%%%---------------------------------------------------------------------------
+
+-module(example_html).
diff --git a/examples/example_inets.erl b/examples/example_inets.erl
new file mode 100644 (file)
index 0000000..3a348eb
--- /dev/null
@@ -0,0 +1,81 @@
+%%%---------------------------------------------------------------------------
+%%% @doc
+%%%   Example config and startup code for inets.
+%%%
+%%%   == starting inets ==
+%%%
+%%%   ```
+%%%   start_httpd() ->
+%%%     % remember to start `inets' application
+%%%     {ok, Config} = load_config(),
+%%%     {ok, HttpdPid} = inets:start(httpd, Config),
+%%%     {ok, HttpdPid}.
+%%%
+%%%   load_config() ->
+%%%     SSEHandler = sse_handler,
+%%%     DocumentRoot = "/var/www",
+%%%     ServerRoot = "/var/lib/www",
+%%%     Config = [
+%%%       % typical setup for inets/httpd
+%%%       {port, 1080}, {server_name, "localhost"},
+%%%       {document_root, DocumentRoot}, {server_root, ServerRoot},
+%%%       {directory_index, ["index.html"]}, % requires mod_alias
+%%%
+%%%       % httpd needs to load `mod_sse'
+%%%       {modules, [mod_sse, mod_alias, mod_dir, mod_get, mod_log]},
+%%%
+%%%       % mod_sse's config
+%%%       {sse, "/events", SSEHandler},
+%%%
+%%%       % logging
+%%%       {transfer_log, "/var/log/httpd/access.log"},
+%%%       {error_log,    "/var/log/httpd/error.log"},
+%%%       {log_format, combined},
+%%%       {error_log_format, pretty}
+%%%     ],
+%%%     {ok, Config}.
+%%%   '''
+%%%
+%%%   == example handler module ==
+%%%
+%%%   Note: there's no `start_link()' function. You don't need it, as the
+%%%   process will be spawned by {@link mod_sse} as necessary.
+%%%
+%%%   ```
+%%%   -module(sse_handler).
+%%%
+%%%   -behaviour(gen_sse_server).
+%%%
+%%%   -export([init/4, terminate/2]).
+%%%   -export([handle_call/3, handle_cast/2, handle_info/2]).
+%%%   -export([code_change/3]).
+%%%
+%%%   -record(state, {}).
+%%%
+%%%   init(_RootURI, _URI, _Headers, _Args) ->
+%%%     State = #state{},
+%%%     {ok, State}.
+%%%
+%%%   terminate(_Reason, _State) ->
+%%%     ok.
+%%%
+%%%   handle_call(_Request, _From, State) ->
+%%%     Events = nothing,
+%%%     {reply, {error,not_implemented}, Events, State}.
+%%%
+%%%   handle_cast(_Request, State) ->
+%%%     Events = nothing,
+%%%     {noreply, Events, State}.
+%%%
+%%%   handle_info(_Message, State) ->
+%%%     Events = nothing,
+%%%     {noreply, Events, State}.
+%%%
+%%%   code_change(OldVsn, State, Extra) ->
+%%%     {ok, State}.
+%%%   '''
+%%%
+%%% @end
+%%%---------------------------------------------------------------------------
+
+-module(example_inets).
diff --git a/examples/example_nginx.erl b/examples/example_nginx.erl
new file mode 100644 (file)
index 0000000..e27c5ff
--- /dev/null
@@ -0,0 +1,8 @@
+%%%---------------------------------------------------------------------------
+%%% @doc
+%%%   Example config for nginx.
+%%%
+%%% @end
+%%%---------------------------------------------------------------------------
+
+-module(example_nginx).
diff --git a/src/gen_sse_server.erl b/src/gen_sse_server.erl
new file mode 100644 (file)
index 0000000..e04c1af
--- /dev/null
@@ -0,0 +1,55 @@
+%%%---------------------------------------------------------------------------
+%%% @doc
+%%%   Behaviour for a callback module to use with {@link mod_sse}.
+%%% @end
+%%%---------------------------------------------------------------------------
+
+-module(gen_sse_server).
+
+%%%---------------------------------------------------------------------------
+
+-type event() :: iolist().
+
+%%%---------------------------------------------------------------------------
+
+-callback init(RootURI :: string(), URI :: string(),
+               Headers :: [{term(), term()}], Args :: term()) ->
+    {ok, term()}
+  | {ok, term(), timeout()}
+  | {ok, term(), hibernate}
+  | {error, term()}.
+
+-callback terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
+                    State :: term()) ->
+  term().
+
+-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+                      State :: term()) ->
+    {reply, Reply :: term(), nothing | [event()], NewState :: term()}
+  | {reply, Reply :: term(), nothing | [event()], NewState :: term(), timeout()}
+  | {reply, Reply :: term(), nothing | [event()], NewState :: term(), hibernate}
+  | {noreply, nothing | [event()], NewState :: term()}
+  | {noreply, nothing | [event()], NewState :: term(), timeout()}
+  | {noreply, nothing | [event()], NewState :: term(), hibernate}
+  | {stop, Reason :: term(), Reply :: term(), nothing | [event()], NewState :: term()}
+  | {stop, Reason :: term(), nothing | [event()], NewState :: term()}.
+
+-callback handle_cast(Request :: term(), State :: term()) ->
+    {noreply, nothing | [event()], NewState :: term()}
+  | {noreply, nothing | [event()], NewState :: term(), timeout()}
+  | {noreply, nothing | [event()], NewState :: term(), hibernate}
+  | {stop, Reason :: term(), nothing | [event()], NewState :: term()}.
+
+-callback handle_info(Message :: term(), State :: term()) ->
+    {noreply, nothing | [event()], NewState :: term()}
+  | {noreply, nothing | [event()], NewState :: term(), timeout()}
+  | {noreply, nothing | [event()], NewState :: term(), hibernate}
+  | {stop, Reason :: term(), nothing | [event()], NewState :: term()}.
+
+-callback code_change(OldVsn :: term() | {down, term()}, State :: term(),
+                      Extra :: term()) ->
+    {ok, NewState :: term()}
+  | {error, term()}.
+
+%%%---------------------------------------------------------------------------
+%%% vim:ft=erlang:foldmethod=marker:nowrap
diff --git a/src/mod_sse.app.src b/src/mod_sse.app.src
new file mode 100644 (file)
index 0000000..ada1a96
--- /dev/null
@@ -0,0 +1,9 @@
+{application, mod_sse, [
+  {description, "Server sent events for inets/httpd"},
+  {vsn, "0.0.0"}, % remember about `overview.edoc'
+  {registered, []},
+  {applications, [kernel, stdlib]},
+  {mod, {mod_sse_app, []}}, % callback module
+  {env, []}
+]}.
+% vim:ft=erlang
diff --git a/src/mod_sse.erl b/src/mod_sse.erl
new file mode 100644 (file)
index 0000000..1653480
--- /dev/null
@@ -0,0 +1,117 @@
+%%%---------------------------------------------------------------------------
+%%% @doc
+%%%   `inets'/`httpd' request handler module.
+%%% @end
+%%%---------------------------------------------------------------------------
+
+-module(mod_sse).
+
+%%% `httpd' module API
+-export([do/1]).
+
+%%% enable qlc parse transform
+-include_lib("stdlib/include/qlc.hrl").
+
+%%%---------------------------------------------------------------------------
+
+%%% httpd's data record
+-include_lib("inets/include/httpd.hrl").
+
+-define(CONTENT_TYPE, "text/event-stream").
+
+%%%---------------------------------------------------------------------------
+
+do(_ModData = #mod{config_db = ConfigTable, request_uri = URI,
+                   data = RequestData, socket = Socket}) ->
+  case find_prefix(ConfigTable, URI) of
+    nothing ->
+      % nothing found, pass the request over
+      {proceed, RequestData};
+    {ok, {RootURI, Handler}} ->
+      Code = 200, % HTTP OK
+      Headers = [{content_type, ?CONTENT_TYPE}],
+      Function = fun pass_received_events/5,
+      ReqHeaders = [],
+      Args = [Socket, Handler, RootURI, URI, ReqHeaders],
+      Body = {Function, Args},
+      {break, [{response, {response, [{code, Code} | Headers], Body}}]}
+  end.
+
+%%%---------------------------------------------------------------------------
+
+-spec pass_received_events(gen_tcp:socket(), module(),
+                           string(), string(), [{term(), term()}]) ->
+  sent.
+
+pass_received_events(Socket, Handler, RootURI, URI, ReqHeaders) ->
+  % TODO: SSL sockets
+  case Handler of
+    Mod when is_atom(Mod) -> ModArgs = [];
+    {Mod, ModArgs} when is_atom(Mod), is_list(ModArgs) -> ok
+  end,
+  {ok, RecPid} = mod_sse_sup:start_worker(Mod, ModArgs, self(),
+                                          RootURI, URI, ReqHeaders),
+  Ref = erlang:monitor(process, RecPid),
+  inet:setopts(Socket, [{active, true}]),
+  receive_and_pass(Socket, {Ref, RecPid}),
+  % tell the httpd that all the response was already sent
+  sent.
+
+receive_and_pass(Socket, {Ref, Pid} = Receiver) ->
+  receive
+    {event, Data} ->
+      Lines = binary:split(iolist_to_binary(Data), <<"\n">>),
+      gen_tcp:send(Socket, [[["data: ", L, "\r\n"] || L <- Lines], "\r\n"]),
+      receive_and_pass(Socket, Receiver);
+    %{event, Id, Data}
+    %{event, Id, EventType, Data}
+    {'DOWN', Ref, process, Pid, _Info} ->
+      ok;
+    {tcp_closed, Socket} ->
+      erlang:demonitor(Ref, [flush]),
+      Pid ! {tcp_closed, self()},
+      ok;
+    {tcp_error, Socket, _Reason} ->
+      erlang:demonitor(Ref, [flush]),
+      Pid ! {tcp_closed, self()},
+      ok;
+    _Any ->
+      % ignore
+      % FIXME: how about system messages?
+      receive_and_pass(Socket, Receiver)
+  end.
+
+%%%---------------------------------------------------------------------------
+
+-spec find_prefix(ets:tab(), string()) ->
+  {ok, {string(), module()}}.
+
+find_prefix(ConfigTable, URI) ->
+  Q = qlc:q([
+    {Root, Handler} ||
+    {sse, Root, Handler} <- ets:table(ConfigTable),
+    is_uri_prefix_of(Root, URI)
+  ]),
+  % thanks to the sorting (DESC), nested prefixes should work consistently
+  case qlc:e(qlc:keysort(1, Q, {order, descending})) of
+    [] -> nothing;
+    [{Root, Handler} | _] -> {ok, {Root, Handler}}
+  end.
+
+%%%---------------------------------------------------------------------------
+
+is_uri_prefix_of("" = _Pfx, "" = _String) ->
+  true;
+is_uri_prefix_of("" = _Pfx, "/" ++ _String) ->
+  true;
+is_uri_prefix_of("" = _Pfx, "?" ++ _String) ->
+  true;
+is_uri_prefix_of("/" = _Pfx, "/" ++ _String) ->
+  true;
+is_uri_prefix_of([C | Pfx], [C | String]) ->
+  is_uri_prefix_of(Pfx, String);
+is_uri_prefix_of(_Pfx, _String) ->
+  false.
+
+%%%---------------------------------------------------------------------------
+%%% vim:ft=erlang:foldmethod=marker
diff --git a/src/mod_sse_app.erl b/src/mod_sse_app.erl
new file mode 100644 (file)
index 0000000..e32cca4
--- /dev/null
@@ -0,0 +1,22 @@
+%%%---------------------------------------------------------------------------
+%%% @doc
+%%%   `mod_sse' application entry point.
+%%% @end
+%%%---------------------------------------------------------------------------
+
+-module(mod_sse_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+%%%---------------------------------------------------------------------------
+
+start(_StartType, _StartArgs) ->
+  mod_sse_sup:start_link().
+
+stop(_State) ->
+  ok.
+
+%%%---------------------------------------------------------------------------
+%%% vim:ft=erlang:foldmethod=marker
diff --git a/src/mod_sse_sup.erl b/src/mod_sse_sup.erl
new file mode 100644 (file)
index 0000000..69b042b
--- /dev/null
@@ -0,0 +1,41 @@
+%%%---------------------------------------------------------------------------
+%%% @doc
+%%%   Supervisor for workers that receive events and pass them to appropriate
+%%%   httpd process.
+%%% @end
+%%%---------------------------------------------------------------------------
+
+-module(mod_sse_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+-export([start_worker/6]).
+-export([init/1]).
+
+%%%---------------------------------------------------------------------------
+
+start_link() ->
+  supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+-spec start_worker(module(), [term()], pid(),
+                   string(), string(), [{term(), term()}]) ->
+  {ok, pid()} | {error, term()}.
+
+start_worker(HandlerModule, HandlerArgs, ClientPid, RootURI, URI, ReqHeaders) ->
+  Args = [HandlerModule, HandlerArgs, ClientPid, RootURI, URI, ReqHeaders],
+  supervisor:start_child(?MODULE, Args).
+
+%%%---------------------------------------------------------------------------
+
+init([] = _Args) ->
+  Strategy = {simple_one_for_one, 5, 10},
+  Children = [
+    {undefined,
+      {mod_sse_worker, start_link, []},
+      permanent, 5000, worker, [mod_sse_worker]}
+  ],
+  {ok, {Strategy, Children}}.
+
+%%%---------------------------------------------------------------------------
+%%% vim:ft=erlang:foldmethod=marker
diff --git a/src/mod_sse_worker.erl b/src/mod_sse_worker.erl
new file mode 100644 (file)
index 0000000..3c93c59
--- /dev/null
@@ -0,0 +1,212 @@
+%%%---------------------------------------------------------------------------
+%%% @doc
+%%%   Process that receives events and passes them to httpd connection
+%%%   handler.
+%%% @end
+%%%---------------------------------------------------------------------------
+
+-module(mod_sse_worker).
+
+-behaviour(gen_server).
+
+-export([start_link/6]).
+
+%%% gen_server callbacks
+-export([init/1, terminate/2]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+-export([code_change/3]).
+
+%%%---------------------------------------------------------------------------
+
+-record(state, {
+  mod :: module(),
+  state :: term(),
+  httpd :: pid()
+}).
+
+%%%---------------------------------------------------------------------------
+
+%% @doc Start the receiver process.
+
+-spec start_link(module(), [term()], pid(),
+                 string(), string(), [{string(), string()}]) ->
+    {ok, pid()} | ignore | {error, term()}.
+
+start_link(Handler, HArgs, HTTPD, RootURI, URI, Headers) ->
+  Args = [Handler, HArgs, HTTPD, RootURI, URI, Headers],
+  gen_server:start_link(?MODULE, Args, []).
+
+%%%---------------------------------------------------------------------------
+%%% gen_server callbacks
+%%%---------------------------------------------------------------------------
+
+%%----------------------------------------------------------
+%% starting and termination {{{
+
+%% @private
+%% @doc Initialize {@link gen_server} state.
+
+init([Handler, HArgs, HTTPD, RootURI, URI, Headers] = _Args) ->
+  % TODO: monitor HTTPD
+  case Handler:init(RootURI, URI, Headers, HArgs) of
+    {ok, HState} ->
+      State = #state{mod = Handler, state = HState, httpd = HTTPD},
+      {ok, State};
+    {ok, HState, infinity = _Timeout} ->
+      State = #state{mod = Handler, state = HState, httpd = HTTPD},
+      {ok, State};
+    {ok, HState, Timeout} when is_integer(Timeout) ->
+      State = #state{mod = Handler, state = HState, httpd = HTTPD},
+      {ok, State, Timeout};
+    {ok, HState, hibernate} ->
+      State = #state{mod = Handler, state = HState, httpd = HTTPD},
+      {ok, State, hibernate};
+    {stop, Reason} ->
+      {stop, Reason}
+  end.
+
+%% @private
+%% @doc Clean up {@link gen_server} state.
+
+terminate(Reason, _State = #state{mod = Handler, state = HState}) ->
+  Handler:terminate(Reason, HState).
+
+%% }}}
+%%----------------------------------------------------------
+%% communication {{{
+
+%% @private
+%% @doc Handle {@link gen_server:call/2}.
+
+handle_call(Request, From, State = #state{mod = Handler, state = HState}) ->
+  case Handler:handle_call(Request, From, HState) of
+    {reply, Reply, Events, NewHState} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {reply, Reply, NewState};
+    {reply, Reply, Events, NewHState, infinity = _Timeout} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {reply, Reply, NewState, infinity};
+    {reply, Reply, Events, NewHState, Timeout} when is_integer(Timeout) ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {reply, Reply, NewState, Timeout};
+    {reply, Reply, Events, NewHState, hibernate} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {reply, Reply, NewState, hibernate};
+    {noreply, Events, NewHState} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState};
+    {noreply, Events, NewHState, infinity = _Timeout} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, infinity};
+    {noreply, Events, NewHState, Timeout} when is_integer(Timeout) ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, Timeout};
+    {noreply, Events, NewHState, hibernate} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, hibernate};
+    {stop, Reason, Reply, Events, NewHState} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {stop, Reason, Reply, NewState};
+    {stop, Reason, Events, NewHState} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {stop, Reason, NewState}
+  end.
+
+%% @private
+%% @doc Handle {@link gen_server:cast/2}.
+
+handle_cast(Request, State = #state{mod = Handler, state = HState}) ->
+  case Handler:handle_cast(Request, HState) of
+    {noreply, Events, NewHState} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState};
+    {noreply, Events, NewHState, infinity = _Timeout} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, infinity};
+    {noreply, Events, NewHState, Timeout} when is_integer(Timeout) ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, Timeout};
+    {noreply, Events, NewHState, hibernate} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, hibernate};
+    {stop, Reason, Events, NewHState} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {stop, Reason, NewState}
+  end.
+
+%% @private
+%% @doc Handle incoming messages.
+
+handle_info({tcp_closed, HTTPD} = _Message, State = #state{httpd = HTTPD}) ->
+  {stop, normal, State};
+
+handle_info(Message, State = #state{mod = Handler, state = HState}) ->
+  case Handler:handle_info(Message, HState) of
+    {noreply, Events, NewHState} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState};
+    {noreply, Events, NewHState, infinity = _Timeout} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, infinity};
+    {noreply, Events, NewHState, Timeout} when is_integer(Timeout) ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, Timeout};
+    {noreply, Events, NewHState, hibernate} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {noreply, NewState, hibernate};
+    {stop, Reason, Events, NewHState} ->
+      send_events_if_any(Events, State),
+      NewState = State#state{state = NewHState},
+      {stop, Reason, NewState}
+  end.
+
+%% }}}
+%%----------------------------------------------------------
+%% code change {{{
+
+%% @private
+%% @doc Handle code change.
+
+code_change(OldVsn, State = #state{mod = Handler, state = HState}, Extra) ->
+  case Handler:code_change(OldVsn, HState, Extra) of
+    {ok, NewHState} ->
+      NewState = State#state{state = NewHState},
+      {ok, NewState};
+    {error, Reason} ->
+      {error, Reason}
+  end.
+
+%% }}}
+%%----------------------------------------------------------
+
+-spec send_events_if_any([iolist()] | nothing, #state{}) ->
+  ok.
+
+send_events_if_any(nothing = _Events, _State) ->
+  ok;
+
+send_events_if_any(Events, _State = #state{httpd = HTTPD}) ->
+  [HTTPD ! {event, E} || E <- Events],
+  ok.
+
+%%%---------------------------------------------------------------------------
+%%% vim:ft=erlang:foldmethod=marker
diff --git a/src/overview.edoc b/src/overview.edoc
new file mode 100644 (file)
index 0000000..7336159
--- /dev/null
@@ -0,0 +1,24 @@
+@author Stanislaw Klekot <dozzie@jarowit.net>
+% remember about mod_sse.app.src
+@version 0.0.0
+@title Server-Sent Event server implementation for inets/httpd.
+@doc
+
+mod_sse is a module implementing server side of
+<a href="http://www.w3.org/TR/eventsource/">Server-Sent Events protocol</a>
+for inets httpd server.
+
+The module was written as a low-dependency proof of concept for something to
+be run behind nginx 1.2, later extended to be a little more generic
+(i.e. parametrized).
+
+== Architecture ==
+
+<b>TODO</b>
+
+== Example usage ==
+
+For usage examples see documentation of {@link examples_inets},
+{@link examples_nginx}, and {@link examples_html} modules.
+
+<!-- vim:set ft=edoc: -->