[Book][BooNeylson Crepaldek] Big Data on Kubernetes [ENG, 2024]


https://github.com/webmakaka/Bigdata-on-Kubernetes


Part 2: Big Data Stack

04. The Modern Data Stack

05. Big Data Processing with Apache Spark


Делаю:
2025.04.12


$ java --version
openjdk 11.0.26 2025-01-21


$ vi /etc/hosts
127.0.0.1 postgres


$ export PYTHON_VERSION=3.8.12
$ export PROJECT_NAME=big_data


$ pip install pyspark
$ spark-submit --version


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.5
      /_/
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.26


$ git clone [email protected]:webmakaka/Bigdata-on-Kubernetes.git


$ pip install jupyterlab
$ jupyter lab


$ cd Bigdata-on-Kubernetes/Chapter05/


$ pip install jupyterlab
$ pip install requests


$ cd Bigdata-on-Kubernetes/Chapter05/\
$ python get_titanic_data.py
$ python get_imdb_data.py


$ jupyter lab


run -> read_titanic_dataset.ipynb
run -> analyzing_imdb_data.ipynb


http://localhost:4040/jobs/


06. Building Pipelines with Apache Airflow

Install the Astro CLI

https://docs.astronomer.io/astro/cli/install-cli


$ curl -sSL install.astronomer.io | sudo bash -s


$ mkdir airflow
$ cd airflow/

$ astro dev init
$ astro dev start


// admin / admin
http://localhost:8080


$ astro dev kill


$ cd Chapter06/dags
$ cp ./* airflow/dags/
$ astro dev start

Имеет 2 DAG

  1. Демо
  2. Записывает данные в базу postgres и в облако AWS (если у вас есть такая возможность)


При старте ошибка DAG, нужно Airflow -> Admin -> Variables. Добавить переменные см. по коду какие нужны.


При старте airflow поднимается база postgres, к которой можно подключиться

// postgres / postgres // localhost


Airflow -> Admin -> Connections


$ astro dev kill


07. Apache Kafka for Real-Time Events and Data Ingestion

$ cd Chapter07/multinode


$ docker-compose up -d


$ docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED         STATUS         PORTS     NAMES
9d154c46ff2e   confluentinc/cp-kafka:7.6.0       "/etc/confluent/dock…"   2 minutes ago   Up 2 minutes             multinode-kafka-1-1
eeb748d35702   confluentinc/cp-kafka:7.6.0       "/etc/confluent/dock…"   2 minutes ago   Up 2 minutes             multinode-kafka-3-1
d949e54e0ecc   confluentinc/cp-kafka:7.6.0       "/etc/confluent/dock…"   2 minutes ago   Up 2 minutes             multinode-kafka-2-1
7151f56d6d40   confluentinc/cp-zookeeper:7.6.0   "/etc/confluent/dock…"   2 minutes ago   Up 2 minutes             multinode-zookeeper-2-1
d1ab8513b73d   confluentinc/cp-zookeeper:7.6.0   "/etc/confluent/dock…"   2 minutes ago   Up 2 minutes             multinode-zookeeper-3-1
f2936115277b   confluentinc/cp-zookeeper:7.6.0   "/etc/confluent/dock…"   2 minutes ago   Up 2 minutes             multinode-zookeeper-1-1


$ docker logs multinode-kafka-1-1


$ CONTAINER_NAME=multinode-kafka-1-1
$ docker exec -it $CONTAINER_NAME bash


$ BOOTSTRAP_SERVER=localhost:19092
$ TOPIC=mytopic
$ GROUP=mygroup


$ kafka-topics --create --bootstrap-server $BOOTSTRAP_SERVER --replication-factor 3 --partitions 3 --topic $TOPIC


$ kafka-topics --list --bootstrap-server $BOOTSTRAP_SERVER


$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --describe --topic $TOPIC


Topic: mytopic	TopicId: FDSRMlR1SGaDzIhi5x2fEQ	PartitionCount: 3	ReplicationFactor: 3	Configs:
	Topic: mytopic	Partition: 0	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
	Topic: mytopic	Partition: 1	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: mytopic	Partition: 2	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1


$ kafka-console-producer --broker-list $BOOTSTRAP_SERVER --topic $TOPIC


+1 Terminal

$ CONTAINER_NAME=multinode-kafka-1-1
$ docker exec -it $CONTAINER_NAME bash


$ BOOTSTRAP_SERVER=localhost:19092
$ TOPIC=mytopic


$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic $TOPIC --from-beginning


$ docker-compose down


Streaming from a database with Kafka Connect

$ cd Chapter07/connect/kafka-connect-custom-image
$ cd kafka-connect-custom-image
$ docker build -t connect-custom:1.0.0 .
$ cd ../


$ vi .env_kafka_connect


$ docker-compose up -d


$ export PROJECT_NAME=big_data
$ source ${PYENV_ROOT}/versions/${PROJECT_NAME}-env/bin/activate


// Добавить данные в базу postgres
$ cd simulations
$ pip install -r ./simulations/requirements.txt
$ python simulations/make_fake_data.py

// Завершить спустя какое-то количество
$ ^C


// OK! Данные добавляются
SQL> SELECT * FROM "public"."customers"


// Создаем топик json-customers
$ docker-compose exec broker kafka-topics --create --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1 --topic json-customers


// register the connectors
$ curl -X POST -H "Content-Type: application/json" --data @connectors/connect_jdbc_pg_json.config localhost:8083/connectors | jq


// Пропустим пока AWS
// $ curl -X POST -H "Content-Type: application/json" --data @connectors/connect_s3_sink.config localhost:8083/connectors


{
  "name": "pg-connector-json",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "postgres",
    "mode": "timestamp",
    "timestamp.column.name": "dt_update",
    "table.whitelist": "public.customers",
    "topic.prefix": "json-",
    "validate.non.null": "false",
    "poll.interval.ms": "500",
    "name": "pg-connector-json"
  },
  "tasks": [],
  "type": "source"
}


$ curl -s localhost:8083/connectors | jq


[
  "pg-connector-json"
]


$ docker logs connect


// Прверка
// You should see the messages in JSON format printed on the screen
$ docker exec -it broker bash
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic json-customers --from-beginning


Ничего не появилось!


Real-time data processing with Kafka and Spark


$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 processing/consume_from_kafka.py


// Open another terminal and run more simulations by running the following command
$ python simulations/make_fake_data.py


Ничего не отработало!


$ docker-compose down