Bittorrent с нуля на Go

в 19:27, , рубрики: Go, skillfactory, Блог компании SkillFactory, пиринг, пиринговые сети, поиск, Программирование, протоколы, реализация, Сетевые технологии, спецификации

Bittorrent с нуля на Go - 1

BitTorrent — протокол загрузки и распространения файлов через Интернет. В отличие от традиционных отношений клиент/сервер, когда загрузчики подключаются к центральному серверу (например, для просмотра фильма на Netflix или загрузки веб-страницы), участники сети BitTorrent, называемые одноранговыми узлами, загружают фрагменты файлов друг с друга. Это то, что делает BitTorrent одноранговым протоколом. Исследуем, как он работает, и создадим собственный клиент, который сможет находить одноранговые узлы и обмениваться с ними данными.

диаграмма, показывающая разницу между отношениями клиент/сервер (все клиенты подключаются к одному серверу) и одноранговыми (одноранговые узлы подключаются друг к другу)

Протокол органично развивался в последние 20 лет, отдельные люди и организации добавили расширения для таких функций, как шифрование, приватные торренты и новые способы поиска одноранговых узлов. Чтобы управиться за выходные, реализуем оригинальную спецификацию 2001 года.

Я воспользуюсь Debian ISO, 350 МБ. Это популярный дистрибутив Linux, поэтому у нас будет множество быстрых и совместимых одноранговых узлов, к которым мы сможем подключиться. Получится также избежать юридических и этических проблем, связанных с загрузкой пиратского контента.

Поиск пиров

Вот проблема: мы хотим загрузить файл с помощью BitTorrent, но это одноранговый протокол, и мы понятия не имеем, где найти одноранговые узлы для его загрузки. Это очень похоже на переезд в новый город и попытку завести друзей — может быть, мы зайдём в местный паб или встретимся! Централизованные местоположения — главная идея трекеров, которые представляют собой центральные серверы, знакомящие одноранговые узлы друг с другом. Это просто веб-серверы, работающие по протоколу HTTP. Некоторые трекеры используют UDP — двоичный протокол для экономии полосы пропускания.

иллюстрация настольного компьютера и ноутбука, сидящих в пабе

Конечно, эти центральные серверы могут подвергнуться рейду федералов, если они способствуют обмену незаконным контентом. Возможно, вы читали о том, что такие трекеры, как TorrentSpy, Popcorn Time и KickassTorrents, закрыты. Новые методы устраняют посредников, превращая в распределённый процесс даже обнаружение одноранговых узлов. Мы не будем их внедрять, но если вам интересно, некоторые термины, которые вы можете изучить, — DHT, PEX и магнитные ссылки.

Разбор файла .torrent

Файл .torrent описывает содержимое файла, доступного для торрента, и информацию для подключения к трекеру. Это всё необходимое, чтобы запустить загрузку торрента. Торрент-файл Debian выглядит так:

d8:announce41:http://bttracker.debian.org:6969/announce7:comment35:"Debian CD from cdimage.debian.org"13:creation datei1573903810e9:httpseedsl145:https://cdimage.debian.org/cdimage/release/10.2.0//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-10.2.0-amd64-netinst.iso145:https://cdimage.debian.org/cdimage/archive/10.2.0//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-10.2.0-amd64-netinst.isoe4:infod6:lengthi351272960e4:name31:debian-10.2.0-amd64-netinst.iso12:piece lengthi262144e6:pieces26800:PS^ (binary blob of the hashes of each piece)ee

Этот беспорядок закодирован в формате Bencode (произносится bee-encode), и нам нужно декодировать его.

Bencode может кодировать примерно те же типы структур, что и JSON — строки, целые числа, списки и словари. Закодированные bencode данные не так удобны для чтения / записи человеком, как JSON, но они могут эффективно обрабатывать двоичные данные, и их действительно просто анализировать из потока. Строки поставляются с префиксом длины и выглядят как 4:spam. Целые числа располагаются между маркерами start и end, поэтому 7 кодируется как i7e. Списки и словари работают аналогичным образом: l4:spami7ee представляет ['spam', 7], тогда как d4:spami7ee означает {spam: 7}.

В формате покрасивее .torrent выглядит так:

d
  8:announce
    41:http://bttracker.debian.org:6969/announce
  7:comment
    35:"Debian CD from cdimage.debian.org"
  13:creation date
    i1573903810e
  4:info
    d
      6:length
        i351272960e
      4:name
        31:debian-10.2.0-amd64-netinst.iso
      12:piece length
        i262144e
      6:pieces
        26800:PS^ (binary blob of the hashes of each piece)

По этому файлу можно отследить URL трекера, дату создания (в виде временной метки Unix), имя и размер файла. Кроме этого, в файле есть большой бинарный блоб, в котором содержатся SHA-1 хэши каждой части файла. Эти части равны по размеру в пределах файла для скачивания. У разных торрентов размер части может быть разным, но обычно находится в пределах от 256 КБ до 1 МБ. Таким образом, крупный файл может состоять из тысяч частей. Эти части мы скачаем у пиров, проверим их по хэшам нашего торрент-файла и соберём их воедино. Всё. Файл готов!

изображение файла, разрезаемого ножницами на части, пронумерованные с piece 0

Такой механизм позволяет проверить отдельно целостность каждой части файла в ходе процесса. При этом, BitTorrent устойчив к случайному и намеренному повреждению торрент-файла (torrent poisoning). Если хакеру не удалось взломать SHA-1 при помощи атаки праобраза (preimage attack), мы получим ровно тот контент, который запросили.

Здорово было бы написать свой bencode-парсер, но статья посвящена не парсерам. Парсер на 50 строк кода от Фредерика Лунда кажется мне наиболее интересным. Для этого проекта я пользовался github.com/jackpal/bencode-go:

import (
    "github.com/jackpal/bencode-go"
)

type bencodeInfo struct {
    Pieces      string `bencode:"pieces"`
    PieceLength int    `bencode:"piece length"`
    Length      int    `bencode:"length"`
    Name        string `bencode:"name"`
}

type bencodeTorrent struct {
    Announce string      `bencode:"announce"`
    Info     bencodeInfo `bencode:"info"`
}

// Open parses a torrent file
func Open(r io.Reader) (*bencodeTorrent, error) {
    bto := bencodeTorrent{}
    err := bencode.Unmarshal(r, &bto)
    if err != nil {
        return nil, err
    }
    return &bto, nil
}

смотреть в контексте

Мне нравится делать свои структуры относительно плоскими и отделять структуры приложения от структур сериализации (serialization structs). Поэтому я применил другую, более плоскую структуру TorrentFile и написал несколько вспомогательных функций для их взаимопреобразования.

Обратите внимание, что pieces (изначально, это была строковая переменная) я разбил на хэш-слайсы (slice of hashes) по [20] байт на хэш. Это упросит доступ у отдельным хэшам. Кроме того, я рассчитал общий хэш SHA-1 всего кодированного в bencode словаря info (где содержатся имя, размер и хэш каждой части). Этот хэш известен нам как инфохэш (infohash), уникальный идентификатор файлов для передачи данных трекерам и пирам. Подробнее об этом поговорим позже.

именная табличка 'Hello my name is 86d4c80024a469be4c50bc5a102cf71780310074'

type TorrentFile struct {
    Announce    string
    InfoHash    [20]byte
    PieceHashes [][20]byte
    PieceLength int
    Length      int
    Name        string
}

смотреть в контексте

Получение пиров через трекер

Теперь, имея информацию о файле и его трекере, давайте обратимся к трекеру для анонсирования нашего присутствия среди пиров и для получения списка других пиров. Для этого нужно составить запрос GET на URL announce в файле .torrent с несколькими параметрами запроса:

func (t *TorrentFile) buildTrackerURL(peerID [20]byte, port uint16) (string, error) {
    base, err := url.Parse(t.Announce)
    if err != nil {
        return "", err
    }
    params := url.Values{
        "info_hash":  []string{string(t.InfoHash[:])},
        "peer_id":    []string{string(peerID[:])},
        "port":       []string{strconv.Itoa(int(Port))},
        "uploaded":   []string{"0"},
        "downloaded": []string{"0"},
        "compact":    []string{"1"},
        "left":       []string{strconv.Itoa(t.Length)},
    }
    base.RawQuery = params.Encode()
    return base.String(), nil
}

смотреть в контексте

Важные элементы кода:

  • info_hash: идентифицирует файл, который мы хотим скачать. Это инфохэш (infohash), который мы рассчитывали ранее по кодированному bencode словарю info. Трекеру нужно знать этот хэш, чтобы показать нам именно тех пиров, которые нужны.
  • peer_id: имя на 20 байт, которое идентифицирует нас на трекерах и для пиров. Для этого мы просто берём 20 случайных байтов. Реальные клиенты BitTorrent имеют ID вида -TR2940-k8hj0wgej6ch, где закодированы программное обеспечение и его версия. В данном случае TR2940 означает Transmission client 2.94.

файл с именной табличной 'info_hash' и человечек с именной табличкой 'peer_id'

Парсинг ответа трекера

Мы получаем ответ, закодированный bencode:

d
  8:interval
    i900e
  5:peers
    252:(another long binary blob)
e

interval указывает как часто мы можем делать запрос на сервер для обновления списка пиров. Значение 900 означает, что мы можем переподключаться раз в 15 минут (900 секунд).

peers — большой бинарный блоб, где содержатся IP-адреса каждого пира. Он образован группами по 6 байтов. Первые 4 байта — это IP адрес пира, а каждый байт показывает числовое значение в IP. Последние 2 байта — порт, в кодировке big-endian это uint16. Big-endian или сетевой порядок (network order) предполагает, что группу байтов можно представлять в виде целочисленного значения, двигаясь по порядку слева направо. Например, байты 0x1A, 0xE1 кодируются в 0x1AE1 или 6881 в десятичном формате.*Интерпретация тех же байтов в порядке little-endian дала бы 0xE11A = 57626

схема интерпретации 192, 0, 2, 123, 0x1A, 0xE1 в виде 192.0.1.123:6881

// Peer encodes connection information for a peer
type Peer struct {
    IP   net.IP
    Port uint16
}

// Unmarshal parses peer IP addresses and ports from a buffer
func Unmarshal(peersBin []byte) ([]Peer, error) {
    const peerSize = 6 // 4 for IP, 2 for port
    numPeers := len(peersBin) / peerSize
    if len(peersBin)%peerSize != 0 {
        err := fmt.Errorf("Received malformed peers")
        return nil, err
    }
    peers := make([]Peer, numPeers)
    for i := 0; i < numPeers; i++ {
        offset := i * peerSize
        peers[i].IP = net.IP(peersBin[offset : offset+4])
        peers[i].Port = binary.BigEndian.Uint16(peersBin[offset+4 : offset+6])
    }
    return peers, nil
}

смотреть в контексте

Скачивание у пиров

Теперь, имея список пиров, пора подключиться к ним и скачивать файл частями! Для каждого пира мы хотим сделать следующее:

  1. Начало подключения к пиру по TCP. Это всё равно, что снять трубку и набрать номер.
  2. Двустороннее BitTorrent-рукопожатие. Это всё равно, что сказать «Алло» и услышать «Алло» в ответ.
  3. Обмен сообщениями (messages) для скачивания частей файла. «Мне, пожалуйста, 231-ю часть».

Начало подключения по TCP

conn, err := net.DialTimeout("tcp", peer.String(), 3*time.Second)
if err != nil {
    return nil, err
}

смотреть в контексте

Для соединения я воспользовался таймаутом, чтобы не терять слишком много времени на пирах, которые не дают подключиться. В основном же это довольно стандартное подключении по TCP.

Хэндшейк

Мы только что подключились к пиру. Теперь нужно рукопожатие, чтобы убедиться, что этот пир:

  • может взаимодействовать по протоколу BitTorrent;
  • может понимать наши сообщения и отвечать на них;
  • имеет нужный нам файл, или хотя бы знает, о чём идёт речь.

Два общающихся компьютера. Один справивает: 'do you speak BitTorrent and have this file?', другой отвечает: 'I speak BitTorrent and have that file'

Как некогда говорил мой отец, хорошее рукопожатие должно быть крепким и с контактом глазами, в этом и весь секрет. В BitTorrent же секрет хорошего «рукопожатия» — или хэндшейка — включает 5 аспектов:

  1. Длина идентификатора протокола всегда должна быть равна 19 (0x13 в hex)
  2. Идентификатор протокола, который называется pstr, — это всегда строка BitTorrent protocol
  3. 8 **зарезервированных байтов, выставленных в 0. Некоторые из них переводятся в состояние 1 для указания поддержки отдельных расширений (extensions). Так как этого не произошло, мы оставляем им значение 0.
  4. Инфохэш (infohash), который мы рассчитали ранее для желаемого файла
  5. ID пира (Peer ID), которым мы идентифицировали себя

Собираем это всё вместе и получаем хэндшейк:

x13BitTorrent protocolx00x00x00x00x00x00x00x00x86xd4xc8x00x24xa4x69xbex4cx50xbcx5ax10x2cxf7x17x80x31x00x74-TR2940-k8hj0wgej6ch

Отправив хэндшейк пиру, мы ожидаем аналогичный хэндшейк в том же формате. Инфохэш, который мы получим в ответе, должен совпасть с нашим. Тогда будет понятно, что говорим об одном и том же файле. Если всё идёт по плану, едем дальше. Если нет, мы можем разъединиться. «Алло». «Чже ши шей Нин сян яо шеньме» «Ой, извините, ошибся номером».

Реализуем в нашем коде структуру, представляющую хэндшейк, и записываем несколько методов для их сериализации и чтения:

// A Handshake is a special message that a peer uses to identify itself
type Handshake struct {
    Pstr     string
    InfoHash [20]byte
    PeerID   [20]byte
}

// Serialize serializes the handshake to a buffer
func (h *Handshake) Serialize() []byte {
    buf := make([]byte, len(h.Pstr)+49)
    buf[0] = byte(len(h.Pstr))
    curr := 1
    curr += copy(buf[curr:], h.Pstr)
    curr += copy(buf[curr:], make([]byte, 8)) // 8 reserved bytes
    curr += copy(buf[curr:], h.InfoHash[:])
    curr += copy(buf[curr:], h.PeerID[:])
    return buf
}

// Read parses a handshake from a stream
func Read(r io.Reader) (*Handshake, error) {
    // Do Serialize(), but backwards
    // ...
}

смотреть в контексте

Отправка и получение сообщений

По завершении начального хэндшейка мы можем отправлять и принимать сообщения. Конечно, если другой пир не готов принимать сообщения, мы не сможем это делать, пока он не скажет, что готов. В этом состоянии мы, с точки зрения другого пира, «заглушены» (choked). Они должны отправить нам сообщение unchoke, сообщающее, что мы можем приступить к запросу данных. По умолчанию мы считаем, что всегда «заглушены», пока не убеждаемся в обратном.

Получив сообщение unchoked, мы можем приступать к отправке запросов (requests) на части файла, а они — присылать в ответ сообщения с такими частями.

Рисунок, где один стикмен говорит другому 'hello I would like piece number—', а второй душит его за шею и говорит '00 00 00 01 00 (choke)'

Интерпретация сообщений

У сообщения есть длина, ID и полезная нагрузка (payload). Это выглядит так:

Сообщение имеет указатель длины 4 байта, 1 байт занимает ID, остальное — опциональная полезная нагрузка

Сообщение начинается с указателя длины. Он показывает, сколько байт составит длина сообщения. Это целочисленная переменная в 32 бита, то есть 4 байта в кодировке big-endian. Следующий байт, ID, сообщает, какой тип сообщения мы получили — например, байт 2 означает «interested». Оставшуюся часть сообщения занимает полезная нагрузка.

type messageID uint8

const (
    MsgChoke         messageID = 0
    MsgUnchoke       messageID = 1
    MsgInterested    messageID = 2
    MsgNotInterested messageID = 3
    MsgHave          messageID = 4
    MsgBitfield      messageID = 5
    MsgRequest       messageID = 6
    MsgPiece         messageID = 7
    MsgCancel        messageID = 8
)

// Message stores ID and payload of a message
type Message struct {
    ID      messageID
    Payload []byte
}

// Serialize serializes a message into a buffer of the form
// <length prefix><message ID><payload>
// Interprets `nil` as a keep-alive message
func (m *Message) Serialize() []byte {
    if m == nil {
        return make([]byte, 4)
    }
    length := uint32(len(m.Payload) + 1) // +1 for id
    buf := make([]byte, 4+length)
    binary.BigEndian.PutUint32(buf[0:4], length)
    buf[4] = byte(m.ID)
    copy(buf[5:], m.Payload)
    return buf
}

смотреть в контексте

Чтобы прочитать сообщение из потока (stream), мы просто следуем формату сообщения. Мы считываем 4 байта и интерпретируем их как uint32, чтобы узнать длину сообщения. Затем узнаём число байт, получаем ID (первый байт) и полезную нагрузку — оставшиеся байты.

// Read parses a message from a stream. Returns `nil` on keep-alive message
func Read(r io.Reader) (*Message, error) {
    lengthBuf := make([]byte, 4)
    _, err := io.ReadFull(r, lengthBuf)
    if err != nil {
        return nil, err
    }
    length := binary.BigEndian.Uint32(lengthBuf)

    // keep-alive message
    if length == 0 {
        return nil, nil
    }

    messageBuf := make([]byte, length)
    _, err = io.ReadFull(r, messageBuf)
    if err != nil {
        return nil, err
    }

    m := Message{
        ID:      messageID(messageBuf[0]),
        Payload: messageBuf[1:],
    }

    return &m, nil
}

смотреть в контексте

Битовые поля

Один из интереснейших типов сообщений — битовое поле (bitfield). Это структура данных, с помощью которой пиры эффективно кодируют те фрагменты, которые могут отправить. Битовое поле похоже на байтовый массив (byte array). Чтобы проверить, какие части уже скачались, нужно только посмотреть на положения битов, значение которых равно 1. Можете считать это цифровым эквивалентом карты лояльности какой-нибудь кофейни. Вначале карта пуста, и все биты равны 0. Мы меняем значения на 1, чтобы «проштамповать» все позиции на карте.

карта лояльности кофейни с отметками на 4 первых и предпоследней позиции, что соответствует двоичному коду 11110010

Благодаря работе с битами вместо байтов, эта структура данных намного компактнее. Мы можем закодировать информацию о 8 частях в одном байте — это размер типа bool [уточнить]. Но цена такого подхода — большая сложность. Самый маленький размер для адресации — байт. Поэтому для работы с битами нужны некоторые манипуляции:

// A Bitfield represents the pieces that a peer has
type Bitfield []byte

// HasPiece tells if a bitfield has a particular index set
func (bf Bitfield) HasPiece(index int) bool {
    byteIndex := index / 8
    offset := index % 8
    return bf[byteIndex]>>(7-offset)&1 != 0
}

// SetPiece sets a bit in the bitfield
func (bf Bitfield) SetPiece(index int) {
    byteIndex := index / 8
    offset := index % 8
    bf[byteIndex] |= 1 << (7 - offset)
}

смотреть в контексте

Собираем всё вместе

Теперь у нас есть всё, чтобы начать скачивать файл: список пиров для трекера, связь с ними по TCP, рукопожатие, отправка и получение сообщений. Важнейшие из оставшихся задач — конкурентность при одновременной связи с несколькими пирами и управлении состоянием пиров, с которыми мы взаимодействуем. Обе задачи классические, но сложные.

Управление одновременной работой: каналы в качестве очередей

В Go память память распределяется через коммуникацию. При этом канал Go можно считать малозатратной потокобезопасной очередью.

Мы настроим два канала для синхронизации конкурентных воркеров: один для распределения задач (скачиваемых частей) между пирами, а другой — для сборки скачанных частей. Как только загруженные фрагменты попадают в канал с результатами, их можно копировать в буфер, чтобы можно было начать полную сборку файла.

// Init queues for workers to retrieve work and send results
workQueue := make(chan *pieceWork, len(t.PieceHashes))
results := make(chan *pieceResult)
for index, hash := range t.PieceHashes {
    length := t.calculatePieceSize(index)
    workQueue <- &pieceWork{index, hash, length}
}

// Start workers
for _, peer := range t.Peers {
    go t.startDownloadWorker(peer, workQueue, results)
}

// Collect results into a buffer until full
buf := make([]byte, t.Length)
donePieces := 0
for donePieces < len(t.PieceHashes) {
    res := <-results
    begin, end := t.calculateBoundsForPiece(res.index)
    copy(buf[begin:end], res.buf)
    donePieces++
}
close(workQueue)

смотреть в контексте

Проводим по процессу в горутине каждого полученного на трекере пира. Каждый из них подключится к пиру, выполнит рукопожатие и будет получать задачи из workQueue, пытаясь выполнить скачивание, и отправлять скачанные части обратно по каналу results.

диаграмма стратегии скачивания

func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
    c, err := client.New(peer, t.PeerID, t.InfoHash)
    if err != nil {
        log.Printf("Could not handshake with %s. Disconnectingn", peer.IP)
        return
    }
    defer c.Conn.Close()
    log.Printf("Completed handshake with %sn", peer.IP)

    c.SendUnchoke()
    c.SendInterested()

    for pw := range workQueue {
        if !c.Bitfield.HasPiece(pw.index) {
            workQueue <- pw // Put piece back on the queue
            continue
        }

        // Download the piece
        buf, err := attemptDownloadPiece(c, pw)
        if err != nil {
            log.Println("Exiting", err)
            workQueue <- pw // Put piece back on the queue
            return
        }

        err = checkIntegrity(pw, buf)
        if err != nil {
            log.Printf("Piece #%d failed integrity checkn", pw.index)
            workQueue <- pw // Put piece back on the queue
            continue
        }

        c.SendHave(pw.index)
        results <- &pieceResult{pw.index, buf}
    }
}

смотреть в контексте

Управление состоянием пиров

Мы будем следить за состоянием каждого пира в структуре (struct) и менять его по мере считывания полученных сообщений. В структуре будут храниться данные о том, сколько мы скачали у данного пира, сколько мы у него запросили и «заглушены» ли мы им. Всё это можно масштабировать до уровня конечного автомата, но пока нам достаточно структуры с обычным переключателем.

type pieceProgress struct {
    index      int
    client     *client.Client
    buf        []byte
    downloaded int
    requested  int
    backlog    int
}

func (state *pieceProgress) readMessage() error {
    msg, err := state.client.Read() // this call blocks
    switch msg.ID {
    case message.MsgUnchoke:
        state.client.Choked = false
    case message.MsgChoke:
        state.client.Choked = true
    case message.MsgHave:
        index, err := message.ParseHave(msg)
        state.client.Bitfield.SetPiece(index)
    case message.MsgPiece:
        n, err := message.ParsePiece(state.index, state.buf, msg)
        state.downloaded += n
        state.backlog--
    }
    return nil
}

смотреть в контексте

Пора делать запросы!

Файлами, частями и хэшами частей дело не кончится. Мы можем пойти дальше и разбить части на блоки. Блок — это фрагмент части. Его можно определить по индексу части, в которую он входит, байтовому смещению внутри части и длине. У пиров обычно запрашиваются блоки. Размер блока, как правило, 16 КБ, поэтому для получения части в 256 KB может понадобится 16 запросов.

Пир должен разрывать соединение при получении запроса на блок более 16 КБ. Но, как я знаю по своему опыту, большинство клиентов прекрасно обрабатывает запросы на блоки до 128 КБ. Но у меня прирост скорости при этом был довольно скромным, поэтому лучше, придерживаться требований спецификации.

Конвейерная обработка

Двусторонняя передача по сети (network round-trips) обходится дорого, а поблочные запросы зачастую просто убийственны для скорости скачивания. Поэтому важна конвейерная обработка запросов, при которой бремя незавершённых определённого числа процессов будет поддерживаться постоянным. Это может на порядок повысить пропускную способность нашего соединения.

Две ветки email с симуляцией подключения пиров. Ветка слева показывает запрос, на который трижды дан ответ. В ветке справа отправлены три запроса, и на них быстро дано три ответа

Клиенты BitTorrent традиционно поддерживают очередь из 5 конвейерных запросов (pipelined requests), и я возьму то же значение. Как я увидел, увеличение этого значения может удвоить скорость скачивания. Более современные клиенты используют адаптивный размер очереди (adaptive queue size), что лучше соответствует скоростям и условиям современных сетей. С этим параметром, определённо, стоит поиграть. Это простой и очевидный способ оптимизации производительности программы.

// MaxBlockSize is the largest number of bytes a request can ask for
const MaxBlockSize = 16384

// MaxBacklog is the number of unfulfilled requests a client can have in its pipeline
const MaxBacklog = 5

func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
    state := pieceProgress{
        index:  pw.index,
        client: c,
        buf:    make([]byte, pw.length),
    }

    // Setting a deadline helps get unresponsive peers unstuck.
    // 30 seconds is more than enough time to download a 262 KB piece
    c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
    defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline

    for state.downloaded < pw.length {
        // If unchoked, send requests until we have enough unfulfilled requests
        if !state.client.Choked {
            for state.backlog < MaxBacklog && state.requested < pw.length {
                blockSize := MaxBlockSize
                // Last block might be shorter than the typical block
                if pw.length-state.requested < blockSize {
                    blockSize = pw.length - state.requested
                }

                err := c.SendRequest(pw.index, state.requested, blockSize)
                if err != nil {
                    return nil, err
                }
                state.backlog++
                state.requested += blockSize
            }
        }

        err := state.readMessage()
        if err != nil {
            return nil, err
        }
    }

    return state.buf, nil
}

смотреть в контексте

main.go

Это не займёт много времени. Мы почти закончили.

package main

import (
    "log"
    "os"

    "github.com/veggiedefender/torrent-client/torrentfile"
)

func main() {
    inPath := os.Args[1]
    outPath := os.Args[2]

    tf, err := torrentfile.Open(inPath)
    if err != nil {
        log.Fatal(err)
    }

    err = tf.DownloadToFile(outPath)
    if err != nil {
        log.Fatal(err)
    }
}

смотреть в контексте

Это не всё

Для краткости я включил сюда лишь несколько основных фрагментов кода. Замечу, что весь связующий код (glue code), синтаксический анализ (parsing), юнит-тесты и другие скучные строительные элементы программы я опустил. Полную реализацию смотрите на Github.

Bittorrent с нуля на Go - 14

Автор: Skillfactory School

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js