CoraMQ - Building a Concurrent In-Memory Queue Service in Go

October 5, 2024

Golang Queue Concurrency In-Memory API

Overview

CoraMQ v0.1.0

I recently built a Concurrent In-Memory Queue Service in Go to support multiple clients, priority-based message processing, and a visibility timeout for unacknowledged items. This service is ideal for scenarios involving a producer-consumer pattern where messages are dequeued, processed, and acknowledged by consumers.

The goal was to design a scalable, thread-safe queue system for high-performance environments where multiple clients interact with the same queue.

Key Features

  • Multiple Queue Support: Each queue is identified by a unique name.
  • Priority Queue: Higher priority items are dequeued first.
  • Visibility Timeout: Once dequeued, messages are invisible for a set period (default: 10 seconds) before being available for dequeuing again.
  • In-Flight Tracking: Keeps track of messages currently being processed.
  • Acknowledgment: Messages are removed from the queue upon acknowledgment, preventing reprocessing.
  • Thread-Safety: The queue service is designed to handle concurrent requests safely.

Architecture Overview

The service is built with three core components:

  • QueueItem: Represents a single item in the queue, including its ID, payload, priority, visibility timeout, and acknowledgment status.
  • Queue: Manages a list of QueueItems and uses a priority heap to ensure higher-priority items are dequeued first.
  • QueueServer: Oversees multiple queues, handles creation, enqueuing, dequeuing, and acknowledgment of items. It uses Go channels and mutex locks to ensure thread-safe, concurrent access.

API Overview

The queue service exposes a simple API with endpoints for interacting with queues and items. Here are a few key endpoints:

1. Create Queue

  • Endpoint: /queues
  • Method: POST
  • Request Body:
{
  "queueName": "High-priority"
}
  • Response:
{
  "status": "success",
  "message": "Queue created successfully",
  "queueName": "High-priority"
}
  • Status Code: 201 Created (if the queue is successfully created)

  • Status Code: 400 Bad Request (if the queue already exists)

{
  "status": "error",
  "message": "Queue with the given name already exists",
  "queueName": "High-priority"
}

2. Enqueue Item

  • Endpoint: /queues/{queueName}/enqueue
  • Method: POST
  • Request Body:
{
  "item": {
    "id": "3",
    "payload": "{'name': 'Lorem Ipsum', 'email': 'loremipsum@gmail.com'}",
    "priority": 5
  }
}
  • Response:
{
  "status": "success",
  "message": "Item enqueued successfully",
  "item": {
    "id": "3",
    "payload": "{'name': 'Lorem Ipsum', 'email': 'loremipsum@gmail.com'}",
    "priority": 5
  }
}

3. Dequeue Item

  • Endpoint: /queues/{queueName}/dequeue
  • Method: POST
  • Request Body:
{
  "queueName": "queueName"
}
  • Response:
{
  "status": "success",
  "message": "Item dequeued successfully",
  "item": {
    "id": "3",
    "payload": "{'name': 'Lorem Ipsum', 'email': 'loremipsum@gmail.com'}",
    "priority": 5,
    "visibilityTimeout": "2024-10-07T12:45:00Z"
  }
}

4. Acknowledge Item

  • Endpoint: /queues/{queueName}/acknowledge
  • Method: POST
  • Request Body:
{
  "itemId": "3"
}
  • Response:
{
  "status": "success",
  "message": "Item acknowledged successfully",
  "itemId": "3"
}
  • Status Code: 200 OK (if the item is acknowledged successfully)

  • Status Code: 404 Not Found (if the item was not found in the in-flight queue)

{
  "status": "error",
  "message": "Item not found in the in-flight queue",
  "itemId": "3"
}