SockIt

Tcp.cpp

Go to the documentation of this file.
00001 /*
00002  * Tcp.cpp
00003  *
00004  *  Created on: Jun 8, 2011
00005  *      Author: jtedesco
00006  */
00007 
00008 #include "Tcp.h"
00009 
00010 Tcp::Tcp(string host, int port, boost::asio::io_service & ioService) :
00011     host(host), port(port), waiting_to_shutdown(false), active_jobs(0), io_service(ioService), using_ipv6(false), failed(false)
00012 {
00013     // Collect the set of errors classified as 'disconnect' type errors
00014     disconnect_errors.insert(boost::asio::error::connection_reset);
00015     disconnect_errors.insert(boost::asio::error::eof);
00016     disconnect_errors.insert(boost::asio::error::connection_aborted);
00017     disconnect_errors.insert(boost::asio::error::operation_aborted);
00018 }
00019 
00020 void Tcp::send_handler(const boost::system::error_code & error_code, std::size_t bytes_transferred, string data, string host, int port,
00021         boost::shared_ptr<tcp::socket> connection)
00022 {
00023     // Check the error code
00024     if (error_code)
00025     {
00026         // Check for disconnection errors
00027         std::set<boost::system::error_code>::iterator find_result = disconnect_errors.find(error_code);
00028         if (find_result != disconnect_errors.end())
00029         {
00030             if (error_code == boost::asio::error::operation_aborted)
00031             {
00032                 Logger::info("TCP send failed, aborted", port, host);
00033             }
00034             else
00035             {
00036                 string message("TCP send failed, disconnected, error message: '" + error_code.message() + "'");
00037                 Logger::info(message, port, host);
00038                 fire_disconnect_event(message);
00039             }
00040             return;
00041         }
00042 
00043         string message(
00044                 "TCP send failed, error message '" + error_code.message() + "', error code '" + boost::lexical_cast<string>(
00045                         error_code.value()) + "' was encountered");
00046         Logger::error(message, port, host);
00047         fire_error_event(message);
00048         return;
00049     }
00050 
00051     if (bytes_transferred != data.size())
00052     {
00053         // Get the rest of the message, and try resending it
00054         string rest_of_buffer(((char*) data.data()) + bytes_transferred, data.size() - bytes_transferred);
00055 
00056         active_jobs_mutex.lock();
00057         active_jobs++;
00058         active_jobs_mutex.unlock();
00059 
00060         connection->async_send(boost::asio::buffer(rest_of_buffer.data(), rest_of_buffer.size()),
00061                 boost::bind(&Tcp::send_handler, this, _1, _2, rest_of_buffer, host, port, connection));
00062 
00063         // Log that we had to do this
00064         // We could not send all of the message, fire an error and log it
00065         string message(
00066                 string("TCP send failed, only ") + boost::lexical_cast<string>(bytes_transferred) + " of " + boost::lexical_cast<string>(
00067                         data.size()) + " total bytes were sent");
00068         Logger::warn(message, port, host);
00069         return;
00070     }
00071     else
00072     {
00073         string message("TCP send succeeded, sent " + boost::lexical_cast<string>(bytes_transferred) + " bytes, data is: " + data);
00074         Logger::info(message, port, host);
00075     }
00076 
00077     // Check to see if this is for the last send to complete, and if we're waiting to shutdown
00078     active_jobs_mutex.lock();
00079     active_jobs--;
00080     active_jobs_mutex.unlock();
00081     int current_jobs = active_jobs;
00082     if (waiting_to_shutdown && current_jobs == 0)
00083     {
00084         close();
00085     }
00086 }
00087 
00088 void Tcp::receive_handler(const boost::system::error_code & error_code, std::size_t bytes_transferred,
00089         boost::shared_ptr<tcp::socket> connection, string host, int port)
00090 {
00091     // Check for errors
00092     if (error_code)
00093     {
00094         // Check for disconnection errors
00095         if (disconnect_errors.find(error_code) != disconnect_errors.end())
00096         {
00097             if (error_code == boost::asio::error::operation_aborted)
00098             {
00099                 Logger::info("TCP send failed, aborted", port, host);
00100                 return;
00101             }
00102             else
00103             {
00104                 string message("TCP receive failed, disconnected, error message: '" + error_code.message() + "'");
00105                 Logger::info(message, port, host);
00106                 fire_disconnect_event(message);
00107                 return;
00108             }
00109         }
00110 
00111         // Otherwise, this is a more serious error, fail and log the error
00112         string message(
00113                 "TCP receive failed, error message: '" + error_code.message() + "', value '" + boost::lexical_cast<string>(
00114                         error_code.value()) + "'");
00115         Logger::error(message, port, host);
00116         fire_error_event(message);
00117 
00118         // Shutdown the socket we're receiving from
00119         if (connection.get() && connection->is_open())
00120         {
00121             try
00122             {
00123                 connection->shutdown(connection->shutdown_receive);
00124             }
00125             catch (...)
00126             {
00127                 Logger::warn("TCP receive failed, shutdown improperly, proceeding anyway", port, host);
00128             }
00129         }
00130 
00131         return;
00132     }
00133 
00134     // Pull out the data we received, and fire a data received event
00135     string data = string(receive_buffer.c_array(), bytes_transferred);
00136 
00137     // Log success
00138     string message(
00139             "Successfully received all " + boost::lexical_cast<string>(bytes_transferred) + " bytes of " + boost::lexical_cast<string>(
00140                     bytes_transferred) + " total bytes. Data: '" + data + "'");
00141 
00142     Logger::info(message, port, host);
00143 
00144     try
00145     {
00146         fire_data_event(data, connection);
00147     }
00148     catch (const boost::bad_weak_ptr &p)
00149     {
00150         Logger::error("Event is going out of scope", port, host);
00151     }
00152 
00153     // Try to receive more data
00154     if (connection.get())
00155         connection->async_receive(boost::asio::buffer(receive_buffer),
00156                 boost::bind(&Tcp::receive_handler, this, _1, _2, connection, host, port));
00157 }
00158 
00159 inline string Tcp::bool_option_to_string(optional<bool> &arg, string iftrue, string iffalse)
00160 {
00161     if (arg && *arg)
00162         return iftrue;
00163     return iffalse;
00164 }
00165 
00166 template<class T>
00167 inline string Tcp::option_to_string(optional<T> &arg)
00168 {
00169     if (arg)
00170         return boost::lexical_cast<string>(*arg);
00171     return string("unset");
00172 }
00173 
00174 void inline Tcp::parse_string_bool_arg(map<string, string> &options, string arg, optional<bool> &arg_value)
00175 {
00176     map<string, string>::iterator it;
00177 
00178     if ((it = options.find(arg)) != options.end())
00179     {
00180         if (it->second == std::string("true"))
00181             arg_value.reset(true);
00182         if (it->second == std::string("false"))
00183             arg_value.reset(false);
00184     }
00185 }
00186 
00187 void inline Tcp::parse_string_int_arg(map<string, string> &options, string arg, optional<int> &arg_value)
00188 {
00189     map<string, string>::iterator it;
00190 
00191     if ((it = options.find(arg)) != options.end())
00192     {
00193         arg_value.reset(boost::lexical_cast<int>(it->second));
00194     }
00195 }
00196 
00197 void Tcp::parse_args(map<string, string> options)
00198 {
00199     map<string, string>::iterator it;
00200     map<string, string> transformed_options;
00201 
00202     string t("true");
00203     string f("false");
00204 
00205     // transform the entire map to lower case
00206     for (it = options.begin(); it != options.end(); it++)
00207     {
00208         string k = it->first;
00209         string v = it->second;
00210 
00211         std::transform(k.begin(), k.end(), k.begin(), ::tolower);
00212         std::transform(v.begin(), v.end(), v.begin(), ::tolower);
00213 
00214         k.erase(std::remove_if(k.begin(), k.end(), ::isspace), k.end());
00215         v.erase(std::remove_if(v.begin(), v.end(), ::isspace), v.end());
00216 
00217         transformed_options.insert(std::pair<string, string>(k, v));
00218     }
00219 
00220     parse_string_bool_arg(transformed_options, "ipv6", using_ipv6);
00221     parse_string_bool_arg(transformed_options, "donotroute", do_not_route);
00222     parse_string_bool_arg(transformed_options, "keepalive", keep_alive);
00223     parse_string_bool_arg(transformed_options, "nodelay", no_delay);
00224     parse_string_int_arg(transformed_options, "keepalivetimeout", keep_alive_timeout);
00225 
00226     log_options();
00227 }
00228 
00229 void Tcp::log_options()
00230 {
00231     string options("These arguments were passed in: ");
00232 
00233     options.append(bool_option_to_string(using_ipv6, "ipv6, ", "ipv4, "));
00234     options.append(bool_option_to_string(do_not_route, "no routing, ", "use routing, "));
00235     options.append(bool_option_to_string(no_delay, "no delay, ", "allow delay, "));
00236     options.append(bool_option_to_string(keep_alive, "keep alive", "don't keep alive"));
00237     options.append(", keep alive timeout is ");
00238     options.append(option_to_string<int> (keep_alive_timeout));
00239 
00240     Logger::info(options, port, host);
00241 }
00242 
00243 bool Tcp::set_tcp_keepalive(boost::shared_ptr<tcp::socket> socket)
00244 {
00245 #ifdef __UNIX__
00246 
00247     // For *n*x systems
00248     int native_fd = socket->native();
00249     int timeout = *keep_alive_timeout;
00250     int intvl = 1;
00251     int probes = 10;
00252     int on = 1;
00253 
00254     int ret_keepalive = setsockopt(native_fd, SOL_SOCKET, SO_KEEPALIVE, (void*) &on, sizeof(int));
00255     int ret_keepidle = setsockopt(native_fd, SOL_TCP, TCP_KEEPIDLE, (void*) &timeout, sizeof(int));
00256     int ret_keepintvl = setsockopt(native_fd, SOL_TCP, TCP_KEEPINTVL, (void*) &intvl, sizeof(int));
00257     int ret_keepinit = setsockopt(native_fd, SOL_TCP, TCP_KEEPCNT, (void*) &probes, sizeof(int));
00258 
00259     if(ret_keepalive || ret_keepidle || ret_keepintvl || ret_keepinit)
00260     {
00261         string message("Failed to enable keep alive on TCP client socket!");
00262         Logger::error(message, port, host);
00263         return false;
00264     }
00265 
00266 #elif defined(__OSX__)
00267 
00268     int native_fd = socket->native();
00269     int timeout = *keep_alive_timeout;
00270     int intvl = 1;
00271     int on = 1;
00272 
00273     // Set the timeout before the first keep alive message
00274     int ret_sokeepalive = setsockopt(native_fd, SOL_SOCKET, SO_KEEPALIVE, (void*) &on, sizeof(int));
00275     int ret_tcpkeepalive = setsockopt(native_fd, IPPROTO_TCP, TCP_KEEPALIVE, (void*) &timeout, sizeof(int));
00276     int ret_tcpkeepintvl = setsockopt(native_fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (void*) &intvl, sizeof(int));
00277 
00278     if(ret_sokeepalive || ret_tcpkeepalive || ret_tcpkeepintvl)
00279     {
00280         string message("Failed to enable keep alive on TCP client socket!");
00281         Logger::error(message, port, host);
00282         return false;
00283     }
00284 
00285 #else
00286     // Partially supported on windows
00287     struct tcp_keepalive keepalive_options;
00288     keepalive_options.onoff = 1;
00289     keepalive_options.keepalivetime = *keep_alive_timeout * 1000;
00290     keepalive_options.keepaliveinterval = 2000;
00291 
00292     BOOL keepalive_val = true;
00293     SOCKET native = socket->native();
00294     DWORD bytes_returned;
00295 
00296     int ret_keepalive = setsockopt(native, SOL_SOCKET, SO_KEEPALIVE, (const char *) &keepalive_val, sizeof(keepalive_val));
00297     int ret_iotcl = WSAIoctl(native, SIO_KEEPALIVE_VALS, (LPVOID) & keepalive_options, (DWORD) sizeof(keepalive_options), NULL, 0,
00298             (LPDWORD) & bytes_returned, NULL, NULL);
00299 
00300     if (ret_keepalive || ret_iotcl)
00301     {
00302         string message("Failed to set keep alive timeout on TCP client socket!");
00303         Logger::error(message, port, host);
00304         return false;
00305     }
00306 #endif
00307     return true;
00308 }
 All Classes Files Functions Variables Typedefs Friends Defines