Doctests

Queue API

senaite.queue comes with an api to facilitate the interaction with queue.

Running this test from the buildout directory:

bin/test test_textual_doctests -t API

Test Setup

Needed imports:

>>> import transaction
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from senaite.queue import api
>>> from senaite.queue.queue import QueueTask
>>> from senaite.queue.tests import utils as test_utils
>>> from bika.lims import api as _api
>>> from plone import api as plone_api
>>> from zope import globalrequest

Functional Helpers:

>>> def new_sample():
...     return test_utils.create_sample([Cu], client, contact,
...                                     sampletype, receive=False)

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Retrieve the Queue Utility

The queue utility is the engine from senaite.queue that is responsible of providing access to the queue storage. Unless the current zeo client is configured to act as the queue’s server, api.get_queue() always returns the client queue utility:

>>> api.get_queue()
<senaite.queue.client.utility.ClientQueueUtility object at...

If we configure the current zeo client as the server, we get the server queue utility instead:

>>> api.is_queue_server()
False
>>> api.get_server_url()
'http://localhost:8080/senaite'
>>> key = "senaite.queue.server"
>>> plone_api.portal.set_registry_record(key, u'http://nohost/plone')
>>> transaction.commit()
>>> api.get_queue()
<senaite.queue.server.utility.ServerQueueUtility object at...
>>> api.is_queue_server()
True
>>> api.get_server_url()
'http://nohost/plone'

Both utility queues provide same interface and same behavior is expected, regardless of the type of QueueUtility. See ClientQueueUtility.rst and ServerQueueUtility.rst doctests for additional information.

Queue status

We can check the queue status:

>>> api.get_queue_status()
'ready'

We can even use the helper is_queue_ready:

>>> api.is_queue_ready()
True

Queue might be enabled, but not ready:

>>> api.is_queue_enabled()
True

Enable/Disable queue

The queue can be disabled and enabled from Site Setup > Queue Settings:

>>> key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(key, 0)
>>> api.is_queue_enabled()
False
>>> api.is_queue_ready()
False
>>> api.get_queue_status()
'disabled'

We can re-enable the queue by defining the default’s chunk size:

>>> plone_api.portal.set_registry_record(key, 10)
>>> api.is_queue_enabled()
True
>>> api.is_queue_ready()
True
>>> api.get_queue_status()
'ready'

Add a task

We can add a task without the need of retrieving the queue utility or without the need of creating a QueueTask object:

>>> sample = new_sample()
>>> kwargs = {"action": "receive"}
>>> task = api.add_task("task_action_receive", sample)
>>> isinstance(task, QueueTask)
True
>>> api.get_queue().get_tasks()
[{...}]
>>> len(api.get_queue())
1

Add an action task

Tasks for workflow actions are quite common. Therefore, a specific function for actions is also available:

>>> task = api.add_action_task(sample, "submit")
>>> isinstance(task, QueueTask)
True
>>> len(api.get_queue())
2

Add assign action task

The action “assign” (for analyses) requires not only the worksheet, but also the list of analyses to be assigned and the slot positions as well. Therefore, a helper function to make it easier is also available:

>>> worksheet = _api.create(portal.worksheets, "Worksheet")
>>> analyses = sample.getAnalyses(full_objects=True)
>>> task = api.add_assign_task(worksheet, analyses)
>>> isinstance(task, QueueTask)
True
>>> len(api.get_queue())
3

Check if an object is queued

>>> new_sample = new_sample()
>>> api.is_queued(new_sample)
False
>>> api.is_queued(sample)
True
>>> api.is_queued(worksheet)
True

Flush the queue

Flush the queue to make room for other tests:

>>> test_utils.flush_queue(browser, self.request)

Server’s Queue utility

The IServerQueueUtility is an utility that acts as a singleton and is used to store and keep track of tasks added by queue clients and the delivery of tasks to consumers.

This utility is only used by the zeo instance that acts as the Queue Server. The rest (consumers and queue clients), use IClientQueueUtility instead.

Running this test from the buildout directory:

bin/test test_textual_doctests -t ServerQueueUtility

Test Setup

Needed imports:

>>> import binascii
>>> import os
>>> import time
>>> from bika.lims import api as _api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from senaite.queue.interfaces import IQueueUtility
>>> from senaite.queue.interfaces import IServerQueueUtility
>>> from senaite.queue.queue import new_task
>>> from senaite.queue.tests import utils as test_utils
>>> from zope.component import getUtility

Functional Helpers:

>>> def new_sample():
...     return test_utils.create_sample([Cu], client, contact,
...                                     sampletype, receive=False)

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Retrieve the server’s queue utility

The server queue utility provides all the functionalities required to manage the queue and the tasks from the server side. Is a global utility, acting like a singleton, and it guarantees no conflicts when multiple threads from same instance (zeo client) operate against. To retrieve the server utility:

>>> getUtility(IServerQueueUtility)
<senaite.queue.server.utility.ServerQueueUtility object at...

Server utility implements IQueueUtility interface too:

>>> utility = getUtility(IServerQueueUtility)
>>> IQueueUtility.providedBy(utility)
True

Add a task

The queue server does not have any task awaiting yet:

>>> utility.is_empty()
True

Create a new object and the task first:

>>> sample = new_sample()
>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)

Add the new task to the utility

>>> added_task = utility.add(task)
>>> added_task == task
True
>>> utility.is_empty()
False
>>> len(utility)
1

Only tasks from QueueTask type are supported:

>>> utility.add("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Adding an existing task has no effect:

>>> dummy = utility.add(task)
>>> dummy is None
True
>>> len(utility)
1

However, we can add another task for same context and with same name:

>>> kwargs = {"action": "receive"}
>>> copy_task = new_task("task_action_receive", sample, **kwargs)
>>> utility.add(copy_task) == copy_task
True
>>> len(utility)
2

But is not possible to add a new task for same context and task name when the unique wildcard is used:

>>> kwargs = {"action": "receive", "unique": True}
>>> unique_task = new_task("task_action_receive", sample, **kwargs)
>>> utility.add(unique_task) is None
True
>>> len(utility)
2

Delete a task

We can delete a task directly:

>>> utility.delete(copy_task)
>>> len(utility)
1

Or by using its task uid:

>>> added = utility.add(copy_task)
>>> len(utility)
2
>>> utility.delete(copy_task.task_uid)
>>> len(utility)
1

Get a task

We can retrieve the task we added before by it’s uid:

>>> retrieved_task = utility.get_task(task.task_uid)
>>> retrieved_task == task
True

If we ask for a task that does not exist, returns None:

>>> dummy_uid = binascii.hexlify(os.urandom(16))
>>> utility.get_task(dummy_uid) is None
True

If we ask for something that is not a valid uid, we get an exception:

>>> utility.get_task("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Get tasks

Or we can get all the tasks the utility contains:

>>> tasks = utility.get_tasks()
>>> tasks
[{...}]
>>> task in tasks
True
>>> len(tasks)
1

Get tasks by status

We can even get the tasks filtered by their status:

>>> utility.get_tasks(status=["queued", "running"])
[{...}]
>>> utility.get_tasks(status="queued")
[{...}]
>>> utility.get_tasks(status="running")
[]

Get tasks by context

Or we can get the task by context:

>>> utility.get_tasks_for(sample)
[{...}]
>>> utility.get_tasks_for(_api.get_uid(sample))
[{...}]
>>> utility.get_tasks_for(task.task_uid)
[]
>>> utility.get_tasks_for("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Get tasks by context and task name

>>> utility.get_tasks_for(sample, name="task_action_receive")
[{...}]
>>> utility.get_tasks_for(sample, name="dummy")
[]

Get objects uids from tasks

We can also ask for all the uids from objects the utility contains:

>>> uids = utility.get_uids()
>>> len(uids)
1
>>> _api.get_uid(sample) in uids
True
>>> task.task_uid in uids
False

Ask if a task exists

>>> utility.has_task(task)
True
>>> utility.has_task(task.task_uid)
True
>>> utility.has_task(_api.get_uid(sample))
False
>>> utility.has_task("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Ask if a task for a context exists

>>> utility.has_tasks_for(sample)
True
>>> utility.has_tasks_for(_api.get_uid(sample))
True
>>> utility.has_tasks_for(task.task_uid)
False
>>> utility.has_tasks_for("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Ask if a task for a context and name exists

>>> utility.has_tasks_for(sample, name="task_action_receive")
True
>>> utility.has_tasks_for(sample, name="dummy")
False

Pop a task

When a task is popped, the utility changes the status of the task to “running”, cause expects the task has been popped for consumption:

>>> consumer_id = u'http://nohost'
>>> popped = utility.pop(consumer_id)
>>> popped.status
'running'

We can still add new tasks at the same time, even if they are for same context and with same name:

>>> kwargs = {"action": "receive"}
>>> copy_task = new_task("task_action_receive", sample, **kwargs)
>>> utility.add(copy_task) == copy_task
True

However, the server does not allow the consumer to pop more tasks until receives an acknowledgment that the previously popped task is done:

>>> utility.pop(consumer_id) is None
True

Even if we ask again:

>>> utility.pop(consumer_id) is None
True

Unless we wait for 10 seconds, when the server assumes the consumer failed while processing the task. Consumers always check that there is no thread running from their side before doing a pop(). Also, a consumer (that in fact, is a zeo client) might be stopped at some point. Therefore, this timeout mechanism is used as a safety fallback to prevent the queue to enter in a dead-lock:

>>> time.sleep(11)
>>> utility.pop(consumer_id) is None
True

The previous task is now re-queued by the server:

>>> popped = utility.get_task(popped.task_uid)
>>> popped.status
'queued'
>>> popped.get("error_message")
'Purged on pop (http://nohost)'

And a pop returns now the next task:

>>> next_task = utility.pop(consumer_id)
>>> next_task.status
'running'
>>> next_task.task_uid != popped.task_uid
True

Delete the newly added task, so we keep only one task in the queue for testing:

>>> utility.delete(next_task)
>>> len(utility)
1

If we try now to pop again, the task the queue server considered as timeout won’t be popped because the server adds a delay of 5 seconds before the task can be popped again. This mechanism prevents the queue to be jeopardized by recurrent failing tasks and makes room for other tasks to be processed:

>>> popped.get("delay")
5
>>> utility.pop(consumer_id) is None
True
>>> time.sleep(5)
>>> delayed = utility.pop(consumer_id)
>>> delayed.task_uid == popped.task_uid
True

Flush the queue:

>>> utility.delete(delayed)
>>> len(utility)
0

Task timeout

Create a new task:

>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)
>>> task = utility.add(task)

When a consumer thread in charge of processing a given task times out, it notifies the queue accordingly so the task is re-queued:

>>> running = utility.pop(consumer_id)
>>> running.status
'running'
>>> utility.timeout(running)
>>> queued = utility.get_task(running.task_uid)
>>> queued.task_uid == running.task_uid
True
>>> queued.status
'queued'
>>> queued.get("error_message")
'Timeout'

Each time a task is timed out, the number of seconds the system will wait for the thread in charge of processing the task to complete increases. This mechanism is used as a fall-back for when the processing of task takes longer than initially expected:

>>> queued.get("max_seconds") > running.get("max_seconds")
True

Flush the queue:

>>> utility.delete(queued)
>>> len(utility)
0

Task failure

Create a new task:

>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)
>>> task = utility.add(task)

If an error arises when processing a task, the consumer tells the server to mark the task as failed. By default, the queue server re-queues the task up to a pre-defined number of times before considering the task as failed. The most common reason why a task fails is because of a transaction commit conflict with a transaction taken place from userland. This mechanism is used as a safeguard for when the workload is high and tasks keep failing because of this.

Pop a task first:

>>> running = utility.pop(consumer_id)
>>> task_uid = running.task_uid
>>> running.status
'running'
>>> running.retries
3

Flag as failed and the number of retries decreases in one unit:

>>> utility.fail(running)
>>> failed = utility.get_task(running.task_uid)
>>> failed.task_uid == running.task_uid
True
>>> failed.retries
2
>>> failed.status
'queued'

When the number of retries reach 0, the server eventually considers the task as failed, its status becomes failed and cannot be popped anymore:

>>> time.sleep(5)
>>> failed = utility.pop(consumer_id)
>>> utility.fail(failed)
>>> failed = utility.get_task(failed.task_uid)
>>> failed.status
'queued'
>>> failed.retries
1
>>> time.sleep(5)
>>> failed = utility.pop(consumer_id)
>>> utility.fail(failed)
>>> failed = utility.get_task(failed.task_uid)
>>> failed.status
'queued'
>>> failed.retries
0
>>> time.sleep(5)
>>> failed = utility.pop(consumer_id)
>>> utility.fail(failed)
>>> failed = utility.get_task(failed.task_uid)
>>> failed.status
'failed'
>>> failed.retries
0
>>> time.sleep(5)
>>> utility.pop(consumer_id) is None
True

Flush the queue:

>>> utility.delete(failed)
>>> len(utility)
0

Task done

When the consumer notifies a task has been done to the server queue, the task is removed from the queue:

>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)
>>> task = utility.add(task)
>>> utility.has_task(task)
True
>>> running = utility.pop(consumer_id)
>>> utility.has_task(running)
True
>>> utility.done(running)
>>> utility.has_task(running)
False

Flush the queue

Flush the queue to make room for other tests:

>>> deleted = map(utility.delete, utility.get_tasks())

Client’s Queue utility

The IClientQueueUtility is an utility that acts as a singleton and is used as an interface to interact with the Server’s queue. It provides functions to add tasks to the queue and retrieve them.

This utility is used by the instances that either act as queue clients or consumers. The zeo instance that acts as the queue server uses IServerQueueUtility instead. IClientQueueUtility has a cache of tasks that keeps up-to-date with those from server’s queue through POST calls.

However, both utilities provide same interface, so developer does not need to worry about which utility is actually using: except for some particular cases involving failed and ghost tasks, their expected behaviour is exactly the same.

Running this test from the buildout directory:

bin/test test_textual_doctests -t ClientQueueUtility

Test Setup

Needed imports:

>>> import binascii
>>> import os
>>> import time
>>> import transaction
>>> from bika.lims import api as _api
>>> from plone import api as plone_api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from senaite.queue.interfaces import IQueueUtility
>>> from senaite.queue.interfaces import IClientQueueUtility
>>> from senaite.queue.interfaces import IServerQueueUtility
>>> from senaite.queue.queue import new_task
>>> from senaite.queue.tests import utils as test_utils
>>> from senaite.queue.tests.utils import RequestTestHandler
>>> from zope import globalrequest
>>> from zope.component import getUtility

Functional Helpers:

>>> def new_sample():
...     return test_utils.create_sample([Cu], client, contact,
...                                     sampletype, receive=False)

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)
>>> sample = new_sample()

Setup the current instance as the queue server too:

>>> key = "senaite.queue.server"
>>> host = u'http://nohost/plone'
>>> plone_api.portal.set_registry_record(key, host)
>>> transaction.commit()

Retrieve the client’s queue utility

The client queue utility provides all the functionalities required to manage the the queue from the client side. This utility interacts internally with the queue server via JSON API calls, but provides same interface. Therefore, the user should expect the same behavior no matter if is using the client’s queue or the server’s queue.

>>> getUtility(IClientQueueUtility)
<senaite.queue.client.utility.ClientQueueUtility object at...

Client utility implements IQueueUtility interface too:

>>> utility = getUtility(IClientQueueUtility)
>>> IQueueUtility.providedBy(utility)
True

The utility makes use of requests module to ask the queue server. We override the requests handler here for the doctests to mimic its behavior, but using plone.testing.z2.Browser ( instead:

>>> utility._req = RequestTestHandler(browser, self.request)

We we will also use the server’s queue utility to validate integrity:

>>> s_utility = getUtility(IServerQueueUtility)

Add a task

The queue client does not have any task awaiting yet:

>>> utility.is_empty()
True

Add a task for a sample:

>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)

Add the new task:

>>> utility.add(task) == task
True
>>> utility.is_empty()
False
>>> len(utility)
1

The server queue contains the task as well:

>>> len(s_utility)
1
>>> s_utility.has_task(task)
True

Only tasks from QueueTask type are supported:

>>> utility.add("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Adding an existing task has no effect:

>>> utility.add(task) is None
True
>>> len(utility)
1
>>> len(s_utility)
1

However, we can add another task for same context and with same name:

>>> kwargs = {"action": "receive", "test": "test"}
>>> copy_task = new_task("task_action_receive", sample, **kwargs)
>>> utility.add(copy_task) == copy_task
True
>>> len(utility)
2
>>> len(s_utility)
2

But is not possible to add a new task for same context and task name when the unique wildcard is used:

>>> kwargs = {"action": "receive", "unique": True}
>>> unique_task = new_task("task_action_receive", sample, **kwargs)
>>> utility.add(unique_task) is None
True
>>> len(utility)
2

The server queue contains the two tasks as well:

>>> len(s_utility)
2
>>> all(map(s_utility.has_task, utility.get_tasks()))
True

Delete a task

We can delete a task directly:

>>> utility.delete(copy_task)
>>> len(utility)
1

And the task gets removed from the server’s queue as well:

>>> len(s_utility)
1

We can also delete a task by using the task uid:

>>> added = utility.add(copy_task)
>>> len(utility)
2
>>> len(s_utility)
2
>>> utility.delete(copy_task.task_uid)
>>> len(utility)
1
>>> len(s_utility)
1

Get a task

We can retrieve the task we added before by it’s uid:

>>> retrieved_task = utility.get_task(task.task_uid)
>>> retrieved_task == task
True

If we ask for a task that does not exist, returns None:

>>> dummy_uid = binascii.hexlify(os.urandom(16))
>>> utility.get_task(dummy_uid) is None
True

If we ask for something that is not a valid uid, we get an exception:

>>> utility.get_task("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Get tasks

Or we can get all the tasks the utility contains:

>>> tasks = utility.get_tasks()
>>> tasks
[{...}]
>>> task in tasks
True
>>> len(tasks)
1

Get tasks by status

We can even get the tasks filtered by their status:

>>> utility.get_tasks(status=["queued", "running"])
[{...}]
>>> utility.get_tasks(status="queued")
[{...}]
>>> utility.get_tasks(status="running")
[]

Get tasks by context

Or we can get the task by context:

>>> utility.get_tasks_for(sample)
[{...}]
>>> utility.get_tasks_for(_api.get_uid(sample))
[{...}]
>>> utility.get_tasks_for(task.task_uid)
[]
>>> utility.get_tasks_for("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Get tasks by context and task name

>>> utility.get_tasks_for(sample, name="task_action_receive")
[{...}]
>>> utility.get_tasks_for(sample, name="dummy")
[]

Get objects uids from tasks

We can also ask for all the uids from objects the utility contains:

>>> uids = utility.get_uids()
>>> len(uids)
1
>>> _api.get_uid(sample) in uids
True
>>> task.task_uid in uids
False

Ask if a task exists

>>> utility.has_task(task)
True
>>> utility.has_task(task.task_uid)
True
>>> utility.has_task(_api.get_uid(sample))
False
>>> utility.has_task("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Ask if a task for a context exists

>>> utility.has_tasks_for(sample)
True
>>> utility.has_tasks_for(_api.get_uid(sample))
True
>>> utility.has_tasks_for(task.task_uid)
False
>>> utility.has_tasks_for("dummy")
Traceback (most recent call last):
[...]
ValueError: 'dummy' is not supported

Ask if a task for a context and name exists

>>> utility.has_tasks_for(sample, name="task_action_receive")
True
>>> utility.has_tasks_for(sample, name="dummy")
False

Synchronize with queue server

If we add a task directly to the server’s queue:

>>> kwargs = {"action": "receive"}
>>> server_task = new_task("task_action_receive", sample, **kwargs)
>>> s_utility.add(server_task) == server_task
True
>>> s_utility.has_task(server_task)
True
>>> len(s_utility)
2

The task is not in client’s queue local pool:

>>> server_task in utility.get_tasks()
False

However, the client queue falls back to a search against server’s queue when asked for an specific task that does not exist in the local pool:

>>> utility.has_task(server_task)
True
>>> utility.get_task(server_task.task_uid)
{...}

Client queue’s local pool of tasks can be easily synchronized with the tasks from the server’s queue:

>>> len(utility)
1
>>> utility.sync()
>>> len(utility)
2
>>> server_task in utility.get_tasks()
True
>>> all(map(s_utility.has_task, utility.get_tasks()))
True

When the task status in the server is “running”, the corresponding task of the local pool is always updated on synchronization:

>>> consumer_id = u'http://nohost'
>>> running = s_utility.pop(consumer_id)
>>> running.status
'running'
>>> local_task = utility.get_task(running.task_uid)
>>> local_task.status
'queued'
>>> utility.sync()
>>> local_task = utility.get_task(running.task_uid)
>>> local_task.status
'running'

Flush the queue:

>>> deleted = map(utility.delete, utility.get_tasks())
>>> len(utility)
0
>>> len(s_utility)
0

Pop a task

Add a new task to the queue:

>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)
>>> utility.add(task) == task
True

When a task is popped, the utility changes the status of the task to “running”, cause expects that the task has been popped for consumption:

>>> consumer_id = u'http://nohost'
>>> popped = utility.pop(consumer_id)
>>> popped.status
'running'

We can still add new tasks at the same time, even if they are for same context and with same name:

>>> kwargs = {"action": "receive"}
>>> copy_task = new_task("task_action_receive", sample, **kwargs)
>>> utility.add(copy_task) == copy_task
True

However, is not allowed to consume more more tasks unless the queue server receives an acknowledgment that the previously popped task is done:

>>> utility.pop(consumer_id) is None
True

Even if we ask again:

>>> utility.pop(consumer_id) is None
True

Unless we wait for 10 seconds, when the server assumes the consumer failed while processing the task. Consumers always check that there is no thread running from their side before doing a pop(). Also, a consumer (that in fact, is a zeo client) might be stopped at some point. Therefore, this timeout mechanism is used as a safety fallback to prevent the queue to enter in a dead-lock:

>>> time.sleep(11)
>>> utility.pop(consumer_id) is None
True

The previous task is now re-queued:

>>> popped = utility.get_task(popped.task_uid)
>>> popped.status
'queued'
>>> popped.get("error_message")
'Purged on pop (http://nohost)'

And a pop returns now the next task:

>>> next_task = utility.pop(consumer_id)
>>> next_task.status
'running'
>>> next_task.task_uid != popped.task_uid
True

Delete the newly added task, so we keep only one task in the queue for testing:

>>> utility.delete(next_task)
>>> len(utility)
1

If we try now to pop again, the task the queue server considered as timeout won’t be popped because the server adds a delay of 5 seconds before the task can be popped again. This mechanism prevents the queue to be jeopardized by recurrent failing tasks and makes room for other tasks to be processed:

>>> popped.get("delay")
5
>>> utility.pop(consumer_id) is None
True
>>> time.sleep(5)
>>> delayed = utility.pop(consumer_id)
>>> delayed.task_uid == popped.task_uid
True

Flush the queue:

>>> utility.delete(delayed)
>>> len(utility)
0

Task timeout

Create a new task:

>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)
>>> task = utility.add(task)

When a consumer thread in charge of processing a given task times out, it notifies the queue accordingly so the task is re-queued:

>>> running = utility.pop(consumer_id)
>>> running.status
'running'
>>> utility.timeout(running)
>>> queued = utility.get_task(running.task_uid)
>>> queued.task_uid == running.task_uid
True
>>> queued.status
'queued'
>>> queued.get("error_message")
'Timeout'

Each time a task is timed out, the number of seconds the system will wait for the thread in charge of processing the task to complete increases. This mechanism is used as a fall-back for when the processing of task takes longer than initially expected:

>>> queued.get("max_seconds") > running.get("max_seconds")
True

Flush the queue:

>>> utility.delete(queued)
>>> len(utility)
0

Task failure

Create a new task:

>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)
>>> task = utility.add(task)

If an error arises when processing a task, the client queue tells the server to mark the task as failed. By default, the queue server re-queues the task up to a pre-defined number of times before considering the task as failed. The most common reason why a task fails is because of a transaction commit conflict with a transaction taken place from userland. This mechanism is used as a safeguard for when the workload is high and tasks keep failing because of this.

Pop a task first:

>>> running = utility.pop(consumer_id)
>>> task_uid = running.task_uid
>>> running.status
'running'
>>> running.retries
3

Flag as failed and the number of retries decreases in one unit:

>>> utility.fail(running)
>>> failed = utility.get_task(running.task_uid)
>>> failed.task_uid == running.task_uid
True
>>> failed.retries
2
>>> failed.status
'queued'

When the number of retries reach 0, the server eventually considers the task as failed, its status becomes failed and cannot be popped anymore:

>>> time.sleep(5)
>>> failed = utility.pop(consumer_id)
>>> utility.fail(failed)
>>> failed = utility.get_task(failed.task_uid)
>>> failed.status
'queued'
>>> failed.retries
1
>>> time.sleep(5)
>>> failed = utility.pop(consumer_id)
>>> utility.fail(failed)
>>> failed = utility.get_task(failed.task_uid)
>>> failed.status
'queued'
>>> failed.retries
0
>>> time.sleep(5)
>>> failed = utility.pop(consumer_id)
>>> utility.fail(failed)
>>> failed = utility.get_task(failed.task_uid)
>>> failed.status
'failed'
>>> failed.retries
0
>>> time.sleep(5)
>>> utility.pop(consumer_id) is None
True

Flush the queue:

>>> utility.delete(failed)
>>> len(utility)
0

Task done

When the client notifies a task has been done to the server queue, the task is removed from the queue:

>>> kwargs = {"action": "receive"}
>>> task = new_task("task_action_receive", sample, **kwargs)
>>> task = utility.add(task)
>>> utility.has_task(task)
True
>>> running = utility.pop(consumer_id)
>>> utility.has_task(running)
True
>>> utility.done(running)
>>> utility.has_task(running)
False

Flush the queue

Flush the queue to make room for other tests:

>>> test_utils.flush_queue(browser, self.request)

Assignment of analyses

SENAITE Queue supports the assign transition for analyses, either for when the analyses are assigned manually (via Add analyses view from Worksheet) or when using a Worksheet Template.

Running this test from the buildout directory:

bin/test test_textual_doctests -t WorksheetAnalysesAssign

Test Setup

Needed imports:

>>> import time
>>> import transaction
>>> from bika.lims import api as _api
>>> from plone import api as plone_api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from plone.app.testing import TEST_USER_PASSWORD
>>> from senaite.queue import api
>>> from senaite.queue.tests import utils as test_utils
>>> from zope import globalrequest

Functional Helpers:

>>> def new_samples(num_analyses):
...     samples = []
...     for num in range(num_analyses):
...         sample = test_utils.create_sample([Cu], client, contact,
...                                           sampletype, receive=True)
...         samples.append(sample)
...     transaction.commit()
...     return samples
>>> def get_analyses_from(samples):
...     analyses = []
...     for sample in samples:
...         analyses.extend(sample.getAnalyses(full_objects=True))
...     return analyses

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Setup the current instance as the queue server too:

>>> key = "senaite.queue.server"
>>> host = u'http://nohost/plone'
>>> plone_api.portal.set_registry_record(key, host)
>>> transaction.commit()

Manual assignment of analyses to a Worksheet

Set the default number of objects to process per task to 5:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Create 15 Samples with 1 analysis each:

>>> samples = new_samples(15)
>>> analyses = get_analyses_from(samples)

Create an empty worksheet and add all analyses:

>>> worksheet = _api.create(portal.worksheets, "Worksheet")
>>> worksheet.addAnalyses(analyses)
>>> transaction.commit()

The worksheet is queued now:

>>> api.is_queued(worksheet)
True

And the analyses as well:

>>> queued = map(api.is_queued, analyses)
>>> all(queued)
True

None of the analyses have been transitioned:

>>> transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(transitioned)
0

The queue contains one task:

>>> queue = api.get_queue()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(non_transitioned)
10
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

And the worksheet is still queued:

>>> api.is_queued(worksheet)
True

Change the number of items to process per task to 2:

>>> plone_api.portal.set_registry_record(chunk_key, 2)
>>> transaction.commit()

Pop a task and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Only 2 analyses are transitioned now:

>>> transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(transitioned)
7
>>> non_transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(non_transitioned)
8
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True
>>> api.is_queued(worksheet)
True

We can disable the queue. Set the number of items to process per task to 0:

>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Because the queue contains tasks not yet processed, the queue remains enabled, although is not ready:

>>> api.is_queue_enabled()
True
>>> api.is_queue_ready()
False
>>> api.get_queue_status()
'resuming'

Queue does not allow the addition of new tasks, but remaining tasks are processed as usual but will transition all remaining analyses at once:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'
>>> queue.is_empty()
True
>>> transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(non_transitioned)
0
>>> any(map(api.is_queued, transitioned))
False
>>> api.is_queued(worksheet)
False

Since all analyses have been processed, the worksheet is no longer queued and the queue is now disabled:

>>> api.is_queued(worksheet)
False
>>> api.is_queue_enabled()
False
>>> api.is_queue_ready()
False
>>> api.get_queue_status()
'disabled'

Assignment of analyses through Worksheet Template

Analyses can be assigned to a worksheet by making use of a Worksheet Template. In such case, the system must behave exactly the same way as before.

Set the number of analyses to be transitioned in a single process:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Create 15 Samples with 1 analysis each:

>>> samples = new_samples(15)
>>> analyses = get_analyses_from(samples)

Create a Worksheet Template with 15 slots reserved for Cu analysis:

>>> template = _api.create(setup.bika_worksheettemplates, "WorksheetTemplate")
>>> template.setService([Cu])
>>> layout = map(lambda idx: {"pos": idx + 1, "type": "a"}, range(15))
>>> template.setLayout(layout)
>>> transaction.commit()

Use the template for Worksheet creation:

>>> worksheet = _api.create(portal.worksheets, "Worksheet")
>>> worksheet.applyWorksheetTemplate(template)
>>> transaction.commit()

The worksheet is now queued:

>>> api.is_queued(worksheet)
True

And the analyses as well:

>>> queued = map(api.is_queued, analyses)
>>> all(queued)
True

None of the analyses have been transitioned:

>>> transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(transitioned)
0

And the queue contains one task:

>>> queue = api.get_queue()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1

Wait for the task delay. This is a mechanism to prevent consumers to start processing while the life-cycle of current request has not been finished yet:

>>> task = queue.get_tasks_for(worksheet)[0]
>>> time.sleep(task.get("delay", 0))

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(non_transitioned)
10
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

And the worksheet is still queued:

>>> api.is_queued(worksheet)
True

As the queue confirms:

>>> queue.is_empty()
False
>>> len(queue)
1
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(non_transitioned)
5
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

Since there are still 5 analyses remaining, the Worksheet is still queued:

>>> api.is_queued(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(non_transitioned)
0
>>> any(map(api.is_queued, transitioned))
False

The queue is now empty:

>>> queue.is_empty()
True

And the worksheet is no longer queued:

>>> api.is_queued(worksheet)
False

Unassign transition

SENAITE Queue comes with an adapter for generic actions (e.g. submit, unassign). Generic actions don’t require additional logic other than transitioning and this is handled by DC workflow. Thus, the adapter for generic actions provided by senaite.queue only deal with the number of chunks to process per task, with no additional logic. Most transitions from senaite.core match with these requirements.

Running this test from the buildout directory:

bin/test test_textual_doctests -t WorksheetAnalysesUnassign

Test Setup

Needed imports:

>>> import transaction
>>> from bika.lims import api as _api
>>> from plone import api as plone_api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from plone.app.testing import TEST_USER_PASSWORD
>>> from senaite.queue import api
>>> from senaite.queue.tests import utils as test_utils
>>> from zope import globalrequest

Functional Helpers:

>>> def new_sample(services):
...     return test_utils.create_sample(services, client, contact,
...                                     sampletype, receive=True)
>>> def new_worksheet(num_analyses):
...     analyses = []
...     for num in range(num_analyses):
...         sample = new_sample([Cu])
...         analyses.extend(sample.getAnalyses(full_objects=True))
...     worksheet = _api.create(portal.worksheets, "Worksheet")
...     worksheet.addAnalyses(analyses)
...     transaction.commit()
...     return worksheet

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Setup the current instance as the queue server too:

>>> key = "senaite.queue.server"
>>> host = u'http://nohost/plone'
>>> plone_api.portal.set_registry_record(key, host)
>>> transaction.commit()
>>> api.get_queue()
<senaite.queue.server.utility.ServerQueueUtility object at...

Unassign transition

Disable the queue first, so assign transitions is performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()

Enable the queue so we can trap the unassign transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Unassign analyses:

>>> test_utils.handle_action(worksheet, analyses, "unassign")

The worksheet is queued and the analyses as well:

>>> api.is_queued(worksheet)
True
>>> len(test_utils.filter_by_state(analyses, "unassigned"))
0
>>> all(map(api.is_queued, analyses))
True

And the queue contains one task:

>>> queue = api.get_queue()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
10
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

And the worksheet is still queued:

>>> api.is_queued(worksheet)
True

As the queue confirms:

>>> queue.is_empty()
False
>>> len(queue)
1
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
5
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

Since there are still 5 analyses remaining, the Worksheet is still queued:

>>> api.is_queued(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
0
>>> any(map(api.is_queued, transitioned))
False

The queue is now empty:

>>> queue.is_empty()
True

And the worksheet is no longer queued:

>>> api.is_queued(worksheet)
False

Unassign transition (with ClientQueue)

Perform same test as before, but now using the ClientQueueUtility:

>>> queue = test_utils.get_client_queue(browser, self.request)

Disable the queue first, so submit and assign transitions are performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()

Enable the queue so we can trap the unassign transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Unassign the analyses:

>>> test_utils.handle_action(worksheet, analyses, "unassign")

The queue contains one task:

>>> queue.sync()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1
>>> all(filter(queue.get_tasks_for, analyses))
True

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
10
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
5
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "unassigned")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
0
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> queue.is_empty()
True
>>> queue.has_tasks_for(worksheet)
False

Submit transition

SENAITE Queue comes with an adapter for generic actions (e.g. submit, unassign). Generic actions don’t require additional logic other than transitioning and this is handled by DC workflow. Thus, the adapter for generic actions provided by senaite.queue only deal with the number of chunks to process per task, with no additional logic. Most transitions from senaite.core match with these requirements.

Running this test from the buildout directory:

bin/test test_textual_doctests -t WorksheetAnalysesSubmit

Test Setup

Needed imports:

>>> import transaction
>>> from bika.lims import api as _api
>>> from plone import api as plone_api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from plone.app.testing import TEST_USER_PASSWORD
>>> from senaite.queue import api
>>> from senaite.queue.tests import utils as test_utils
>>> from zope import globalrequest

Functional Helpers:

>>> def new_sample(services):
...     return test_utils.create_sample(services, client, contact,
...                                     sampletype, receive=True)
>>> def new_worksheet(num_analyses):
...     analyses = []
...     for num in range(num_analyses):
...         sample = new_sample([Cu])
...         analyses.extend(sample.getAnalyses(full_objects=True))
...     worksheet = _api.create(portal.worksheets, "Worksheet")
...     worksheet.addAnalyses(analyses)
...     transaction.commit()
...     return worksheet
>>> def set_analyses_results(worksheet):
...     for analysis in worksheet.getAnalyses():
...         analysis.setResult(13)
...     transaction.commit()

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Setup the current instance as the queue server too:

>>> key = "senaite.queue.server"
>>> host = u'http://nohost/plone'
>>> plone_api.portal.set_registry_record(key, host)
>>> transaction.commit()
>>> api.get_queue()
<senaite.queue.server.utility.ServerQueueUtility object at...

Submit transition

Disable the queue first, so assign transition is performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses and set results:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)

Enable the queue so we can trap the submit transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Submit the analyses

>>> test_utils.handle_action(worksheet, analyses, "submit")

The worksheet is now queued:

>>> api.is_queued(worksheet)
True

The worksheet is queued and the analyses as well:

>>> api.is_queued(worksheet)
True
>>> len(test_utils.filter_by_state(analyses, "to_be_verified"))
0
>>> all(map(api.is_queued, analyses))
True

And the queue contains one task:

>>> queue = api.get_queue()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
10
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

And the worksheet is still queued:

>>> api.is_queued(worksheet)
True

As the queue confirms:

>>> queue.is_empty()
False
>>> len(queue)
1
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
5
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

Since there are still 5 analyses remaining, the Worksheet is still queued:

>>> api.is_queued(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
0
>>> any(map(api.is_queued, transitioned))
False

The queue is now empty:

>>> queue.is_empty()
True

And the worksheet is no longer queued:

>>> api.is_queued(worksheet)
False

Submit transition (with ClientQueue)

Perform same test as before, but now using the ClientQueueUtility:

>>> queue = test_utils.get_client_queue(browser, self.request)

Disable the queue first, so assign transition is performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses, set a result and submit all them:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)

Enable the queue so we can trap the submit transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Submit the analyses

>>> test_utils.handle_action(worksheet, analyses, "submit")

The queue contains one task:

>>> queue.sync()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1
>>> all(filter(queue.get_tasks_for, analyses))
True

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
10
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
5
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> len(non_transitioned)
0
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> queue.is_empty()
True
>>> queue.has_tasks_for(worksheet)
False

Reject transition

SENAITE Queue comes with an adapter for generic actions (e.g. submit, unassign). Generic actions don’t require additional logic other than transitioning and this is handled by DC workflow. Thus, the adapter for generic actions provided by senaite.queue only deal with the number of chunks to process per task, with no additional logic. Most transitions from senaite.core match with these requirements.

Running this test from the buildout directory:

bin/test test_textual_doctests -t WorksheetAnalysesReject

Test Setup

Needed imports:

>>> import transaction
>>> from bika.lims import api as _api
>>> from plone import api as plone_api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from plone.app.testing import TEST_USER_PASSWORD
>>> from senaite.queue import api
>>> from senaite.queue.tests import utils as test_utils
>>> from zope import globalrequest

Functional Helpers:

>>> def new_sample(services):
...     return test_utils.create_sample(services, client, contact,
...                                     sampletype, receive=True)
>>> def new_worksheet(num_analyses):
...     analyses = []
...     for num in range(num_analyses):
...         sample = new_sample([Cu])
...         analyses.extend(sample.getAnalyses(full_objects=True))
...     worksheet = _api.create(portal.worksheets, "Worksheet")
...     worksheet.addAnalyses(analyses)
...     transaction.commit()
...     return worksheet
>>> def set_analyses_results(worksheet):
...     for analysis in worksheet.getAnalyses():
...         analysis.setResult(13)
...     transaction.commit()

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Setup the current instance as the queue server too:

>>> key = "senaite.queue.server"
>>> host = u'http://nohost/plone'
>>> plone_api.portal.set_registry_record(key, host)
>>> transaction.commit()

Reject transition

Disable the queue first, so submit and assign transitions are performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses, set a result and submit all them:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)
>>> test_utils.handle_action(worksheet, analyses, "submit")

Enable the queue so we can trap the reject transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Reject the results:

>>> test_utils.handle_action(worksheet, analyses, "reject")

The worksheet is queued and the analyses as well:

>>> api.is_queued(worksheet)
True
>>> len(test_utils.filter_by_state(analyses, "rejected"))
0
>>> all(map(api.is_queued, analyses))
True

And the queue contains one task:

>>> queue = api.get_queue()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "rejected")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
10
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

And the worksheet is still queued:

>>> api.is_queued(worksheet)
True

As the queue confirms:

>>> queue.is_empty()
False
>>> len(queue)
1
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "rejected")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
5
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

Since there are still 5 analyses remaining, the Worksheet is still queued:

>>> api.is_queued(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "rejected")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
0
>>> any(map(api.is_queued, transitioned))
False

The queue is now empty:

>>> queue.is_empty()
True

And the worksheet is no longer queued:

>>> api.is_queued(worksheet)
False

Reject transition (with ClientQueue)

Perform same test as before, but now using the ClientQueueUtility:

>>> queue = test_utils.get_client_queue(browser, self.request)

Disable the queue first, so submit and assign transitions are performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses, set a result and submit all them:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)
>>> test_utils.handle_action(worksheet, analyses, "submit")

Enable the queue so we can trap the reject transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Retract the results:

>>> test_utils.handle_action(worksheet, analyses, "reject")

The queue contains one task:

>>> queue.sync()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1
>>> all(filter(queue.get_tasks_for, analyses))
True

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "rejected")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
10
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "rejected")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
5
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "rejected")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
0
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> queue.is_empty()
True
>>> queue.has_tasks_for(worksheet)
False

Retract transition

SENAITE Queue comes with an adapter for generic actions (e.g. submit, unassign). Generic actions don’t require additional logic other than transitioning and this is handled by DC workflow. Thus, the adapter for generic actions provided by senaite.queue only deal with the number of chunks to process per task, with no additional logic. Most transitions from senaite.core match with these requirements.

Running this test from the buildout directory:

bin/test test_textual_doctests -t WorksheetAnalysesRetract

Test Setup

Needed imports:

>>> import transaction
>>> from bika.lims import api as _api
>>> from plone import api as plone_api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from plone.app.testing import TEST_USER_PASSWORD
>>> from senaite.queue import api
>>> from senaite.queue.tests import utils as test_utils
>>> from zope import globalrequest

Functional Helpers:

>>> def new_sample(services):
...     return test_utils.create_sample(services, client, contact,
...                                     sampletype, receive=True)
>>> def new_worksheet(num_analyses):
...     analyses = []
...     for num in range(num_analyses):
...         sample = new_sample([Cu])
...         analyses.extend(sample.getAnalyses(full_objects=True))
...     worksheet = _api.create(portal.worksheets, "Worksheet")
...     worksheet.addAnalyses(analyses)
...     transaction.commit()
...     return worksheet
>>> def set_analyses_results(worksheet):
...     for analysis in worksheet.getAnalyses():
...         analysis.setResult(13)
...     transaction.commit()

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Setup the current instance as the queue server too:

>>> key = "senaite.queue.server"
>>> host = u'http://nohost/plone'
>>> plone_api.portal.set_registry_record(key, host)
>>> transaction.commit()
>>> api.get_queue()
<senaite.queue.server.utility.ServerQueueUtility object at...

Retract transition

Disable the queue first, so submit and assign transitions are performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses, set a result and submit all them:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)
>>> test_utils.handle_action(worksheet, analyses, "submit")

Enable the queue so we can trap the retract transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Retract the results:

>>> test_utils.handle_action(worksheet, analyses, "retract")

The worksheet is queued and the analyses as well:

>>> api.is_queued(worksheet)
True
>>> len(test_utils.filter_by_state(analyses, "retracted"))
0
>>> all(map(api.is_queued, analyses))
True

And the queue contains one task:

>>> queue = api.get_queue()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "retracted")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
10
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

And the worksheet is still queued:

>>> api.is_queued(worksheet)
True

As the queue confirms:

>>> queue.is_empty()
False
>>> len(queue)
1
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "retracted")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
5
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

Since there are still 5 analyses remaining, the Worksheet is still queued:

>>> api.is_queued(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "retracted")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
0
>>> any(map(api.is_queued, transitioned))
False

The queue is now empty:

>>> queue.is_empty()
True

And the worksheet is no longer queued:

>>> api.is_queued(worksheet)
False

Retract transition (with ClientQueue)

Perform same test as before, but now using the ClientQueueUtility:

>>> queue = test_utils.get_client_queue(browser, self.request)

Disable the queue first, so submit and assign transitions are performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses, set a result and submit all them:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)
>>> test_utils.handle_action(worksheet, analyses, "submit")

Enable the queue so we can trap the retract transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Retract the results:

>>> test_utils.handle_action(worksheet, analyses, "retract")

The queue contains one task:

>>> queue.sync()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1
>>> all(filter(queue.get_tasks_for, analyses))
True

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "retracted")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
10
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "retracted")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
5
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "retracted")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
0
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> queue.is_empty()
True
>>> queue.has_tasks_for(worksheet)
False

Verify transition

SENAITE Queue comes with an adapter for generic actions (e.g. submit, unassign). Generic actions don’t require additional logic other than transitioning and this is handled by DC workflow. Thus, the adapter for generic actions provided by senaite.queue only deal with the number of chunks to process per task, with no additional logic. Most transitions from senaite.core match with these requirements.

Running this test from the buildout directory:

bin/test test_textual_doctests -t WorksheetAnalysesVerify

Test Setup

Needed imports:

>>> import transaction
>>> from bika.lims import api as _api
>>> from plone import api as plone_api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from plone.app.testing import TEST_USER_PASSWORD
>>> from senaite.queue import api
>>> from senaite.queue.tests import utils as test_utils
>>> from zope import globalrequest

Functional Helpers:

>>> def new_sample(services):
...     return test_utils.create_sample(services, client, contact,
...                                     sampletype, receive=True)
>>> def new_worksheet(num_analyses):
...     analyses = []
...     for num in range(num_analyses):
...         sample = new_sample([Cu])
...         analyses.extend(sample.getAnalyses(full_objects=True))
...     worksheet = _api.create(portal.worksheets, "Worksheet")
...     worksheet.addAnalyses(analyses)
...     transaction.commit()
...     return worksheet
>>> def set_analyses_results(worksheet):
...     for analysis in worksheet.getAnalyses():
...         analysis.setResult(13)
...     transaction.commit()

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Enable the self-verification:

>>> setup.setSelfVerificationEnabled(True)
>>> setup.getSelfVerificationEnabled()
True

Setup the current instance as the queue server too:

>>> key = "senaite.queue.server"
>>> host = u'http://nohost/plone'
>>> plone_api.portal.set_registry_record(key, host)
>>> transaction.commit()
>>> api.get_queue()
<senaite.queue.server.utility.ServerQueueUtility object at...

Verify transition

Disable the queue first, so submit and assign transitions are performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses, set a result and submit all them:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)
>>> test_utils.handle_action(worksheet, analyses, "submit")

Enable the queue so we can trap the verify transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Verify the results:

>>> test_utils.handle_action(worksheet, analyses, "verify")

The worksheet is queued and the analyses as well:

>>> api.is_queued(worksheet)
True
>>> len(test_utils.filter_by_state(analyses, "verified"))
0
>>> all(map(api.is_queued, analyses))
True

And the queue contains one task:

>>> queue = api.get_queue()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "verified")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
10
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

And the worksheet is still queued:

>>> api.is_queued(worksheet)
True

As the queue confirms:

>>> queue.is_empty()
False
>>> len(queue)
1
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "verified")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
5
>>> any(map(api.is_queued, transitioned))
False
>>> all(map(api.is_queued, non_transitioned))
True

Since there are still 5 analyses remaining, the Worksheet is still queued:

>>> api.is_queued(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "verified")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
0
>>> any(map(api.is_queued, transitioned))
False

The queue is now empty:

>>> queue.is_empty()
True

And the worksheet is no longer queued:

>>> api.is_queued(worksheet)
False

Verify transition (with ClientQueue)

Perform same test as before, but now using the ClientQueueUtility:

>>> queue = test_utils.get_client_queue(browser, self.request)

Disable the queue first, so submit and assign transitions are performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses, set a result and submit all them:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)
>>> test_utils.handle_action(worksheet, analyses, "submit")

Enable the queue so we can trap the verify transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Verify the results:

>>> test_utils.handle_action(worksheet, analyses, "verify")

The queue contains one task:

>>> queue.sync()
>>> queue.is_empty()
False
>>> len(queue)
1
>>> len(queue.get_tasks_for(worksheet))
1
>>> all(filter(queue.get_tasks_for, analyses))
True

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The first chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "verified")
>>> len(transitioned)
5
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
10
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Next chunk of analyses has been processed:

>>> transitioned = test_utils.filter_by_state(analyses, "verified")
>>> len(transitioned)
10
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
5
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> all(map(queue.has_tasks_for, non_transitioned))
True
>>> queue.has_tasks_for(worksheet)
True

Pop and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Last chunk of analyses is processed:

>>> transitioned = test_utils.filter_by_state(analyses, "verified")
>>> len(transitioned)
15
>>> non_transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> len(non_transitioned)
0
>>> queue.sync()
>>> any(map(queue.has_tasks_for, transitioned))
False
>>> queue.is_empty()
True
>>> queue.has_tasks_for(worksheet)
False

Sample with queued analyses

Samples that contain queued analyses cannot be transitioned until all analyses it contains are successfully processed.

Running this test from buildout directory:

bin/test test_textual_doctests -t SampleWithQueuedAnalyses

Test Setup

Needed imports:

>>> import transaction
>>> from bika.lims import api as _api
>>> from bika.lims.workflow import getAllowedTransitions
>>> from plone import api as plone_api
>>> from plone.app.testing import setRoles
>>> from plone.app.testing import TEST_USER_ID
>>> from plone.app.testing import TEST_USER_PASSWORD
>>> from senaite.queue import api
>>> from senaite.queue.tests import utils as test_utils
>>> from zope import globalrequest

Functional Helpers:

>>> def new_sample(services):
...     return test_utils.create_sample(services, client, contact,
...                                     sampletype, receive=True)
>>> def new_worksheet(num_analyses):
...     analyses = []
...     for num in range(num_analyses):
...         sample = new_sample([Cu])
...         analyses.extend(sample.getAnalyses(full_objects=True))
...     worksheet = _api.create(portal.worksheets, "Worksheet")
...     worksheet.addAnalyses(analyses)
...     transaction.commit()
...     return worksheet
>>> def set_analyses_results(worksheet):
...     for analysis in worksheet.getAnalyses():
...         analysis.setResult(13)
...     transaction.commit()
>>> def samples_transitions_allowed(analyses):
...     samples = map(lambda an: an.getRequest(), analyses)
...     transitions = map(lambda samp: getAllowedTransitions(samp), samples)
...     transitions = map(lambda trans: any(trans), transitions)
...     return all(transitions)

Variables:

>>> portal = self.portal
>>> request = self.request
>>> setup = _api.get_setup()
>>> browser = self.getBrowser()
>>> globalrequest.setRequest(request)
>>> setRoles(portal, TEST_USER_ID, ["LabManager", "Manager"])
>>> transaction.commit()

Create some basic objects for the test:

>>> setRoles(portal, TEST_USER_ID, ['Manager',])
>>> client = _api.create(portal.clients, "Client", Name="Happy Hills", ClientID="HH", MemberDiscountApplies=True)
>>> contact = _api.create(client, "Contact", Firstname="Rita", Lastname="Mohale")
>>> sampletype = _api.create(setup.bika_sampletypes, "SampleType", title="Water", Prefix="W")
>>> labcontact = _api.create(setup.bika_labcontacts, "LabContact", Firstname="Lab", Lastname="Manager")
>>> department = _api.create(setup.bika_departments, "Department", title="Chemistry", Manager=labcontact)
>>> category = _api.create(setup.bika_analysiscategories, "AnalysisCategory", title="Metals", Department=department)
>>> Cu = _api.create(setup.bika_analysisservices, "AnalysisService", title="Copper", Keyword="Cu", Price="15", Category=category.UID(), Accredited=True)

Setup the current instance as the queue server too:

>>> key = "senaite.queue.server"
>>> host = u'http://nohost/plone'
>>> plone_api.portal.set_registry_record(key, host)
>>> transaction.commit()
>>> api.get_queue()
<senaite.queue.server.utility.ServerQueueUtility object at...

Queued analyses

Disable the queue first, so assign transition is performed non-async:

>>> chunk_key = "senaite.queue.default"
>>> plone_api.portal.set_registry_record(chunk_key, 0)
>>> transaction.commit()

Create a worksheet with some analyses and set results:

>>> worksheet = new_worksheet(15)
>>> analyses = worksheet.getAnalyses()
>>> set_analyses_results(worksheet)

Enable the queue so we can trap the submit transition:

>>> plone_api.portal.set_registry_record(chunk_key, 5)
>>> transaction.commit()

Submit the analyses

>>> test_utils.handle_action(worksheet, analyses, "submit")

No analyses have been transitioned. All them have been queued:

>>> test_utils.filter_by_state(analyses, "to_be_verified")
[]

Pop a task and process:

>>> queue = api.get_queue()
>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

Only the first chunk is transitioned and the samples they belong to can be transitioned as well:

>>> transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> samples_transitions_allowed(transitioned)
True

While the rest cannot be transitioned, these analyses are still queued:

>>> samples_transitions_allowed(analyses)
False
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> samples_transitions_allowed(non_transitioned)
False

Pop a task and process again:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

The next chunk of analyses has been processed and again, only the Samples for those that have been transitioned can be transitioned too:

>>> transitioned = test_utils.filter_by_state(analyses, "to_be_verified")
>>> samples_transitions_allowed(transitioned)
True

While the rest of Samples (5) cannot be transitioned yet:

>>> samples_transitions_allowed(analyses)
False
>>> non_transitioned = test_utils.filter_by_state(analyses, "assigned")
>>> samples_transitions_allowed(non_transitioned)
False

Pop a task and process:

>>> popped = queue.pop("http://nohost")
>>> test_utils.process(browser, popped.task_uid)
'{...Processed...}'

All analyses have been processed at this point, so all samples can be transitioned now:

>>> samples_transitions_allowed(analyses)
True