# Завершаем сессию Spark

spark.stop()

```

Объяснение:

– `groupBy` позволяет сгруппировать данные по столбцу.

– `avg` вычисляет среднее значение для каждой группы.


Задача 5: Сортировка больших данных

Описание: У вас есть файл с информацией о транзакциях. Необходимо отсортировать данные по дате транзакции и сохранить результат в новый файл.

Решение:

```python

from pyspark.sql import SparkSession

# Создаем сессию Spark

spark = SparkSession.builder.appName("SortTransactions").getOrCreate()

# Загрузка данных

df = spark.read.csv('transactions_large.csv', header=True, inferSchema=True)

# Сортировка данных по дате

sorted_df = df.orderBy('transaction_date')

# Сохранение отсортированных данных в новый файл

sorted_df.write.csv('sorted_transactions', header=True, mode='overwrite')

print("Данные отсортированы и сохранены.")

# Завершаем сессию Spark

spark.stop()

```

Объяснение:

– `orderBy` сортирует данные по указанному столбцу.

– `write.csv` сохраняет результат в новом файле.

Эти задачи демонстрируют, как использовать Dask и PySpark для работы с большими объемами данных.

– Dask подходит для локальных задач и интеграции с Python-библиотеками.

– PySpark эффективен для кластерной обработки данных и интеграции с экосистемой Hadoop.

Обе библиотеки упрощают решение задач, которые сложно выполнить традиционными методами из-за ограничений памяти или мощности процессора.


1.2 Потоковая обработка данных с Apache Kafka

Apache Kafka – это мощная платформа для обработки потоков данных в реальном времени. Она широко используется для обработки и анализа событий, поступающих из различных источников, таких как веб-серверы, базы данных, датчики IoT, системы мониторинга и многое другое. Kafka обеспечивает высокую производительность, надежность и масштабируемость, что делает её одним из лучших инструментов для потоковой обработки данных.

В основе Apache Kafka лежат несколько ключевых компонентов:

1. Брокеры – серверы, которые принимают, хранят и доставляют данные.

2. Топики – логические каналы, через которые данные передаются.

3. Продюсеры – приложения или устройства, которые отправляют данные в Kafka.

4. Консьюмеры – приложения, которые получают данные из Kafka.

Kafka организует поток данных в виде последовательностей сообщений. Сообщения записываются в топики и разделяются на партиции, что позволяет обрабатывать данные параллельно.

Пример потока данных

Представим, что у нас есть система интернет-магазина, где Kafka используется для обработки событий, таких как заказы, клики на странице, добавление товаров в корзину и платежи. Каждое из этих событий записывается в топик Kafka. Например, топик `orders` может содержать события, описывающие новые заказы.


Установка и настройка Apache Kafka

Перед началом работы убедитесь, что Kafka установлена. Для локальной работы используйте официальные сборки Kafka с сайта [Apache Kafka](https://kafka.apache.org/).

1. Установите Kafka и запустите ZooKeeper, необходимый для управления брокерами.

2. Запустите Kafka-брокер.

3. Создайте топик с помощью команды:

```bash

bin/kafka-topics.sh –create –topic orders –bootstrap-server localhost:9092 –partitions 3 –replication-factor 1

```


Отправка данных в Kafka

Теперь создадим простого продюсера на Python, который будет отправлять данные в топик `orders`. Для работы с Kafka на Python используется библиотека `confluent-kafka`. Установите её с помощью команды:

```bash

pip install confluent-kafka

```

Пример кода, который отправляет сообщения в топик:

```python

from confluent_kafka import Producer

import json

import time

# Настройки продюсера

producer_config = {