+2

Amazon Simple Queue Service - Tổng quan và cách sử dụng

Tổng quan về SQS

Amazon Simple Queue Service (SQS) là gì ?

SQS là dịch vụ hàng đợi thông điệp phân tán, được giới thiệu bởi Amazon Web Service năm 2004. Là dịch vụ hỗ trợ gửi và nhận thông điệp thông qua môi trường internet HTTP API. SQS được thiết kế để nhắm đến các ứng dụng cần khả năng mở rộng cao. Bởi vì cung cấp HTTP API nên các thành phần khác nhau của hệ thống có thể gửi nhận thông điệp một cách dễ dàng.

Một vài đặc điểm nổi bật của SQS

Tính mở rộng cao

SQS mở rộng bởi dung lượng đọc/ghi của thông điệp, nên chúng ta không cần quan tâm đến việc mở rộng hàng đợi, bởi việc này đã có AWS lo.

Trả tiền trên số lần xử lý

Chúng ta phải trả tiền cho mỗi lượt đọc/ghi thông điệp, không có khoản phí định kỳ nào.

Dễ dàng thiết lập

Khi lựa chọn SQS, không cần phải setup bất kỳ hạ tầng nào để có thể sử dụng nó. Chỉ đơn giản là gọi API để đọc/ghi thông điệp.

Có hai lựa chọn là Standard và FIFO queues

Có hai lựa chọn để thiết lập queue là tiêu chuẩn hay FIFO, tùy theo nhu cầu sử dụng mà lựa chọn cho phù hợp.

Tự động xử lý thông điệp trùng lặp ở FIFO queues

Xử lý lặp thông điệp khá quan trọng trong queue, SQS sẽ xóa những thông điệp trùng lặp. Điều này làm FIFO queue phù hợp cho các tác vụ mà mỗi task chỉ chạy một lần.

Xử lý lỗi qua Dead letter queue

Chức năng quan trọng để debug lỗi. Tất cả các thông điệp mà không thể xử lý sẽ được lưu vào Dead letter queue, nó như là một hàng đợi bình thường và chúng ta có thế kiểm tra hoặc xử lý logic cho những thông điệp lỗi trên Dead letter queue này.

SQS hoạt động như thế nào ?

Behind the scenes:

Hàng đợi (queue) hoạt động như thế nào ?

Điển hình thì xử lý hàng đợi sẽ như sau. Hệ thống có các Producer với vai trò là đấy thông điệp vào hàng đợi, các Consumer sẽ nhận thông điệp và xử lý chúng.

Hệ thống đánh dấu là tin nhắn đang pending hoặc là in-flight và gửi nó đến các consumer - các background job management ở consumer client sẽ chạy và xử lý các thông điệp này.

Khi mà consumer hoàn thành xử lý, nó sẽ thông báo cho queue là đã xử lý thành công bằng cách gửi một thông báo acknowledge - xác nhận xử lý thành công. Sau đó thì thông điệp được đánh dấu từ pending thành processed.

Nếu quá trình này xảy ra break ở đâu đó, ví dụ thiếu thông báo acknowledge trong một khoảng thời gian nhất định chẳng hạn, thì có nghĩa là quá trình xử lý thông điệp không thành công, và thông điệp không còn được coi là pending nữa. Như vậy sẽ không có thông điệp nào bị kẹt ở trạng thái pending mãi mãi.

Khi mà xử lý fail một số lần nhất định, queue sẽ coi là các thông điệp này có vấn đề và ngừng gửi nó đến các consumer. Hoặc cũng có thể đưa nó lưu vào Dead letter queue (DLQ).

Với SQS

SQS cung cấp một api endpoint để các producer gửi thông điệp vào queue, các consumer sẽ đọc thông điệp và xử lý thông điệp đó.

  • Thông điệp gửi đến SQS có thể là chuỗi string bất kỳ, XML hay là JSON.
  • SQS không giới hạn số lượng thông điệp được gửi đến và được xử lý.

Thông điệp

Một thông điệp ở SQS sẽ trông như này.

{
  "message_id": "b1438d18-8095-4bed-ad26-ee1b8768e537",
  "receipt_handle": "AQEBLUaoBHhmJXetoonDkqfDDHUSlkGsIXLtHNATnSCeDd2nzk6V45nTnOt2PSCjqt5bbm2uCBUdoablpUJ59q4qS/kWOoT6h/R6y+kax+H6GNasKX6NU7IzGg/kr2E+QiI/OssES0niKE7J45fAge7UnrafrBWbpo1vJCgJzEzot3Z00AvwhsYmqxIiItQV7TRz165gdf9MTD+uILt6WNboJqglKpjba/4SN1MbQtuI7fEO0TFbp673YmNG8haNi0epfofzMKa+90+pjCDHBjUmSxkCrbx42Q3JIgA4LWPQVoBUo6mwttLyfkcSzoDfHVbGl0/Aph6uMgMymDkfN++79Il/LCrwOHSHj9Mpv1C1AuXj+TGO+6Vu1qvFvWdmoIvHNR+/AvALxWLKkhUH+ky9yg==",
  "md5_of_body": "c9f0f895fb98ab9159f51fd0297e236d",
  "body": "8",
  "attributes": {
    "SenderId": "596865106215",
    "ApproximateFirstReceiveTimestamp": "1605893094056",
    "ApproximateReceiveCount": "1",
    "SentTimestamp": "1605892972066"
  },
  "md5_of_message_attributes": null,
  "message_attributes": {}
}

Mỗi thông điệp sẽ được gán một message_id, receipt_handle cũng như là body, message_attributes. receipt_handle sẽ dùng cho trường hợp muốn xóa thông điệp ở queue

Visible timeout

  • Visible timeout là thời gian chờ tối đa mà thông điệp được xử lý ở consumer

  • Với một queue có visible timeout 60 phút thì khi một message được consumer đọc, và xử lý thì timer bắt đầu chạy, thông điệp đó sẽ coi như bị ẩn đi 60 phút - ở trạng thái in-flight, các consumer khác không đọc được thông điệp đó trong một khoảng thời gian.

  • Nếu đang xử lý mà consumer client bị ngỏm, mất mạng hay vì lý do gì đó mà không thể xử lý, thì sẽ không thể tiếp tục thông báo tới queue về tình trạng xử lý. Vậy nên quá thời gian này thì thông điệp đó sẽ tiếp tục xuất hiện để consumers kéo về xử lý tiếp.

SQS tích hợp Ruby SDK

Quen thuộc với các dịch vụ khác của AWS, với SQS thì AWS cũng cung cấp đầy đủ api trong sdk để sử dụng. Sau đây sẽ là một vài ví dụ đơn giản:

Để sử dụng SQS sdk ruby thì chúng ta cần cài gem aws-sdk-sqs, và cấu hình đầy đủ AWS với key id và acess key id.

Khởi tạo SQS với queue_name

SQS = Aws::SQS::Client.new(region: "us-east-1")
queue = SQS.create_queue(queue_name: "my_queue")

Tạo một message mới

SQS.send_message({queue_url: queue_url, message_body: "process_record_id", message_attributes: {"Id" => {string_value: "1", data_type: "String"}}})

Với poller thì chúng ta có thể lắng nghe queue liên tục để bắt message ở queue

QUEUE_URL = SQS.get_queue_url(queue_name: "my_queue").queue_url
POLLER = Aws::SQS::QueuePoller.new(QUEUE_URL)
poller_stats = POLLER.poll({max_number_of_messages: 10 }) do |messages|
    messages.each do |message|
        puts "Message body: #{message.body}"
        # Do any logic process message here
    end
end

Hoặc chủ động lấy message:

SQS.receive_message({
  queue_url: QUEUE_URL,
  max_number_of_messages: 5,
  visibility_timeout: 60
})

Với mỗi message thì sau khi handle xong, chúng ta cần xóa nó ở queue để hoàn thành job, params cần có receipt_handle là value lúc chúng ta get message về

resp = SQS.delete_message({
  queue_url: QUEUE_URL,
  receipt_handle: "receipt_handle value",
})

Trường hợp trong lúc xử lý message, máy bị mất kết nối hoặc bị ngỏm, sau một số lần nhất định thì message sẽ được lưu vào Dead letter queue, việc chúng ta cần làm là tiếp tục handle những message lỗi này nếu cần.

Chúng ta cần phải tự tạo một queue để sử dụng làm Dead letter queue này, qua SDK hoặc qua AWS Console UI

  • Sử dụng SDK:
SQS.create_queue({
  queue_name: "my_dead_letter_queue"
})
dead_letter_queue_url = SQS.get_queue_url(queue_name: "my_dead_letter_queue").queue_url
dead_letter_queue_arn = SQS.get_queue_attributes({
  queue_url: dead_letter_queue_url,
  attribute_names: ["QueueArn"]
})
.attributes["QueueArn"]

redrive_policy = {
    "maxReceiveCount" => "5", # After the queue receives the same message 5 times, send that message to the dead letter queue.
    "deadLetterTargetArn" => dead_letter_queue_arn
}.to_json

# Sau đó set queue attributes
SQS.set_queue_attributes({
  queue_url: QUEUE_URL,
  attributes: {
    "RedrivePolicy" => redrive_policy
  }
})
  • Với AWS console UI, chúng ta tạo một queue mới là dead_letter_queue chẳng hạn, sau đó vào edit queue chính, chọn enable dead letter và select queue dead_letter_queue là xong (easy).

  • Để xóa queue:
SQS.delete_queue(queue_url: QUEUE_URL)

SQS tích hợp gem shoryuken

Với gem shoryuken, việc tích hợp background job sử dụng SQS đơn giản và dễ dàng hơn bao giờ hết.

https://github.com/phstc/shoryuken

Chúng ta cần cài 2 gem này vào nhé.

gem "shoryuken"
gem "aws-sdk-sqs"

Với những ai đã quen thuộc với bộ đôi Sidekiq + Redis thì combo shoryuken + SQS cũng khá là tương đồng, cơ chế cũng như cách sử dụng gần như là không khác gì nhau cả.

Lan man một tý:

Về combo Sidekiq + Redis thì chúng ta cần mua gói PRO (950 $ / năm) để có thể handle job failed như Dead letter bên SQS.

Còn với SQS thì job chạy sẽ không được realtime cho lắm vì thông qua HTTP API, sẽ không nhanh bằng combo Sidekiq + Redis, phù hợp cho những Job không cần realtime

Những Job như bắn thông báo, gửi notification hay cần ưu tiên xử lý nhanh thì nên dùng Redis vì nó bắt rất nhanh - ngược lại thì có thể sử dụng SQS.

  • Khai báo worker
class HelloWorker
  include Shoryuken::Worker

  shoryuken_options queue: "my_queue", auto_delete: true

  def perform(sqs_msg, data)
    puts "Hello, #{data}"
  end
end
  • Start shoryuken
bundle exec shoryuken -q my_queue -R
  • Và enqueue một job
HelloWorker.perform_async({message: "1"})

Như vậy là chúng ta vừa tìm hiểu về Amazon Simple Queue Service (SQS) qua một vài khái niệm, các đặc điểm nổi bật, cách sử dụng SQS tích hợp với SDK ruby cũng như sài gem. Hy vọng bài viết sẽ có ích cho những ai đang tìm hiểu về SQS

Cảm ơn mọi người đã theo dõi bài viết!


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.