# Завершаем сессию Spark
spark.stop()
```
Объяснение:
– `groupBy` позволяет сгруппировать данные по столбцу.
– `avg` вычисляет среднее значение для каждой группы.
Задача 5: Сортировка больших данных
Описание: У вас есть файл с информацией о транзакциях. Необходимо отсортировать данные по дате транзакции и сохранить результат в новый файл.
Решение:
```python
from pyspark.sql import SparkSession
# Создаем сессию Spark
spark = SparkSession.builder.appName("SortTransactions").getOrCreate()
# Загрузка данных
df = spark.read.csv('transactions_large.csv', header=True, inferSchema=True)
# Сортировка данных по дате
sorted_df = df.orderBy('transaction_date')
# Сохранение отсортированных данных в новый файл
sorted_df.write.csv('sorted_transactions', header=True, mode='overwrite')
print("Данные отсортированы и сохранены.")
# Завершаем сессию Spark
spark.stop()
```
Объяснение:
– `orderBy` сортирует данные по указанному столбцу.
– `write.csv` сохраняет результат в новом файле.
Эти задачи демонстрируют, как использовать Dask и PySpark для работы с большими объемами данных.
– Dask подходит для локальных задач и интеграции с Python-библиотеками.
– PySpark эффективен для кластерной обработки данных и интеграции с экосистемой Hadoop.
Обе библиотеки упрощают решение задач, которые сложно выполнить традиционными методами из-за ограничений памяти или мощности процессора.
Apache Kafka – это мощная платформа для обработки потоков данных в реальном времени. Она широко используется для обработки и анализа событий, поступающих из различных источников, таких как веб-серверы, базы данных, датчики IoT, системы мониторинга и многое другое. Kafka обеспечивает высокую производительность, надежность и масштабируемость, что делает её одним из лучших инструментов для потоковой обработки данных.
В основе Apache Kafka лежат несколько ключевых компонентов:
1. Брокеры – серверы, которые принимают, хранят и доставляют данные.
2. Топики – логические каналы, через которые данные передаются.
3. Продюсеры – приложения или устройства, которые отправляют данные в Kafka.
4. Консьюмеры – приложения, которые получают данные из Kafka.
Kafka организует поток данных в виде последовательностей сообщений. Сообщения записываются в топики и разделяются на партиции, что позволяет обрабатывать данные параллельно.
Пример потока данных
Представим, что у нас есть система интернет-магазина, где Kafka используется для обработки событий, таких как заказы, клики на странице, добавление товаров в корзину и платежи. Каждое из этих событий записывается в топик Kafka. Например, топик `orders` может содержать события, описывающие новые заказы.
Установка и настройка Apache Kafka
Перед началом работы убедитесь, что Kafka установлена. Для локальной работы используйте официальные сборки Kafka с сайта [Apache Kafka](https://kafka.apache.org/).
1. Установите Kafka и запустите ZooKeeper, необходимый для управления брокерами.
2. Запустите Kafka-брокер.
3. Создайте топик с помощью команды:
```bash
bin/kafka-topics.sh –create –topic orders –bootstrap-server localhost:9092 –partitions 3 –replication-factor 1
```
Отправка данных в Kafka
Теперь создадим простого продюсера на Python, который будет отправлять данные в топик `orders`. Для работы с Kafka на Python используется библиотека `confluent-kafka`. Установите её с помощью команды:
```bash
pip install confluent-kafka
```
Пример кода, который отправляет сообщения в топик:
```python
from confluent_kafka import Producer
import json
import time
# Настройки продюсера
producer_config = {