SockIt

TcpClient.cpp

Go to the documentation of this file.
00001 /*
00002  * File:   TcpClient.cpp
00003  * Author: jtedesco
00004  *
00005  * Created on May 26, 2011, 12:32 PM
00006  */
00007 
00008 #include "TcpClient.h"
00009 
00010 TcpClient::TcpClient(const string & host, int port, boost::asio::io_service & io_service) :
00011     Tcp(host, port, io_service), resolver(new tcp::resolver(io_service)), connection(new tcp::socket(io_service))
00012 {
00013     // Check that the connection and resolver are valid, and fail gracefully if they are not
00014     if (!resolver.get() || !connection.get())
00015     {
00016         failed = true;
00017         string message("Failed to initialize TCP client, failed to initialize properly");
00018         Logger::error(message, port, host);
00019         fire_error(message);
00020         return;
00021     }
00022 
00023     init();
00024 }
00025 
00026 TcpClient::TcpClient(const string & host, int port, boost::asio::io_service & io_service, map<string, string> options) :
00027     Tcp(host, port, io_service), resolver(new tcp::resolver(io_service)), connection(new tcp::socket(io_service))
00028 {
00029     // Check that the connection and resolver are valid, and fail gracefully if they are not
00030     if (!resolver.get() || !connection.get())
00031     {
00032         failed = true;
00033         string message("Failed to initialize TCP client, failed to initialize properly");
00034         Logger::error(message, port, host);
00035         fire_error(message);
00036         return;
00037     }
00038 
00039     parse_args(options);
00040 
00041     init();
00042 }
00043 
00044 void TcpClient::init()
00045 {
00046     connected = false;
00047 
00048     Logger::info(
00049             "Initializing TCP client to host '" + boost::lexical_cast<string>(host) + "' on port " + boost::lexical_cast<string>(port),
00050             port, host);
00051     log_options();
00052     Logger::info(
00053             "Trying to resolve DNS information for host " + boost::lexical_cast<string>(host) + "', port " + boost::lexical_cast<string>(
00054                     port), port, host);
00055 
00056     // Create a query to resolve this host & port
00057     if (using_ipv6 && *using_ipv6)
00058     {
00059         // Asynchronously resolve the remote host, and once the host is resolved, create a connection
00060         tcp::resolver::query query(tcp::v6(), host, boost::lexical_cast<string>(port),
00061                 boost::asio::ip::resolver_query_base::numeric_service);
00062         if (resolver.get())
00063             resolver->async_resolve(query, boost::bind(&TcpClient::resolve_handler, this, _1, _2));
00064         else
00065         {
00066             failed = true;
00067             string message("TCP client failed to resolve, invalid resolver");
00068             Logger::error(message, port, host);
00069             fire_error(message);
00070         }
00071     }
00072     else
00073     {
00074         // Asynchronously resolve the remote host, and once the host is resolved, create a connection
00075         tcp::resolver::query query(tcp::v4(), host, boost::lexical_cast<string>(port),
00076                 boost::asio::ip::resolver_query_base::numeric_service);
00077         if (resolver.get())
00078             resolver->async_resolve(query, boost::bind(&TcpClient::resolve_handler, this, _1, _2));
00079         else
00080         {
00081             failed = true;
00082             string message("TCP client failed to resolve, invalid resolver");
00083             Logger::error(message, port, host);
00084             fire_error(message);
00085         }
00086     }
00087 }
00088 
00089 void TcpClient::init_socket()
00090 {
00091     // Set the socket options for this client's TCP socket
00092     if (do_not_route)
00093     {
00094         boost::asio::socket_base::do_not_route option(*do_not_route);
00095         connection->set_option(option);
00096     }
00097     if (keep_alive)
00098     {
00099         boost::asio::socket_base::keep_alive option(*keep_alive);
00100         connection->set_option(option);
00101     }
00102     if (no_delay)
00103     {
00104         boost::asio::ip::tcp::no_delay option(*no_delay);
00105         connection->set_option(option);
00106     }
00107     if (keep_alive_timeout)
00108     {
00109         // Set the TCP keep-alive timeout - ignores return value
00110         set_tcp_keepalive(connection);
00111     }
00112 }
00113 
00114 TcpClient::~TcpClient()
00115 {
00116     close();
00117 }
00118 
00119 void TcpClient::close()
00120 {
00121     waiting_to_shutdown = true;
00122     resolver->cancel();
00123 
00124     // Shutdown the IO service, cancel any transfers on the socket, and close the socket
00125     if (connection->is_open())
00126     {
00127         try
00128         {
00129             connection->shutdown(connection->shutdown_both);
00130         }
00131         catch (...)
00132         {
00133             Logger::warn("Socket shutdown improperly, proceeding anyway", port, host);
00134         }
00135 
00136         connection->close();
00137     }
00138     else
00139     {
00140         Logger::warn("Failed to cleanly shutdown TCP client connection, continuing anyways", port, host);
00141     }
00142 }
00143 
00144 void TcpClient::shutdown()
00145 {
00146     if (!failed)
00147     {
00148         active_jobs_mutex.lock();
00149         int current_jobs = active_jobs;
00150         active_jobs_mutex.unlock();
00151         waiting_to_shutdown = true;
00152         if (current_jobs == 0)
00153         {
00154             fire_close();
00155             close();
00156         }
00157     }
00158     else
00159     {
00160         // Log & fire an error
00161         Logger::error("Trying to start the server listening, but the server has permanently failed!", port, host);
00162     }
00163 }
00164 
00165 void TcpClient::send_bytes(const vector<byte> & bytes)
00166 {
00167     string data;
00168 
00169     for (int i = 0; i < bytes.size(); i++)
00170     {
00171         data.push_back((unsigned char) bytes[i]);
00172     }
00173 
00174     send(data);
00175 }
00176 
00177 void TcpClient::send(const string & data)
00178 {
00179     if (failed)
00180     {
00181         // Log & fire an error
00182         string message("Trying to send data on a TCP client that has permanently failed!");
00183         Logger::error(message, port, host);
00184         return;
00185     }
00186 
00187     // If we're not already connected, then queue this data to be send
00188     connected_mutex.lock();
00189     bool connected_now = connected;
00190     connected_mutex.unlock();
00191 
00192     if (!connected_now)
00193     {
00194         active_jobs_mutex.lock();
00195         active_jobs++;
00196         active_jobs_mutex.unlock();
00197         data_queue_mutex.lock();
00198         data_queue.push(data);
00199         data_queue_mutex.unlock();
00200     }
00201     else
00202     {
00203         // Check if the queue needs to be flushed, and if so, flush it
00204         flush();
00205 
00206         // Send the data, and record that we've started a new send job
00207         active_jobs_mutex.lock();
00208         active_jobs++;
00209         active_jobs_mutex.unlock();
00210         connection->async_send(boost::asio::buffer(data.data(), data.size()),
00211                 boost::bind(&TcpClient::send_handler, this, _1, _2, data, host, port, connection));
00212     }
00213 }
00214 
00215 void TcpClient::resolve_handler(const boost::system::error_code & error_code, tcp::resolver::iterator endpoint_iterator)
00216 {
00217     // If we encountered an error
00218     if (error_code)
00219     {
00220         // Check for disconnection errors
00221         std::set<boost::system::error_code>::iterator find_result = disconnect_errors.find(error_code);
00222         if (find_result != disconnect_errors.end())
00223         {
00224             if (error_code == boost::asio::error::operation_aborted)
00225             {
00226                 Logger::warn("TCP resolve was aborted");
00227                 return;
00228             }
00229             else
00230             {
00231                 string message("TCP resolve failed, disconnected: '" + error_code.message() + "'");
00232                 Logger::warn(message, port, host);
00233                 fire_disconnect(message);
00234                 return;
00235             }
00236         }
00237 
00238         // We failed permanently, fail permanently and log it
00239         string message("Failed to resolve host with error: " + error_code.message());
00240         Logger::error(message, port, host);
00241         fire_error(message);
00242         return;
00243     }
00244 
00245     if (connection.get())
00246     {
00247         // Attempt to connect to the endpoint, using IPv6 if specified
00248         tcp::endpoint receiver_endpoint = *endpoint_iterator;
00249         if (using_ipv6 && *using_ipv6)
00250         {
00251             connection->open(tcp::v6());
00252         }
00253         else
00254         {
00255             connection->open(tcp::v4());
00256         }
00257 
00258         // Initialize the socket if it's open
00259         if (!connection->is_open())
00260             init_socket();
00261 
00262         // Log success
00263         Logger::info("Host has been resolved, attempting to connect to host", port, host);
00264         fire_resolve();
00265 
00266         // Try to asynchronously establish a connection to the host
00267         connection->async_connect(receiver_endpoint, boost::bind(&TcpClient::connect_handler, this, _1, endpoint_iterator));
00268     }
00269     else
00270     {
00271         failed = true;
00272         string message("TCP client failed to resolve, invalid connection");
00273         Logger::error(message, port, host);
00274         fire_error(message);
00275     }
00276 }
00277 
00278 void TcpClient::connect_handler(const boost::system::error_code & error_code, tcp::resolver::iterator endpoint_iterator)
00279 {
00280     // If there was an error to connect, log the error and abort connection
00281     if (error_code)
00282     {
00283         // Check for disconnection errors
00284         std::set<boost::system::error_code>::iterator find_result = disconnect_errors.find(error_code);
00285         if (find_result != disconnect_errors.end())
00286         {
00287             if (error_code == boost::asio::error::operation_aborted)
00288             {
00289                 Logger::warn("TCP connect was aborted");
00290                 return;
00291             }
00292             else
00293             {
00294                 string message("TCP connect failed, disconnected: '" + error_code.message() + "'");
00295                 Logger::warn(message, port, host);
00296                 fire_disconnect(message);
00297                 return;
00298             }
00299         }
00300 
00301         // Check if this was not the last possible connection in the iterator
00302         tcp::resolver::iterator end;
00303         if (endpoint_iterator != end && error_code == boost::asio::error::host_not_found)
00304         {
00305             // Create a query to resolve this host & port
00306             tcp::resolver::query query(host, boost::lexical_cast<string>(port));
00307 
00308             // Try the next possible endpoint
00309             resolver->async_resolve(query, boost::bind(&TcpClient::resolve_handler, this, _1, endpoint_iterator++));
00310         }
00311         else
00312         {
00313             // If we've tried every possible endpoint, fail permanently
00314             string message("Failed to connect to host, with message: '" + error_code.message() + "'");
00315             Logger::error(message, port, host);
00316             fire_error(message);
00317             return;
00318         }
00319     }
00320 
00321     // Log success, and record that we are now connected
00322     Logger::info("Connection established to host", port, host);
00323     fire_connect();
00324 
00325     // Start receiving data on this connection
00326     connection->async_receive(boost::asio::buffer(receive_buffer),
00327             boost::bind(&TcpClient::receive_handler, this, _1, _2, connection, host, port));
00328 
00329     // Flush data from the queue if necessary
00330     flush();
00331 
00332     connected_mutex.lock();
00333 
00334     connected = true;
00335     connected_mutex.unlock();
00336 }
00337 
00338 void TcpClient::flush()
00339 {
00340     data_queue_mutex.lock();
00341     while (!data_queue.empty())
00342     {
00343         data_queue_mutex.unlock();
00344 
00345         // Pull the next data chunk to be sent off the queue
00346         data_queue_mutex.lock();
00347         string data = data_queue.front();
00348         data_queue.pop();
00349         data_queue_mutex.unlock();
00350 
00351         // Asynchronously send the data across the connection
00352         connection->async_send(boost::asio::buffer(data.data(), data.size()),
00353                 boost::bind(&TcpClient::send_handler, this, _1, _2, data, host, port, connection));
00354     }
00355     data_queue_mutex.unlock();
00356 }
00357 
00358 int TcpClient::get_port()
00359 {
00360     return port;
00361 }
00362 
00363 string TcpClient::get_host()
00364 {
00365     return host;
00366 }
00367 
00368 void TcpClient::fire_error_event(const string & message)
00369 {
00370     fire_error(message);
00371 }
00372 
00373 void TcpClient::fire_disconnect_event(const string & message)
00374 {
00375     fire_disconnect(message);
00376 }
00377 
00378 void TcpClient::fire_data_event(const string data, boost::shared_ptr<tcp::socket> connection)
00379 {
00380     fire_data(boost::make_shared<TcpEvent>(this, connection, data));
00381 }
00382 
 All Classes Files Functions Variables Typedefs Friends Defines