Skip to content

Interact with simulations via websockets

Download the jupyter notebook

Follow the simulation using websockets💡

In this notebook we'll show you how to open a websocket connection with the simulation to follow its progress. Additionally we will also use a websocket to follow the waterlevels of a node over time.

In this notebook we will use the package websockets. Make sure it's installed if you want to run this notebook locally.

import asyncio
import json
from datetime import datetime
from getpass import getpass

import numpy as np
import ipywidgets as widgets
import websockets
from IPython.display import display
from websockets.http import Headers
from base64 import b64encode
from threedi_api_client.api import ThreediApi
from threedi_api_client.versions import V3Api

# Provide authentication details
API_HOST = "https://api.3di.live"
PERSONAL_API_KEY = getpass("Personal API token")  # https://management.3di.live/personal_api_keys


config = {
    "THREEDI_API_HOST": API_HOST,
    "THREEDI_API_PERSONAL_API_TOKEN": PERSONAL_API_KEY
}

api_client: V3Api = ThreediApi(config=config)

# Specify the threedi-model and organisation
organisation_uuid = "b08433fa47c1401eb9cbd4156034c679"
threedimodel = api_client.threedimodels_list(name__icontains='v2_bergermeer').results[0]

# Retrieve first simulation template
simulation_templates = api_client.simulation_templates_list(simulation__threedimodel__id=threedimodel.id)
assert simulation_templates.count > 0, f"No simulation templates found for threedimodel {threedimodel.name}"
simulation_template_id = simulation_templates.results[0].id

simulation = api_client.simulations_from_template(
    data={
        "template": simulation_template_id,
        "name": "just some simulation",
        "organisation": organisation_uuid,
        "start_datetime": datetime.now(),
        "duration": 3600  # in seconds, so we simulate for 1 hour
    }
)

The simulation needs to be initialized before we can start a websocket connection. We'll initialize the simulation but we won't start it yet.

api_client.simulations_actions_create(
    simulation_pk=simulation.id, data={"name": "initialize"}
)

The websocket uses personal api token authentication.

basic_auth_creds = b64encode(f"__key__:{PERSONAL_API_TOKEN}")
headers = Headers(authorization=f"Basic: {basic_auth_creds}")
uri = f'wss://api.3di.live/v3/simulations/{simulation.id}/'
First we create the progress-bar.

progress = widgets.IntProgress(value=0, min=0, max=simulation.duration)

async def update_progress_bar():
    async with websockets.connect(uri, extra_headers=headers) as websocket:
        print("Connected to the websocket")
        async for message in websocket:
            message = json.loads(message)
            if message.get("type") == "time":
                progress.value = message['data']['time']
        print("Websocket connection closed")

asyncio.tasks.create_task(update_progress_bar())
Next we print the water levels over time.

waterlevel_websocket_url = api_Client.simulations_visualisations_water_level_graph_create(
    simulation_pk=simulation.id,
    data={"start_time": 0, "subscribe": True, "node_id": 1, "subscribe_rate_limit": 2}
)

async def print_waterlevels():
    async with websockets.connect(waterlevel_websocket_url.url, extra_headers=headers) as websocket:
        print("Connected to the websocket")
        async for message in websocket:
            data = np.frombuffer(message, dtype=np.float32)
            print(f"Time: {int(data[0]):<4} waterlevel: {data[1]}")
        print("Websocket connection closed")

asyncio.tasks.create_task(print_waterlevels())

Once we start the simulation we should see updates in the progress bar and the water level.

display(progress)
api_client.simulations_actions_create(
    simulation_pk=simulation.id, data={"name": "start"}
)