SockIt
|
00001 /* 00002 * Tcp.cpp 00003 * 00004 * Created on: Jun 8, 2011 00005 * Author: jtedesco 00006 */ 00007 00008 #include "Tcp.h" 00009 00010 Tcp::Tcp(string host, int port, boost::asio::io_service & ioService) : 00011 host(host), port(port), waiting_to_shutdown(false), active_jobs(0), io_service(ioService), using_ipv6(false), failed(false) 00012 { 00013 // Collect the set of errors classified as 'disconnect' type errors 00014 disconnect_errors.insert(boost::asio::error::connection_reset); 00015 disconnect_errors.insert(boost::asio::error::eof); 00016 disconnect_errors.insert(boost::asio::error::connection_aborted); 00017 disconnect_errors.insert(boost::asio::error::operation_aborted); 00018 } 00019 00020 void Tcp::send_handler(const boost::system::error_code & error_code, std::size_t bytes_transferred, string data, string host, int port, 00021 boost::shared_ptr<tcp::socket> connection) 00022 { 00023 // Check the error code 00024 if (error_code) 00025 { 00026 // Check for disconnection errors 00027 std::set<boost::system::error_code>::iterator find_result = disconnect_errors.find(error_code); 00028 if (find_result != disconnect_errors.end()) 00029 { 00030 if (error_code == boost::asio::error::operation_aborted) 00031 { 00032 Logger::info("TCP send failed, aborted", port, host); 00033 } 00034 else 00035 { 00036 string message("TCP send failed, disconnected, error message: '" + error_code.message() + "'"); 00037 Logger::info(message, port, host); 00038 fire_disconnect_event(message); 00039 } 00040 return; 00041 } 00042 00043 string message( 00044 "TCP send failed, error message '" + error_code.message() + "', error code '" + boost::lexical_cast<string>( 00045 error_code.value()) + "' was encountered"); 00046 Logger::error(message, port, host); 00047 fire_error_event(message); 00048 return; 00049 } 00050 00051 if (bytes_transferred != data.size()) 00052 { 00053 // Get the rest of the message, and try resending it 00054 string rest_of_buffer(((char*) data.data()) + bytes_transferred, data.size() - bytes_transferred); 00055 00056 active_jobs_mutex.lock(); 00057 active_jobs++; 00058 active_jobs_mutex.unlock(); 00059 00060 connection->async_send(boost::asio::buffer(rest_of_buffer.data(), rest_of_buffer.size()), 00061 boost::bind(&Tcp::send_handler, this, _1, _2, rest_of_buffer, host, port, connection)); 00062 00063 // Log that we had to do this 00064 // We could not send all of the message, fire an error and log it 00065 string message( 00066 string("TCP send failed, only ") + boost::lexical_cast<string>(bytes_transferred) + " of " + boost::lexical_cast<string>( 00067 data.size()) + " total bytes were sent"); 00068 Logger::warn(message, port, host); 00069 return; 00070 } 00071 else 00072 { 00073 string message("TCP send succeeded, sent " + boost::lexical_cast<string>(bytes_transferred) + " bytes, data is: " + data); 00074 Logger::info(message, port, host); 00075 } 00076 00077 // Check to see if this is for the last send to complete, and if we're waiting to shutdown 00078 active_jobs_mutex.lock(); 00079 active_jobs--; 00080 active_jobs_mutex.unlock(); 00081 int current_jobs = active_jobs; 00082 if (waiting_to_shutdown && current_jobs == 0) 00083 { 00084 close(); 00085 } 00086 } 00087 00088 void Tcp::receive_handler(const boost::system::error_code & error_code, std::size_t bytes_transferred, 00089 boost::shared_ptr<tcp::socket> connection, string host, int port) 00090 { 00091 // Check for errors 00092 if (error_code) 00093 { 00094 // Check for disconnection errors 00095 if (disconnect_errors.find(error_code) != disconnect_errors.end()) 00096 { 00097 if (error_code == boost::asio::error::operation_aborted) 00098 { 00099 Logger::info("TCP send failed, aborted", port, host); 00100 return; 00101 } 00102 else 00103 { 00104 string message("TCP receive failed, disconnected, error message: '" + error_code.message() + "'"); 00105 Logger::info(message, port, host); 00106 fire_disconnect_event(message); 00107 return; 00108 } 00109 } 00110 00111 // Otherwise, this is a more serious error, fail and log the error 00112 string message( 00113 "TCP receive failed, error message: '" + error_code.message() + "', value '" + boost::lexical_cast<string>( 00114 error_code.value()) + "'"); 00115 Logger::error(message, port, host); 00116 fire_error_event(message); 00117 00118 // Shutdown the socket we're receiving from 00119 if (connection.get() && connection->is_open()) 00120 { 00121 try 00122 { 00123 connection->shutdown(connection->shutdown_receive); 00124 } 00125 catch (...) 00126 { 00127 Logger::warn("TCP receive failed, shutdown improperly, proceeding anyway", port, host); 00128 } 00129 } 00130 00131 return; 00132 } 00133 00134 // Pull out the data we received, and fire a data received event 00135 string data = string(receive_buffer.c_array(), bytes_transferred); 00136 00137 // Log success 00138 string message( 00139 "Successfully received all " + boost::lexical_cast<string>(bytes_transferred) + " bytes of " + boost::lexical_cast<string>( 00140 bytes_transferred) + " total bytes. Data: '" + data + "'"); 00141 00142 Logger::info(message, port, host); 00143 00144 try 00145 { 00146 fire_data_event(data, connection); 00147 } 00148 catch (const boost::bad_weak_ptr &p) 00149 { 00150 Logger::error("Event is going out of scope", port, host); 00151 } 00152 00153 // Try to receive more data 00154 if (connection.get()) 00155 connection->async_receive(boost::asio::buffer(receive_buffer), 00156 boost::bind(&Tcp::receive_handler, this, _1, _2, connection, host, port)); 00157 } 00158 00159 inline string Tcp::bool_option_to_string(optional<bool> &arg, string iftrue, string iffalse) 00160 { 00161 if (arg && *arg) 00162 return iftrue; 00163 return iffalse; 00164 } 00165 00166 template<class T> 00167 inline string Tcp::option_to_string(optional<T> &arg) 00168 { 00169 if (arg) 00170 return boost::lexical_cast<string>(*arg); 00171 return string("unset"); 00172 } 00173 00174 void inline Tcp::parse_string_bool_arg(map<string, string> &options, string arg, optional<bool> &arg_value) 00175 { 00176 map<string, string>::iterator it; 00177 00178 if ((it = options.find(arg)) != options.end()) 00179 { 00180 if (it->second == std::string("true")) 00181 arg_value.reset(true); 00182 if (it->second == std::string("false")) 00183 arg_value.reset(false); 00184 } 00185 } 00186 00187 void inline Tcp::parse_string_int_arg(map<string, string> &options, string arg, optional<int> &arg_value) 00188 { 00189 map<string, string>::iterator it; 00190 00191 if ((it = options.find(arg)) != options.end()) 00192 { 00193 arg_value.reset(boost::lexical_cast<int>(it->second)); 00194 } 00195 } 00196 00197 void Tcp::parse_args(map<string, string> options) 00198 { 00199 map<string, string>::iterator it; 00200 map<string, string> transformed_options; 00201 00202 string t("true"); 00203 string f("false"); 00204 00205 // transform the entire map to lower case 00206 for (it = options.begin(); it != options.end(); it++) 00207 { 00208 string k = it->first; 00209 string v = it->second; 00210 00211 std::transform(k.begin(), k.end(), k.begin(), ::tolower); 00212 std::transform(v.begin(), v.end(), v.begin(), ::tolower); 00213 00214 k.erase(std::remove_if(k.begin(), k.end(), ::isspace), k.end()); 00215 v.erase(std::remove_if(v.begin(), v.end(), ::isspace), v.end()); 00216 00217 transformed_options.insert(std::pair<string, string>(k, v)); 00218 } 00219 00220 parse_string_bool_arg(transformed_options, "ipv6", using_ipv6); 00221 parse_string_bool_arg(transformed_options, "donotroute", do_not_route); 00222 parse_string_bool_arg(transformed_options, "keepalive", keep_alive); 00223 parse_string_bool_arg(transformed_options, "nodelay", no_delay); 00224 parse_string_int_arg(transformed_options, "keepalivetimeout", keep_alive_timeout); 00225 00226 log_options(); 00227 } 00228 00229 void Tcp::log_options() 00230 { 00231 string options("These arguments were passed in: "); 00232 00233 options.append(bool_option_to_string(using_ipv6, "ipv6, ", "ipv4, ")); 00234 options.append(bool_option_to_string(do_not_route, "no routing, ", "use routing, ")); 00235 options.append(bool_option_to_string(no_delay, "no delay, ", "allow delay, ")); 00236 options.append(bool_option_to_string(keep_alive, "keep alive", "don't keep alive")); 00237 options.append(", keep alive timeout is "); 00238 options.append(option_to_string<int> (keep_alive_timeout)); 00239 00240 Logger::info(options, port, host); 00241 } 00242 00243 bool Tcp::set_tcp_keepalive(boost::shared_ptr<tcp::socket> socket) 00244 { 00245 #ifdef __UNIX__ 00246 00247 // For *n*x systems 00248 int native_fd = socket->native(); 00249 int timeout = *keep_alive_timeout; 00250 int intvl = 1; 00251 int probes = 10; 00252 int on = 1; 00253 00254 int ret_keepalive = setsockopt(native_fd, SOL_SOCKET, SO_KEEPALIVE, (void*) &on, sizeof(int)); 00255 int ret_keepidle = setsockopt(native_fd, SOL_TCP, TCP_KEEPIDLE, (void*) &timeout, sizeof(int)); 00256 int ret_keepintvl = setsockopt(native_fd, SOL_TCP, TCP_KEEPINTVL, (void*) &intvl, sizeof(int)); 00257 int ret_keepinit = setsockopt(native_fd, SOL_TCP, TCP_KEEPCNT, (void*) &probes, sizeof(int)); 00258 00259 if(ret_keepalive || ret_keepidle || ret_keepintvl || ret_keepinit) 00260 { 00261 string message("Failed to enable keep alive on TCP client socket!"); 00262 Logger::error(message, port, host); 00263 return false; 00264 } 00265 00266 #elif defined(__OSX__) 00267 00268 int native_fd = socket->native(); 00269 int timeout = *keep_alive_timeout; 00270 int intvl = 1; 00271 int on = 1; 00272 00273 // Set the timeout before the first keep alive message 00274 int ret_sokeepalive = setsockopt(native_fd, SOL_SOCKET, SO_KEEPALIVE, (void*) &on, sizeof(int)); 00275 int ret_tcpkeepalive = setsockopt(native_fd, IPPROTO_TCP, TCP_KEEPALIVE, (void*) &timeout, sizeof(int)); 00276 int ret_tcpkeepintvl = setsockopt(native_fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (void*) &intvl, sizeof(int)); 00277 00278 if(ret_sokeepalive || ret_tcpkeepalive || ret_tcpkeepintvl) 00279 { 00280 string message("Failed to enable keep alive on TCP client socket!"); 00281 Logger::error(message, port, host); 00282 return false; 00283 } 00284 00285 #else 00286 // Partially supported on windows 00287 struct tcp_keepalive keepalive_options; 00288 keepalive_options.onoff = 1; 00289 keepalive_options.keepalivetime = *keep_alive_timeout * 1000; 00290 keepalive_options.keepaliveinterval = 2000; 00291 00292 BOOL keepalive_val = true; 00293 SOCKET native = socket->native(); 00294 DWORD bytes_returned; 00295 00296 int ret_keepalive = setsockopt(native, SOL_SOCKET, SO_KEEPALIVE, (const char *) &keepalive_val, sizeof(keepalive_val)); 00297 int ret_iotcl = WSAIoctl(native, SIO_KEEPALIVE_VALS, (LPVOID) & keepalive_options, (DWORD) sizeof(keepalive_options), NULL, 0, 00298 (LPDWORD) & bytes_returned, NULL, NULL); 00299 00300 if (ret_keepalive || ret_iotcl) 00301 { 00302 string message("Failed to set keep alive timeout on TCP client socket!"); 00303 Logger::error(message, port, host); 00304 return false; 00305 } 00306 #endif 00307 return true; 00308 }