Home  /  RSS  /  RSS Comments  /  RSS for Comet  /  Enter

Posts in category ‘Comet’.

Server-Sent Events in CppCMS Trunk

Tuesday, August 28, 2012, by artyom ; Posted in: Comet; one comment

There is a simple class added to the contrib section in trunk.

https://cppcms.svn.sourceforge.net/svnroot/cppcms/framework/trunk/contrib/server_side/sse/

The Server-Sent Events support is provide in form of a generic class sse::event_source that the user expected to derive from.

Also two simple event source objects with a predefined policy:

Examples of a simple chat and a stock ticker are provided.

Notes:

Easy Comet: Server-Sent Events

Sunday, August 12, 2012, by artyom ; Posted in: FastCGI, Comet; 4 comments

HTML5 Comet Technologies

Today there are two major technologies for efficient implementation of Comet applications using HTML5:

The first ones - WS - provide full-duplex communication such that both client and server can send each other events without creating a new connection. The second ones - SSE - provide real time notification in a single direction, server to client.

It seems that WS is much more popular. Indeed, WS are much more powerful and their support is scheduled for an implementation in CppCMS. On the other hand, WS have very significant limitation: WS protocol is not HTTP. It may look like HTTP and its hand-shake is done at HTTP level, but afterwards, the communication is done using different protocol that is not compatible with HTTP.

As a result, there is no way to implement WS over existing web server API as FastCGI or SCGI. More than that, even if the application uses HTTP directly, not every web server would be able to proxy web sockets communication.

So, despite that WS is very promising technology, it is very hard to deploy it today in the real production environment.

On the other hand, SSE are implemented over plain HTTP without any changes. The protocol is very simple and supported by most browsers, Firefox, Opera, Safari, Chrome and expected to be supported by IE10.

There is no special changes required to underlying web server APIs, thus FastCGI or SCGI would work perfectly well. For example, SSE can be easily implemented using stable version of CppCMS without any special tools.

Additionally SSE support stream synchronization in case of disconnect. The fall-back to long-polling using XHR can be easily implemented.

An Example

‎Lets implement a simple page that receives stock price updates.

First, we create an EventSource object and then we attach a handler that would update the appropriate html filed upon notification from the server side:

function read_data() {
    var stream = new EventSource('/ticker');
    stream.onmessage = function(e){
        document.getElementById('price').innerHTML=e.data;
    };

    stream.onerror = function(e){
        console.log(e);
    };
}

read_data();

The code is very simple and trivial, disconnects are handled automatically and transparently. Now lets take a look on the server side that is little bit more challenging:

Upon request we prepare our content type as required and fetch the ID of the last known price that was sent to the client.

void main(std::string /*url*/)
{
    response().set_content_header("text/event-stream");
    response().set_header("Cache-Control", "no-cache");

    auto last_id = atoi(request().cgetenv("HTTP_LAST_EVENT_ID"));

After that we detach the HTTP context object from the application, such that we will be able to handle multiple connections simultaneously.

    auto context=release_context();

The idle connections would be stored in a special waiters_ set. We add a special callback that allows us to cleanup the clients that had disconnected:

    context->async_on_peer_reset([=](){
        this->waiters_.erase(context);
    });

Note, we use C++11 lambda expressions that make the code much more simple and clear.

Then we check if the last price id that is known to the client and if it differs we send the client an update asynchronously, otherwise we add the client to the waiting list:

    if(last_id != counter_) {
        async_send(context);
    }
    else
        waiters_.insert(context);
} 

The code that sends the request to the client is quite simple, we send the last price id - simple counter that would use us for synchronization in case of disconnect and send an actual data.

void async_send(booster::shared_ptr<cppcms::http::context> waiter)
{
    waiter->response().out() <<
        "id:" <<  counter_ <<"\n"
        "data:" << price_ << "\n"
        "\n";

Then we setup a completion callback, if the operation fails (client had disconnected), we just exit, and the context would be automatically destroyed.

    waiter->async_flush_output([=,counter_](cppcms::http::context::completion_type status){
        if(status!=0)
            return;

Otherwise we check if there were an updates since the data was sent, and if yes we send the latest price once again, otherwise we add the client to the waiting list.

        if(counter_ != this->counter_) {
            this->async_send(waiter);
        }
        else {
            this->waiters_.insert(waiter);
        }
    });

This completes our function.

}

Note: that our lambda expression captures the waiter variable and keeps it alive till the handler is executed.

Now the last and the simplest thing - updating the price. Upon price update, we increase the identification counter and notify all objects in the waiting list.

void update_price(double new_one)
{
    counter_++;
    price_ = new_one;
    for(auto waiter : waiters_) {
        async_send(waiter);
    }

Afterwards we clear the list - now the callback object owns the context and would destroy it in case of an error.

    waiters_.clear();
}

The full code of the sample including simple timer based random price generation can be found there.

Falling Back to Long Polling

Is very simple. We would use the same protocol, but when the event is ready we would close the connection. In order to let the server to distinguish between EventSource and XHR long polling we would add a special header like X-Event-Source-Simulate: Long-Polling

Then we would change a little our async_send function by adding the following lines:

    if(waiter->request().getenv("HTTP_X_EVENT_SOURCE_SIMULATE")=="Long-Polling") {
       waiter->async_complete_response();
      return;
    }

Such that our function would look like:

    waiter->response().out() <<
        "id:" <<  counter_ <<"\n"
        "data:" << price_ << "\n"
        "\n";
    if(waiter->request().getenv("HTTP_X_EVENT_SOURCE_SIMULATE")=="Long-Polling") {
       waiter->async_complete_response();
       return;
    }
    waiter->async_flush_output(...)
    ...

Of course XHR simulation would have to send and manage Last-Event-Id header and parse the response, but the server side would look almost identically.

Connecting to the web server

When it is coming to configuring a web server you should make sure that it does not buffer out-coming request and sends them immediately to the client:

Now once again Nginx shows us its problems:

However, FastCGI does not implement such an option! See this ticket

So don't even try to use Nginx with FastCGI for Server-Sent Events.

Once again, give yourself a favor, use lighttpd

Thanks

Special thanks to Barbu Paul - Gheorghe,that had brought to my attention such a powerful and useful HTML5 feature.

First beta version of CppCMS 1.x.x is officially out!

Thursday, June 24, 2010, by artyom ; Posted in: Progress, Framework, Comet, Unicode and Localization; 10 comments

Hello all CppCMS users.

The first beta version of CppCMS 1.x.x is available for download from the Sourceforge. The build instructions can be found here.

This version very different from CppCMS 0.0.x branch - it fixes many design flaws that had been done the previous version, it is almost 90% rewrite of the original code according to new design.

It also includes many important features

Most significant ones:

So now CppCMS beta is ready.

Hopefully first release candidate version will be ready withing about a 3 month. And the first stable release is expected at the end of 2010 beginning of 2011.

CppCMS 1.x.x updates

Saturday, May 15, 2010, by artyom ; Posted in: Progress, Framework, Comet; 0 comments

After big internal changes I explain some critical architectural updates that CppCMS 1.x.x did.

In few words:

  1. Removed cppcms_boost library. Only several essential tools, not visible to API are still taken from cppcms_boost which compiled statically into cppcms library.
  2. Booster library was introduced. It is a library with boost-like interfaces that do not depend on actual boost. Some of the code is taken from boost library, some are wrappers of good C libraries and some totally new code I had written with ABI compatibility in mind.

Following my previous post feel free to update your working copies.

Rationale

CppCMS 1.x.x would provide backward compatible API and ABI and thus it can't relate on Boost library in it's public API. Only possible way to provide Boost API to users is actually wrapping it.

CppCMS 1.x.x introduces asynchronous even loop to web development - something that is very critical for Comet programming. This loop was based on Boost.Asio. But unfortunately it had very big limitations and writing a good wrapper Boost.Asio was unfeasible.

So a small "Boost_er_" library replaced required functionality from boost partially reimplementing, partially wrapping C libraries and partially borrowing the code from Boost itself.

Booster has following components:

The AIO library it is central CppCMS event loop that has Asio like API - proactor design, callback interface, etc.

However unlike ASIO it uses very few template classes, it is prefork-friendly (unlike ASIO)

Booster.Aio interface would allow Comet application to receive various asynchronous notifications from any source that can deliver such notifications over sockets.

CppCMS meets Comet

Thursday, August 27, 2009, by artyom ; Posted in: Progress, Framework, Comet; 8 comments

One of the major requirements for framework refactoring was support of Comet. Now, with introduction of asynchronous request handling and persistent application servers it becomes reality.

Client Side

There is a HTML source of simple chat client, that uses Dojo toolkit. It does following:

  1. Submits new messages to the server application by posting form using XHR:

     function send_data() {
             var kw = {
                     url : "/chat/post",
                     form : "theform"
             };
             dojo.xhrPost(kw);
             dojo.byId("message").value="";
             return false;
     }
    
  2. Receives new messages from the server using long poll via XHR:

     var message_count = 0;
     function read_data() {
             dojo.xhrGet( {
                     url: "/chat/get/" + message_count,
                     timeout: 120000,
                     handleAs: "text",
                     load: function(response, ioArgs) {
                             dojo.byId("messages").innerHTML =
                                     response
                                     + '<br/>'
                                     + dojo.byId("messages").innerHTML;
                             message_count++;
                             read_data();
                             return response;
                     },
                     error: function(response,ioArgs) {
                             read_data();
                             return response;
                     }
    
             });
     }
     dojo.addOnLoad(read_data);
    

So, the client side is quite simple (however error handling should be quite better).

Server Side

First we create our long running asynchronous application, that receives two kinds for requests: "/post" -- with new data, and "/get/NN" -- receive message nuber NN, we assign these calls to two member functions post and get.

class chat : public cppcms::application {
public:
    chat(cppcms::service &srv) : cppcms::application(srv)
    {
        dispatcher().assign("^/post$",&chat::post,this);
        dispatcher().assign("^/get/(\\d+)$",&chat::get,this,1);
    }

Now, this class includes two data members:

private:
    std::vector<std::string> messages_;
    std::vector<cppcms::intrusive_ptr<cppcms::http::context> > waiters_;

The history of all chat messages -- messages_ and all pending get requests that can't be satisfied, because the message still not exists -- waiters_

Each, "waiter" is actually pointer to request/response context that can be used for message transport.

Now, when new message arrives, post member function is called:

void post()
{
    if(request().request_method()=="POST") {
        if(request().post().find("message")!=request().post().end()) {
            messages_.push_back(request().post().find("message")->second);
            broadcast();
        }
    }
    release_context()->async_complete_response();
}

If the requested message was found, it is added to messages_ list and all waiters are notified using broadcast() member function.

At the end, the current request context is released and completed.

The broadcasting is done as following:

void broadcast()
{
    for(unsigned i=0;i<waiters_.size();i++) {
        waiters_[i]->response().set_plain_text_header();
        waiters_[i]->response().out() << messages_.back();
        waiters_[i]->async_complete_response();
        waiters_[i]=0;
    }
    waiters_.clear();
}

For each pending request the last message is written and the request closed. After that, all pending request are cleaned.

When get request arrives, it is handled by get(std::string no) member function, first of all we check if requested message exists, if so we just return it to user.

unsigned pos=atoi(no.c_str());
if(pos < messages_.size()) {
    response().set_plain_text_header();
    response().out()<<messages_[pos];
    release_context()->async_complete_response();
}

Otherwise, if the requested message is the last one, that does not exists, we add the request context to pending list waiters

else if(pos == messages_.size()) {
    waiters_.push_back(release_context());
}

If requested message it too late -- probably client error, we just set status to "404 Not Found" and return the response.

else {
    response().status(404);
    release_context()->async_complete_response();
}

No, all we need to do is to add application to the main running loop under script name "/char" and start the service.

cppcms::service service(argc,argv);
cppcms::intrusive_ptr<chat> app=new chat(service);
service.applications_pool().mount(app,"/chat");
service.run();

Summary

So, the simple chat service was written with about 50 lines of C++ code and about same amount of JavaScript code.

I must admit, that it is too simplistic and not efficient, for example: if new client connects it receives all messages one by one and not as bulk (can be easily fixed), I do not handle timeouts and disconnects. But the general idea is quite clear:

This is actually a base for future development of tools like XML-RPC and JSON-RPC that allow client to call asynchronously server side objects, it can be used for implementation of any other Comet protocols.

next page

Pages

Categories