#include "../options.hpp"
#include "../broker.hpp"
#include <iostream>
#ifdef WIN32
#include "proton/acceptor.hpp"
#include "proton/container.hpp"
#include "proton/value.hpp"
class broker {
public:
broker(
const proton::url& url) : handler_(url, queues_) {}
private:
class my_handler : public broker_handler {
public:
my_handler(
const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
std::cout << "broker listening on " << url_ << std::endl;
}
private:
};
private:
queues queues_;
my_handler handler_;
};
int main(int argc, char **argv) {
options opts(argc, argv);
opts.add_value(url, 'a', "address", "listen on URL", "URL");
try {
opts.parse();
broker b(url);
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
#else // WIN32
#include <proton/io.hpp>
#include <sys/select.h>
#include <set>
template <class T> T check(T result, const std::string& msg="io_error: ") {
if (result < 0)
return result;
}
void fd_set_if(bool on, int fd, fd_set *fds);
class broker {
typedef std::set<proton::io::socket_engine*> engines;
queues queues_;
broker_handler handler_;
proton::connection_engine::container container_;
engines engines_;
fd_set reading_, writing_;
public:
broker() : handler_(queues_) {
FD_ZERO(&reading_);
FD_ZERO(&writing_);
}
~broker() {
for (engines::iterator i = engines_.begin(); i != engines_.end(); ++i)
delete *i;
}
std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
FD_SET(listener.socket(), &reading_);
while(true) {
fd_set readable_set = reading_;
fd_set writable_set = writing_;
check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
if (FD_ISSET(listener.socket(), &readable_set)) {
std::string client_host, client_port;
int fd = listener.accept(client_host, client_port);
std::cout << "accepted " << client_host << ":" << client_port
<< " fd=" << fd << std::endl;
FD_SET(fd, &reading_);
FD_SET(fd, &writing_);
}
for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
engines::iterator j = i++;
int flags = 0;
if (FD_ISSET(eng->
socket(), &readable_set))
flags |= proton::io::socket_engine::READ;
if (FD_ISSET(eng->
socket(), &writable_set))
flags |= proton::io::socket_engine::WRITE;
std::cout <<
"closed fd=" << eng->
socket() <<
" "
engines_.erase(j);
delete eng;
}
}
}
}
};
void fd_set_if(bool on, int fd, fd_set *fds) {
if (on)
FD_SET(fd, fds);
else
FD_CLR(fd, fds);
}
int main(int argc, char **argv) {
std::string address("0.0.0.0");
options opts(argc, argv);
opts.add_value(address, 'a', "address", "listen on URL", "URL");
try {
opts.parse();
broker().run(address);
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
#endif