SockIt
|
00001 /* UdpClient.cpp 00002 * 00003 * The UDP client can connect to services over udp. This class was intended to be exposed to the javascript. 00004 * The client currently looks up the host and port everytime a message is sent; the performance hit is currently 00005 * being measured to see if this information should be cached locally. 00006 * 00007 * Javascript API related to the UDP Client: 00008 * 00009 * attach/detachListener (implemented in firebreath) 00010 * send(msg, callback_function) 00011 * close() 00012 */ 00013 00014 #include "UdpClient.h" 00015 00016 UdpClient::UdpClient(const string &host, int port, boost::asio::io_service & ioService) : 00017 Udp(host, port, ioService), resolver(new udp::resolver(io_service)), socket(new udp::socket(io_service)), resolved_endpoint(false) 00018 { 00019 // Check that the connection and resolver are valid, and fail gracefully if they are not 00020 if (!resolver.get() || !socket.get()) 00021 { 00022 failed = true; 00023 string message("Failed to initialize UDP client, failed to initialize properly"); 00024 Logger::error(message, port, host); 00025 fire_error(message); 00026 return; 00027 } 00028 } 00029 00030 UdpClient::UdpClient(const string &host, int port, boost::asio::io_service & ioService, map<string, string> options) : 00031 Udp(host, port, ioService), resolver(new udp::resolver(io_service)), socket(new udp::socket(io_service)), resolved_endpoint(false) 00032 { 00033 // Check that the connection and resolver are valid, and fail gracefully if they are not 00034 if (!resolver.get() || !socket.get()) 00035 { 00036 failed = true; 00037 string message("Failed to initialize UDP client, failed to initialize properly"); 00038 Logger::error(message, port, host); 00039 fire_error(message); 00040 return; 00041 } 00042 00043 parse_args(options); 00044 } 00045 00046 void UdpClient::init_socket() 00047 { 00048 Logger::info( 00049 "Initializing UDP client to host '" + boost::lexical_cast<string>(host) + "' on port " + boost::lexical_cast<string>(port), 00050 port, host); 00051 log_options(); 00052 00053 if (using_ipv6 && *using_ipv6) 00054 socket->open(udp::v6()); 00055 else 00056 socket->open(udp::v4()); 00057 00058 if (!socket->is_open()) 00059 { 00060 string message("Failed to open UDP client socket"); 00061 Logger::error(message, port, host); 00062 fire_error(message); 00063 } 00064 00065 // synchronize the buffer size of the socket with this class's buffer size 00066 boost::asio::socket_base::receive_buffer_size buf_size_option(BUFFER_SIZE); 00067 socket->set_option(buf_size_option); 00068 00069 // set multicast ttl and out going interface 00070 if (multicast && *multicast) 00071 { 00072 if (multicast_ttl) 00073 { 00074 boost::asio::ip::multicast::hops option(*multicast_ttl); 00075 socket->set_option(option); 00076 } 00077 } 00078 00079 if (do_not_route) 00080 { 00081 boost::asio::socket_base::do_not_route option(*do_not_route); 00082 socket->set_option(option); 00083 } 00084 00085 if (reuse_address) 00086 { 00087 boost::asio::socket_base::reuse_address option(*reuse_address); 00088 socket->set_option(option); 00089 } 00090 } 00091 00092 UdpClient::~UdpClient() 00093 { 00094 close(); 00095 } 00096 00097 void UdpClient::close() 00098 { 00099 should_close = true; 00100 00101 if (socket->is_open()) 00102 { 00103 socket->close(); 00104 } 00105 } 00106 00107 void UdpClient::shutdown() 00108 { 00109 if (!failed) 00110 { 00111 should_close = true; 00112 00113 pending_sends_mutex.lock(); 00114 int pending_sends_now = pending_sends; 00115 pending_sends_mutex.unlock(); 00116 00117 if (pending_sends_now == 0) 00118 { 00119 fire_close(); 00120 close(); 00121 } 00122 } 00123 } 00124 00125 void UdpClient::send_bytes(const vector<byte> & bytes) 00126 { 00127 string data; 00128 00129 for (int i = 0; i < bytes.size(); i++) 00130 { 00131 data.push_back((unsigned char) bytes[i]); 00132 } 00133 00134 send(data); 00135 } 00136 00137 void UdpClient::send(const string &msg) 00138 { 00139 if (failed) 00140 { 00141 // Log & fire an error 00142 string message("Trying to send from a UDP client that has permanently failed!"); 00143 Logger::error(message, port, host); 00144 return; 00145 } 00146 00147 if (should_close) 00148 return; 00149 00150 Logger::info("udpclient: sending a msg of size: " + boost::lexical_cast<std::string>(msg.size()) + " which is: " + msg, port, host); 00151 00152 pending_sends_mutex.lock(); 00153 pending_sends++; 00154 pending_sends_mutex.unlock(); 00155 00156 if (!resolved_endpoint) 00157 { 00158 Logger::info("udpclient: resolving " + host + ":" + boost::lexical_cast<string>(port), port, host); 00159 00160 queue_mtx.lock(); 00161 msgs_not_sent.push(msg); 00162 queue_mtx.unlock(); 00163 00164 // Create a query to resolve this host & port 00165 if (using_ipv6 && *using_ipv6) 00166 { 00167 // Asynchronously resolve the remote host, and once the host is resolved, create a connection 00168 udp::resolver::query query(udp::v6(), host, boost::lexical_cast<string>(port), 00169 boost::asio::ip::resolver_query_base::numeric_service); 00170 resolver->async_resolve(query, boost::bind(&UdpClient::resolve_handler, this, _1, _2)); 00171 } 00172 else 00173 { 00174 // Asynchronously resolve the remote host, and once the host is resolved, create a connection 00175 udp::resolver::query query(udp::v4(), host, boost::lexical_cast<string>(port), 00176 boost::asio::ip::resolver_query_base::numeric_service); 00177 resolver->async_resolve(query, boost::bind(&UdpClient::resolve_handler, this, _1, _2)); 00178 } 00179 } 00180 else 00181 { 00182 flush(); // any pending messages? send them. 00183 00184 Logger::info("udpclient: attempting to send " + boost::lexical_cast<string>(msg.size()) + " bytes of data: " + msg, port, host); 00185 00186 // send the message 00187 if (socket->is_open() && remote_endpoint.get()) 00188 { 00189 socket->async_send_to(boost::asio::buffer(msg.data(), msg.size()), *remote_endpoint, 00190 boost::bind(&UdpClient::send_handler, this, _1, _2, msg, host, port)); 00191 00192 Logger::info("udpclient: (async) send called", port, host); 00193 00194 if (!should_close) 00195 listen(); // listen for responses 00196 } 00197 } 00198 } 00199 00200 void UdpClient::resolve_handler(const boost::system::error_code &err, udp::resolver::iterator endpoint_iterator) 00201 { 00202 if (err) 00203 { 00204 udp::resolver::iterator end; 00205 if (endpoint_iterator != end && err == boost::asio::error::host_not_found) 00206 { 00207 // If we haven't tried resolving using all resolvers, try with another 00208 udp::resolver::query query(host, boost::lexical_cast<string>(port)); 00209 resolver->async_resolve(query, boost::bind(&UdpClient::resolve_handler, this, _1, endpoint_iterator++)); 00210 } 00211 else 00212 { // We have tried and cannot recover, fail permanently 00213 string message("Error: resolving host " + host + ":" + boost::lexical_cast<string>(port) + " error was " + err.message()); 00214 00215 Logger::error(message, port, host); 00216 fire_error(message); 00217 00218 return; 00219 } 00220 } 00221 00222 fire_resolve(); 00223 00224 Logger::info("udpclient: resolved, going to send", port, host); 00225 00226 // We succeeded resolving the endpoint, continue 00227 remote_endpoint = boost::make_shared<udp::endpoint>(*endpoint_iterator); 00228 00229 if (!socket->is_open()) 00230 init_socket(); 00231 00232 // This endpoint has now been initialized 00233 resolved_endpoint = true; 00234 00235 flush(); 00236 } 00237 00238 void UdpClient::flush() 00239 { 00240 queue_mtx.lock(); 00241 00242 while (!msgs_not_sent.empty()) 00243 { 00244 string msg = msgs_not_sent.front(); 00245 msgs_not_sent.pop(); 00246 queue_mtx.unlock(); 00247 send(msg); 00248 queue_mtx.lock(); 00249 } 00250 00251 queue_mtx.unlock(); 00252 } 00253 00254 void UdpClient::listen() 00255 { 00256 if (remote_endpoint && remote_endpoint.get()) 00257 socket->async_receive_from(boost::asio::buffer(receive_buffer), *remote_endpoint, 00258 boost::bind(&UdpClient::receive_handler, this, _1, _2, socket, remote_endpoint, host, port)); 00259 else 00260 Logger::warn("remote endpoint is null", port, host); 00261 } 00262 00263 string UdpClient::get_host() 00264 { 00265 return host; 00266 } 00267 00268 int UdpClient::get_port() 00269 { 00270 return port; 00271 } 00272 00273 void UdpClient::fire_error_event(const string & message) 00274 { 00275 if (should_close) 00276 return; 00277 00278 fire_error(message); 00279 } 00280 00281 void UdpClient::fire_data_event(const string data, boost::shared_ptr<udp::socket> socket, boost::shared_ptr<udp::endpoint> endpoint) 00282 { 00283 if (should_close) 00284 return; 00285 00286 fire_data(boost::make_shared<UdpEvent>(this, socket, endpoint, data)); 00287 }