Создаем конвейер потоковой обработки данных. Часть 2

в 14:52, , рубрики: big data, data science, Google Cloud Platform, programming, python, Towards Data Science, Блог компании OTUS. Онлайн-образование, хранение данных

Всем привет. Делимся переводом заключительной части статьи, подготовленной специально для студентов курса «Data Engineer». С первой частью можно ознакомиться тут.

Apache Beam и DataFlow для конвейеров реального времени

Создаем конвейер потоковой обработки данных. Часть 2 - 1

Настройка Google Cloud

Примечание: Для запуска конвейера и публикации данных пользовательского лога я использовал Google Cloud Shell, поскольку у меня возникли проблемы с запуском конвейера на Python 3. Google Cloud Shell использует Python 2, который лучше согласуется с Apache Beam.

Чтобы запустить конвейер, нам нужно немного покопаться в настройках. Тем из вас, кто раньше не пользовался GCP, необходимо выполнить следующие 6 шагов, приведенных на этой странице.

После этого нам нужно будет загрузить наши скрипты в облачное хранилище Google и скопировать их в нашу Google Cloud Shel. Загрузка в облачное хранилище достаточно тривиальна (описание можно найти здесь). Чтобы скопировать наши файлы, мы можем открыть Google Cloud Shel из панели инструментов, щелкнув первый значок слева на рисунке 2 ниже.

Создаем конвейер потоковой обработки данных. Часть 2 - 2
Рисунок 2

Команды, которые нам нужны для копирования файлов и установки необходимых библиотек, перечислены ниже.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Создание нашей базы данных и таблицы

После того, как мы выполнили все шаги, связанные с настройкой, следующее, что нам нужно сделать, это создать набор данных и таблицу в BigQuery. Есть несколько способов сделать это, но самый простой — использовать консоль Google Cloud, сначала создав набор данных. Вы можете выполнить действия, указанные по следующей ссылке, чтобы создать таблицу со схемой. Наша таблица будет иметь 7 столбцов, соответствующих компонентам каждого пользовательского лога. Для удобства мы определим все столбцы как строки (тип string), за исключением переменной timelocal, и назовем их в соответствии с переменными, которые мы сгенерировали ранее. Схема нашей таблицы должна выглядеть как на рисунке 3.

Создаем конвейер потоковой обработки данных. Часть 2 - 3
Рисунок 3. Схема таблицы

Публикация данных пользовательского лога

Pub/Sub является критически важным компонентом нашего конвейера, поскольку позволяет нескольким независимым приложениям взаимодействовать друг с другом. В частности, он работает как посредник, позволяющий нам отправлять и получать сообщения между приложениями. Первое, что нам нужно сделать, это создать тему (topic). Достаточно просто перейти в Pub/Sub в консоли и нажать CREATE TOPIC.

Приведенный ниже код вызывает наш скрипт для генерации данных лога, определенных выше, а затем подключается и отправляет журналы в Pub/Sub. Единственное, что нам нужно сделать, — это создать объект PublisherClient, указать путь к теме с помощью метода topic_path и вызвать функцию publish с topic_path и данными. Обратите внимание, что мы импортируем generate_log_line из нашего скрипта stream_logs, поэтому убедитесь, что эти файлы находятся в одной папке, иначе вы получите ошибку импорта. Затем мы можем запустить это через нашу google-консоль, используя:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Как только файл запустится, мы сможем наблюдать вывод данных лога на консоль, как показано на рисунке ниже. Этот скрипт будет работать до тех пор, пока мы не используем CTRL+C, чтобы завершить его.

Создаем конвейер потоковой обработки данных. Часть 2 - 4
Рисунок 4. Вывод publish_logs.py

Написание кода нашего конвейера

Теперь, когда мы все подготовили, мы можем приступить к самой интересной части — написанию кода нашего конвейера, используя Beam и Python. Чтобы создать Beam-конвейер, нам нужно создать объект конвейера (p). После того как мы создали объект конвейера, мы можем применить несколько функций одну за другой, используя оператор pipe (|). В общем, рабочий процесс выглядит как на рисунке ниже.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

В нашем коде мы создадим две пользовательские функции. Функцию regex_clean, которая сканирует данные и извлекает соответствующую строку на основе списка PATTERNS, используя функцию re.search. Функция возвращает разделенную запятыми строку. Если вы не являетесь экспертом по регулярным выражениям, я рекомендую ознакомится с этим туториалом и попрактиковаться в блокноте, чтобы проверить код. После этого мы определяем пользовательскую ParDo-функцию под названием Split, которая является вариацией Beam-преобразования для параллельной обработки. В Python это делается особым способом — мы должны создать класс, который наследуется от класса DoFn Beam. Функция Split принимает распаршенную строку из предыдущей функции и возвращает список словарей с ключами, соответствующими именам столбцов в нашей таблице BigQuery. Есть кое-что, что следует отметить про эту функцию: мне пришлось импортировать datetime внутри функции, чтобы она работала. Я получал сообщение об ошибке при импорте в начале файла, что было странно. Этот список затем передается в функцию WriteToBigQuery, которая просто добавляет наши данные в таблицу. Код для Batch DataFlow Job и Streaming DataFlow Job приведен ниже. Единственное отличие между пакетным и потоковым кодом заключается в том, что в пакетной обработке мы читаем CSV из src_path, используя функцию ReadFromText из Beam.

Batch DataFlow Job (обработка пакетов)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (обработка потока)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Запуск конвейера

Мы можем запустить конвейер несколькими различными способами. Если бы мы захотели, мы могли бы просто запустить его локально с терминала, удаленно войдя в GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Однако мы собираемся запустить его с помощью DataFlow. Мы можем сделать это с помощью нижеприведенной команды, установив следующие обязательные параметры.

  • project — ID вашего проекта GCP.
  • runner — средство запуска конвейера, которое проанализирует вашу программу и сконструирует ваш конвейер. Для выполнения в облаке вы должны указать DataflowRunner.
  • staging_location — путь к облачному хранилищу Cloud Dataflow для индексировани пакетов кода, необходимых обработчикам, выполняющим работу.
  • temp_location — путь к облачному хранилищу Cloud Dataflow для размещения временных файлов заданий, созданных во время работы конвейера.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Пока эта команда выполняется, мы можем перейти на вкладку DataFlow в google-консоли и просмотреть наш конвейер. Кликнув по конвейеру, мы должны увидеть что-то похожее на рисунок 4. В целях отладки может быть очень полезно перейти в логи, а затем в Stackdriver для просмотра подробных логов. Это помогло мне разрешить проблемы с конвейером в ряде случаев.

Создаем конвейер потоковой обработки данных. Часть 2 - 5
Рисунок 4: Beam-конвейер

Доступ к нашим данным в BigQuery

Итак, у нас уже должен быть запущен конвейер с данными, поступающими в нашу таблицу. Чтобы проверить это, мы можем перейти к BigQuery и просмотреть данные. После использования команды ниже вы должны увидеть первые несколько строк набора данных. Теперь, когда у нас есть данные, хранящиеся в BigQuery, мы можем провести дальнейший анализ, а также поделиться данными с коллегами и начать отвечать на бизнес-вопросы.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Создаем конвейер потоковой обработки данных. Часть 2 - 6
Рисунок 5: BigQuery

Заключение

Надеемся, что этот пост послужит полезным примером создания потокового конвейера данных, а также поиска способов сделать данные более доступными. Хранение данных в таком формате дает нам много преимуществ. Теперь мы можем начать отвечать на важные вопросы, например, сколько людей используют наш продукт? Растет ли со временем база пользователей? С какими аспектами продукта люди взаимодействуют больше всего? И есть ли ошибки, там где их быть не должно? Это те вопросы, которые будут интересны для организации. На основе идей, вытекающих из ответов на эти вопросы, мы сможем усовершенствовать продукт и повысить заинтересованность пользователей.

Beam действительно полезен для такого типа упражнений, а также имеет ряд других интересных случаев использования. Например, вы можете анализировать данные по биржевым тикам в режиме реального времени и совершать сделки на основе анализа, возможно, у вас есть данные датчиков, поступающие с транспортных средств, и вы хотите вычислить расчет уровня трафика. Вы также можете, например, быть игровой компанией, собирающей данные о пользователях и использующей ее для создания информационных панелей для отслеживания ключевых показателей. Ладно, господа, это тема уже для другого поста, спасибо за чтение, а для тех, кто хочет увидеть полный код, ниже ссылка на мой GitHub.

https://github.com/DFoly/User_log_pipeline

На этом все. Читать первую часть.

Автор: MaxRokatansky

Источник


* - обязательные к заполнению поля