SockIt
|
00001 /* 00002 * File: TcpClient.cpp 00003 * Author: jtedesco 00004 * 00005 * Created on May 26, 2011, 12:32 PM 00006 */ 00007 00008 #include "TcpClient.h" 00009 00010 TcpClient::TcpClient(const string & host, int port, boost::asio::io_service & io_service) : 00011 Tcp(host, port, io_service), resolver(new tcp::resolver(io_service)), connection(new tcp::socket(io_service)) 00012 { 00013 // Check that the connection and resolver are valid, and fail gracefully if they are not 00014 if (!resolver.get() || !connection.get()) 00015 { 00016 failed = true; 00017 string message("Failed to initialize TCP client, failed to initialize properly"); 00018 Logger::error(message, port, host); 00019 fire_error(message); 00020 return; 00021 } 00022 00023 init(); 00024 } 00025 00026 TcpClient::TcpClient(const string & host, int port, boost::asio::io_service & io_service, map<string, string> options) : 00027 Tcp(host, port, io_service), resolver(new tcp::resolver(io_service)), connection(new tcp::socket(io_service)) 00028 { 00029 // Check that the connection and resolver are valid, and fail gracefully if they are not 00030 if (!resolver.get() || !connection.get()) 00031 { 00032 failed = true; 00033 string message("Failed to initialize TCP client, failed to initialize properly"); 00034 Logger::error(message, port, host); 00035 fire_error(message); 00036 return; 00037 } 00038 00039 parse_args(options); 00040 00041 init(); 00042 } 00043 00044 void TcpClient::init() 00045 { 00046 connected = false; 00047 00048 Logger::info( 00049 "Initializing TCP client to host '" + boost::lexical_cast<string>(host) + "' on port " + boost::lexical_cast<string>(port), 00050 port, host); 00051 log_options(); 00052 Logger::info( 00053 "Trying to resolve DNS information for host " + boost::lexical_cast<string>(host) + "', port " + boost::lexical_cast<string>( 00054 port), port, host); 00055 00056 // Create a query to resolve this host & port 00057 if (using_ipv6 && *using_ipv6) 00058 { 00059 // Asynchronously resolve the remote host, and once the host is resolved, create a connection 00060 tcp::resolver::query query(tcp::v6(), host, boost::lexical_cast<string>(port), 00061 boost::asio::ip::resolver_query_base::numeric_service); 00062 if (resolver.get()) 00063 resolver->async_resolve(query, boost::bind(&TcpClient::resolve_handler, this, _1, _2)); 00064 else 00065 { 00066 failed = true; 00067 string message("TCP client failed to resolve, invalid resolver"); 00068 Logger::error(message, port, host); 00069 fire_error(message); 00070 } 00071 } 00072 else 00073 { 00074 // Asynchronously resolve the remote host, and once the host is resolved, create a connection 00075 tcp::resolver::query query(tcp::v4(), host, boost::lexical_cast<string>(port), 00076 boost::asio::ip::resolver_query_base::numeric_service); 00077 if (resolver.get()) 00078 resolver->async_resolve(query, boost::bind(&TcpClient::resolve_handler, this, _1, _2)); 00079 else 00080 { 00081 failed = true; 00082 string message("TCP client failed to resolve, invalid resolver"); 00083 Logger::error(message, port, host); 00084 fire_error(message); 00085 } 00086 } 00087 } 00088 00089 void TcpClient::init_socket() 00090 { 00091 // Set the socket options for this client's TCP socket 00092 if (do_not_route) 00093 { 00094 boost::asio::socket_base::do_not_route option(*do_not_route); 00095 connection->set_option(option); 00096 } 00097 if (keep_alive) 00098 { 00099 boost::asio::socket_base::keep_alive option(*keep_alive); 00100 connection->set_option(option); 00101 } 00102 if (no_delay) 00103 { 00104 boost::asio::ip::tcp::no_delay option(*no_delay); 00105 connection->set_option(option); 00106 } 00107 if (keep_alive_timeout) 00108 { 00109 // Set the TCP keep-alive timeout - ignores return value 00110 set_tcp_keepalive(connection); 00111 } 00112 } 00113 00114 TcpClient::~TcpClient() 00115 { 00116 close(); 00117 } 00118 00119 void TcpClient::close() 00120 { 00121 waiting_to_shutdown = true; 00122 resolver->cancel(); 00123 00124 // Shutdown the IO service, cancel any transfers on the socket, and close the socket 00125 if (connection->is_open()) 00126 { 00127 try 00128 { 00129 connection->shutdown(connection->shutdown_both); 00130 } 00131 catch (...) 00132 { 00133 Logger::warn("Socket shutdown improperly, proceeding anyway", port, host); 00134 } 00135 00136 connection->close(); 00137 } 00138 else 00139 { 00140 Logger::warn("Failed to cleanly shutdown TCP client connection, continuing anyways", port, host); 00141 } 00142 } 00143 00144 void TcpClient::shutdown() 00145 { 00146 if (!failed) 00147 { 00148 active_jobs_mutex.lock(); 00149 int current_jobs = active_jobs; 00150 active_jobs_mutex.unlock(); 00151 waiting_to_shutdown = true; 00152 if (current_jobs == 0) 00153 { 00154 fire_close(); 00155 close(); 00156 } 00157 } 00158 else 00159 { 00160 // Log & fire an error 00161 Logger::error("Trying to start the server listening, but the server has permanently failed!", port, host); 00162 } 00163 } 00164 00165 void TcpClient::send_bytes(const vector<byte> & bytes) 00166 { 00167 string data; 00168 00169 for (int i = 0; i < bytes.size(); i++) 00170 { 00171 data.push_back((unsigned char) bytes[i]); 00172 } 00173 00174 send(data); 00175 } 00176 00177 void TcpClient::send(const string & data) 00178 { 00179 if (failed) 00180 { 00181 // Log & fire an error 00182 string message("Trying to send data on a TCP client that has permanently failed!"); 00183 Logger::error(message, port, host); 00184 return; 00185 } 00186 00187 // If we're not already connected, then queue this data to be send 00188 connected_mutex.lock(); 00189 bool connected_now = connected; 00190 connected_mutex.unlock(); 00191 00192 if (!connected_now) 00193 { 00194 active_jobs_mutex.lock(); 00195 active_jobs++; 00196 active_jobs_mutex.unlock(); 00197 data_queue_mutex.lock(); 00198 data_queue.push(data); 00199 data_queue_mutex.unlock(); 00200 } 00201 else 00202 { 00203 // Check if the queue needs to be flushed, and if so, flush it 00204 flush(); 00205 00206 // Send the data, and record that we've started a new send job 00207 active_jobs_mutex.lock(); 00208 active_jobs++; 00209 active_jobs_mutex.unlock(); 00210 connection->async_send(boost::asio::buffer(data.data(), data.size()), 00211 boost::bind(&TcpClient::send_handler, this, _1, _2, data, host, port, connection)); 00212 } 00213 } 00214 00215 void TcpClient::resolve_handler(const boost::system::error_code & error_code, tcp::resolver::iterator endpoint_iterator) 00216 { 00217 // If we encountered an error 00218 if (error_code) 00219 { 00220 // Check for disconnection errors 00221 std::set<boost::system::error_code>::iterator find_result = disconnect_errors.find(error_code); 00222 if (find_result != disconnect_errors.end()) 00223 { 00224 if (error_code == boost::asio::error::operation_aborted) 00225 { 00226 Logger::warn("TCP resolve was aborted"); 00227 return; 00228 } 00229 else 00230 { 00231 string message("TCP resolve failed, disconnected: '" + error_code.message() + "'"); 00232 Logger::warn(message, port, host); 00233 fire_disconnect(message); 00234 return; 00235 } 00236 } 00237 00238 // We failed permanently, fail permanently and log it 00239 string message("Failed to resolve host with error: " + error_code.message()); 00240 Logger::error(message, port, host); 00241 fire_error(message); 00242 return; 00243 } 00244 00245 if (connection.get()) 00246 { 00247 // Attempt to connect to the endpoint, using IPv6 if specified 00248 tcp::endpoint receiver_endpoint = *endpoint_iterator; 00249 if (using_ipv6 && *using_ipv6) 00250 { 00251 connection->open(tcp::v6()); 00252 } 00253 else 00254 { 00255 connection->open(tcp::v4()); 00256 } 00257 00258 // Initialize the socket if it's open 00259 if (!connection->is_open()) 00260 init_socket(); 00261 00262 // Log success 00263 Logger::info("Host has been resolved, attempting to connect to host", port, host); 00264 fire_resolve(); 00265 00266 // Try to asynchronously establish a connection to the host 00267 connection->async_connect(receiver_endpoint, boost::bind(&TcpClient::connect_handler, this, _1, endpoint_iterator)); 00268 } 00269 else 00270 { 00271 failed = true; 00272 string message("TCP client failed to resolve, invalid connection"); 00273 Logger::error(message, port, host); 00274 fire_error(message); 00275 } 00276 } 00277 00278 void TcpClient::connect_handler(const boost::system::error_code & error_code, tcp::resolver::iterator endpoint_iterator) 00279 { 00280 // If there was an error to connect, log the error and abort connection 00281 if (error_code) 00282 { 00283 // Check for disconnection errors 00284 std::set<boost::system::error_code>::iterator find_result = disconnect_errors.find(error_code); 00285 if (find_result != disconnect_errors.end()) 00286 { 00287 if (error_code == boost::asio::error::operation_aborted) 00288 { 00289 Logger::warn("TCP connect was aborted"); 00290 return; 00291 } 00292 else 00293 { 00294 string message("TCP connect failed, disconnected: '" + error_code.message() + "'"); 00295 Logger::warn(message, port, host); 00296 fire_disconnect(message); 00297 return; 00298 } 00299 } 00300 00301 // Check if this was not the last possible connection in the iterator 00302 tcp::resolver::iterator end; 00303 if (endpoint_iterator != end && error_code == boost::asio::error::host_not_found) 00304 { 00305 // Create a query to resolve this host & port 00306 tcp::resolver::query query(host, boost::lexical_cast<string>(port)); 00307 00308 // Try the next possible endpoint 00309 resolver->async_resolve(query, boost::bind(&TcpClient::resolve_handler, this, _1, endpoint_iterator++)); 00310 } 00311 else 00312 { 00313 // If we've tried every possible endpoint, fail permanently 00314 string message("Failed to connect to host, with message: '" + error_code.message() + "'"); 00315 Logger::error(message, port, host); 00316 fire_error(message); 00317 return; 00318 } 00319 } 00320 00321 // Log success, and record that we are now connected 00322 Logger::info("Connection established to host", port, host); 00323 fire_connect(); 00324 00325 // Start receiving data on this connection 00326 connection->async_receive(boost::asio::buffer(receive_buffer), 00327 boost::bind(&TcpClient::receive_handler, this, _1, _2, connection, host, port)); 00328 00329 // Flush data from the queue if necessary 00330 flush(); 00331 00332 connected_mutex.lock(); 00333 00334 connected = true; 00335 connected_mutex.unlock(); 00336 } 00337 00338 void TcpClient::flush() 00339 { 00340 data_queue_mutex.lock(); 00341 while (!data_queue.empty()) 00342 { 00343 data_queue_mutex.unlock(); 00344 00345 // Pull the next data chunk to be sent off the queue 00346 data_queue_mutex.lock(); 00347 string data = data_queue.front(); 00348 data_queue.pop(); 00349 data_queue_mutex.unlock(); 00350 00351 // Asynchronously send the data across the connection 00352 connection->async_send(boost::asio::buffer(data.data(), data.size()), 00353 boost::bind(&TcpClient::send_handler, this, _1, _2, data, host, port, connection)); 00354 } 00355 data_queue_mutex.unlock(); 00356 } 00357 00358 int TcpClient::get_port() 00359 { 00360 return port; 00361 } 00362 00363 string TcpClient::get_host() 00364 { 00365 return host; 00366 } 00367 00368 void TcpClient::fire_error_event(const string & message) 00369 { 00370 fire_error(message); 00371 } 00372 00373 void TcpClient::fire_disconnect_event(const string & message) 00374 { 00375 fire_disconnect(message); 00376 } 00377 00378 void TcpClient::fire_data_event(const string data, boost::shared_ptr<tcp::socket> connection) 00379 { 00380 fire_data(boost::make_shared<TcpEvent>(this, connection, data)); 00381 } 00382