Kuyruk Class

class kuyruk.Kuyruk(config: kuyruk.config.Config = None) → None

Provides task() decorator to convert a function into a Task.

Provides channel() context manager for opening a new channel on the connection. Connection is opened when the first channel is created.

Parameters:config – Must be an instance of Config. If None, default config is used. See Config for default values.
channel() → typing.Iterator[amqp.channel.Channel]

Returns a new channel from a new connection as a context manager.

connection() → typing.Iterator[amqp.connection.Connection]

Returns a new connection as a context manager.

task(queue: str = 'kuyruk', **kwargs: typing.Any) → typing.Callable

Wrap functions with this decorator to convert them to tasks. After wrapping, calling the function will send a message to a queue instead of running the function.

  • queue – Queue name for the tasks.
  • kwargs – Keyword arguments will be passed to Task constructor.

Callable Task object wrapping the original function.

Task Class

class kuyruk.Task

Calling a Task object serializes the task to JSON and sends it to the queue.

  • retry – Retry this times before give up. The failed task will be retried in the same worker.
  • max_run_time – Maximum allowed time in seconds for task to complete.
apply(*args: typing.Any, **kwargs: typing.Any) → typing.Any

Called by workers to run the wrapped function. You may call it yourself if you want to run the task in current process without sending to the queue.

If task has a retry property it will be retried on failure.

If task has a max_run_time property the task will not be allowed to run more than that.


Full path to the task in the form of <module>.<function>. Workers find and import tasks by this path.

send_to_queue(args: typing.Tuple = (), kwargs: typing.Dict[str, typing.Any] = {}, host: str = None, wait_result: typing.Union[int, float] = None, message_ttl: typing.Union[int, float] = None) → typing.Any

Sends a message to the queue. A worker will run the task’s function when it receives the message.

  • args – Arguments that will be passed to task on execution.
  • kwargs – Keyword arguments that will be passed to task on execution.
  • host – Send this task to specific host. host will be appended to the queue name. If host is “localhost”, hostname of the server will be appended to the queue name.
  • wait_result – Wait for result from worker for wait_result seconds. If timeout occurs, ResultTimeout is raised. If excecption occurs in worker, RemoteException is raised.
  • message_ttl – If set, message will be destroyed in queue after message_ttl seconds.

Result from worker if wait_result is set, else None.

Config Class

class kuyruk.Config

Kuyruk configuration object. Default values are defined as class attributes. Additional attributes may be added by extensions.

EAGER = False

Run tasks in the process without sending to queue. Useful in tests.

RABBIT_HOST = 'localhost'

RabbitMQ host.


RabbitMQ password.


RabbitMQ port.

RABBIT_USER = 'guest'

RabbitMQ user.


RabbitMQ virtual host.


Logging level of root logger.


Pause consuming queue when the load goes above this level.


Gracefully shutdown worker after running this seconds.

from_dict(d: typing.Dict[str, typing.Any]) → None

Load values from a dict.

from_env_vars() → None

Load values from environment variables. Keys must start with KUYRUK_.

from_object(obj: typing.Any) → None

Load values from an object.

from_pyfile(filename: str) → None

Load values from a Python file.


exception kuyruk.exceptions.Discard

The task may raise this if it does not want to process the message. The message will be dropped.

exception kuyruk.exceptions.HeartbeatError(exc_info: typing.Tuple[typing.Type[BaseException], BaseException, traceback]) → None

Raised when there is problem while sending heartbeat during task execution.

exception kuyruk.exceptions.KuyrukError

Base class for Kuyruk exceptions.

exception kuyruk.exceptions.Reject

The task may raise this if it does not want to process the message. The message will be requeued and delivered to another worker.

exception kuyruk.exceptions.RemoteException(type_: typing.Type, value: Exception, traceback: traceback) → None

Raised from kuyruk.Task.send_to_queue() if wait_result is set and exception is raised on the worker while running the task.

exception kuyruk.exceptions.ResultTimeout

Raised from kuyruk.Task.send_to_queue() if wait_result is set and reply is not received in wait_result seconds.

exception kuyruk.exceptions.Timeout

Raised if a task exceeds it’s allowed run time.