Overview
CoraMQ v0.1.0
I recently built a Concurrent In-Memory Queue Service in Go to support multiple clients, priority-based message processing, and a visibility timeout for unacknowledged items. This service is ideal for scenarios involving a producer-consumer pattern where messages are dequeued, processed, and acknowledged by consumers.
The goal was to design a scalable, thread-safe queue system for high-performance environments where multiple clients interact with the same queue.
Key Features
- Multiple Queue Support: Each queue is identified by a unique name.
- Priority Queue: Higher priority items are dequeued first.
- Visibility Timeout: Once dequeued, messages are invisible for a set period (default: 10 seconds) before being available for dequeuing again.
- In-Flight Tracking: Keeps track of messages currently being processed.
- Acknowledgment: Messages are removed from the queue upon acknowledgment, preventing reprocessing.
- Thread-Safety: The queue service is designed to handle concurrent requests safely.
Architecture Overview
The service is built with three core components:
- QueueItem: Represents a single item in the queue, including its ID, payload, priority, visibility timeout, and acknowledgment status.
- Queue: Manages a list of
QueueItems
and uses a priority heap to ensure higher-priority items are dequeued first. - QueueServer: Oversees multiple queues, handles creation, enqueuing, dequeuing, and acknowledgment of items. It uses Go channels and mutex locks to ensure thread-safe, concurrent access.
API Overview
The queue service exposes a simple API with endpoints for interacting with queues and items. Here are a few key endpoints:
1. Create Queue
- Endpoint:
/queues
- Method:
POST
- Request Body:
{
"queueName": "High-priority"
}
- Response:
{
"status": "success",
"message": "Queue created successfully",
"queueName": "High-priority"
}
-
Status Code:
201 Created
(if the queue is successfully created) -
Status Code:
400 Bad Request
(if the queue already exists)
{
"status": "error",
"message": "Queue with the given name already exists",
"queueName": "High-priority"
}
2. Enqueue Item
- Endpoint:
/queues/{queueName}/enqueue
- Method:
POST
- Request Body:
{
"item": {
"id": "3",
"payload": "{'name': 'Lorem Ipsum', 'email': 'loremipsum@gmail.com'}",
"priority": 5
}
}
- Response:
{
"status": "success",
"message": "Item enqueued successfully",
"item": {
"id": "3",
"payload": "{'name': 'Lorem Ipsum', 'email': 'loremipsum@gmail.com'}",
"priority": 5
}
}
3. Dequeue Item
- Endpoint:
/queues/{queueName}/dequeue
- Method:
POST
- Request Body:
{
"queueName": "queueName"
}
- Response:
{
"status": "success",
"message": "Item dequeued successfully",
"item": {
"id": "3",
"payload": "{'name': 'Lorem Ipsum', 'email': 'loremipsum@gmail.com'}",
"priority": 5,
"visibilityTimeout": "2024-10-07T12:45:00Z"
}
}
4. Acknowledge Item
- Endpoint:
/queues/{queueName}/acknowledge
- Method:
POST
- Request Body:
{
"itemId": "3"
}
- Response:
{
"status": "success",
"message": "Item acknowledged successfully",
"itemId": "3"
}
-
Status Code:
200 OK
(if the item is acknowledged successfully) -
Status Code:
404 Not Found
(if the item was not found in the in-flight queue)
{
"status": "error",
"message": "Item not found in the in-flight queue",
"itemId": "3"
}