Event Hub for Python¶
Event Handling¶
events.py¶
from collections import defaultdict
class Event:
pass
class JobCreated(Event):
def __init__(self, job):
self.job = job
class JobUpdated(Event):
def __init__(self, job):
self.job = job
def handle(event: Event):
for handler in HANDLERS[type(event)]:
handler(event)
HANDLERS = defaultdict(list)
Unit Tests¶
test_events.py¶
from unittest.mock import Mock, patch
from .events import Event, handle, HANDLERS
class FakeEvent(Event):
pass
class Test_handle_Function:
def test_should_call_registered_handler(self):
handler = Mock()
event = FakeEvent()
with patch.dict(HANDLERS, {FakeEvent: [handler]}):
handle(event)
handler.assert_called_once_with(event)
def test_should_pass_without_error_if_no_registered_handler(self):
ev = Mock(Event)
handle(ev)
Integration with Django Signals¶
signals.py¶
from django.db.models.signals import post_save
from django.dispatch import receiver
from .events import JobCreated, JobUpdated, handle
from .models import Job
@receiver(post_save, sender=Job)
def job_post_save_handler(sender, instance, raw, created, **_):
if raw: # pragma: no cover
return
if created:
handle(JobCreated(instance))
else:
handle(JobUpdated(instance))
test_signals.py¶
from unittest.mock import Mock, patch
import pytest
from .events import JobCreated, JobUpdated
from .models import Job
pytestmark = [pytest.mark.django_db]
@pytest.fixture(name="job")
def given_job():
job = Job.objects.create(label="Test Job", arguments="Test Arguments", stage="new")
yield job
@patch("schema_discovery.signals.handle")
def test_should_handle_JobCreated_event_when_creating_event(mock_handle: Mock, request):
job = request.getfixturevalue("job")
mock_handle.assert_called_once()
event, = mock_handle.call_args.args
assert isinstance(event, JobCreated)
assert event.job is job
@patch("schema_discovery.signals.handle")
def test_should_handle_JobUpdated_event_when_updating_event(mock_handle: Mock, job, request):
job.save()
mock_handle.assert_called_once()
event, = mock_handle.call_args.args
assert isinstance(event, JobUpdated)
assert event.job is job
2023-09-02