如何在调用更新后端状态的函数时从python(fastapi)发送服务器端事件

我有以下问题:给定一个运行fastapi的后端,该后端具有一个流端点,用于更新前端,我想在每次调用更新后端状态的函数时发送这些更新(可以是(通过计划的作业或被命中并导致状态更新的其他端点)。

我要实现的简单版本是:

from fastapi import FastAPI
from starlette.responses import StreamingResponse

class State:
    def __init__(self):
        self.messages = []

    def update(self,new_messages):
        self.messages = new_messages
        # HERE: notify waiting stream endpoint

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            # HERE lies the question: wait for state to be update
            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(),media_type="text/event-stream")

我希望它永远运行。每次状态更新时,event_stream都会解除阻止并发送消息。

我看过线程和异步,但是我感觉我缺少一些关于如何在python中执行此操作的简单概念。

Q53558217 回答:如何在调用更新后端状态的函数时从python(fastapi)发送服务器端事件

FastAPI基于Starlette,并且Server-Sent Events插件可用于Starlette

var numbers = binary.trim().split(/\s*,\s*/g).map(x => x/1);
var binstr = String.fromCharCode(...numbers);
var b64str = btoa(binstr);
var src = 'data:image/jpeg;base64,' + b64str;
document.getElementById("image").src = src;
,

我能找到解决此问题的最简单方法是使用threading.Condition

因此它变成了:

import threading

from fastapi import FastAPI
from starlette.responses import StreamingResponse

condition = threading.Condition()

class State:
    def __init__(self):
        self.messages = []

    def update(self,new_messages):
        self.messages = new_messages
        with condition:
            condition.notify()

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            with condition:
                condition.wait()

            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(),media_type="text/event-stream")




本文链接:https://www.f2er.com/3088450.html

大家都在问