Spark Session là gì

Các nguồn đầu vào phát trực tuyến có cấu trúc của Apache Spark [2/6]

Bắt đầu phát trực tuyến Spark với các nguồn đầu vào Rate, Socket, File và Kafka

Ảnh của Viktor Talashuk trên Unsplash

Trong phần 1 , chúng ta đã thảo luận tổng quan về Apache Spark ️ Structured Streaming với một ví dụ rất cơ bản sử dụng ratenguồn đầu vào. Trong blog này, chúng tôi thảo luận chi tiết về một số nguồn đầu vào khác bằng cách sử dụng các ví dụ.

Bạn cũng có thể quan tâm đến một số bài viết trước đây của tôi trên Apache Spark.

  • Apache Spark Structured Streaming - First Streaming Ví dụ [Phần 1 của loạt bài này]
  • Đi sâu vào các chức năng DateTime của Apache Spark
  • Làm việc với JSON trong Apache Spark
  • Đi sâu vào các chức năng của Apache Spark Window
  • Đi sâu vào các hàm của Apache Spark Array
  • Bắt đầu hành trình của bạn với Apache Spark

Spark Streaming nhập dữ liệu từ các loại nguồn đầu vào khác nhau để xử lý trong thời gian thực.

  • Tỷ lệ [để Thử nghiệm]: Nó sẽ tự động tạo ra dữ liệu bao gồm 2 cột timestampvà value. Điều này thường được sử dụng cho mục đích thử nghiệm. Chúng tôi đã chứng minh điều này trong phần 1 của loạt bài này .
  • Socket [để thử nghiệm]: Nguồn dữ liệu này sẽ lắng nghe socket được chỉ định và nhập bất kỳ dữ liệu nào vào Spark Streaming. Nó cũng chỉ được sử dụng cho mục đích thử nghiệm.
  • Tệp : Điều này sẽ lắng nghe một thư mục cụ thể dưới dạng dữ liệu truyền trực tuyến. Nó hỗ trợ các định dạng tệp như CSV, JSON, ORC và Parquet. Bạn có thể tìm thấy danh sách định dạng tệp được hỗ trợ mới nhất tại đây .
  • Kafka : Điều này sẽ đọc dữ liệu từ Apache Kafka® và tương thích với Kafka broker phiên bản 0.10.0 trở lên

Nguồn đầu vào - Socket

Thiết lập

Nhập thư viện và tạo Phiên Spark

Tạo dữ liệu phát trực tuyến

Tạo Streaming DataFrame bằng cách sử dụng socketnguồn. Ngoài ra, hãy kiểm tra xem DataFrame isStreaming.

Chúng tôi sử dụng socketđịnh dạng để đọc dữ liệu từ một socket [ 127.0.0.1:9999]. Bạn có thể sử dụng bất kỳ cổng nào được phép tùy ý để nghe.

Đầu ra:

Streaming DataFrame : true

Ở đây chúng tôi đếm các từ từ một luồng dữ liệu đến từ ổ cắm này. Ngoài ra, chúng tôi kiểm tra lược đồ của DataFrame phát trực tuyến của chúng tôi.

Đầu ra:

Schema of DataFame wordCount. root |-- words: string [nullable = true] |-- count: long [nullable = false]

In nội dung của DataFrame phát trực tuyến wordCounttrên bảng điều khiển.

Mở cổng 9999vào localhost[127.0.0.1]và gửi một số dữ liệu để đếm. Chúng tôi sử dụng tiện ích netcat để mở cổng. Mở một thiết bị đầu cuối và chạy lệnh bên dưới.

nc -lk 9999

Lưu ý: Đảm bảo rằng bạn đã mở ổ cắm trước khi khởi động ứng dụng phát trực tuyến.

Trên cửa sổ bên phải, chúng tôi gửi dữ liệu đến socket của chúng tôi và ở bên trái ứng dụng Spark Streaming của chúng tôi in ra bảng điều khiển. Chúng tôi nhận được một số từ trên bảng điều khiển đầu ra của dữ liệu được gửi đến socket. Vì chúng tôi đang chạy ứng dụng Spark Stream của mình ở updatechế độ đầu ra, chúng tôi thấy rằng chỉ các bản ghi được cập nhật trong một đợt cụ thể mới được xuất ra bảng điều khiển. Từ dòng đầu tiên London Paris New_york Tokyo, chúng tôi có đầu ra trong Batch:1. Từ dòng thứ hai Mumbai Ohio Delhi London, chúng tôi có kết quả đầu ra Batch:2[Lưu ý rằng chúng tôi chỉ gửi dòng thứ hai khi dòng đầu tiên được xử lý hoàn toàn]. Vì Mumbai Ohio Delhilà các kỷ lục mới và ứng dụng phát trực tuyến của chúng tôi đã xuất hiện Londonlần thứ hai, chúng tôi đã có số lượng như 2cho London và 1phần còn lại của các từ ở dòng thứ hai Mumbai Ohio Delhi. Ngoài ra, các từ từ dòng đầu tiên không xuất hiện ở dòng thứ hai không được in trên đầu ra của Batch:2.Một lần nữa, điều này là do chúng tôi đang làm việc ở updatechế độ đầu ra chỉ in các bản ghi cập nhật.

Hãy thử cả completechế độ đầu ra và xem nó hoạt động như thế nào.

Lưu ý: hủy quá trình đang chạy trên cổng 9999để bắt đầu lại cho completechế độ. Hoặc sử dụng một cổng khác.

// Find the process id using below command on terminal. ps | grep 9999 // kill process using below command. kill -9

Trên cửa sổ bên phải, chúng tôi gửi dữ liệu đến ổ cắm của chúng tôi và bên trái ứng dụng Spark Streaming của chúng tôi sẽ in. Batch:1và Batch:2đại diện cho đầu ra cho dòng 1 và dòng 2 tương ứng. Ở completechế độ này, Batch:2chúng tôi nhận được tất cả các bản ghi mà ứng dụng phát trực tuyến của chúng tôi đã thấy cho đến nay. Ví dụ: Tokyochỉ có ở dòng 1 nhưng không có ở dòng 2. Tuy nhiên, chúng tôi nhận Tokyotrong Batch:2cũng bởi vì chế độ hoàn toàn viết tất cả các hồ sơ đó đã xử lý cho đến nay.

Lưu ý: Đầu ra cho dòng-3 bị cắt ra khỏi ảnh chụp màn hình.

Để dễ dàng tham khảo, bạn có thể tìm thấy mã hoàn chỉnh trên GitHub .

Nguồn đầu vào - Tệp

Với nguồn đầu vào tệp, ứng dụng của chúng tôi sẽ đợi dữ liệu có sẵn trong thư mục được chỉ định. Các ví dụ của chúng tôi sử dụng dữ liệu chứng khoán từ Yahoo ®; bạn cũng có thể tìm thấy dữ liệu mẫu trên GitHub . Dưới đây là tên file ví dụ AAPL_2015.csv, GOOGL_2015.csv, AMZN_2015.csvvv mà chứa cổ phiếu dữ liệu cho năm 2015 Apple, Googlevà Amazonlần lượt.

Thiết lập

Lược đồ

Dữ liệu của chúng tôi chứa các trường Date,Open,High,Low,Close,Adj Close,Volumevà chúng tôi sẽ trích xuất Nametừ tên tệp bằng một hàm tùy chỉnh.

Ở đây chúng tôi xác định lược đồ và viết một hàm tùy chỉnh để trích xuất biểu tượng mã chứng khoán.

Tạo dữ liệu phát trực tuyến

Chúng tôi tạo một DataFrame trực tuyến để đọc csvdữ liệu từ một thư mục được chỉ định data/streamvà áp dụng các điều trên schema. Chúng tôi cũng chỉ định tham số maxFilesPerTrigger = 2, có nghĩa là ứng dụng của chúng tôi sẽ xử lý tối đa 2tệp CSV trong mỗi lô. Cuối cùng, chúng tôi tạo một cột khác được gọi Namebằng chức năng getFileName.Cột đó chứa các ký hiệu mã chứng khoán như GOOGL, AMZN, AAPL, v.v.

Chuyển đổi

Thực hiện tổng hợp cơ bản trên DataFrame trực tuyến của chúng tôi.

Chúng tôi nhóm các dữ liệu dựa trên chứng khoán Name, Yearvà tìm giá trị tối đa của HIGHcột.

Chúng ta cũng có thể thực hiện chuyển đổi trên bằng cách sử dụng truy vấn SQL. Trong mẫu mã này, chúng tôi đăng ký DataFrame trực tuyến dưới dạng một chế độ xem tạm thời và thực thi truy vấn SQL trên đó.

Xuất ra bảng điều khiển

In nội dung truyền trực tuyến DatFrame sang updatechế độ sử dụng bảng điều khiển .

Hãy bắt đầu ứng dụng phát trực tuyến của chúng tôi ngay bây giờ, nó sẽ đợi dữ liệu trong data/streamthư mục. Sao chép các tệp theo trình tự bên dưới từ data/stocksđến data/streamđể mô phỏng quá trình phát trực tuyến.

  1. TSLA_2017.csv
  2. GOOGL_2017.csv
  3. TSLA_2016.csv
  4. AMZN_2017.csv

Để dễ dàng tham khảo, bạn có thể tìm thấy mã hoàn chỉnh trên GitHub .

Nguồn đầu vào - Kafka

Chúng tôi sẽ đọc dữ liệu từ Kafka và hiển thị nó trên bảng điều khiển. Để đọc dữ liệu từ Kafka, trước tiên chúng ta cần thiết lập Kafka và xuất bản tin nhắn tới chủ đề Kafka mà sau đó chúng ta sẽ đọc trong Spark Streaming.

Thiết lập Kafka

Bạn có thể làm theo các bước dưới đây để thiết lập Kafka.

  • Cài đặt Kafka: Chúng tôi có thể tham khảo này bài viết để cài đặt Kafka.
  • Bắt đầu dịch vụ hợp lưu
  • confluent local services start kafka-topics --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic test List Kafka Topic kafka-console-producer --broker-list localhost:9092 --topic test kafka-console-consumer --bootstrap-server localhost:9092 --topic test

Tạo dữ liệu phát trực tuyến

Đọc dữ liệu từ Kafka vào Spark Streaming.

Chúng tôi sử dụng kafkađịnh dạng để đọc dữ liệu từ Kafka. Chúng tôi chỉ định các chi tiết của máy chủ khởi động Kafka được cài đặt cục bộ của chúng tôi và đăng ký vào chủ đề Kafka testđã tạo ở trên. Cuối cùng, chúng tôi chỉ chọn dữ liệu được nhập vào chủ đề Kafka có trong valuecột.

Chuyển đổi

Thực hiện đếm từ đơn giản tương tự như socketví dụ nguồn.

Xuất ra bảng điều khiển

In nội dung của luồng DataFrame ra bảng điều khiển.

Đầu ra trông rất giống với socketví dụ nguồn của chúng tôi nhưng lần này ứng dụng phát trực tuyến của chúng tôi đọc dữ liệu từ Kafka.

Để dễ dàng tham khảo, bạn có thể tìm thấy mã hoàn chỉnh trên GitHub .

Tôi hy vọng bạn thích tìm hiểu về các nguồn đầu vào khác nhau có thể nhập dữ liệu vào Spark Streaming!

Người giới thiệu

  • //spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources

Video liên quan

Chủ Đề