Skip to content

Taskiq & Matrix

Key Components

  • The Matrix Protocol
  • Taskiq
  • Synapse

The Matrix Protocol

  • Decentralized & Federation support

This allows different Matrix Servers to communicate with eachother, similar to how email works with different servers. It also allows different accounts to have replicated data available in different servers so that if one server goes down any other server linked will still have access to the replicated data.

  • End-to-End Encryption Support

  • Implementations Snynapse (Python) Currently used Other not implemented options include: Dendrite (GO), Conduit (Rust), Maelstrom (Rust)

Taskiq matrix

Leverages the Matrix chat protocol to have processes or python programs to listen for messages and execute some task. This is done by workers which are devices or computers connected to the server.

Tasks are “kicked” (kick sends a task into a room in Synapse) onto a queue that are then consumed by the worker

3 Different kinds of tasks

  • Mutex (1 worker runs 1 task) (In a pool of devices only one worker will run the task to guarantee that the task is ran only 1 time)

  • Broadcast (everyone runs this task once)

  • Device (This device runs the task once)

Fundamentals of Taskiq

  • Broker is an interface for sending and receiving tasks from some external source In our case Synapse which is an implementation of matrix

  • Kick sends a task into a room in Synapse with a message to a worker/device that it knows how to read and understand

  • Listen returns tasks that have been kicked to Synapse rooms. Tasks are then given to workers. by doing this the worker is consuming from the broker

  • Result Backend is simply like the broker. We're defining an interface that lets us ask for task and check like the status of them. Was the task successful? Is it still in progress? Is there still a result yet?

Application

Launching a worker

taskiq worker fractal_database_matrix.broker:broker -fsd

Launches a worker that uses the broker defined in fractal_database_matrix.broker -fsd tells the worker to discober all tasks in the current directory and all child directories

Tasks can be any sort of function that must be wrapped by @broker.task() This will register the task within the given broker. Theses tasks can be optionally labeled to indicate which queue the task should be kicked into

Kicking a mutex task

First need to import the broker using from fractal_database_matrix.broker import broker to know which broker to kick the task with. Need the room id. In this case it is !work:homeserver.org Done this way because you can’t really pass labels in kiq

Kicks the add task as a Mutex task by specifying the label queue=”mutex"

kiq() is synonymous with calling the function normally. Returns a task object that has a wait_result method.

wait_result will wait for the result to be returned before continuing.

When you kick a task like this the taskiq interface will create a task and assign a unique task id. After kicking, the task is sent to the room_id provided. Since queue=”mutex” workers will attempt a lock on the task id. One worker will get the lock and execute the task and set the result.

For example if the task id is 1234 after the task is executed the worker will set the result in the Result Backend by resulta_backend.set_result(“1234”, result=result) The result of the add task is set in the Result backend as taskiq.result.1234 the result message is then sent into the room where the task was kicked in this case !work:homeserver.org

result = task.wait_result() asks the result backend if the task 1234 has a result. The result backend then asks the matrix server do you have a message that is taskiq.result.1234? The server will respond if the message is there or not. If it is not there yet the result backend will wait for the message from the server and will return the result when ready.