Posts in category ‘Comet’.
Server-Sent Events in CppCMS Trunk
There is a simple class added to the contrib section in trunk.
https://cppcms.svn.sourceforge.net/svnroot/cppcms/framework/trunk/contrib/server_side/sse/
The Server-Sent Events support is provide in form of a generic class sse::event_source
that the user expected to derive from.
Also two simple event source objects with a predefined policy:
- Queue bounded up to certain capacity:
sse::bounded_event_queue
- Real time state updates that allows to keep clients updated with the latest events - rather than sending all the events from the beginning:
sse::state_stream
Examples of a simple chat and a stock ticker are provided.
Notes:
- the
sse::event_source
supports falling back to long polling using XHR by sending a special header, notifying the server that the connection should be closed rather than kept open. - Also the code resides withing trunk, it is fully useful with current stable CppCMS release - 1.0.2.
- This code does nothing exceptional, you don't really have to use it to create SSE, it is just makes it easy to handle them without knowing too much about server-side event driven programming.
Easy Comet: Server-Sent Events
HTML5 Comet Technologies
Today there are two major technologies for efficient implementation of Comet applications using HTML5:
The first ones - WS - provide full-duplex communication such that both client and server can send each other events without creating a new connection. The second ones - SSE - provide real time notification in a single direction, server to client.
It seems that WS is much more popular. Indeed, WS are much more powerful and their support is scheduled for an implementation in CppCMS. On the other hand, WS have very significant limitation: WS protocol is not HTTP. It may look like HTTP and its hand-shake is done at HTTP level, but afterwards, the communication is done using different protocol that is not compatible with HTTP.
As a result, there is no way to implement WS over existing web server API as FastCGI or SCGI. More than that, even if the application uses HTTP directly, not every web server would be able to proxy web sockets communication.
So, despite that WS is very promising technology, it is very hard to deploy it today in the real production environment.
On the other hand, SSE are implemented over plain HTTP without any changes. The protocol is very simple and supported by most browsers, Firefox, Opera, Safari, Chrome and expected to be supported by IE10.
There is no special changes required to underlying web server APIs, thus FastCGI or SCGI would work perfectly well. For example, SSE can be easily implemented using stable version of CppCMS without any special tools.
Additionally SSE support stream synchronization in case of disconnect. The fall-back to long-polling using XHR can be easily implemented.
An Example
‎Lets implement a simple page that receives stock price updates.
First, we create an EventSource object and then we attach a handler that would update the appropriate html filed upon notification from the server side:
function read_data() {
var stream = new EventSource('/ticker');
stream.onmessage = function(e){
document.getElementById('price').innerHTML=e.data;
};
stream.onerror = function(e){
console.log(e);
};
}
read_data();
The code is very simple and trivial, disconnects are handled automatically and transparently. Now lets take a look on the server side that is little bit more challenging:
Upon request we prepare our content type as required and fetch the ID of the last known price that was sent to the client.
void main(std::string /*url*/)
{
response().set_content_header("text/event-stream");
response().set_header("Cache-Control", "no-cache");
auto last_id = atoi(request().cgetenv("HTTP_LAST_EVENT_ID"));
After that we detach the HTTP context object from the application, such that we will be able to handle multiple connections simultaneously.
auto context=release_context();
The idle connections would be stored in a special waiters_
set. We add a special callback that allows us to cleanup the clients that had disconnected:
context->async_on_peer_reset([=](){
this->waiters_.erase(context);
});
Note, we use C++11 lambda expressions that make the code much more simple and clear.
Then we check if the last price id that is known to the client and if it differs we send the client an update asynchronously, otherwise we add the client to the waiting list:
if(last_id != counter_) {
async_send(context);
}
else
waiters_.insert(context);
}
The code that sends the request to the client is quite simple, we send the last price id - simple counter that would use us for synchronization in case of disconnect and send an actual data.
void async_send(booster::shared_ptr<cppcms::http::context> waiter)
{
waiter->response().out() <<
"id:" << counter_ <<"\n"
"data:" << price_ << "\n"
"\n";
Then we setup a completion callback, if the operation fails (client had disconnected), we just exit, and the context would be automatically destroyed.
waiter->async_flush_output([=,counter_](cppcms::http::context::completion_type status){
if(status!=0)
return;
Otherwise we check if there were an updates since the data was sent, and if yes we send the latest price once again, otherwise we add the client to the waiting list.
if(counter_ != this->counter_) {
this->async_send(waiter);
}
else {
this->waiters_.insert(waiter);
}
});
This completes our function.
}
Note: that our lambda expression captures the waiter
variable and keeps it alive till the handler is executed.
Now the last and the simplest thing - updating the price. Upon price update, we increase the identification counter and notify all objects in the waiting list.
void update_price(double new_one)
{
counter_++;
price_ = new_one;
for(auto waiter : waiters_) {
async_send(waiter);
}
Afterwards we clear the list - now the callback object owns the context and would destroy it in case of an error.
waiters_.clear();
}
The full code of the sample including simple timer based random price generation can be found there.
Falling Back to Long Polling
Is very simple. We would use the same protocol, but when the event is ready we would close the connection. In order to let the server to distinguish between EventSource and
XHR long polling we would add a special header like X-Event-Source-Simulate: Long-Polling
Then we would change a little our async_send
function by adding the following
lines:
if(waiter->request().getenv("HTTP_X_EVENT_SOURCE_SIMULATE")=="Long-Polling") {
waiter->async_complete_response();
return;
}
Such that our function would look like:
waiter->response().out() <<
"id:" << counter_ <<"\n"
"data:" << price_ << "\n"
"\n";
if(waiter->request().getenv("HTTP_X_EVENT_SOURCE_SIMULATE")=="Long-Polling") {
waiter->async_complete_response();
return;
}
waiter->async_flush_output(...)
...
Of course XHR simulation would have to send and manage Last-Event-Id
header and parse the response, but the server side would look almost identically.
Connecting to the web server
When it is coming to configuring a web server you should make sure that it does not buffer out-coming request and sends them immediately to the client:
- Lighttpd does this by default for FastCGI, SCGI and HTTP protocols.
- Apache does this by default for SCGI and HTTP, but for FastCGI (mod_fastcgi) it requires
-flush
option to work correctly.
Now once again Nginx shows us its problems:
- SCGI requires option
scgi_buffering off
- HTTP requires option
http_buffering off
However, FastCGI does not implement such an option! See this ticket
So don't even try to use Nginx with FastCGI for Server-Sent Events.
Once again, give yourself a favor, use lighttpd
Thanks
Special thanks to Barbu Paul - Gheorghe,that had brought to my attention such a powerful and useful HTML5 feature.
First beta version of CppCMS 1.x.x is officially out!
Hello all CppCMS users.
The first beta version of CppCMS 1.x.x is available for download from the Sourceforge. The build instructions can be found here.
This version very different from CppCMS 0.0.x branch - it fixes many design flaws that had been done the previous version, it is almost 90% rewrite of the original code according to new design.
It also includes many important features
Most significant ones:
Full CppCMS core rewrite that introduced:
- Asynchronous programming support
- Removal of 3rd part libraries from the core api.
- Stable API and ABI through all major releases.
- Improved Ajax support with introduction of JSON-RPC
- Powerful i18n and l10n
- Native Windows support including support of MSVC.
- And much more...
So now CppCMS beta is ready.
Hopefully first release candidate version will be ready withing about a 3 month. And the first stable release is expected at the end of 2010 beginning of 2011.
CppCMS 1.x.x updates
After big internal changes I explain some critical architectural updates that CppCMS 1.x.x did.
In few words:
- Removed
cppcms_boost
library. Only several essential tools, not visible to API are still taken fromcppcms_boost
which compiled statically into cppcms library. - Booster library was introduced. It is a library with boost-like interfaces that do not depend on actual boost. Some of the code is taken from boost library, some are wrappers of good C libraries and some totally new code I had written with ABI compatibility in mind.
Following my previous post feel free to update your working copies.
Rationale
CppCMS 1.x.x would provide backward compatible API and ABI and thus it can't relate on Boost library in it's public API. Only possible way to provide Boost API to users is actually wrapping it.
CppCMS 1.x.x introduces asynchronous even loop to web development - something that is very critical for Comet programming. This loop was based on Boost.Asio. But unfortunately it had very big limitations and writing a good wrapper Boost.Asio was unfeasible.
So a small "Boost_er_" library replaced required functionality from boost partially reimplementing, partially wrapping C libraries and partially borrowing the code from Boost itself.
Booster has following components:
- Smart pointers:
shared_ptr
,intrusive_ptr
from Boost and small pointers for pimpl design:hold_ptr
,copy_ptr
andclone_ptr
(my own). - Function - similar to
std::tr1::function
andboost::function
- small class important for callbacks implementation (my own implementation). - Regular expressions - wrapper of PCRE library with Boost.Regex like API. It provides only
regex
,match_result
,regex_search
andregex_match
. - System:
error_code
,error_category
andsystem_error
(my own). - Thread: Boost like pthreads wrapper.
- Locale - full Boost.Locale library (my own).
- And the most important AIO library - library inspired by Asio.
The AIO library it is central CppCMS event loop that has Asio like API - proactor design, callback interface, etc.
However unlike ASIO it uses very few template classes, it is prefork-friendly (unlike ASIO)
Booster.Aio interface would allow Comet application to receive various asynchronous notifications from any source that can deliver such notifications over sockets.
CppCMS meets Comet
One of the major requirements for framework refactoring was support of Comet. Now, with introduction of asynchronous request handling and persistent application servers it becomes reality.
Client Side
There is a HTML source of simple chat client, that uses Dojo toolkit. It does following:
Submits new messages to the server application by posting form using XHR:
function send_data() { var kw = { url : "/chat/post", form : "theform" }; dojo.xhrPost(kw); dojo.byId("message").value=""; return false; }
Receives new messages from the server using long poll via XHR:
var message_count = 0; function read_data() { dojo.xhrGet( { url: "/chat/get/" + message_count, timeout: 120000, handleAs: "text", load: function(response, ioArgs) { dojo.byId("messages").innerHTML = response + '<br/>' + dojo.byId("messages").innerHTML; message_count++; read_data(); return response; }, error: function(response,ioArgs) { read_data(); return response; } }); } dojo.addOnLoad(read_data);
So, the client side is quite simple (however error handling should be quite better).
Server Side
First we create our long running asynchronous application, that receives two kinds
for requests: "/post" -- with new data, and "/get/NN" -- receive message nuber NN, we assign these calls to two member functions post
and get
.
class chat : public cppcms::application {
public:
chat(cppcms::service &srv) : cppcms::application(srv)
{
dispatcher().assign("^/post$",&chat::post,this);
dispatcher().assign("^/get/(\\d+)$",&chat::get,this,1);
}
Now, this class includes two data members:
private:
std::vector<std::string> messages_;
std::vector<cppcms::intrusive_ptr<cppcms::http::context> > waiters_;
The history of all chat messages -- messages_
and all pending get
requests
that can't be satisfied, because the message still not exists -- waiters_
Each, "waiter" is actually pointer to request/response context that can be used for message transport.
Now, when new message arrives, post
member function is called:
void post()
{
if(request().request_method()=="POST") {
if(request().post().find("message")!=request().post().end()) {
messages_.push_back(request().post().find("message")->second);
broadcast();
}
}
release_context()->async_complete_response();
}
If the requested message was found, it is added to messages_
list and all waiters are notified using broadcast()
member function.
At the end, the current request context is released and completed.
The broadcasting is done as following:
void broadcast()
{
for(unsigned i=0;i<waiters_.size();i++) {
waiters_[i]->response().set_plain_text_header();
waiters_[i]->response().out() << messages_.back();
waiters_[i]->async_complete_response();
waiters_[i]=0;
}
waiters_.clear();
}
For each pending request the last message is written and the request closed. After that, all pending request are cleaned.
When get
request arrives, it is handled by get(std::string no)
member function, first of all
we check if requested message exists, if so we just return it to user.
unsigned pos=atoi(no.c_str());
if(pos < messages_.size()) {
response().set_plain_text_header();
response().out()<<messages_[pos];
release_context()->async_complete_response();
}
Otherwise, if the requested message is the last one, that does not exists, we
add the request context to pending list waiters
else if(pos == messages_.size()) {
waiters_.push_back(release_context());
}
If requested message it too late -- probably client error, we just set status to "404 Not Found" and return the response.
else {
response().status(404);
release_context()->async_complete_response();
}
No, all we need to do is to add application to the main running loop under script name "/char" and start the service.
cppcms::service service(argc,argv);
cppcms::intrusive_ptr<chat> app=new chat(service);
service.applications_pool().mount(app,"/chat");
service.run();
Summary
So, the simple chat service was written with about 50 lines of C++ code and about same amount of JavaScript code.
I must admit, that it is too simplistic and not efficient, for example: if new client connects it receives all messages one by one and not as bulk (can be easily fixed), I do not handle timeouts and disconnects. But the general idea is quite clear:
- Asynchronous long running application that handles all request is created.
- It manages all outstanding request and uses them for server side push.
This is actually a base for future development of tools like XML-RPC and JSON-RPC that allow client to call asynchronously server side objects, it can be used for implementation of any other Comet protocols.