Как я пришёл к идее создания системы приложений и разработал поисковик и мессенджер

в 14:16, , рубрики: android, CICD, django, django framework, docker, IoT, javascript, mqtt, nginx, python

Предисловие

Я Михаил — создатель и главный разработчик системы вэб приложений. Второй участник проекта — Владимир — разработчик мобильных версий и ответственный за SEO оптимизацию.

Внутри системы я разработал:

  • Поисковая система, включающая в себя голосовой поиск, поиск по фото и поиск по обычному тексту.

  • Мессенджер с двумя ботами: первый общается с поисковиком, а второй — с микроконтроллером (см. IoT технологии).

Технологии

Этот блок я разделил на 3 части:

  • технологии приложений,

  • серверные технологии,

  • IoT технологии для микроконтроллера.

1. Технологии приложений

Поисковая система и мессенджер представлены в виде микросервисной архитектуры, они взаимодействуют между собой с помощью бота в мессенджере, и работоспособность каждого приложения не зависит от другого.

1.1 Поисковик

Главная страница поисковика

Главная страница поисковика

Бэкенд поисковика написан с помощью фреймворка Django на языке python и использует API запросы (POST и GET). В основе поисковика лежит Yandex API Search, поведение которого дорабатывалось для получения более желаемых результатов, а также Yandex Speech Kit для преобразования голоса в текст.

Django выбран, для того, чтобы упростить общую логику работы приложения. Так, почти весь функционал, за исключением записи голоса, работает на бэке, и через Django можно делать рендер, а переход на ответы из поисковых запросов происходит по гиперссылкам.

В модуле service.py я описал основную логику работы поисковой системы. Здесь есть:

  • функция поиска, раздел «Все»,

  • функция поиска, раздел «Картинки»,

  • функция поиска, раздел «Видео»,

  • функция поиска по фото,

  • функция преобразования формата аудиофайла

  • функция голосового поиска

import os
from dotenv import load_dotenv
import base64
import time
import logging
import requests
import subprocess
from django.http import HttpResponse
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET


load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def search(search_query, page):
    # Функция поиска в браузере
    folderid = os.getenv('FOLDERID')
    api_key = os.getenv('API_KEY')
    print('ПОИСКОВЫЙ ЗАПРОС', search_query)
    print('Страница номер: ', page)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    results = []
    url = 'https://searchapi.api.cloud.yandex.net/v2/web/search'

    headers = {"Authorization": f"Api-Key {api_key}"}

    body = {
        "query": {
          "searchType": "SEARCH_TYPE_COM",
          "queryText": search_query,
          "familyMode": "FAMILY_MODE_NONE",
          "page": page_number,
          "fixTypoMode": "FIX_TYPO_MODE_ON"
        },
        "folderId": folderid,
        "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
        "responseFormat": "FORMAT_XML",
    }
    response = requests.post(url, headers=headers, json=body)
    if response.status_code == 200:
        start_time = time.time()
        encode_response = response.json()["rawData"]
        # декодируем из base64
        decoded_bytes = base64.b64decode(encode_response)
        # преобразуем байты в строку (UTF-8)
        xml_data = decoded_bytes.decode('utf-8')
        # Парсим XML
        root = ET.fromstring(xml_data)

        # Ищем все документы
        for doc in root.findall('.//doc'):
            url_elem = doc.find('url')
            title_elem = doc.find('title')
            domain = doc.find('domain')
            header = ''.join([title for title in title_elem.itertext()])
            if 'Украин' in header or 'украин' in header:
                continue
            else:
                results.append({
                    'url': url_elem.text if url_elem is not None else '',
                    'title': header if header is not None else '',
                    'favicon_url': f'https://{domain.text}/favicon.ico'
                })
        return results
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return HttpResponse("Error occurred", status=response.status_code)


def image(service_request, search_query):
    # Функция поиска фото
    folderid = os.getenv('FOLDERID')
    api_key = os.getenv('API_KEY')
    page = service_request.GET.get('page', 1)
    print('ПОИСКОВЫЙ ЗАПРОС КАРТИНКИ', search_query)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    images = []
    url = 'https://searchapi.api.cloud.yandex.net/v2/image/search'

    headers = {"Authorization": f"Api-Key {api_key}"}

    body = {
        "query": {
            "searchType": "SEARCH_TYPE_COM",
            "queryText": search_query,
            "familyMode": "FAMILY_MODE_NONE",
            "page": page_number,
            "fixTypoMode": "FIX_TYPO_MODE_ON"
        },
        "folderId": folderid,
        "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
    }
    response = requests.post(url, headers=headers, json=body)
    if response.status_code == 200:
        encode_response = response.json()["rawData"]
        # декодируем из base64
        decoded_bytes = base64.b64decode(encode_response)
        # преобразуем байты в строку (UTF-8)
        text_xml = decoded_bytes.decode('utf-8')
        soup = BeautifulSoup(text_xml, 'lxml')  # Парсим HTML-код
        # Извлечение всех изображений
        for item in soup.find_all('doc'):
            img_url = item.find('image-link').text if item.find('image-link') else None
            link_url = item.find('html-link').text if item.find('html-link') else None
            # Проверяем, что ссылку на картинку и сайт с картинкой можно извлечь
            if img_url and link_url:
                images.append({'url': img_url, 'link': link_url})

        return images
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return HttpResponse("Error occurred", status=response.status_code)


def video(service_request):
    # Функция поиска видео
    api_key = os.getenv('OTHER_API_KEY')
    search_engine_id = os.getenv('SEARCH_ENGINE_ID')
    search_query = service_request.GET.get('query', '')
    page = service_request.GET.get('page', 1)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    url = f'https://customsearch.googleapis.com/customsearch/v1/?q=видео {search_query}&page={page_number}&cx={search_engine_id}&key={api_key}'
    headers = {"Authorization": f"Api-Key {api_key}"}
    response = requests.get(url, headers=headers)
    print(page)
    if response.status_code == 200:
        all_data = []
        items = response.json()["items"]
        for item in items:
            try:
                thumbnail = item["pagemap"]["cse_thumbnail"][0]["src"]
            except KeyError:
                thumbnail = None
            all_data.append({'url': item["link"], 'title':item["title"], 'thumbnail': thumbnail})

        return all_data

    else:
        print(f"Error: {response.status_code} - {response.text}")
        return HttpResponse("Error occurred", status=response.status_code)


def search_by_image(service_request, encoded_image):
    # Функция поиска по фото 
    folderid = os.getenv('FOLDERID')
    api_key = os.getenv('API_KEY')
    section = service_request.GET.get('section')
    page = service_request.GET.get('page', 1)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    results = []
    url = 'https://searchapi.api.cloud.yandex.net/v2/image/search_by_image'

    headers = {"Authorization": f"Api-Key {api_key}"}

    body = {
        "folderId": folderid,
        "data": encoded_image,
        "page": page_number
    }
    response = requests.post(url, headers=headers, json=body)
    if response.status_code == 200:
        images = response.json()["images"]
        for image in images:
            if section == None:
                results.append({'link': image['url'], 'title': image['pageTitle'], 'url': image['pageUrl']})
            elif section == 'Похожее':
                results.append({'link': image['url']})
            elif section == 'Сайты':
                results.append({'title': image['pageTitle'], 'url': image['pageUrl']})
        service_request.session['encoded_image'] = encoded_image
        return results
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return HttpResponse("Error occurred", status=response.status_code)


def convert_webm_to_ogg(input_path: str, output_path: str):
     # Преобразование формата аудиофайла webm в ogg
    ffmpeg_path = '/usr/bin/ffmpeg'
    print('Вывести команду')
    command = [
        ffmpeg_path,
        '-y',
        '-i', input_path,
        '-c:a', 'libopus',
        output_path
    ]
    print('Начало конвертации')

    try:
        print('Попытка')
        subprocess.run(command, check=True)
        print(f"Конвертация завершена: {output_path}")
    except subprocess.CalledProcessError as e:
        print(f"Ошибка при конвертации: {e.stderr.decode()}")


def voice_search(file):
    # Голосовой поиск
    folderid = os.getenv('FOLDERID')
    api_key = os.getenv('API_KEY')
    url = f'https://stt.api.cloud.yandex.net/speech/v1/stt:recognize?topic=general&folderId={folderid}'
    headers = {
        "Authorization": f"Api-Key {api_key}",
        "Content-Type": "audio/ogg"
    }
    file.seek(0)
    data = file.read()
    input_webm = 'audio.webm'
    # Запись бинарных данных во входной файл
    with open(input_webm, 'wb') as f_out:
        f_out.write(data)
    convert_webm_to_ogg(input_webm, 'audio.ogg')
    print('Успешно')
    with open('audio.ogg', 'rb') as f:
        ogg_data = f.read()
    response = requests.post(url, headers=headers, data=ogg_data)

    try:
        return response.json()
    except Exception as e:
        print(f"Ошибка при разборе ответа: {e}")
        return None

Подробнее хочу остановиться на голосовом поиске.

Запись голоса происходит на интерфейсе. Пользователь говорит в микрофон, с помощью JavaScript запись голоса начинает работать, и сказанная речь записывается в файл формата “webm” (файл voice_file.js):

document.getElementById('writeVoice').addEventListener('click', () => {
  let mediaRecorder;
  let audioChunks = [];

  navigator.mediaDevices.getUserMedia({ audio: true })
    .then(stream => {
      const options = { mimeType: 'audio/webm;codecs=opus' };
      let recorderOptions = options;

      if (!MediaRecorder.isTypeSupported(options.mimeType)) {
        console.warn(`${options.mimeType} не поддерживается, оставляем без типов`);
        recorderOptions = {};
      } else {
        recorderOptions = { mimeType: options.mimeType };
      }

      // Создаем Recorder с поддерживаемым типом
      mediaRecorder = new MediaRecorder(stream, recorderOptions);

      mediaRecorder.onstart = () => console.log('Запись началась');
      mediaRecorder.ondataavailable = event => {
        audioChunks.push(event.data);
      };
      mediaRecorder.onstop = () => {
        const audioBlob = new Blob(audioChunks, { type: 'audio/webm' });
        audioChunks = [];

        const formData = new FormData();
        formData.append('file', audioBlob, 'audio.webm');
        fetch('/search/api/voice', {
          method: 'POST',
          body: formData,
        }).then(response => {
          return response.json();
          })
          .then(text => {
              const input = document.querySelector('input[name="query"]');
              input.value = text.data.result;
              const searchButton = document.querySelector('button.btn.btn-outline-success[type="submit"]');
              if (input.value != '') {
                searchButton.click()
              } else {
              }

          })
          .catch(console.error);
      };

      console.log('Начинаем запись...');
      mediaRecorder.start();

      const audioContext = new AudioContext();
      const source = audioContext.createMediaStreamSource(stream);
      const analyser = audioContext.createAnalyser();
      analyser.fftSize = 512;
      source.connect(analyser);

      const dataArray = new Uint8Array(analyser.frequencyBinCount);

      let silenceStart = null;      // Время начала тишины
      const silenceThreshold = 30;  // Порог громкости (0-255), ниже которого считаем тишину
      const maxSilenceTime = 1500;  // Максимальное время тишины (мс), после которого остановим запись

      function checkSilence() {
        analyser.getByteFrequencyData(dataArray);
        const volume = dataArray.reduce((a,b) => a + b) / dataArray.length;

        if (volume < silenceThreshold) {
          if (silenceStart === null) {
            silenceStart = Date.now();
          } else {
            let silenceDuration = Date.now() - silenceStart;
            if (silenceDuration > maxSilenceTime) {
              console.log('Обнаружена тишина, останавливаем запись');
              mediaRecorder.stop();
              audioContext.close();
              return; // прекратить вызов таймера
            }
          }
        } else {
          silenceStart = null; // звук появился — сброс таймера тишины
        }

        requestAnimationFrame(checkSilence);
      }

      checkSilence();
    })
    .catch(err => console.error('Ошибка доступа к микрофону:', err));
});

После этого срабатывает API эндпоинт /search/api/voice который преобразует формат “webm” в “ogg”. Сначала с помощью DRF приходит “webm” файл (файл api.py), который активирует функцию voice_search(file)из модуля services.py:

from rest_framework import generics
from rest_framework.response import Response
from .services import voice_search


class VoiceView(generics.GenericAPIView):

    def post(self, request, *args, **kwargs):
        audio = request.data
        file = audio.get('file')
        data = voice_search(file)
        print('Файл записался')
        return Response({"data": data})

Далее записанный файл сохраняется на сервер, и функция convert_webm_to_ogg в service.py, преобразует формат “webm” в “ogg”. Сделано это, потому что следующим шагом нужно извлечь текст из ogg файла через Yandex SpeechKit, а он может работать только с ogg форматом. В конце извлекается текст из файла “ogg”, который потом записывается на поисковую строку, и срабатывает поиск по извлечённому тексту. Так как на стороне клиента я использовал формат “webm”, а Yandex SpeechKit принимает только “ogg” изначально была проблема того, что конвертация не работало, Yandex SpeechKit выдавал ошибку. Я разобрался, и оказалось, что проблема была в пути до ffmpeg, я указал полный путь до ffmpeg на сервере и решил эту проблему.

Функции из модуля services.py используются в модуле views.py для последующего рендера:

import base64
from django.shortcuts import render
from .services import search, image, video, search_by_image


def search_view(request):
    # Функция поиска в браузере
    search_query = request.GET.get('query', '')
    print('ПОИСКОВЫЙ ЗАПРОС', search_query)
    page = request.GET.get('page', 1)  # Текущая страница (по умолчанию 1)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    try:
        results = search(search_query, page)
        len_results = len(results)
        return render(request, 'search/result.html',
                          {'results': results, 'query': search_query, 'page': page_number, 'total_results': len_results})
    except Exception:
        return render(request, 'main/site.html')

def image_view(request):
    # Функция поиска фото
    search_query = request.GET.get('query', '')
    page = request.GET.get('page', 1)
    print('ПОИСКОВЫЙ ЗАПРОС КАРТИНКИ', search_query)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    images = image(request, search_query)
    return render(request, 'search/images.html',
                      {'images': images, 'query': search_query, 'page': page_number})


def video_view(request):
    # Функция поиска видео
    search_query = request.GET.get('query', '')
    page = request.GET.get('page', 1)
    print('ПОИСКОВЫЙ ЗАПРОС ВИДЕО', search_query)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    video_query = 'видео' + search_query
    images = image(request, video_query)
    return render(request, 'search/videos.html',
                  {'images': images, 'query': search_query, 'page': page_number})


def search_by_image_view(request):
    img_file = request.FILES.get('image')
    print('ПОИСК ПО ИЗОБРАЖЕНИЮ: ', img_file)
    if img_file:
        encoded_image = base64.b64encode(img_file.read()).decode('utf-8')
    else:
        encoded_image = request.session.get('encoded_image')
    section = request.GET.get('section')
    page = request.GET.get('page', 1)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    results = search_by_image(request, encoded_image)
    return render(request, 'search/result_image_search.html',
                      {'results': results, 'query': img_file,
                        'page': page_number, 'encoded_image': encoded_image, 'section': section})


def news_view(request):
    # Функция поиска в браузере
    query = request.GET.get('query', '')
    print('ПОИСКОВЫЙ ЗАПРОС', query)
    page = request.GET.get('page', 1)  # Текущая страница (по умолчанию 1)
    try:
        page_number = int(page)
    except ValueError:
        page_number = 1
    news_query = 'Новости' + query
    results = search(news_query, page)
    len_results = len(results)
    return render(request, 'search/result.html',
                      {'results': results, 'query': query, 'page': page_number, 'total_results': len_results})

Также для поисковика создана база данных PostgreSQL, в которую записывается текущая сессия при поиске по фото (функция search_by_image в модуле services.py).

1.2 Мессенджер

Главная страница мессенджера

Главная страница мессенджера

Для бэкенда мессенджера MixСhat был создан API с помощью Django REST Framework. Мессенджер использует разные типы API запросов:

  • POST – для отправки сообщений, создания чатов и пользователей.

  • GET – для получения сообщений, чатов и пользователей.

  • PUT – для редактирования сообщений, изменений данных о пользователей или в групповых чатах.

  • DELETE – для удаления сообщений.

Django REST Framework я использовал для ORM моделей, для возможности работы через админ панель и надёжной системы безопасности.

Структура хранения данных мессенджера представлена в виде ORM модели (файл models.py):

import uuid
from django.db import models
from django.contrib.auth.models import AbstractUser

# Create your models here.


class ChatUser(AbstractUser):
    # Кастомная модель пользователя
    id = models.UUIDField(
        primary_key=True,
        default=uuid.uuid4,
        editable=False)
    email = models.EmailField("Электронная почта", unique=True)
    phone = models.CharField("Телефон")
    country_code = models.CharField("Код страны")
    code = models.IntegerField("Код подтверждения", default=0)
    photo = models.ImageField(upload_to='photo/profilephoto/', default='photo/profilephoto/default.png', null=False, blank=True, verbose_name="Фотография")
    date_birth = models.DateField(null=True, blank=True)
    bio = models.CharField(null=True, blank=True)
    fcm_token = models.CharField(null=True, blank=True)
    USERNAME_FIELD = 'email'
    REQUIRED_FIELDS = ['phone', 'country_code']

    def __str__(self):
        return self.username


class Bot(models.Model):
    id = models.UUIDField(
        primary_key=True,
        default=uuid.uuid4,
        editable=False)
    name = models.TextField("Имя")
    photo = models.ImageField(upload_to='photo/profilephoto/', default='photo/profilephoto/default.png', null=False,
                              blank=True, verbose_name="Фотография")

    def __str__(self):
        return self.name


class Chat(models.Model):
    name = models.TextField("Название чата")
    photo = models.ImageField(upload_to='photo/profilephoto/', default='photo/profilephoto/default.png', null=True,
                              blank=True, verbose_name="Фотография чата")
    bio = models.TextField("Описание")
    type = models.TextField("Тип")


class ChatMembership(models.Model):
    user_id = models.UUIDField()
    chat = models.ForeignKey(Chat, on_delete=models.CASCADE, related_name='id_chat')
    user_role = models.TextField("Роль пользователя")

    def __str__(self):
        return f"Membership of chat{self.chat}"


class Channel(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.TextField("Название")


class ChannelMembership(models.Model):
    id = models.AutoField(primary_key=True)
    user = models.ForeignKey(ChatUser, on_delete=models.CASCADE, related_name='id_user')
    chanel = models.ForeignKey(Channel, on_delete=models.CASCADE, related_name='chanel_id')


class Message(models.Model):
    id = models.AutoField(primary_key=True)
    sender_user = models.ForeignKey(ChatUser, verbose_name="Пользователь отправитель", null=True, blank=True, on_delete=models.CASCADE, related_name='sent_messages')
    sender_bot = models.ForeignKey(Bot, verbose_name="Бот отправитель", null=True, blank=True, on_delete=models.CASCADE, related_name='sent_messages')
    content = models.TextField("Текст")
    image = models.ImageField("Картинка", upload_to='messages/images/', null=True, blank=True)
    video = models.FileField("Видео", upload_to='messages/videos/', null=True, blank=True)
    audio = models.FileField("Аудио", upload_to='messages/audios/', null=True, blank=True)
    timestamp = models.DateTimeField("Дата", auto_now_add =True)
    chat = models.ForeignKey(Chat, on_delete=models.CASCADE, related_name='messages')
    is_edit = models.BooleanField("Редактировано?", null=True, blank=True)
    delete_at_home = models.BooleanField("Удалено у себя?", null=True, blank=True)
    id_for_answer = models.IntegerField("Ответное сообщение", null=True, blank=True)
    id_for_transmission = models.IntegerField("Пересланное сообщение (ID)", null=True, blank=True)
    is_forwarded = models.BooleanField("Пересылка?", default=False)
    transmission_content = models.TextField("Пересланное сообщение")

    def __str__(self):
        return f"Message from {self.sender_user} to chat {self.chat.id}"

Все данные мессенджера хранятся в БД PostrgreSQL.

Регистрация (создание) новых пользователей осуществляется через ввод почты пользователя, логина и пароля. Эти данные отправляются на сервер в формате JSON, поэтому для них создан файл serialazers.py.

def send_code_to_email(email, user_name, code):
    send_mail(
        f"{user_name}, ваш код подтверждения",
        f"Mixchat, Код подтверждения: {code}. Никому не передавайте этот код!",
        EMAIL_HOST_USER,
        [email]
    )


class UserSerializer(serializers.ModelSerializer):
    password = serializers.CharField(max_length=128, min_length=8, write_only=True)

    class Meta:
        model = ChatUser
        fields = ['email', 'username', 'password']

    def create(self, validated_data):
        email = validated_data["email"]
        validated_data["is_active"] = False
        code = random.randint(1000, 9999)
        validated_data["code"] = code
        user = ChatUser.objects.create_user(**validated_data)
        send_code_to_email(email, user.username, code)
        return user

Пользователь вносит эти данные, а затем сохраняется в базу данных, но становится неактивным.

Как я пришёл к идее создания системы приложений и разработал поисковик и мессенджер - 3

После этого система предложит ввести код, который пришёл пользователю на введённую почту.

Как я пришёл к идее создания системы приложений и разработал поисковик и мессенджер - 4

Далее, если пользователь вводит правильный код, то он становится активным, если неправильный, то продолжает оставаться неактивным, а через некоторое время удаляется из базы данных.  Удаление пользователя представлено в файле views.py в функции auth_in_chat, которая срабатывает при попадании пользователя на страницу авторизации.

def auth_in_chat(request):
    users = ChatUser.objects.filter(is_active=False).all()
    if users:
        for user in users:
            update_date = datetime.now()
            date_joined = user.date_joined + timedelta(0, 10800)
            if update_date - date_joined.replace(tzinfo=None) >= timedelta(0, 3600):
                user.delete()
    return render(request, 'main/account.html')

Если пользователь есть в базе данных, является активным и вводит верные данные при авторизации, то он входит в систему мессенджера, и ему выдаётся JWT токен. Если хотя бы один из этих пунктов не выполняется, то пользователь не сможет войти в систему.

Для фронтенда в мессенджере использовались HTML (создание шаблонов страниц), CSS (для стиля шаблонов) и JavaScript (для работы с API мессенджера).

В мессенджере есть возможность отправки уведомлений. Про них расскажу подробнее. Для отправки уведомлений я использовал сервис Google Firebase. Под web платформу я создал 2 инструмента (SDK) для возможности отправки уведомлений: клиентский SDK и серверный SDK.

Клиентский SDK я использовал для инициализации клиентского приложения в Google Firebase, создания FCM токена для определённого устройства через тот же Firebase и отправки его на сервер.

import { getMessaging, getToken, deleteToken } from "https://www.gstatic.com/firebasejs/12.10.0/firebase-messaging.js";

const messaging = getMessaging();

getToken(messaging, {vapidKey: 'BISnUtSCVp9xdEpjULSIJVAmSSxDpZyjddQdR7NHlko2tAZNX7apnVL5feKslk1iS71cMQ8xJuH5_lx0O_Yx3UM'}).then((currentToken) => {
  if (currentToken) {
    fetch('/api/save_fcm/', {
       method: 'PUT',
       headers: {
           'Content-Type': 'application/json',
           'Authorization': 'Bearer ' + localStorage.getItem('access_token')
       },
       body: JSON.stringify({"token": currentToken})
   })
  } else {
    console.log('No registration token available. Request permission to generate one.');
  }
}).catch((err) => {
  console.log('An error occurred while retrieving token. ', err);
});

Серверный SDK – также для инициализации приложения, но с серверными данными, для получения FCM токена и отправки уведомления на устройство с определённым FCM токеном.

import os
import firebase_admin
from firebase_admin import credentials, messaging
from dotenv import load_dotenv
from pathlib import Path

load_dotenv()
BASE_DIR = Path(__file__).resolve().parent.parent

cred_path = {
            "type": "service_account",
            "project_id": os.getenv('FIREBASE_PROJECT_ID'),
            "private_key": os.getenv('FIREBASE_PRIVATE_KEY'),
            "client_email": os.getenv('FIREBASE_CLIENT_EMAIL'),
            "private_key_id": os.getenv('FIREBASE_PRIVATE_KEY_ID'),
            "client_id": os.getenv('FIREBASE_CLIENT_ID'),
            "auth_uri": os.getenv('FIREBASE_AUTH_URI'),
            "token_uri": os.getenv('FIREBASE_TOKEN_URI'),
            "auth_provider_x509_cert_url": os.getenv('FIREBASE_AUTH_PROVIDER_X509_CERT_URL'),
            "client_x509_cert_url": os.getenv('FIREBASE_CLIENT_X509_CERT_URL'),
            "universe_domain": os.getenv('FIREBASE_UNIVERSE_DOMAIN')
        }

try:
    # Пытаемся инициализировать Firebase
    cred = credentials.Certificate(cred_path)
    firebase_app = firebase_admin.initialize_app(cred)
    app = firebase_admin.get_app()
    print("Firebase Admin SDK успешно инициализирован!")
except Exception as e:
    print(f"Ошибка инициализации Firebase: {e}")
    firebase_app = None


def send_push_notification(fcm_token, title, body, image, data=None):
    # Отправка push-уведомления через FCM
    try:
        # Создаем сообщение
        message = messaging.Message(
            notification=messaging.Notification(
                title=title,
                body=body,
                image=image
            ),
            data=data or {},
            token=fcm_token,
        )
        # Отправляем
        response = messaging.send(message)
        print("Уведомление отправлено")
        return response
    except Exception as e:
        print(f"Ошибка отправки: {e}")
        return None
      

Для отображения уведомлений на экране я создал Service Worker.

const CACHE_VERSION = 'v1.0.1';
console.log(`[SW] Service Worker версии ${CACHE_VERSION} загружен`);

self.addEventListener('install', (event) => {
  console.log('install event');
  self.skipWaiting();
});

self.addEventListener('activate', (event) => {
  console.log('activate event');
  event.waitUntil(clients.claim());
});

self.addEventListener('message', event => {
    if (event.data.action === 'getVersion') {
        event.source.postMessage({ version: CACHE_VERSION });
    }
});

self.addEventListener('push', function(event) {
  console.log('push event получен!');

  let data = {};
  if (event.data) {
    data = event.data.json();
  }
  const notification = data.notification
  const options = {
    body: notification.body,
    icon: notification.image,
    vibrate: [200, 100, 200, 100, 200, 100, 200],
    tag: "notification-" + Date.now(),

  }

  let promise = self.registration.showNotification(notification.title, options);

  event.waitUntil(promise);
});

Кроме того, в мессенджере есть системный бот MixRobot для взаимодействия с поисковой системой. Пользователь пишет в чат с ботом любое слово, а бот выдаёт ему ответы из поисковика.

Взаимодействие поисковой системы и мессенджера

Взаимодействие поисковой системы и мессенджера

Первоначально бот выдавал пустые ответы. Я понял, что проблема была связана с неправильным форматированием ответов в поисковике. В HTML поисковика я добавил в переменную ссылки значение переменной заголовка. Ранее эти переменные выводились отдельно, и было непонятно, какую логику писать для бота. В итоге теперь через бота стало удобнее собирать ссылки и заголовки и выводить на интерфейс только заголовки с возможность перехода по ссылкам.

HTML файл с ответами поисковика (result.html):

<!DOCTYPE html>
<html lang="ru">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>MixRech</title>
    {% load static %}
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css">
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@flaticon/font-flaticon@1.0.0/css/font-flaticon.css">
    <link rel="stylesheet" href="{% static 'search/css/static.css' %}">
    <link rel="icon" href="{% static 'main/png/favicon.ico' %}">
</head>
<body>
    <div class="header">
        <div class="header-content">
            <h2><a href="{% url 'home' %}" style="text-decoration: none;">MixRech</a></h2>
            <form method="get" action="{% url 'search' %}" class="search-form">
                <div class="search-input-wrapper">
                    <input type="text" name="query" value="{{ query }}" placeholder="Введите поисковый запрос или URL" aria-label="Поиск">
                </div>
            </form>
            <div class="category-buttons">
                <a href="{% url 'search' %}?query={{ query }}" class="category-btn {% if request.path == '/search' %}active{% endif %}">
                    Все
                </a>
                <a href="{% url 'images' %}?query={{ query }}" class="category-btn {% if request.path == '/images' %}active{% endif %}">
                    Картинки
                </a>
                <a href="{% url 'videos' %}?query={{ query }}" class="category-btn {% if request.path == '/videos' %}active{% endif %}">
                    Видео
                </a>
                <a href="{% url 'news' %}?query={{ query }}" class="category-btn {% if request.path == '/news' %}active{% endif %}">
                    Новости
                </a>
            </div>
        </div>
    </div>
    <div class="results-container">
        {% if results %}
            {% for result in results %}
            <div class="result-card">
                <div class="result-title">
                    {% if result.favicon_url %}
                    <img src="{{ result.favicon_url }}" alt="" onerror="this.style.display='none'">
                    {% endif %}
                    <a href="{{ result.url }}">{{ result.title }}</a>
                </div>
                <div class="result-url">
                    <a href="{{ result.url }}">
                        <svg class="icon" viewBox="0 0 24 24" style="width: 14px; height: 14px;">
                            <circle cx="12" cy="12" r="3" stroke="currentColor" fill="none"/>
                            <path d="M19.4 15a8 8 0 00-14.8 0M5 9a8 8 0 0114 0" stroke="currentColor"/>
                        </svg>
                        {{ result.url|truncatechars:60 }}
                    </a>
                </div>
            </div>
            {% endfor %}
        {% else %}
            <li>Нет результатов.</li>
        {% endif %}
        <div class="pagination">
            {% if page > 1 %}
                <a href="?query={{ query }}&page={{ page|add:-1 }}">
                    <svg class="icon" viewBox="0 0 24 24" style="width: 18px; height: 18px;">
                        <path d="M19 12H5M12 19l-7-7 7-7" stroke="currentColor" stroke-width="2"/>
                    </svg>
                    Предыдущая
                </a>
                {% endif %}

                <span class="page-info">Страница {{ page }}</span>

                <a href="?query={{ query }}&page={{ page|add:1 }}">
                    Следующая
                    <svg class="icon" viewBox="0 0 24 24" style="width: 18px; height: 18px;">
                        <path d="M5 12h14M12 5l7 7-7 7" stroke="currentColor" stroke-width="2"/>
                    </svg>
                </a>
        </div>
    </div>
</body>

</html>

Функция для работы с ботом MixRobot - microservice_functions.py:

import json
import requests
from bs4 import BeautifulSoup


def mixrech(query):
    # Функция для взаимодействия с поисковой системой
    url = f'https://mixrech.com/search/?query={query}'

    response = requests.get(url)
    if response.status_code == 200:
        print("Успешный запрос")
        text_html = response.text
        soup = BeautifulSoup(text_html, 'html.parser')  # Парсим HTML-код
        links = [item.a for item in soup.find_all('div', class_='result-url')]
        titles = [item.text.strip() for item in soup.find_all('div', class_='result-title')]
        hrefs = [item.get('href') for item in links]
        #print(titles)
        #print(hrefs)
        return json.dumps(dict(zip(titles, hrefs)), ensure_ascii=False)

    else:
        print(f"Ошибка: {response.status_code}")
        print(response.text)

2. Серверные технологии

Схема работы приложения через модель "клиент-сервер"

Схема работы приложения через модель "клиент-сервер"

На схеме представлена работа приложений через модель "клиент-сервер". Пользователь в браузере взаимодействует с приложениями через web сервер Nginx. Также каждое приложение взаимодействует с соответствующей с ним базой данных для работы с данными приложений.

Для массового пользователя приложения помещены в облачный сервер (виртуальную машину) от TimeWeb Cloud на базе Ubuntu.

Система приложений работает с помощью Docker-Compose, в Docker файлы помещены оба приложения и Nginx.

Для удобства обращения к приложениям по HTTP и повышения доверия реализована возможность запускать поисковик с домена (mixrech.com), а мессенджер с поддомена (chat.mixrech.com), а также созданы DNS записи для связи доменов с IP адресом сервера.

Чтобы обеспечить защищённое соединение, зашифровать передачу данных и защитить информацию от перехвата на сервер установлен SSL wildcard сертификат для домена и поддомена, и теперь приложения работают по HTTPS.

Nginx настроен на:

  • работу приложений от доменов,

  • SSL соединение и SSL шифрование,

  • раздачу статических и медиа файлов,

Также в Nginx увеличен размер видеофайлов и работает как обратный прокси.

Для работы с микроконтроллером на сервер установлен MQTT брокер mosquitto.

2.1 CI/CD

Кодовые файлы поисковика и мессенджера лежат в GitHub репозитории, а для автоматизации сборки и развёртывания приложения применён CI/CD. При загрузке и/или обновлении файлов на Github, информация автоматически отправляется на Github Actions, где выполняется работа по деплою проекта. Сам процесс CI/CD работает с помощью .github/workflows/action.yml

name: Deploy to TimeWeb Cloud

on:
   push:
      branches:
         - main

jobs:
     deploy:
       runs-on: ubuntu-latest

       steps:
       - name: Checkout code
         uses: actions/checkout@v2

       - name: Debug
         run: echo "SSH_PRIVATE_KEY=${{ secrets.SSH_PRIVATE_KEY }}"

       - name: Set up SSH
         uses: webfactory/ssh-agent@v0.9.0
         with:
           ssh-private-key: ${{ secrets.SSH_PRIVATE_KEY }}

       - name: Add server to known_hosts
         run: |
           mkdir -p ~/.ssh
           ssh-keyscan -H ${{ secrets.IP_ADRESS }} >> ~/.ssh/known_hosts

       - name: Add SSH key to agent
         run: echo "${{ secrets.SSH_PRIVATE_KEY }}" | ssh-add -

       - name: Deploy to server
         run: |
           ssh ${{ secrets.USERNAME }}@${{ secrets.IP_ADRESS }} "cd /root/browser && git pull && docker-compose down && docker-compose up --build -d && docker cp browser_mixrech_1:/mixrech/staticfiles ./mixrech && docker cp browser_mixchat_1:/mixchat/staticfiles ./mixchat"
           

     logs:
       runs-on: ubuntu-latest
       needs: deploy
       
       steps:
       - name: Set up SSH
         uses: webfactory/ssh-agent@v0.9.0
         with:
           ssh-private-key: ${{ secrets.SSH_PRIVATE_KEY }}

       - name: Add server to known_hosts
         run: |
           mkdir -p ~/.ssh
           ssh-keyscan -H ${{ secrets.IP_ADRESS }} >> ~/.ssh/known_hosts
          
       - name: Fetch logs
         run: |
           ssh ${{ secrets.USERNAME }}@${{ secrets.IP_ADRESS }} "cd /root/browser && git pull && docker-compose logs -f"

Здесь представлены 2 работы:

- Сам деплой проекта.

- Вывод логов.

Изначально после запуска приложения на сервере не работал JavaScript, не загружались картинки, и стили не были применены. Я понял, что во время деплоя не собрались статические файлы, поэтому при написании скриптов в actions.yml я добавил вручную копирование статических файлов из хоста в контейнер, что решило проблему.

Github связан с сервером через SSH ключ. Этапы деплоя:

  • Github пытается связаться с сервером через SSH и подключается к нему.

  • Происходит обновление файлов приложения на сервере через git pull, затем начинаются сборка docker образов и запуск контейнеров поисковика, мессенджера и Nginx.

  • Приложения начинают работать на глобальном сервере.

3. IoT технологии

Схема взаимодействия мессенджера с микроконтроллером

Схема взаимодействия мессенджера с микроконтроллером

Здесь пойдёт речь о том, как я настроил взаимодействие мессенджера с микроконтроллером.

На схеме представлено взаимодействие мессенджера с микроконтроллером ESP32. Взаимодействие происходит через системного бота SmartMix. Связующем узлом между ESP32 и мессенджером выступает MQTT сервер, который размещён на виртуальной машине. На ESP32 есть встроенный светодиод и подключается датчик температуры DS18B20. Эти компоненты управляются через бота в мессенджере по MQTT.

В качестве микроконтроллера я выбрал ESP32, установил для него драйвер и прошивку.

Далее настроил среду разработки PyCharm для MicroPython.

Подключил датчик температуры DS18B20 к ESP32. Написал логику микроконтроллера, где происходит включение/выключение светодиода и сбор данных с датчика температуры. Данные с датчика считываются по протоколу OneWire , а светодиод управляется через GPIO.

После чего я настроил микроконтроллер на автономную работу:

  • настроил прошивку устройства так, чтобы при включении питания не требовалось запускать программу из консоли,

  • реализовал подключение к Wi-Fi сети при включении питания для независимой работы,

  • обеспечил работу от внешнего блока питания (без USB-подключения к ПК).

Далее была интеграция ESP32 с мессенджером. Для управления микроконтроллером существует бот SmartMix. С него отправляются команды на MQTT сервер, откуда ESP32 принимает команды со SmartMix и в зависимости от конкретной команды, происходит включение/выключение светодиода/сбор данных с датчика. Данные с датчика затем обратно поступают на MQTT, откуда мессенджер записывает полученную температуру в базу данных PostgreSQL и выводит её на интерфейс приложения.

Изначально микроконтроллер не мог работать автономно, и при отправке команд с мессенджера ничего не происходило. Я понял и пришёл к выводу, что многие Wi-FI роутеры имеют 2 точки доступа. Я подключился к другой точке доступа с меньшим диапазоном частот. В итоге подключение проходило быстро, микроконтроллер заработал и начал запускать светодиод и датчик.

Таким образом полностью создан функционал макета умного дома и организовано взаимодействие мессенджера с макетом умного дома.

SEO оптимизация

Для поисковика и мессенджера настроена SEO оптимизация в Google Search Console и Яндекс Метрике для продвижения приложений на верхнюю строчку в поисковиках по ключевым словам.

Мобильные версии

Также для поисковика и мессенджера созданы мобильные версии для Android.

Мобильные версии разрабатывались в Android Studio на языке Java.

Для функциональной части оба приложения используют API вэб версий, а дизайн создан кастомный под удобство пользователя, в мессенджере ещё с возможностью изменения цветовых тем и иконки приложения мессенджера. Мобильные приложения имеют тот же функционал, что и их вэб версии, а также эксклюзивно для Android мессенджер имеет возможность записывать видеокружки.

Касаемо уведомлений, то был также использован Google Firebase, но для Android приложения и с другими конфигурациями клиентских и серверных SDK. Принцип работы SDK такой же, как и на web версиях.

Заключение

Таким образом получилась разносторонняя система с хорошим набором функционала и микросервисным взаимодействием как между двумя вэб приложениями, так и между вэб приложением и микроконтроллером.

Проект делается силами двух энтузиастов, которые по мере работы обучаются новым навыкам и разбираются в различных аспектах разработки.

Ссылка на Github репозиторий проекта

Автор: Mikhail-24

Источник


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js