Voting System
Introduction
The Social Choice Voting System is a modular and extensible backend framework designed to orchestrate, evaluate, and manage social choice-based voting workflows. It enables the submission, validation, and evaluation of structured votes associated with collaborative decision-making tasks, where voting rules and evaluation logic are defined dynamically via domain-specific DSLs (Domain-Specific Languages).
The system is designed for flexibility, allowing organizations or agents to configure:
- Access control (public vs. private voting)
- Voting specifications and metadata
- Voting pre-qualification rules
- Choice evaluation and tie-breaking logic
- Post-awarding workflows and notification mechanisms
Core Capabilities
- Task Lifecycle Management: Create, store, and manage social choice tasks.
- Vote Submission: Accept votes via REST API or NATS messaging system.
- DSL-Driven Evaluation: Use configurable DSL workflows to compute winners, handle tie-breaking, and run post-award logic.
- Reporting and Auditing: Automatically generate and persist a comprehensive voting report after task completion.
- Live Streaming Interface: Provide real-time task state updates via a WebSocket interface.
- Query API: Enable consumers and dashboards to fetch task results, metadata, and live task bundles on demand.
Architecture Components
- REST API Server: Supports structured queries and task metadata access.
- WebSocket Server: Streams task state updates if live streaming is enabled.
- Vote Acceptor: Handles validation, PQT DSL checks, and DB storage of votes.
- DSL Evaluator: Executes evaluation DSLs to determine voting results.
- Kubernetes Evaluation Job: Triggered after vote threshold is met; executes DSLs and generates final outcomes.
- Post-Processing Layer: Generates reports, stores final results, and notifies winners via NATS.
This system is designed to integrate seamlessly into larger decision-making and agent-driven systems, providing robust task orchestration, traceability, and pluggable evaluation logic.
Architecture Overview
The Social Choice Voting System is designed as a modular, event-driven backend framework that handles the complete lifecycle of structured voting tasks. The system is composed of multiple coordinated subsystems that manage task creation, vote intake, validation, DSL-based evaluation, result publication, and query access.
The architecture is divided into the following logical domains:
1. Task Creation and Initialization
- Submission Parser and Validator: Parses a high-level task definition into structured components: core task data, voting specification, and evaluation inputs.
- DB Write API: Writes all validated components into MongoDB collections.
- Scheduler Registration: Registers the task with a cron scheduler for timed initiation.
- DSL DB Client: Provides schema templates to validate incoming specifications if needed.
2. Vote Intake and Pre-Qualification
- Vote Submission REST API / NATS Listener: Accepts votes either via HTTP or messaging.
- Initial Conditions Checker: Ensures task is active, vote is not duplicated, and access rules are met.
- Voting PQT DSL Executor: Executes pre-qualification DSL logic to accept or reject a vote.
- DB Updater: Writes qualified or disqualified votes to the
votes
collection and updates status. - Human Intervention System (optional): Triggered when DSL specifies interactive validation logic.
3. Voting Task Initiation
- Cron Scheduler: Polls the DB to find scheduled tasks and moves them into an active state.
- Voting Task Initializer: Triggers task activation and initiates voting message generation.
- Voting Message DSL Executor: Executes the DSL responsible for constructing NATS voting invite payloads.
- Notification Pusher: Publishes invite messages to voter-specific NATS subjects.
4. Evaluation and Post-Awarding
- Final Evaluation Initializer: Launched once all required votes are submitted (private) or threshold met (public).
-
DSL Executor Sequence:
-
Choice Evaluation DSL: Computes the winner(s)
- Tie Breaker DSL (if needed): Resolves multiple top votes
- Post-Award DSL: Constructs final result metadata and actions
- DB Status Updater: Marks the task as
"complete"
and stores the full report JSON. - Completion Notifier: Pushes result messages via NATS or triggers external systems.
5. Query and Live Monitoring Subsystem
- Query API Controller: Exposes task status, report access, and filtered data queries over REST.
- Live Streaming Checker: Determines if a task supports real-time streaming.
- WebSocket Server: Periodically streams full task bundles to authorized clients.
- Detailed JSON Report Generator: Packages complete task, vote, and DSL evaluation outputs as a report.
6. Storage and Messaging Backend
- Voting DB (MongoDB): Stores all structured entities including tasks, specs, evaluation logic, and votes.
- NATS Server: Handles message-based interactions including voting invites and result notifications.
- Kubernetes Cluster: Executes the votes evaluation job in an isolated job container.
Integration and Modularity
The system is fully extensible via DSL workflows and cleanly separated service layers. It supports pluggable:
- Evaluation logic
- Message brokers
- Task types
- Result dissemination protocols
Schema
The voting system uses the following core data structures to model tasks, voting options, evaluation logic, and individual votes. Each structure is represented as a Python @dataclass
, stored in MongoDB, and used throughout the submission and evaluation workflow.
1. SocialTaskCoreData
@dataclass
class SocialTaskCoreData:
social_task_id: str
created_by_subject_id: str
created_by_subject_data: Dict
org_ids: List[str]
social_task_access_type: str # private / public
goal_data: Dict
social_tasks_topics: List[str]
social_task_properties: Dict
creation_time: datetime
scheduled_time: Optional[datetime] = None
duration: Optional[int] = None
status: Optional[str] = None
report_json: Optional[Dict] = field(default_factory=dict)
job_id: Optional[str] = None
enable_live_streaming: Optional[bool] = False
Field | Type | Description |
---|---|---|
social_task_id |
str | Unique identifier for the voting task |
created_by_subject_id |
str | ID of the subject who created the task |
created_by_subject_data |
dict | Metadata about the creator (name, profile, etc.) |
org_ids |
list | List of subject IDs allowed to vote (if task is private) |
social_task_access_type |
str | "private" or "public" access type |
goal_data |
dict | Describes the objective or goal of the task |
social_tasks_topics |
list | Topics or tags associated with the task |
social_task_properties |
dict | Additional properties (urgency, category, etc.) |
creation_time |
datetime | Time the task was created |
scheduled_time |
datetime | Time the voting is scheduled to start |
duration |
int | Duration or threshold (used for vote counts in public tasks) |
status |
str | Current status: scheduled , started , complete |
report_json |
dict | Final report generated after evaluation |
job_id |
str | Kubernetes job ID for evaluation |
enable_live_streaming |
bool | Whether live WebSocket updates are allowed |
2. SocialChoiceSubjectSpecInput
@dataclass
class SocialChoiceSubjectSpecInput:
social_task_id: str
topic_title: str
topic_description: str
voting_options_map: Dict
voting_option_metadata_map: Dict
supported_protocols: List[str]
voting_message_request_creator_dsl: str
Field | Type | Description |
---|---|---|
social_task_id |
str | ID of the task this spec belongs to |
topic_title |
str | Title shown to voters |
topic_description |
str | Description of what is being voted on |
voting_options_map |
dict | Maps option IDs (e.g., "A" ) to labels (e.g., "Solar Energy" ) |
voting_option_metadata_map |
dict | Additional metadata per voting option |
supported_protocols |
list | Communication protocols (e.g., http , ipfs ) supported for voting |
voting_message_request_creator_dsl |
str | DSL workflow ID used to generate voting invite messages |
3. SocialChoiceEvaluationInput
@dataclass
class SocialChoiceEvaluationInput:
social_task_id: str
constraints_entry_id: str
constraints_entry_id_1: str
voting_pqt_dsl: str
choice_evaluation_dsl: str
tie_breaker_dsl: str
post_awarding_dsl: str
Field | Type | Description |
---|---|---|
social_task_id |
str | ID of the task being evaluated |
constraints_entry_id |
str | Reference to constraint rules (optional use) |
constraints_entry_id_1 |
str | Secondary constraint reference (optional use) |
voting_pqt_dsl |
str | DSL ID used to determine if a vote qualifies before submission |
choice_evaluation_dsl |
str | DSL ID that computes the winner or list of tied winners |
tie_breaker_dsl |
str | DSL ID to resolve ties, used if multiple winners are returned |
post_awarding_dsl |
str | DSL ID that performs post-winner logic (notifications, state changes, etc.) |
4. Votes
@dataclass
class Votes:
vote_id: str
social_task_id: str
submitter_subject_id: str
vote_data: Dict
submission_time: datetime
qualified: bool
Field | Type | Description |
---|---|---|
vote_id |
str | Unique identifier of the vote |
social_task_id |
str | ID of the task this vote belongs to |
submitter_subject_id |
str | Subject who submitted the vote |
vote_data |
dict | Raw vote content (e.g., chosen option) |
submission_time |
datetime | Timestamp of when the vote was received |
qualified |
bool | Whether the vote passed the PQT DSL and is valid for evaluation |
REST API Documentation
The system exposes several REST endpoints for:
- Task submission
- Vote submission
- Task querying and report retrieval
- Ad hoc parsing for validation
1. POST /internal/process-task
Description: Submits a new social voting task by parsing the input and storing structured data in the database.
Request Body (JSON)
{
"core_data": {
"created_by_subject_id": "subject_123",
"created_by_subject_data": { "name": "Alice" },
"org_ids": ["org1", "org2"],
"social_task_access_type": "private",
"goal_data": { "objective": "Select best renewable strategy" },
"social_tasks_topics": ["environment", "energy"],
"social_task_properties": { "urgency": "high" },
"scheduled_time": "2025-06-12T15:00:00",
"duration": 3,
"enable_live_streaming": true
},
"subject_spec": {
"topic_title": "Energy Policy Choice",
"topic_description": "Choose between wind, solar, and hydro energy options",
"voting_options_map": { "A": "Wind", "B": "Solar", "C": "Hydro" },
"voting_option_metadata_map": {},
"supported_protocols": ["http"],
"voting_message_request_creator_dsl": "invite_dsl_1"
},
"evaluation_spec": {
"voting_pqt_dsl": "pqt_qualifier_v1",
"choice_evaluation_dsl": "evaluate_v1",
"tie_breaker_dsl": "resolve_ties",
"post_awarding_dsl": "notify_winners"
}
}
Response
{
"social_task_id": "task_xyz123"
}
2. POST /internal/adhoc-parse
Description: Parses and validates task input format without committing to the database.
Request Body
Same format as /internal/process-task
.
Response
{
"core_data": { ...parsed SocialTaskCoreData... },
"subject_spec": { ...parsed SocialChoiceSubjectSpecInput... },
"evaluation_spec": { ...parsed SocialChoiceEvaluationInput... }
}
3. POST /submit-vote
Description: Submits a vote and validates it using PQT DSL.
Request Body
{
"vote_id": "vote_789",
"social_task_id": "task_xyz123",
"submitter_subject_id": "subject_456",
"vote_data": { "selected_option": "B" }
}
Response (Success)
{
"message": "Vote recorded",
"qualified": true
}
Response (Failure)
{
"error": "Subject has already voted"
}
4. GET /get-report/<social_task_id>
Description: Returns the final report JSON for a given task.
Response
{
"social_task_id": "task_xyz123",
"report_json": { ... }
}
5. POST /generic-query/<collection_name>
Description: Performs a filtered query on any collection.
Request Body
{
"social_task_id": "task_xyz123"
}
Response
[
{ ... },
{ ... }
]
6. GET /get-status/<social_task_id>
Description: Returns the current status of the voting task.
Response
{
"status": "started"
}
7. GET /get-full-task-bundle/<social_task_id>
Description: Returns the entire task context including specs and all votes.
Response
{
"core_data": { ... },
"subject_spec": { ... },
"evaluation_spec": { ... },
"votes": [ ... ]
}
8. GET /is-live-streaming-enabled/<social_task_id>
Description: Checks if the task supports WebSocket-based live streaming.
Response
{
"enabled": true
}
WebSocket API Documentation
The WebSocket server provides real-time streaming of voting task data, primarily useful for dashboards or observers who wish to monitor voting task progress as it evolves.
Endpoint
ws://<host>:8765/live-stream
1. Connection Initialization
The client must send a JSON message immediately after connecting to initiate streaming.
Request Message Format
{
"social_task_id": "task_xyz123",
"refresh_interval_sec": 5
}
Field | Type | Description |
---|---|---|
social_task_id |
string | The task ID whose state is to be streamed |
refresh_interval_sec |
int | Polling interval in seconds (e.g., 2, 5, 10) |
2. Server Behavior
Upon receiving a valid request:
- The server checks if live streaming is enabled for the given
social_task_id
. -
If enabled, the server:
-
Spawns a background thread for the connection.
- Periodically calls
get_full_task_bundle(social_task_id)
from theQueriesManager
. - Sends the current snapshot of task state to the client over WebSocket.
- If not enabled, the server sends an error response and terminates the connection.
3. Streaming Payload Format
The server sends task bundle snapshots as JSON, every refresh_interval_sec
seconds:
{
"type": "task_update",
"data": {
"core_data": { ... },
"subject_spec": { ... },
"evaluation_spec": { ... },
"votes": [ ... ]
}
}
4. Error Response Format
If the task does not support streaming, or the request is malformed:
{
"type": "error",
"message": "Live streaming is not enabled for this task"
}
5. Termination Handling
- The background thread is automatically terminated when the client disconnects.
- If an internal error occurs (e.g., DB failure), the connection is closed with an error message.
Example Usage (Client-Side)
import asyncio
import websockets
import json
async def consume():
uri = "ws://localhost:8765/live-stream"
async with websockets.connect(uri) as websocket:
await websocket.send(json.dumps({
"social_task_id": "task_xyz123",
"refresh_interval_sec": 5
}))
while True:
response = await websocket.recv()
print("Update received:", response)
asyncio.run(consume())
Event Subscription via NATS
The system uses NATS (a lightweight, high-performance messaging system) to send events such as:
- Voting invitations sent to eligible subjects when a task starts
- Winner announcements after evaluation is complete
External systems, bots, or agents can subscribe to these messages to perform automated actions or display updates.
NATS Configuration
- The NATS server URL is expected to be available via the environment variable:
ORG_NATS_URL
- Topics are published per-subject using their
subject_id
as the NATS topic name
Event Message Format
Each event is a JSON object with the following structure:
{
"event_type": "voting_invite" | "winner_announcement",
"sender_subject_id": "subject_123",
"event_data": {
"voting_info": { ... },
"dsl_output": { ... } // Present if applicable
}
}
Field | Description |
---|---|
event_type |
Type of event: voting_invite or winner_announcement |
sender_subject_id |
Subject who initiated the event (usually the task creator) |
event_data |
DSL-generated output and context metadata |
Subscribing to NATS Topics (Python Example)
You can use the asyncio-nats-client
library to subscribe to messages.
Installation
pip install nats-py
Example: Subscriber Script
import asyncio
import json
from nats.aio.client import Client as NATS
ORG_NATS_URL = "nats://localhost:4222"
SUBJECT_ID = "subject_456" # Replace with your ID to subscribe
async def message_handler(msg):
data = json.loads(msg.data.decode())
print(f"Received event on [{msg.subject}]:")
print(json.dumps(data, indent=2))
async def main():
nc = NATS()
await nc.connect(ORG_NATS_URL)
await nc.subscribe(SUBJECT_ID, cb=message_handler)
print(f"Subscribed to NATS subject '{SUBJECT_ID}'")
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
Use-Cases for Subscribers
- Agents: Can initiate their workflows based on invitations.
- Dashboards: Display real-time status of task invitations and winners.
- Auditing Systems: Log voting lifecycle events for analysis or compliance.
DSL Execution Flow
The voting system supports dynamic decision logic using pluggable DSL (Domain-Specific Language) workflows, executed using the dsl_executor
module. These DSLs define the rules for:
- Generating voting invitations
- Pre-qualifying votes (PQT)
- Evaluating the winning choice
- Resolving ties
- Running post-awarding logic
DSL Infrastructure
All DSL workflows are registered in a Workflow Registry and are executed via the new_dsl_workflow_executor
interface.
DSL Runner API
from dsl_executor import new_dsl_workflow_executor, parse_dsl_output
Step-by-Step DSL Execution
1. Voting Invitation Generation DSL
When it runs: Before voting starts
Where: VotingTaskInitiator
Workflow ID: voting_message_request_creator_dsl
Purpose: Constructs the voting invitation message to be sent via NATS.
Input:
{
"core_data": ...,
"subject_spec": ...,
"evaluation_spec": ...,
"addons": {
"initiator_subject_id": <subject_id>
}
}
Output:
DSL-defined voting message → used in the "event_data"
field of voting_invite
event.
2. Voting PQT (Pre-Qualification Test) DSL
When it runs: On vote submission
Where: VoteAcceptor
Workflow ID: voting_pqt_dsl
Purpose: Validates a vote before accepting it.
Input:
{
"core_data": ...,
"subject_spec": ...,
"evaluation_spec": ...,
"vote": <submitted_vote_dict>
}
Output:
Boolean or structured value used to set qualified = true/false
in DB.
3. Choice Evaluation DSL
When it runs: After all votes are collected
Where: DSLEvaluator.evaluate()
Workflow ID: choice_evaluation_dsl
Purpose: Determines the winner(s).
Input:
{
"core_data": ...,
"subject_spec": ...,
"evaluation_spec": ...,
"votes": [ ...qualified votes... ]
}
Output:
- Single subject ID or
- List of subject IDs (in case of tie)
4. Tie Breaker DSL
When it runs: Only if choice_evaluation_dsl
returns multiple winners
Where: DSLEvaluator.evaluate()
Workflow ID: tie_breaker_dsl
Purpose: Resolves tie and selects final winner(s)
Input:
Same as choice evaluation, with added winners
list.
Output:
Final winner(s) list.
5. Post-Awarding DSL
When it runs: After final winner(s) determined
Where: DSLEvaluator.evaluate()
Workflow ID: post_awarding_dsl
Purpose: Triggers downstream logic (e.g., assignment, notifications)
Input:
{
"core_data": ...,
"subject_spec": ...,
"evaluation_spec": ...,
"votes": [ ... ],
"winners": [ ... ]
}
Output:
Custom payload used for:
- NATS winner announcements
- Audit trail
- Post-processing
Output Structure
The evaluate()
method of DSLEvaluator
returns:
{
"winners": [...],
"post_award_payload": {...},
"dsl_outputs": {
"choice_evaluation_output": {...},
"tie_breaker_output": {...},
"post_awarding_output": {...}
}
}
Vote Submission Lifecycle
This section describes how a vote is submitted, validated, processed, and stored in the system.
Step-by-Step Lifecycle
1. Vote Submission
Votes can be submitted via:
- REST API (
POST /submit-vote
) - NATS Topic (topic =
subject_id
)
Each vote submission includes:
{
"vote_id": "vote_123",
"social_task_id": "task_xyz123",
"submitter_subject_id": "subject_456",
"vote_data": {
"selected_option": "A"
}
}
2. Initial Validation (VotingInitialChecker
)
The vote is checked against the following conditions:
Check | Condition |
---|---|
Repeat submission | Submitter must not have voted on this task before |
Access control (private) | Submitter must be in the task's allowed org list |
Task status | Task must be in "started" state |
If any check fails, the system rejects the vote with an appropriate error message.
3. PQT DSL Execution (VoteAcceptor
)
If validation passes, the system executes the voting_pqt_dsl
.
Input includes the vote and all 3 data-classes (core, subject spec, evaluation spec).
If PQT returns true
, the vote is saved with qualified = true
; else, qualified = false
.
4. Database Write
Vote is inserted into the votes
collection, e.g.:
{
"vote_id": "vote_123",
"social_task_id": "task_xyz123",
"submitter_subject_id": "subject_456",
"vote_data": { "selected_option": "A" },
"submission_time": "2025-06-11T12:00:00",
"qualified": true
}
5. Trigger Evaluation Check
After storing the vote, the system invokes VotingEvaluationInitiator
to check if evaluation conditions are met.
Evaluation Job Execution Flow
This section outlines how the system triggers and processes evaluation once voting is complete.
Triggering Conditions
Triggered from VotingEvaluationInitiator
under two conditions:
Task Type | Trigger Condition |
---|---|
private |
All expected voters have submitted votes |
public |
Vote count reaches threshold N |
Job Launch: Kubernetes
A Kubernetes job is created dynamically using the Python kubernetes
client.
Job Configuration
Setting | Value |
---|---|
Namespace | votes-evalaution |
Job Name | Equal to social_task_id |
Container Image | agentspacev1/votes-evaluator:v1 |
Env Variables | SOCIAL_TASK_ID , MONGO_URL , ORG_NATS_URL |
The system ensures the namespace exists before launching the job.
Job Logic: Evaluator Container
Inside the container, the following classes run sequentially:
1. Initiator
Fetches from DB:
SocialTaskCoreData
SocialChoiceSubjectSpecInput
SocialChoiceEvaluationInput
- All qualified votes
2. DSLEvaluator
Executes in order:
choice_evaluation_dsl
- (optional)
tie_breaker_dsl
post_awarding_dsl
Returns:
winners
post_award_payload
- Intermediate DSL outputs
3. PostProcessor
- Builds final
report_json
including task data, votes, DSL results, and winners - Saves report and updates task status to
"complete"
in DB
4. NotificationSender
- Publishes NATS event of type
winner_announcement
-
Includes:
-
winners
post_award_payload
- Full context metadata
Report Structure
Once a voting task completes, a comprehensive report JSON is generated and stored in the report_json
field of the corresponding SocialTaskCoreData
entry. This report captures the full evaluation context and can be downloaded or streamed via APIs.
Accessing the Report
- REST API:
GET /get-report/<social_task_id>
- WebSocket Streaming: Returned in live updates (if enabled)
Report JSON Format
{
"social_task_id": "task_xyz123",
"core_data": { ... },
"subject_spec": { ... },
"evaluation_spec": { ... },
"votes": [ ... ],
"winners": [ "subject_456" ],
"post_award_payload": { ... },
"dsl_outputs": {
"choice_evaluation_output": { ... },
"tie_breaker_output": { ... },
"post_awarding_output": { ... }
}
}
Field-by-Field Explanation
Field | Type | Description |
---|---|---|
social_task_id |
string |
The unique ID of the social voting task |
core_data |
object |
Full contents of SocialTaskCoreData as submitted |
subject_spec |
object |
Full contents of SocialChoiceSubjectSpecInput |
evaluation_spec |
object |
Full contents of SocialChoiceEvaluationInput |
votes |
list |
List of all submitted and qualified vote records |
winners |
list |
Final winner(s) selected post evaluation and (if needed) tie-breaking |
post_award_payload |
object |
Output of post_awarding_dsl , typically used for notifications |
dsl_outputs |
object |
Intermediate DSL outputs for transparency and debugging |
Notes
- All DSL outputs are optional but recommended for audit and transparency.
- The structure is fixed and designed to support machine readability for downstream systems.