- PVSM.RU - https://www.pvsm.ru -
Всем привет!
Продолжаю перевод книги John Torjo «Boost.Asio C++ Network Programming».
Содержание:
В этой главе мы рассмотрим некоторые из не очень известных особенностей Boost.Asio. Объекты std streams и streambuf иногда немного сложнее в использовании, но, как вы сами убедитесь, у них есть свои преимущества. Наконец, вы увидите довольно позднее добавление в Boost.Asio — co-routines, которое позволит вам иметь асинхронный код, но легко читаемый (как буд-то бы он синхронный). Это довольно удивительная особенность.
Вы должны быть знакомы с такими объектами как STL streams и STL streambuf для того, чтобы понимать вещи, написанные в этом разделе.
В Boost.Asio есть два типа буферов для работы с вводом/выводом:
На протяжении всей книги вы в основном видели примерно следующее:
size_t read_complete(boost::system::error_code, size_t bytes){ ... }
char buff[1024];
read(sock, buffer(buff), read_complete);
write(sock, buffer("echon"));
Обычно вам будет этого достаточно. Но, если вы хотите большей гибкости, то можете использовать streambuf
. Вот самое простое и худшее, что вы можете сделать с объектом streambuf
:
streambuf buf;
read(sock, buf);
Это чтение будет идти до тех пор, пока не заполнится объект streambuf
, а так как объект streambuf может перераспределить себя, чтобы вместить в себя больше места, то, в основном, чтение будет идти до тех пор, пока соединение не будет закрыто. Вы можете использовать функцию read_until
, чтобы прочитать до последнего знака:
streambuf buf;
read_until(sock, buf, "n");
Здесь чтение будет идти до символа ‘n’, затем в буфер добавится то, что прочтено и выйдет из функции чтения. Чтобы написать что-то в объект streambuf
вы будете делать что-ото похожее на следующее:
streambuf buf;
std::ostream out(&buf);
out << "echo" << std::endl;
write(sock, buf);
Это довольно просто, вам надо создать поток STL, поместить туда объект streambuf при конструировании, записать в него сообщение, которое вы хотите отправить, а затем использовать функцию write
для отправки содержимого буфера.
C Boost.Asio проделали большую работу по интеграции STL потоков и сетей. А именно, если вы уже широко используете STL, то у вас уже должно быть много классов с перегруженными операторами >> и <<. Чтение и запись в сокеты понравиться вам больше, чем прогулка по парку.
Скажем, у вас есть следующий фрагмент кода:
struct person
{
std::string first_name, last_name;
int age;
};
std::ostream& operator<<(std::ostream & out, const person & p)
{
return out << p.first_name << " " << p.last_name << " " << p.age;
}
std::istream& operator>>(std::istream & in, person & p)
{
return in >> p.first_name >> p.last_name >> p.age;
}
Отправить данные человека по сети так же просто, как показано ниже:
streambuf buf;
std::ostream out(&buf);
person p;
// ... initialize p
out << p << std::endl;
write(sock, buf);
Другая сторона может так же просто это прочитать:
read_until(sock, buf, "n");
std::istream in(&buf);
person p;
in >> p;
Действительно хорошая сторона использования объектов streambuf
и, конечно, соответствующих std::ostream
для записи или std::istream
для чтения, заключается в том, что в конечном итоге вы напишите код, который будет считаться нормальным:
Наконец, известен довольно крутой трюк, чтобы сбросить содержимое объекта streambuf
в консоли, используйте следующий код:
streambuf buf;
...
std::cout << &buf << std::endl; // dumps all content to the console
Аналогичным образом, для преобразования его содержимого в строку, используйте следующий фрагмент кода:
std::string to_string(streambuf &buf)
{
std::ostringstream out;
out << &buf;
return out.str();
}
Как я уже говорил, streambuf
произведен от std::streambuf.
Как и std::streambuf у него нет конструктора копирования.
Кроме того, у него есть несколько дополнительных функций, таких как:
streambuf ([max_size,] [allocator])
: эта функция создает объект streambuf. При необходимости, вы можете опционально задать максимальный размер буфера и аллокатор, который будет использоваться для выделения/освобождения памяти.prepare(n)
: эта функция возвращает под-буфер, используемый для размещения непрерывной последовательности из n
символов. Он может быть использован для чтения или записи. Результат работы этой функции может быть использован с любой независимой функцией из Boost.Asio производящей чтение/запись, а не только с теми, которые работают с объектами streambuf
.data()
: эта функция возвращает весь буфер в виде непрерывной последовательности символов и используется для записи. Результат работы этой функции может быть использован с любой независимой функцией из Boost.Asio, производящей запись, а не только с теми, которые работают с объектами streambuf
.consume(n)
: в этой функции данные удаляются из входной последовательности (из операции чтения). commit(n)
: в этой функции данные удаляются из выходной последовательности (из операции записи) и добавляются к входной последовательности (в операции чтения).size()
: эта функция возвращает размер в символах всего объекта streambuf
.max_size()
: эта функция возвращает максимальное количество символов, которое может содержаться в объекте streambuf
.
За исключением двух последних функций, остальные не так легко понять. Прежде всего, в большинстве случаев, вы будете посылать экземпляр streambuf
в качестве аргумента для чтения/записи независимой функции, как показано ниже:
read_until(sock, buf, "n"); // reads into buf
write(sock, buf); // writes from buf
Если вы посылаете весь буфер независимой функции, как показано в предыдущем фрагменте, то функция сначала убедиться надо ли ей будет увеличивать размер буфера, искать входные и выходные указатели. Другими словами, если есть данные для чтения, то вы сможете их прочитать.
Например:
read_until(sock, buf, 'n');
std::cout << &buf << std::endl;
В предыдущем фрагменте сбросится то, что вы только что прочитали из сокета. Следующий пример не будет дампом чего-то:
read(sock, buf.prepare(16), transfer_exactly(16) );
std::cout << &buf << std::endl;
Байты считываются, но указатель не перемещается. Вы должны двигать его самостоятельно, как показано ниже:
read(sock, buf.prepare(16), transfer_exactly(16) );
buf.commit(16);
std::cout << &buf << std::endl;
Аналогично, если вы хотите записать в объект streambuf
и если вы используете независимую функцию записи, то используйте следующий фрагмент кода:
streambuf buf;
std::ostream out(&buf);
out << "hi there" << std::endl;
write(sock, buf);
Следующий код пошлет hi there
три раза:
streambuf buf;
std::ostream out(&buf);
out << "hi there" << std::endl;
for ( int i = 0; i < 3; ++i)
write(sock, buf.data());
Это происходит, потому что буфер никогда не уничтожается и данные остаются там. Если вы хотите, чтобы данные уничтожались, то посмотрите как это реализуется:
streambuf buf;
std::ostream out(&buf);
out << "hi there" << std::endl;
write(sock, buf.data());
buf.consume(9);
В заключении, вы должны предпочесть иметь дело с целым экземпляром streambuf
. Используйте предыдущие функции, если хотите тонкой настройки.
Даже если вы можете использовать один и тот же экземпляр streambuf для чтения и записи, то я все равно рекомендую вам два отдельных экземпляра, один для чтения, другой для записи. Это воспринимается проще и яснее, и вы избежите многих возможных ошибок.
streambuf
В следующем списке показаны независимые функции из Boost.Asio, которые работают с объектами streambuf
:
read (sock, buf [, completion_function])
: эта функция читает из сокета в объект streambuf
. Завершающая функция является не обязательной. Если же она есть, то она вызывается после каждой успешной операции чтения и сообщает Boost.Asio, если операция завершена (если нет, то продолжает читать). Ее сигнатура выглядит следующим образом: size_t completion(const boost::system::error_code & err, size_t bytes_transfered)
;. При завершении функция возвращает 0, имеется в виду, если операция чтения завершилась полностью; если они возвращает ненулевое значение, то это означает, что вернулось максимальное количество байт для следующего вызова потоковой функции read_some
. read_at(radom_stream, offset, buf [, completion_function])
: эта функция читает из случайного потока. Обратите внимание, что это не относится к сокетам (так как они не моделируют концепцию случайного потока).read_until(sock, buf, char | string | regex | match_condition)
: эта функция читает пока выполняется данное условие. Либо должен быть прочитан определенный символ, либо какая-либо строка или регулярное выражение совпадет с одной из прочитанных строк, либо функция match_condition
скажет нам, что надо выйти из функции. Сигнатура функции match_condition
следующая: match_conditionis pair<iterator,bool>match(iterator begin, iterator end)
; где главный итератор это buffers_iterator <streambuf::const_buffers_type>
. Если совпадение найдено, то вернется пара (passed-end-of-match
установиться в true
), если же совпадений не выявлено, то вернется другая пара (begin
установится в false
). write(sock, buf [, completion_function])
: эта функция записывает все содержимое в объект streambuf
. Завершающая функция является необязательной и ее поведение похоже на завершающую функцию read()
: возвращается 0, когда операция записи завершена или ненулевое значение, когда указывается количество байт, которое будет записано при следующем вызове потоковой функции write_some
.write_at(random_stream,offset, buf [, completion_function])
: эта функция записывает в случайный поток. Опять же не относится к сокетам.async_read(sock, buf [, competion_function], handler)
: эта асинхронный двойник функции read()
. Сигнатура обработчика следующая: void handler(const boost::system::error_code, size_t bytes)
.async_read_at(radom_stream, offset, buf [, completion_function] ,handler)
: это асинхронный двойник функции read_at()
.async_read_until (sock, buf, char | string | regex | match_condition, handler)
: это асинхронный двойник функции read_until()
.async_write(sock, buf [, completion_function] , handler)
: это асинхронный двойник функции write()
.async_write_at(random_stream,offset, buf [, completion_function], handler)
: это асинхронный двойник функции write_at()
.Допустим, вы хотите читать до гласной буквы:
streambuf buf;
bool is_vowel(char c)
{
return c == 'a' || c == 'e' || c == 'i' || c == 'o' || c == 'u';
}
size_t read_complete(boost::system::error_code, size_t bytes)
{
const char * begin = buffer_cast<const char*>( buf.data());
if ( bytes == 0)
return 1;
while ( bytes > 0)
{
if ( is_vowel(*begin++))
return 0;
else
--bytes;
}
return 1;
}
...
read(sock, buf, read_complete);
Если вы, например, хотите использовать регулярные выражения, то это очень просто:
read_until(sock, buf, boost::regex("^[aeiou]+") );
Или позвольте немного модифицировать пример, и вы сможете размещать функцию match_condition
для работы:
streambuf buf;
bool is_vowel(char c)
{
return c == 'a' || c == 'e' || c == 'i' || c == 'o' || c == 'u';
}
typedef buffers_iterator<streambuf::const_buffers_type> iterator;
std::pair<iterator,bool> match_vowel(iterator b, iterator e)
{
while ( b != e)
{
if ( is_vowel(*b++))
return std::make_pair(b, true);
}
return std::make_pair(e, false);
}
...
size_t bytes = read_until(sock, buf, match_vowel);
Авторы Boost.Asio, около 2009-2010 годов, реализовали очень классную идею сопрограмм, которые помогут вам создавать асинхронные приложения еще проще.
Они позволяют вам облегчить две вещи, то есть легко написать асинхронное приложение и так же легко следить за потоком управления, почти как если бы приложение было написано последовательно.
В первом случае отображается обычный подход. Используя сопрограммы, вы максимально приблизитесь ко второму случаю.
Проще говоря, сопрограмма позволяет использовать множественные точки входа для приостановки и возобновления выполнения в определенных местах в пределах функции.
Если вы собираетесь использовать сопрограммы, то вам надо будет подключить два заголовочных файла, которые вы можете найти только в boost/libs/asio/example/http/server4: yield.hpp
и coroutine.hpp
. Здесь в Boost.Asio определены два макроса и класс:
coroutine
: этот класс есть производная от вашего или используемый вами connection
класс в целях реализации сопрограмм. reenter(entry)
: это тело сопрограммы. Входящий аргумент это указатель на подпрограмму, например, для использования в качестве блока внутри целой функции.
Чтобы лучше разобраться, рассмотрим несколько примеров. Мы будем повторно реализовывать приложение из 4 главы, которое представляет собой простой клиент, который входит в систему, пингуется и может сказать вам, какие другие клиенты занесены в журнал.
Основной код похож на:
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>
, public coroutine, boost::noncopyable
{
...
void step(const error_code & err = error_code(), size_t bytes = 0)
{
reenter(this)
{
for (;;)
{
yield async_write(sock_, write_buffer_,
MEM_FN2(step,_1,_2) );
yield async_read_until( sock_, read_buffer_,"n", MEM_
FN2(step,_1,_2));
yield service.post( MEM_FN(on_answer_from_server));
}
}
}
};
Первое, что изменилось — это пропало большое число функций-членов, таких как connect(), on_connect(), on_read(),do_read(), on_write(), do_write()
и так далее, теперь у нас есть одна вызываемая функция step()
.
Тело функции находится внутри reenter(this) { for (;;) { }}
. Вы можете думать о reenter(this)
как о коде, который мы выполняли в последний раз, так что мы можем сейчас вызвать следующий код.
Внутри блока reenter
вы можете увидеть несколько текущих вызовов. Первый раз при входе функцию запускается на выполнение функция async_write
, при втором входе – функция async_read_until
, при третьем – функция service.post
, при четвертом – опять async_write
и так далее.
Вы никогда не должны забывать про экземпляр for(;;) {}.
Посмотрим на следующий код:
void step(const error_code & err = error_code(), size_t bytes = 0)
{
reenter(this)
{
yield async_write(sock_, write_buffer_, MEM_FN2(step,_1,_2) );
yield async_read_until( sock_, read_buffer_, "n",MEM_FN2(step,_1,_2));
yield service.post( MEM_FN(on_answer_from_server));
}
}
Если бы мы использовали предыдущий фрагмент кода в третий раз, мы бы вошли в функцию и выполнили service.post
. В четвертый раз мы бы прошли мимо service.post
и ничего не выполнили. То же самое произойдет и на пятый раз и на все последующие:
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>
, public coroutine, boost::noncopyable
{
talk_to_svr(const std::string & username) : ... {}
void start(ip::tcp::endpoint ep)
{
sock_.async_connect(ep, MEM_FN2(step,_1,0) );
}
static ptr start(ip::tcp::endpoint ep, const std::string & username)
{
ptr new_(new talk_to_svr(username));
new_->start(ep);
return new_;
}
void step(const error_code & err = error_code(), size_t bytes = 0)
{
reenter(this)
{
for (;;)
{
if ( !started_)
{
started_ = true;
std::ostream out(&write_buf_);
out << "login " << username_ << "n";
}
yield async_write(sock_, write_buf_, MEM_FN2(step,_1,_2) );
yield async_read_until( sock_,read_buf_,"n", MEM_FN2(step,_1,_2));
yield service.post( MEM_FN(on_answer_from_server));
}
}
}
void on_answer_from_server()
{
std::istream in(&read_buf_);
std::string word;
in >> word;
if ( word == "login")
on_login();
else if ( word == "ping")
on_ping();
else if ( word == "clients")
on_clients();
read_buf_.consume( read_buf_.size());
if (write_buf_.size() > 0)
service.post( MEM_FN2(step,error_code(),0));
}
...
private:
ip::tcp::socket sock_;
streambuf read_buf_, write_buf_;
bool started_;
std::string username_;
deadline_timer timer_;
};
Когда мы начинаем подключение, вызывается функция start()
, которая асинхронно подключается к серверу. Когда соединение установлено, мы входим в step()
в первый раз. Это когда мы отправляем сообщение с нашим логином.
После этот мы используем async_write
, затем async_read_until
и обрабатываем сообщение (on_answer_from_server
).
В функции on_answer_from_server
мы обрабатываем входящие сообщения; мы читаем первое слово и направляем в соответствующую функцию, а остальную часть сообщения мы игнорируем (в любом случае):
class talk_to_svr : ...
{
...
void on_login()
{
do_ask_clients();
}
void on_ping()
{
std::istream in(&read_buf_);
std::string answer; in >> answer;
if ( answer == "client_list_changed")
do_ask_clients();
else
postpone_ping();
}
void on_clients()
{
std::ostringstream clients;
clients << &read_buf_;
std::cout << username_ << ", new client list:" << clients.
str();
postpone_ping();
}
void do_ping()
{
std::ostream out(&write_buf_);
out << "pingn";
service.post( MEM_FN2(step,error_code(),0));
}
void postpone_ping()
{
timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000));
timer_.async_wait( MEM_FN(do_ping));
}
void do_ask_clients()
{
std::ostream out(&write_buf_);
out << "ask_clientsn";
}
};
Пример немного более сложный, так как мы должны проверять связь с сервером в случайный момент времени. Чтобы сделать это, мы откладываем операцию пинговки после того, как успешно запросили список клиентов в первый раз. Затем на каждый ответный пинг от сервера мы откладываем другую операцию пинговки.
Чтобы запустить все это, используйте следующий фрагмент кода:
int main(int argc, char* argv[])
{
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
talk_to_svr::start(ep, "John");
service.run();
}
Используя сопрограммы мы сократили код на 15 строк, а так же он стал гораздо более читабельнее. Здесь мы едва коснулись темы сопрограмм. Если вы хотите получить больше информации по данному вопросу, то можете посетить эту страницу [7].
Мы увидели, как легко Boost.Asio работает с потоками STL и объектами streambuf. Так же мы посмотрели, как сопрограммы делают наш код более компактным и облегчает его понимание.
В следующей главе мы рассмотрим такие темы как Asio против Boost.Asio, прогрессивная отладка, SSL, а так же некоторые дуругие особенности, зависящие от платформы.
Ресурсы к этой статье: ссылка [8]
Всем большое спасибо за внимание, до новых встреч!
Автор: Vasilui
Источник [9]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/programmirovanie/45258
Ссылки в тексте:
[1] Глава 1: Приступая к работе с Boost.Asio: http://habrahabr.ru/post/192284/
[2] Часть 1: Основы Boost.Asio: http://habrahabr.ru/post/193038/
[3] Часть 2: Асинхронное программирование: http://habrahabr.ru/post/195006/
[4] Глава 3: Echo Сервер/Клиент: http://habrahabr.ru/post/195386/
[5] Глава 4: Клиент и Сервер: http://habrahabr.ru/post/195794/
[6] Глава 5: Синхронное против асинхронного: http://habrahabr.ru/post/196354/
[7] страницу: http://blog.think-async.com/2010_03_01_archive.html
[8] ссылка: https://github.com/Vasilui/habrahabr/tree/master/Chapter_6
[9] Источник: http://habrahabr.ru/post/196888/
Нажмите здесь для печати.