Kuyruk Class

class kuyruk.Kuyruk(config: Config | None = None)

Provides task() decorator to convert a function into a Task.

Provides channel() context manager for opening a new channel on the connection. Connection is opened when the first channel is created and kept open.

Parameters:

config – Must be an instance of Config. If None, default config is used. See Config for default values.

channel() Iterator[Channel]

Returns a new channel created on the uderlying connection as a context manager.

connection() Iterator[Connection]

Returns underlying AMQP connection as a context manager. Connection object is locked while in context and cannot be used by other threads. If you need a connection for a longer duration, use new_connection method.

new_connection() Iterator[Connection]

Returns new AMQP connection as a context manager. Connection is closed when context manager exits.

task(queue: str = 'kuyruk', **kwargs: Any) Callable

Wrap functions with this decorator to convert them to tasks. After wrapping, calling the function will send a message to a queue instead of running the function.

Parameters:
  • queue – Queue name for the tasks.

  • kwargs – Keyword arguments will be passed to Task constructor.

Returns:

Callable Task object wrapping the original function.

Task Class

class kuyruk.Task(f: Callable, kuyruk: Kuyruk, queue: str, retry: int = 0, max_run_time: int = None)

Calling a Task object serializes the task to JSON and sends it to the queue.

Parameters:
  • retry – Retry this times before give up. The failed task will be retried in the same worker.

  • max_run_time – Maximum allowed time in seconds for task to complete.

apply(*args: Any, **kwargs: Any) Any

Called by workers to run the wrapped function. You may call it yourself if you want to run the task in current process without sending to the queue.

If task has a retry property it will be retried on failure.

If task has a max_run_time property the task will not be allowed to run more than that.

property name: str

Full path to the task in the form of <module>.<function>. Workers find and import tasks by this path.

send_to_queue(args: Tuple = (), kwargs: Dict[str, Any] = {}, host: str | None = None, wait_result: int | float | None = None, message_ttl: int | float | None = None) Any

Sends a message to the queue. A worker will run the task’s function when it receives the message.

Parameters:
  • args – Arguments that will be passed to task on execution.

  • kwargs – Keyword arguments that will be passed to task on execution.

  • host – Send this task to specific host. host will be appended to the queue name. If host is “localhost”, hostname of the server will be appended to the queue name.

  • wait_result – Wait for result from worker for wait_result seconds. If timeout occurs, ResultTimeout is raised. If excecption occurs in worker, RemoteException is raised.

  • message_ttl – If set, message will be destroyed in queue after message_ttl seconds.

Returns:

Result from worker if wait_result is set, else None.

Config Class

class kuyruk.Config

Kuyruk configuration object. Default values are defined as class attributes. Additional attributes may be added by extensions.

EAGER = False

Run tasks in the process without sending to queue. Useful in tests.

RABBIT_CONNECT_TIMEOUT = 5

TCP connect timeout.

RABBIT_HEARTBEAT = 60

Heartbeat interval value proposed by client.

RABBIT_HOST = 'localhost'

RabbitMQ host.

RABBIT_IDLE_DURATION = 60

RabbitMQ connection is closed if it’s not active for this duration.

RABBIT_PASSWORD = 'guest'

RabbitMQ password.

RABBIT_PORT = 5672

RabbitMQ port.

RABBIT_READ_TIMEOUT = 5

TCP read timeout.

RABBIT_SSL = False

RabbitMQ connectin uses SSL if True.

RABBIT_USER = 'guest'

RabbitMQ user.

RABBIT_VIRTUAL_HOST = '/'

RabbitMQ virtual host.

RABBIT_WRITE_TIMEOUT = 5

TCP write timeout.

TCP_USER_TIMEOUT = 60

Socket option to specify max seconds before a TCP connection is aborted.

WORKER_LOGGING_LEVEL = 'INFO'

Logging level of root logger.

WORKER_MAX_LOAD = None

Pause consuming queue when the load goes above this level.

WORKER_MAX_RUN_TIME = None

Gracefully shutdown worker after running this seconds.

WORKER_RECONNECT_INTERVAL = 5

Number of seconds to wait after a connection error.

from_dict(d: Dict[str, Any]) None

Load values from a dict.

from_env_vars() None

Load values from environment variables. Keys must start with KUYRUK_.

from_object(obj: str | Any) None

Load values from an object.

from_pyfile(filename: str) None

Load values from a Python file.

Exceptions

exception kuyruk.exceptions.Discard

The task may raise this if it does not want to process the message. The message will be dropped.

exception kuyruk.exceptions.HeartbeatError

Raised when there is problem while processing heartbeats during task execution. This class is not derived from Exception to prevent being catched accidentally by application code.

exception kuyruk.exceptions.KuyrukError

Base class for Kuyruk exceptions.

exception kuyruk.exceptions.Reject

The task may raise this if it does not want to process the message. The message will be requeued and delivered to another worker.

exception kuyruk.exceptions.RemoteException(type_: Type, value: Exception, traceback: TracebackType)

Raised from kuyruk.Task.send_to_queue() if wait_result is set and exception is raised on the worker while running the task.

exception kuyruk.exceptions.ResultTimeout

Raised from kuyruk.Task.send_to_queue() if wait_result is set and reply is not received in wait_result seconds.

exception kuyruk.exceptions.Timeout

Raised if a task exceeds it’s allowed run time.