Kuyruk Class¶
- class kuyruk.Kuyruk(config: Config | None = None)¶
Provides
task()
decorator to convert a function into aTask
.Provides
channel()
context manager for opening a new channel on the connection. Connection is opened when the first channel is created and kept open.- Parameters:
config – Must be an instance of
Config
. IfNone
, default config is used. SeeConfig
for default values.
- channel() Iterator[Channel] ¶
Returns a new channel created on the uderlying connection as a context manager.
- connection() Iterator[Connection] ¶
Returns underlying AMQP connection as a context manager. Connection object is locked while in context and cannot be used by other threads. If you need a connection for a longer duration, use
new_connection
method.
- new_connection() Iterator[Connection] ¶
Returns new AMQP connection as a context manager. Connection is closed when context manager exits.
- task(queue: str = 'kuyruk', **kwargs: Any) Callable ¶
Wrap functions with this decorator to convert them to tasks. After wrapping, calling the function will send a message to a queue instead of running the function.
Task Class¶
- class kuyruk.Task(f: Callable, kuyruk: Kuyruk, queue: str, retry: int = 0, max_run_time: int = None)¶
Calling a
Task
object serializes the task to JSON and sends it to the queue.- Parameters:
retry – Retry this times before give up. The failed task will be retried in the same worker.
max_run_time – Maximum allowed time in seconds for task to complete.
- apply(*args: Any, **kwargs: Any) Any ¶
Called by workers to run the wrapped function. You may call it yourself if you want to run the task in current process without sending to the queue.
If task has a retry property it will be retried on failure.
If task has a max_run_time property the task will not be allowed to run more than that.
- property name: str¶
Full path to the task in the form of <module>.<function>. Workers find and import tasks by this path.
- send_to_queue(args: Tuple = (), kwargs: Dict[str, Any] = {}, host: str | None = None, wait_result: int | float | None = None, message_ttl: int | float | None = None) Any ¶
Sends a message to the queue. A worker will run the task’s function when it receives the message.
- Parameters:
args – Arguments that will be passed to task on execution.
kwargs – Keyword arguments that will be passed to task on execution.
host – Send this task to specific host.
host
will be appended to the queue name. Ifhost
is “localhost”, hostname of the server will be appended to the queue name.wait_result – Wait for result from worker for
wait_result
seconds. If timeout occurs,ResultTimeout
is raised. If excecption occurs in worker,RemoteException
is raised.message_ttl – If set, message will be destroyed in queue after
message_ttl
seconds.
- Returns:
Result from worker if
wait_result
is set, elseNone
.
Config Class¶
- class kuyruk.Config¶
Kuyruk configuration object. Default values are defined as class attributes. Additional attributes may be added by extensions.
- EAGER = False¶
Run tasks in the process without sending to queue. Useful in tests.
- RABBIT_CONNECT_TIMEOUT = 5¶
TCP connect timeout.
- RABBIT_HEARTBEAT = 60¶
Heartbeat interval value proposed by client.
- RABBIT_HOST = 'localhost'¶
RabbitMQ host.
- RABBIT_IDLE_DURATION = 60¶
RabbitMQ connection is closed if it’s not active for this duration.
- RABBIT_PASSWORD = 'guest'¶
RabbitMQ password.
- RABBIT_PORT = 5672¶
RabbitMQ port.
- RABBIT_READ_TIMEOUT = 5¶
TCP read timeout.
- RABBIT_SSL = False¶
RabbitMQ connectin uses SSL if True.
- RABBIT_USER = 'guest'¶
RabbitMQ user.
- RABBIT_VIRTUAL_HOST = '/'¶
RabbitMQ virtual host.
- RABBIT_WRITE_TIMEOUT = 5¶
TCP write timeout.
- TCP_USER_TIMEOUT = 60¶
Socket option to specify max seconds before a TCP connection is aborted.
- WORKER_LOGGING_LEVEL = 'INFO'¶
Logging level of root logger.
- WORKER_MAX_LOAD = None¶
Pause consuming queue when the load goes above this level.
- WORKER_MAX_RUN_TIME = None¶
Gracefully shutdown worker after running this seconds.
- WORKER_RECONNECT_INTERVAL = 5¶
Number of seconds to wait after a connection error.
- from_dict(d: Dict[str, Any]) None ¶
Load values from a dict.
- from_env_vars() None ¶
Load values from environment variables. Keys must start with KUYRUK_.
- from_object(obj: str | Any) None ¶
Load values from an object.
- from_pyfile(filename: str) None ¶
Load values from a Python file.
Exceptions¶
- exception kuyruk.exceptions.Discard¶
The task may raise this if it does not want to process the message. The message will be dropped.
- exception kuyruk.exceptions.HeartbeatError¶
Raised when there is problem while processing heartbeats during task execution. This class is not derived from Exception to prevent being catched accidentally by application code.
- exception kuyruk.exceptions.KuyrukError¶
Base class for Kuyruk exceptions.
- exception kuyruk.exceptions.Reject¶
The task may raise this if it does not want to process the message. The message will be requeued and delivered to another worker.
- exception kuyruk.exceptions.RemoteException(type_: Type, value: Exception, traceback: TracebackType)¶
Raised from
kuyruk.Task.send_to_queue()
ifwait_result
is set and exception is raised on the worker while running the task.
- exception kuyruk.exceptions.ResultTimeout¶
Raised from
kuyruk.Task.send_to_queue()
ifwait_result
is set and reply is not received inwait_result
seconds.
- exception kuyruk.exceptions.Timeout¶
Raised if a task exceeds it’s allowed run time.