SockIt
|
00001 /* 00002 * File: TcpServer.cpp 00003 * Author: jtedesco 00004 * 00005 * Created on May 26, 2011, 12:33 PM 00006 */ 00007 00008 #include "TcpServer.h" 00009 00010 TcpServer::TcpServer(int port, boost::asio::io_service & io_service) : 00011 Tcp("SERVER", port, io_service) 00012 { 00013 init(); 00014 } 00015 00016 TcpServer::TcpServer(int port, boost::asio::io_service & io_service, map<string, string> options) : 00017 Tcp("SERVER", port, io_service) 00018 { 00019 parse_args(options); 00020 init(); 00021 } 00022 00023 TcpServer::~TcpServer() 00024 { 00025 close(); 00026 } 00027 00028 void TcpServer::close() 00029 { 00030 // Shutdown the IO service, cancel any transfers on the socket, and close the socket 00031 waiting_to_shutdown = true; 00032 00033 set<boost::shared_ptr<tcp::socket> >::iterator it; 00034 for(it = connections.begin(); it != connections.end(); it++) 00035 { 00036 try 00037 { 00038 if((*it) && (*it).get() && (*it)->is_open()) 00039 { 00040 (*it)->close(); 00041 } 00042 } 00043 catch(boost::system::error_code &e) 00044 { 00045 Logger::warn("Error in TcpServer deconstructor: " + e.message(), port, host); 00046 } 00047 catch(std::exception &er) 00048 { 00049 Logger::warn("Error in TcpServer deconstructor: " + std::string(er.what()), port, host); 00050 } 00051 catch(...) 00052 { 00053 Logger::warn("Error occured that could not be caught"); 00054 } 00055 } 00056 00057 connections.clear(); 00058 00059 if (acceptor && acceptor->is_open()) 00060 { 00061 acceptor->close(); 00062 } 00063 else 00064 { 00065 // Don't fire an error, otherwise the plugin will crash 00066 string message("Could not cleanly shut down acceptor in TCP server, desctructing anyways"); 00067 Logger::warn(message, port, host); 00068 } 00069 } 00070 00071 void TcpServer::init() 00072 { 00073 // Bind the acceptor to the correct port & set the options on this acceptor 00074 try 00075 { 00076 if (using_ipv6 && *using_ipv6) 00077 { 00078 acceptor = boost::shared_ptr<tcp::acceptor>(new tcp::acceptor(io_service, tcp::endpoint(tcp::v6(), port))); 00079 } 00080 else 00081 { 00082 acceptor = boost::shared_ptr<tcp::acceptor>(new tcp::acceptor(io_service, tcp::endpoint(tcp::v4(), port))); 00083 } 00084 } 00085 catch (boost::system::system_error &e) 00086 { 00087 // Catch this error, and fail gracefully 00088 string message(string("Caught error initializing TCP server: '") + e.what() + "'"); 00089 Logger::error(message, port, host); 00090 00091 // Stop this server from ever doing anything again 00092 failed = true; 00093 } 00094 00095 // Check that acceptor was created successfully 00096 if (!acceptor.get()) 00097 { 00098 // Fail gracefully and stop this server from ever doing anything again 00099 string message("Failed to initialized TCP server acceptor"); 00100 Logger::error(message, port, host); 00101 failed = true; 00102 } 00103 } 00104 00105 void TcpServer::init_socket(boost::shared_ptr<tcp::socket> connection) 00106 { 00107 // Set the socket options for this client's TCP socket 00108 if (do_not_route) 00109 { 00110 boost::asio::socket_base::do_not_route option(*do_not_route); 00111 connection->set_option(option); 00112 } 00113 00114 // Toggle keep alive (enabled/disabled) 00115 if (keep_alive) 00116 { 00117 boost::asio::socket_base::keep_alive option(*keep_alive); 00118 connection->set_option(option); 00119 } 00120 if (no_delay) 00121 { 00122 boost::asio::ip::tcp::no_delay option(*no_delay); 00123 connection->set_option(option); 00124 } 00125 00126 if (keep_alive_timeout) 00127 { 00128 // Set the TCP keep-alive timeout - ignores return value 00129 set_tcp_keepalive(connection); 00130 } 00131 } 00132 00138 void socket_deallocate(tcp::socket * socket) 00139 { 00140 // may already have been deallocated 00141 if(!socket) return; 00142 00143 try 00144 { 00145 if(socket && socket->is_open()) 00146 { 00147 //s->shutdown(s->shutdown_both); 00148 socket->close(); 00149 } 00150 } 00151 catch(boost::system::error_code &e) 00152 { 00153 Logger::error("socket deallocate: " + e.message()); 00154 } 00155 catch(std::exception &er) 00156 { 00157 Logger::error("socket deallocate: " + std::string(er.what())); 00158 } 00159 00160 delete socket; 00161 socket = 0; 00162 } 00163 00164 void TcpServer::start_listening() 00165 { 00166 if(failed) 00167 { 00168 // Log & fire an error 00169 string message("Trying to start the server listening, but the server has permanently failed!"); 00170 Logger::error(message, port, host); 00171 } 00172 00173 // Log listening 00174 Logger::info("TCP server about to start listening for incoming connections on port " 00175 + boost::lexical_cast<string>(port), port, host); 00176 00177 // Prepare to accept a new connection and asynchronously accept new incoming connections 00178 boost::shared_ptr < tcp::socket > connection(new tcp::socket(io_service), socket_deallocate); 00179 connections.insert(connection); 00180 00181 // Try to accept any new connection 00182 if(acceptor.get()) 00183 { 00184 acceptor->async_accept(*connection, boost::bind(&TcpServer::accept_handler, this, _1, connection, host, port)); 00185 } 00186 else 00187 { 00188 // Fail gracefully and stop this server from ever doing anything again 00189 string message("TCP server failed to accept, acceptor invalid"); 00190 Logger::error(message, port, host); 00191 failed = true; 00192 return; 00193 } 00194 00195 // Callback acknowledging that the server has opened 00196 fire_open(); 00197 } 00198 00199 void TcpServer::shutdown() 00200 { 00201 if(!failed) 00202 { 00203 waiting_to_shutdown = true; 00204 if (active_jobs == 0) 00205 { 00206 fire_close(); 00207 close(); 00208 } 00209 } 00210 else 00211 { 00212 // Log & fire an error 00213 string message("Trying to shutdown the TCP server, but the server has permanently failed!"); 00214 Logger::error(message, port, host); 00215 } 00216 } 00217 00218 void TcpServer::accept_handler(const boost::system::error_code & error_code, boost::shared_ptr<tcp::socket> connection, string host, int port) 00219 { 00220 // Log error & return if there is an error 00221 if (error_code) 00222 { 00223 // Check for disconnection errors 00224 std::set<boost::system::error_code>::iterator find_result = disconnect_errors.find(error_code); 00225 if (find_result != disconnect_errors.end()) 00226 { 00227 if (error_code == boost::asio::error::operation_aborted) 00228 { 00229 // If we're waiting to shutdown, this is part of the normal process 00230 if(waiting_to_shutdown) 00231 { 00232 Logger::info("TCP server stopped listening for new connections", port, host); 00233 } 00234 else 00235 { 00236 Logger::warn("TCP accept was aborted", port, host); 00237 } 00238 } 00239 else 00240 { 00241 string message("TCP accept failed, disconnected: '" + error_code.message() + "'"); 00242 Logger::warn(message, port, host); 00243 fire_disconnect(message); 00244 } 00245 return; 00246 } 00247 00248 string message("Error accepting incoming connection: '" + error_code.message() + "'"); 00249 Logger::error(message, port, host); 00250 fire_error(message); 00251 return; 00252 } 00253 00254 // Initialize the socket options before we start using it 00255 init_socket(connection); 00256 00257 // Log that we've successfully accepted a new connection, and fire the 'onconnect' event 00258 string message("TCP server accepted new connection from " + connection->remote_endpoint().address().to_string() + " port " 00259 + boost::lexical_cast<string>(connection->remote_endpoint().port())); 00260 Logger::info(message, port, host); 00261 fire_connect(); 00262 00263 if (connection.get()) 00264 connection->async_receive(boost::asio::buffer(receive_buffer), 00265 boost::bind(&TcpServer::receive_handler, this, _1, _2, connection, host, port)); 00266 00267 // Start listening for new connections, if we're not waiting to close 00268 if (!waiting_to_shutdown) 00269 { 00270 start_listening(); 00271 } 00272 } 00273 00274 00275 void TcpServer::receive_handler(const boost::system::error_code & error_code, std::size_t bytesTransferred, 00276 boost::shared_ptr<tcp::socket> connection, string host, int port) 00277 { 00278 Tcp::receive_handler(error_code, bytesTransferred, connection, host, port); 00279 00280 if(connection && connection.get() && !connection->is_open()) 00281 connections.erase(connection); 00282 } 00283 00284 int TcpServer::get_port() 00285 { 00286 return port; 00287 } 00288 00289 void TcpServer::fire_error_event(const string & message) 00290 { 00291 fire_error(message); 00292 } 00293 00294 void TcpServer::fire_disconnect_event(const string & message) 00295 { 00296 fire_disconnect(message); 00297 } 00298 00299 void TcpServer::fire_data_event(const string data, boost::shared_ptr<tcp::socket> connection) 00300 { 00301 fire_data(boost::make_shared<TcpEvent>(this, connection, data)); 00302 }