Решение:

```python

from confluent_kafka import Consumer, Producer

import json

from datetime import datetime, timedelta

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'user-actions-group',

'auto.offset.reset': 'earliest'

})

producer = Producer({'bootstrap.servers': broker})

consumer.subscribe(['user_actions'])

# Словарь для отслеживания пользователей

user_login_time = {}

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

action = json.loads(msg.value().decode('utf-8'))

user_id = action['user_id']

action_type = action['action']

timestamp = datetime.fromisoformat(action['timestamp'])

if action_type == 'login':

user_login_time[user_id] = timestamp

elif action_type == 'purchase' and user_id in user_login_time:

del user_login_time[user_id]

# Проверяем, прошло ли 10 минут

current_time = datetime.now()

for user, login_time in list(user_login_time.items()):

if current_time – login_time > timedelta(minutes=10):

notification = {'user_id': user, 'message': 'Сделайте покупку!'}

producer.produce('notifications', value=json.dumps(notification))

print(f"Уведомление отправлено для пользователя {user}")

del user_login_time[user]

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Время входа пользователей сохраняется в словаре.

– Если с момента входа прошло более 10 минут и покупка не совершена, генерируется уведомление.

Эти задачи показывают, как использовать Apache Kafka для решения реальных задач, таких как фильтрация событий, подсчет статистики, агрегация данных и сохранение обработанной информации. Эти примеры помогут вам освоить основные подходы к работе с потоками данных в реальном времени.


1.3 Работа с базами данных: SQLAlchemy и интеграция с Pandas

SQLAlchemy – это мощная библиотека для работы с базами данных в Python. Она предоставляет инструменты для удобного взаимодействия с реляционными базами данных через ORM (Object Relational Mapping) или с использованием чистого SQL.

Pandas же идеально подходит для анализа данных, но иногда данные, которые мы хотим обработать, хранятся в базах данных. Для этого SQLAlchemy и Pandas можно эффективно интегрировать, чтобы выгружать данные из базы, обрабатывать их в Pandas и сохранять обратно.


Установка и подключение

Для начала работы установите библиотеку SQLAlchemy:

```bash

pip install sqlalchemy

```

Если вы используете SQLite, дополнительных действий не требуется. Для других баз данных, таких как PostgreSQL или MySQL, также потребуется установить драйверы, например:

```bash

pip install psycopg2 # Для PostgreSQL

pip install pymysql # Для MySQL

```

Создайте подключение к базе данных с помощью SQLAlchemy. Например, для SQLite это будет выглядеть так:

```python

from sqlalchemy import create_engine

# Создаем подключение к базе данных SQLite

engine = create_engine('sqlite:///example.db', echo=True)

```

Здесь `echo=True` означает, что в консоль будут выводиться SQL-запросы, выполняемые через SQLAlchemy, что полезно для отладки.


Создание таблиц и работа с ORM

SQLAlchemy поддерживает два основных подхода: работа через ORM и использование SQL-запросов напрямую. Рассмотрим оба.

Создадим таблицу для хранения информации о пользователях:

```python

from sqlalchemy import Table, Column, Integer, String, MetaData

# Создаем метаданные

metadata = MetaData()

# Определяем таблицу

users = Table(

'users', metadata,

Column('id', Integer, primary_key=True),

Column('name', String),

Column('age', Integer),