[Book][BooNeylson Crepaldek] Big Data on Kubernetes [ENG, 2024]
https://github.com/webmakaka/Bigdata-on-Kubernetes
Part 2: Big Data Stack
04. The Modern Data Stack
05. Big Data Processing with Apache Spark
Делаю:
2025.04.12
$ java --version
openjdk 11.0.26 2025-01-21
$ vi /etc/hosts
127.0.0.1 postgres
$ export PYTHON_VERSION=3.8.12
$ export PROJECT_NAME=big_data
$ pip install pyspark
$ spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.5
/_/
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.26
$ git clone [email protected]:webmakaka/Bigdata-on-Kubernetes.git
$ pip install jupyterlab
$ jupyter lab
$ cd Bigdata-on-Kubernetes/Chapter05/
$ pip install jupyterlab
$ pip install requests
$ cd Bigdata-on-Kubernetes/Chapter05/\
$ python get_titanic_data.py
$ python get_imdb_data.py
$ jupyter lab
run -> read_titanic_dataset.ipynb
run -> analyzing_imdb_data.ipynb
http://localhost:4040/jobs/
06. Building Pipelines with Apache Airflow
Install the Astro CLI
https://docs.astronomer.io/astro/cli/install-cli
$ curl -sSL install.astronomer.io | sudo bash -s
$ mkdir airflow
$ cd airflow/
$ astro dev init
$ astro dev start
// admin / admin
http://localhost:8080
$ astro dev kill
$ cd Chapter06/dags
$ cp ./* airflow/dags/
$ astro dev start
Имеет 2 DAG
- Демо
- Записывает данные в базу postgres и в облако AWS (если у вас есть такая возможность)
При старте ошибка DAG, нужно Airflow -> Admin -> Variables. Добавить переменные см. по коду какие нужны.
При старте airflow поднимается база postgres, к которой можно подключиться
// postgres / postgres // localhost
Airflow -> Admin -> Connections
$ astro dev kill
07. Apache Kafka for Real-Time Events and Data Ingestion
$ cd Chapter07/multinode
$ docker-compose up -d
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
9d154c46ff2e confluentinc/cp-kafka:7.6.0 "/etc/confluent/dock…" 2 minutes ago Up 2 minutes multinode-kafka-1-1
eeb748d35702 confluentinc/cp-kafka:7.6.0 "/etc/confluent/dock…" 2 minutes ago Up 2 minutes multinode-kafka-3-1
d949e54e0ecc confluentinc/cp-kafka:7.6.0 "/etc/confluent/dock…" 2 minutes ago Up 2 minutes multinode-kafka-2-1
7151f56d6d40 confluentinc/cp-zookeeper:7.6.0 "/etc/confluent/dock…" 2 minutes ago Up 2 minutes multinode-zookeeper-2-1
d1ab8513b73d confluentinc/cp-zookeeper:7.6.0 "/etc/confluent/dock…" 2 minutes ago Up 2 minutes multinode-zookeeper-3-1
f2936115277b confluentinc/cp-zookeeper:7.6.0 "/etc/confluent/dock…" 2 minutes ago Up 2 minutes multinode-zookeeper-1-1
$ docker logs multinode-kafka-1-1
$ CONTAINER_NAME=multinode-kafka-1-1
$ docker exec -it $CONTAINER_NAME bash
$ BOOTSTRAP_SERVER=localhost:19092
$ TOPIC=mytopic
$ GROUP=mygroup
$ kafka-topics --create --bootstrap-server $BOOTSTRAP_SERVER --replication-factor 3 --partitions 3 --topic $TOPIC
$ kafka-topics --list --bootstrap-server $BOOTSTRAP_SERVER
$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --describe --topic $TOPIC
Topic: mytopic TopicId: FDSRMlR1SGaDzIhi5x2fEQ PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: mytopic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: mytopic Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: mytopic Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
$ kafka-console-producer --broker-list $BOOTSTRAP_SERVER --topic $TOPIC
+1 Terminal
$ CONTAINER_NAME=multinode-kafka-1-1
$ docker exec -it $CONTAINER_NAME bash
$ BOOTSTRAP_SERVER=localhost:19092
$ TOPIC=mytopic
$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic $TOPIC --from-beginning
$ docker-compose down
Streaming from a database with Kafka Connect
$ cd Chapter07/connect/kafka-connect-custom-image
$ cd kafka-connect-custom-image
$ docker build -t connect-custom:1.0.0 .
$ cd ../
$ vi .env_kafka_connect
$ docker-compose up -d
$ export PROJECT_NAME=big_data
$ source ${PYENV_ROOT}/versions/${PROJECT_NAME}-env/bin/activate
// Добавить данные в базу postgres
$ cd simulations
$ pip install -r ./simulations/requirements.txt
$ python simulations/make_fake_data.py
// Завершить спустя какое-то количество
$ ^C
// OK! Данные добавляются
SQL> SELECT * FROM "public"."customers"
// Создаем топик json-customers
$ docker-compose exec broker kafka-topics --create --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1 --topic json-customers
// register the connectors
$ curl -X POST -H "Content-Type: application/json" --data @connectors/connect_jdbc_pg_json.config localhost:8083/connectors | jq
// Пропустим пока AWS
// $ curl -X POST -H "Content-Type: application/json" --data @connectors/connect_s3_sink.config localhost:8083/connectors
{
"name": "pg-connector-json",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"mode": "timestamp",
"timestamp.column.name": "dt_update",
"table.whitelist": "public.customers",
"topic.prefix": "json-",
"validate.non.null": "false",
"poll.interval.ms": "500",
"name": "pg-connector-json"
},
"tasks": [],
"type": "source"
}
$ curl -s localhost:8083/connectors | jq
[
"pg-connector-json"
]
$ docker logs connect
// Прверка
// You should see the messages in JSON format printed on the screen
$ docker exec -it broker bash
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic json-customers --from-beginning
Ничего не появилось!
Real-time data processing with Kafka and Spark
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 processing/consume_from_kafka.py
// Open another terminal and run more simulations by running the following command
$ python simulations/make_fake_data.py
Ничего не отработало!
$ docker-compose down