Skip to content
Snippets Groups Projects
Commit 125b56e3 authored by RohitMidha23's avatar RohitMidha23
Browse files

init scaffolding

parent df682232
No related branches found
No related tags found
No related merge requests found
FROM ubuntu:jammy
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -yq install python3
COPY simulator.py /simulator/
COPY simulator_test.py /simulator/
WORKDIR /simulator
RUN ./simulator_test.py
COPY messages.mllp /data/
EXPOSE 8440
EXPOSE 8441
CMD /simulator/simulator.py --messages=/data/messages.mllp
\ No newline at end of file
aki.csv 0 → 100644
This diff is collapsed.
history.csv 0 → 100644
Source diff could not be displayed: it is too large. Options to address this: view the blob.
This diff is collapsed.
#!/usr/bin/env python3
import argparse
import socket
import threading
import http.server
VERSION = "0.0.0"
MLLP_BUFFER_SIZE = 1024
MLLP_TIMEOUT_SECONDS = 10
SHUTDOWN_POLL_INTERVAL_SECONDS = 2
def serve_mllp_client(client, source, messages, shutdown_mllp):
i = 0
buffer = b""
while i < len(messages) and not shutdown_mllp.is_set():
try:
mllp = bytes(chr(MLLP_START_OF_BLOCK), "ascii")
mllp += messages[i]
mllp += bytes(chr(MLLP_END_OF_BLOCK) + chr(MLLP_CARRIAGE_RETURN), "ascii")
client.sendall(mllp)
received = []
while len(received) < 1:
r = client.recv(MLLP_BUFFER_SIZE)
if len(r) == 0:
raise Exception("client closed connection")
buffer += r
received, buffer = parse_mllp_messages(buffer, source)
acked, error = verify_ack(received)
if error:
raise Exception(error)
elif acked:
i += 1
else:
print(f"mllp: {source}: message not acknowledged")
except Exception as e:
print(f"mllp: {source}: {e}")
print(f"mllp: {source}: closing connection: error")
break
else:
if i == len(messages):
print(f"mllp: {source}: closing connection: end of messages")
else:
print(f"mllp: {source}: closing connection: mllp shutdown")
client.close()
HL7_MSA_ACK_CODE_FIELD = 1
HL7_MSA_ACK_CODE_ACCEPT = b"AA"
def verify_ack(messages):
if len(messages) != 1:
return False, f"Expected 1 ack message, found {len(messages)}"
segments = messages[0].split(b"\r")
segment_types = [s.split(b"|")[0] for s in segments]
if b"MSH" not in segment_types:
return False, "Expected MSH segment"
if b"MSA" not in segment_types:
return False, "Expected MSA segment"
fields = segments[segment_types.index(b"MSA")].split(b"|")
if len(fields) <= HL7_MSA_ACK_CODE_FIELD:
return False, "Wrong number of fields in MSA segment"
return fields[HL7_MSA_ACK_CODE_FIELD] == HL7_MSA_ACK_CODE_ACCEPT, None
def run_mllp_server(host, port, hl7_messages, shutdown_mllp):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
s.settimeout(SHUTDOWN_POLL_INTERVAL_SECONDS)
s.listen(1)
print(f"mllp: listening on {host}:{port}")
while not shutdown_mllp.is_set():
try:
client, (host, port) = s.accept()
except TimeoutError:
continue
source = f"{host}:{port}"
print(f"mllp: {source}: accepted connection")
client.settimeout(MLLP_TIMEOUT_SECONDS)
t = threading.Thread(target=serve_mllp_client, args=(client, source, hl7_messages, shutdown_mllp), daemon=True)
t.start()
print("mllp: graceful shutdown")
MLLP_START_OF_BLOCK = 0x0b
MLLP_END_OF_BLOCK = 0x1c
MLLP_CARRIAGE_RETURN = 0x0d
def parse_mllp_messages(buffer, source):
i = 0
messages = []
consumed = 0
expect = MLLP_START_OF_BLOCK
while i < len(buffer):
if expect is not None:
if buffer[i] != expect:
raise Exception(f"{source}: bad MLLP encoding: want {hex(expect)}, found {hex(buffer[i])}")
if expect == MLLP_START_OF_BLOCK:
expect = None
consumed = i
elif expect == MLLP_CARRIAGE_RETURN:
messages.append(buffer[consumed+1:i-1])
expect = MLLP_START_OF_BLOCK
consumed = i + 1
else:
if buffer[i] == MLLP_END_OF_BLOCK:
expect = MLLP_CARRIAGE_RETURN
i += 1
return messages, buffer[consumed:]
def read_hl7_messages(filename):
with open(filename, "rb") as r:
messages, remaining = parse_mllp_messages(r.read(), filename)
if len(remaining) > 0:
print(f"messages: {len(messages)} remaining: {len(remaining)}")
raise Exception(f"{filename}: Unexpected data at end of file")
return messages
class PagerRequestHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, shutdown, *args, **kwargs):
self.shutdown = shutdown
super().__init__(*args, **kwargs)
def do_POST(self):
self.server_version = f"coursework3-simulator/{VERSION}"
if self.path == "/page":
length = 0
try:
length = int(self.headers["Content-Length"])
except Exception:
print("pager: bad request: no Content-Length")
self.send_response(http.HTTPStatus.BAD_REQUEST, "No Content-Length")
self.end_headers()
return
mrn = 0
try:
mrn = int(self.rfile.read(length))
except:
print("pager: bad request: no MRN for /page")
self.send_response(http.HTTPStatus.BAD_REQUEST, "Bad MRN in body")
self.end_headers()
return
print(f"pager: paging for MRN {mrn}")
self.send_response(http.HTTPStatus.OK)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"ok\n")
elif self.path == "/healthy":
self.send_response(http.HTTPStatus.OK)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"ok\n")
elif self.path == "/shutdown":
self.send_response(http.HTTPStatus.OK)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"ok\n")
self.shutdown()
else:
print("pager: bad request: not /page")
self.send_response(http.HTTPStatus.BAD_REQUEST)
self.end_headers()
def do_GET(self):
self.do_POST()
def log_message(*args):
pass # Prevent default logging
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--messages", default="messages.mllp", help="HL7 messages to replay, in MLLP format")
parser.add_argument("--mllp", default=8440, type=int, help="Port on which to replay HL7 messages via MLLP")
parser.add_argument("--pager", default=8441, type=int, help="Post on which to listen for pager requests via HTTP")
flags = parser.parse_args()
hl7_messages = read_hl7_messages(flags.messages)
shutdown_mllp = threading.Event()
t = threading.Thread(target=run_mllp_server, args=("0.0.0.0", flags.mllp, hl7_messages, shutdown_mllp), daemon=True)
t.start()
pager = None
def shutdown():
shutdown_mllp.set()
print("pager: graceful shutdown")
pager.shutdown()
def new_pager_handler(*args, **kwargs):
return PagerRequestHandler(shutdown, *args, **kwargs)
pager = http.server.ThreadingHTTPServer(("0.0.0.0", flags.pager), new_pager_handler)
print(f"pager: listening on 0.0.0.0:{flags.pager}")
pager.serve_forever(poll_interval=SHUTDOWN_POLL_INTERVAL_SECONDS)
t.join()
if __name__ == "__main__":
main()
\ No newline at end of file
#!/usr/bin/env python3
import http
import os
import shutil
import socket
import subprocess
import tempfile
import time
import unittest
import urllib.error
import urllib.request
import simulator
ADT_A01 = [
"MSH|^~\&|SIMULATION|SOUTH RIVERSIDE|||202401201630||ADT^A01|||2.5",
"PID|1||478237423||ELIZABETH HOLMES||19840203|F",
"NK1|1|SUNNY BALWANI|PARTNER"
]
ORU_R01 = [
"MSH|^~\&|SIMULATION|SOUTH RIVERSIDE|||202401201800||ORU^R01|||2.5",
"PID|1||478237423",
"OBR|1||||||202401202243",
"OBX|1|SN|CREATININE||103.4",
]
ADT_A03 = [
"MSH|^~\&|SIMULATION|SOUTH RIVERSIDE|||202401221000||ADT^A03|||2.5",
"PID|1||478237423",
]
ACK = [
"MSH|^~\&|||||20240129093837||ACK|||2.5",
"MSA|AA",
]
def wait_until_healthy(p, http_address):
max_attempts = 20
for _ in range(max_attempts):
if p.poll() is not None:
return False
try:
r = urllib.request.urlopen("http://%s/healthy" % http_address)
if r.status == 200:
return True
except urllib.error.URLError:
pass
time.sleep(0.5)
return False
TEST_MLLP_PORT = 18440
TEST_PAGER_PORT = 18441
def to_mllp(segments):
m = bytes(chr(simulator.MLLP_START_OF_BLOCK), "ascii")
m += bytes("\r".join(segments) + "\r", "ascii")
m += bytes(chr(simulator.MLLP_END_OF_BLOCK) + chr(simulator.MLLP_CARRIAGE_RETURN), "ascii")
return m
def from_mllp(buffer):
return str(buffer[1:-3], "ascii").split("\r") # Strip MLLP framing and final \r
class SimulatorTest(unittest.TestCase):
def setUp(self):
self.directory = tempfile.mkdtemp()
messages_filename = os.path.join(self.directory, "messages.mllp")
with open(messages_filename, "wb") as w:
for m in (ADT_A01, ORU_R01, ADT_A03):
w.write(to_mllp(m))
self.simulator = subprocess.Popen([
"./simulator.py",
f"--mllp={TEST_MLLP_PORT}",
f"--pager={TEST_PAGER_PORT}",
f"--messages={messages_filename}"
])
self.assertTrue(wait_until_healthy(self.simulator, f"localhost:{TEST_PAGER_PORT}"))
def test_read_all_messages(self):
messages = []
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(("localhost", TEST_MLLP_PORT))
while True:
buffer = s.recv(1024)
if len(buffer) == 0:
break
messages.append(from_mllp(buffer))
s.sendall(to_mllp(ACK))
self.assertEqual(messages, [ADT_A01, ORU_R01, ADT_A03])
def test_read_all_messages_with_partial_acks(self):
messages = []
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(("localhost", TEST_MLLP_PORT))
while True:
buffer = s.recv(1024)
if len(buffer) == 0:
break
messages.append(from_mllp(buffer))
ack = to_mllp(ACK)
s.sendall(ack[0:len(ack)//2])
time.sleep(1) # Wait for TCP buffer to empty
s.sendall(ack[len(ack)//2:])
self.assertEqual(messages, [ADT_A01, ORU_R01, ADT_A03])
def test_repeated_runs_return_the_same_messages(self):
runs = []
for _ in range(0, 3):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
messages = []
runs.append(messages)
s.connect(("localhost", TEST_MLLP_PORT))
while True:
buffer = s.recv(1024)
if len(buffer) == 0:
break
messages.append(from_mllp(buffer))
s.sendall(to_mllp(ACK))
for i in range(1, len(runs)):
self.assertEqual(runs[i], runs[0])
def test_page_with_valid_mrn(self):
mrn = b"1234"
r = urllib.request.urlopen(f"http://localhost:{TEST_PAGER_PORT}/page", data=mrn)
self.assertEqual(r.status, http.HTTPStatus.OK)
def test_page_with_bad_mrn(self):
mrn = b"NHS1234"
try:
urllib.request.urlopen(f"http://localhost:{TEST_PAGER_PORT}/page", data=mrn)
except urllib.error.HTTPError as e:
self.assertEqual(e.status, http.HTTPStatus.BAD_REQUEST)
else:
self.fail("Expected /page to return an error with a bad MRN")
def tearDown(self):
try:
r = urllib.request.urlopen(f"http://localhost:{TEST_PAGER_PORT}/shutdown")
self.assertEqual(r.status, http.HTTPStatus.OK)
self.simulator.wait()
self.assertEqual(self.simulator.returncode, 0)
finally:
if self.simulator.poll() is None:
self.simulator.kill()
shutil.rmtree(self.directory)
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment