broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'transaction-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['transactions'])

# Словарь для хранения сумм по пользователям

user_totals = defaultdict(float)

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

transaction = json.loads(msg.value().decode('utf-8'))

# Обновляем сумму для пользователя

user_id = transaction['user_id']

user_totals[user_id] += transaction['amount']

# Вывод текущих сумм

print(f"Текущая сумма транзакций по пользователям: {dict(user_totals)}")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает данные из топика `transactions`.

– Для каждого пользователя обновляется сумма его транзакций в словаре `user_totals`.

– Программа выводит текущие суммы по всем пользователям.


Задача 4: Сохранение обработанных данных в файл

Описание:

Топик `sensor_data` содержит данные с датчиков IoT:

– `sensor_id` – идентификатор датчика.

– `temperature` – измеренная температура.

– `timestamp` – время измерения.

Ваша задача: написать программу, которая сохраняет все данные о температуре выше 30°C в файл `high_temp.json`.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'sensor-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['sensor_data'])

# Открываем файл для записи

with open('high_temp.json', 'w') as outfile:

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

sensor_data = json.loads(msg.value().decode('utf-8'))

# Сохраняем данные, если температура выше 30°C

if sensor_data['temperature'] > 30:

json.dump(sensor_data, outfile)

outfile.write('\n') # Новый ряд для каждого объекта

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает данные из топика `sensor_data`.

– Данные с температурой выше 30°C записываются в файл `high_temp.json`.


Задача 5: Обнаружение аномалий в данных

Описание:

В топик `temperature_readings` поступают данные о температуре из различных городов:

– `city` – название города.

– `temperature` – измеренная температура.

– `timestamp` – время измерения.

Ваша задача: написать программу, которая будет находить и выводить аномалии – случаи, когда температура превышает 40°C или опускается ниже -10°C.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'temperature-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['temperature_readings'])

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

reading = json.loads(msg.value().decode('utf-8'))

# Проверяем на аномалии

if reading['temperature'] > 40 or reading['temperature'] < -10:

print(f"Аномалия! Город: {reading['city']}, Температура: {reading['temperature']}°C")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает данные о температуре из топика.

– Если температура выходит за пределы нормального диапазона, программа выводит сообщение об аномалии.


Задача 6: Потоковое объединение данных

Описание:

Есть два топика:

1. `orders` – содержит данные о заказах: `order_id`, `product_id`, `quantity`.