SockIt

UdpClient.cpp

Go to the documentation of this file.
00001 /* UdpClient.cpp
00002  *
00003  * The UDP client can connect to services over udp. This class was intended to be exposed to the javascript.
00004  * The client currently looks up the host and port everytime a message is sent; the performance hit is currently
00005  * being measured to see if this information should be cached locally.
00006  *
00007  * Javascript API related to the UDP Client:
00008  *
00009  * attach/detachListener (implemented in firebreath)
00010  * send(msg, callback_function)
00011  * close()
00012  */
00013 
00014 #include "UdpClient.h"
00015 
00016 UdpClient::UdpClient(const string &host, int port, boost::asio::io_service & ioService) :
00017     Udp(host, port, ioService), resolver(new udp::resolver(io_service)), socket(new udp::socket(io_service)), resolved_endpoint(false)
00018 {
00019     // Check that the connection and resolver are valid, and fail gracefully if they are not
00020     if (!resolver.get() || !socket.get())
00021     {
00022         failed = true;
00023         string message("Failed to initialize UDP client, failed to initialize properly");
00024         Logger::error(message, port, host);
00025         fire_error(message);
00026         return;
00027     }
00028 }
00029 
00030 UdpClient::UdpClient(const string &host, int port, boost::asio::io_service & ioService, map<string, string> options) :
00031     Udp(host, port, ioService), resolver(new udp::resolver(io_service)), socket(new udp::socket(io_service)), resolved_endpoint(false)
00032 {
00033     // Check that the connection and resolver are valid, and fail gracefully if they are not
00034     if (!resolver.get() || !socket.get())
00035     {
00036         failed = true;
00037         string message("Failed to initialize UDP client, failed to initialize properly");
00038         Logger::error(message, port, host);
00039         fire_error(message);
00040         return;
00041     }
00042 
00043     parse_args(options);
00044 }
00045 
00046 void UdpClient::init_socket()
00047 {
00048     Logger::info(
00049             "Initializing UDP client to host '" + boost::lexical_cast<string>(host) + "' on port " + boost::lexical_cast<string>(port),
00050             port, host);
00051     log_options();
00052 
00053     if (using_ipv6 && *using_ipv6)
00054         socket->open(udp::v6());
00055     else
00056         socket->open(udp::v4());
00057 
00058     if (!socket->is_open())
00059     {
00060         string message("Failed to open UDP client socket");
00061         Logger::error(message, port, host);
00062         fire_error(message);
00063     }
00064 
00065     // synchronize the buffer size of the socket with this class's buffer size
00066     boost::asio::socket_base::receive_buffer_size buf_size_option(BUFFER_SIZE);
00067     socket->set_option(buf_size_option);
00068 
00069     // set multicast ttl and out going interface
00070     if (multicast && *multicast)
00071     {
00072         if (multicast_ttl)
00073         {
00074             boost::asio::ip::multicast::hops option(*multicast_ttl);
00075             socket->set_option(option);
00076         }
00077     }
00078 
00079     if (do_not_route)
00080     {
00081         boost::asio::socket_base::do_not_route option(*do_not_route);
00082         socket->set_option(option);
00083     }
00084 
00085     if (reuse_address)
00086     {
00087         boost::asio::socket_base::reuse_address option(*reuse_address);
00088         socket->set_option(option);
00089     }
00090 }
00091 
00092 UdpClient::~UdpClient()
00093 {
00094     close();
00095 }
00096 
00097 void UdpClient::close()
00098 {
00099     should_close = true;
00100 
00101     if (socket->is_open())
00102     {
00103         socket->close();
00104     }
00105 }
00106 
00107 void UdpClient::shutdown()
00108 {
00109     if (!failed)
00110     {
00111         should_close = true;
00112 
00113         pending_sends_mutex.lock();
00114         int pending_sends_now = pending_sends;
00115         pending_sends_mutex.unlock();
00116 
00117         if (pending_sends_now == 0)
00118         {
00119             fire_close();
00120             close();
00121         }
00122     }
00123 }
00124 
00125 void UdpClient::send_bytes(const vector<byte> & bytes)
00126 {
00127     string data;
00128 
00129     for (int i = 0; i < bytes.size(); i++)
00130     {
00131         data.push_back((unsigned char) bytes[i]);
00132     }
00133 
00134     send(data);
00135 }
00136 
00137 void UdpClient::send(const string &msg)
00138 {
00139     if (failed)
00140     {
00141         // Log & fire an error
00142         string message("Trying to send from a UDP client that has permanently failed!");
00143         Logger::error(message, port, host);
00144         return;
00145     }
00146 
00147     if (should_close)
00148         return;
00149 
00150     Logger::info("udpclient: sending a msg of size: " + boost::lexical_cast<std::string>(msg.size()) + " which is: " + msg, port, host);
00151 
00152     pending_sends_mutex.lock();
00153     pending_sends++;
00154     pending_sends_mutex.unlock();
00155 
00156     if (!resolved_endpoint)
00157     {
00158         Logger::info("udpclient: resolving " + host + ":" + boost::lexical_cast<string>(port), port, host);
00159 
00160         queue_mtx.lock();
00161         msgs_not_sent.push(msg);
00162         queue_mtx.unlock();
00163 
00164         // Create a query to resolve this host & port
00165         if (using_ipv6 && *using_ipv6)
00166         {
00167             // Asynchronously resolve the remote host, and once the host is resolved, create a connection
00168             udp::resolver::query query(udp::v6(), host, boost::lexical_cast<string>(port),
00169                     boost::asio::ip::resolver_query_base::numeric_service);
00170             resolver->async_resolve(query, boost::bind(&UdpClient::resolve_handler, this, _1, _2));
00171         }
00172         else
00173         {
00174             // Asynchronously resolve the remote host, and once the host is resolved, create a connection
00175             udp::resolver::query query(udp::v4(), host, boost::lexical_cast<string>(port),
00176                     boost::asio::ip::resolver_query_base::numeric_service);
00177             resolver->async_resolve(query, boost::bind(&UdpClient::resolve_handler, this, _1, _2));
00178         }
00179     }
00180     else
00181     {
00182         flush(); // any pending messages? send them.
00183 
00184         Logger::info("udpclient: attempting to send " + boost::lexical_cast<string>(msg.size()) + " bytes of data: " + msg, port, host);
00185 
00186         // send the message
00187         if (socket->is_open() && remote_endpoint.get())
00188         {
00189             socket->async_send_to(boost::asio::buffer(msg.data(), msg.size()), *remote_endpoint,
00190                     boost::bind(&UdpClient::send_handler, this, _1, _2, msg, host, port));
00191 
00192             Logger::info("udpclient: (async) send called", port, host);
00193 
00194             if (!should_close)
00195                 listen(); // listen for responses
00196         }
00197     }
00198 }
00199 
00200 void UdpClient::resolve_handler(const boost::system::error_code &err, udp::resolver::iterator endpoint_iterator)
00201 {
00202     if (err)
00203     {
00204         udp::resolver::iterator end;
00205         if (endpoint_iterator != end && err == boost::asio::error::host_not_found)
00206         {
00207             // If we haven't tried resolving using all resolvers, try with another
00208             udp::resolver::query query(host, boost::lexical_cast<string>(port));
00209             resolver->async_resolve(query, boost::bind(&UdpClient::resolve_handler, this, _1, endpoint_iterator++));
00210         }
00211         else
00212         { // We have tried and cannot recover, fail permanently
00213             string message("Error: resolving host " + host + ":" + boost::lexical_cast<string>(port) + " error was " + err.message());
00214 
00215             Logger::error(message, port, host);
00216             fire_error(message);
00217 
00218             return;
00219         }
00220     }
00221 
00222     fire_resolve();
00223 
00224     Logger::info("udpclient: resolved, going to send", port, host);
00225 
00226     // We succeeded resolving the endpoint, continue
00227     remote_endpoint = boost::make_shared<udp::endpoint>(*endpoint_iterator);
00228 
00229     if (!socket->is_open())
00230         init_socket();
00231 
00232     // This endpoint has now been initialized
00233     resolved_endpoint = true;
00234 
00235     flush();
00236 }
00237 
00238 void UdpClient::flush()
00239 {
00240     queue_mtx.lock();
00241 
00242     while (!msgs_not_sent.empty())
00243     {
00244         string msg = msgs_not_sent.front();
00245         msgs_not_sent.pop();
00246         queue_mtx.unlock();
00247         send(msg);
00248         queue_mtx.lock();
00249     }
00250 
00251     queue_mtx.unlock();
00252 }
00253 
00254 void UdpClient::listen()
00255 {
00256     if (remote_endpoint && remote_endpoint.get())
00257         socket->async_receive_from(boost::asio::buffer(receive_buffer), *remote_endpoint,
00258                 boost::bind(&UdpClient::receive_handler, this, _1, _2, socket, remote_endpoint, host, port));
00259     else
00260         Logger::warn("remote endpoint is null", port, host);
00261 }
00262 
00263 string UdpClient::get_host()
00264 {
00265     return host;
00266 }
00267 
00268 int UdpClient::get_port()
00269 {
00270     return port;
00271 }
00272 
00273 void UdpClient::fire_error_event(const string & message)
00274 {
00275     if (should_close)
00276         return;
00277 
00278     fire_error(message);
00279 }
00280 
00281 void UdpClient::fire_data_event(const string data, boost::shared_ptr<udp::socket> socket, boost::shared_ptr<udp::endpoint> endpoint)
00282 {
00283     if (should_close)
00284         return;
00285 
00286     fire_data(boost::make_shared<UdpEvent>(this, socket, endpoint, data));
00287 }
 All Classes Files Functions Variables Typedefs Friends Defines