如何在Flask中运行python脚本

问题描述

使用import

  • 将python脚本(例如website_generator.py生成内容包装到函数中。
  • 将其放置在与app.py或相同的目录中flask.py
  • from website_generator import function_name在flask.py
  • 使用运行它 function_name()

你可以使用其他功能,例如subprocess.callet等。尽管他们可能不会给你答复。

使用示例import

  1. from flask import Flask
  2. import your_module # this will be your file name; minus the `.py`
  3. app = Flask(__name__)
  4. @app.route('/')
  5. def dynamic_page():
  6. return your_module.your_function_in_the_module()
  7. if __name__ == '__main__':
  8. app.run(host='0.0.0.0', port='8000', debug=True)

解决方法

我有一个Flask脚本,可以创建网站并动态打印一些数据。-打印的数据应来自另一个python脚本。

我目前面临的问题是,如果我将执行python脚本的行放在执行Flask应用程序的行之前,它将运行Python脚本而不运行Flask;反之亦然。

Python脚本:

  1. import websocket
  2. from bitmex_websocket import Instrument
  3. from bitmex_websocket.constants import InstrumentChannels
  4. from bitmex_websocket.constants import Channels
  5. import json
  6. websocket.enableTrace(True)
  7. sells = 0
  8. buys = 0
  9. channels = [
  10. InstrumentChannels.trade,]
  11. XBTUSD = Instrument(symbol='XBTUSD',channels=channels)
  12. XBTUSD.on('action',lambda msg: test(msg))
  13. def test(msg):
  14. parsed = json.loads(json.dumps(msg))
  15. print(parsed)
  16. XBTUSD.run_forever()

Flask脚本(注意:价格应为其他脚本的“解析”变量):

  1. # Start with a basic flask app webpage.
  2. from flask_socketio import SocketIO,emit
  3. from flask import Flask,render_template,url_for,copy_current_request_context
  4. from random import random
  5. from time import sleep
  6. from threading import Thread,Event
  7. import requests,json
  8. import time
  9. __author__ = 'slynn'
  10. app = Flask(__name__)
  11. app.config['SECRET_KEY'] = 'secret!'
  12. app.config['DEBUG'] = True
  13. #turn the flask app into a socketio app
  14. socketio = SocketIO(app)
  15. #random number Generator Thread
  16. thread = Thread()
  17. thread_stop_event = Event()
  18. class RandomThread(Thread):
  19. def __init__(self):
  20. self.delay = 1
  21. super(RandomThread,self).__init__()
  22. def randomNumberGenerator(self):
  23. while not thread_stop_event.isSet():
  24. socketio.emit('newnumber',{'number': parsed},namespace='/test')
  25. sleep(self.delay)
  26. def run(self):
  27. self.randomNumberGenerator()
  28. @app.route('/')
  29. def index():
  30. #only by sending this page first will the client be connected to the socketio instance
  31. return render_template('index.html')
  32. @socketio.on('connect',namespace='/test')
  33. def test_connect():
  34. # need visibility of the global thread object
  35. global thread
  36. print('Client connected')
  37. #Start the random number generator thread only if the thread has not been started before.
  38. if not thread.isAlive():
  39. print("Starting Thread")
  40. thread = RandomThread()
  41. thread.start()
  42. @socketio.on('disconnect',namespace='/test')
  43. def test_disconnect():
  44. print('Client disconnected')
  45. if __name__ == '__main__':
  46. socketio.run(app)

猜你在找的技术问答相关文章

如何检查配对的蓝牙设备是打印机还是扫描仪(Android)
是否允许实体正文进行HTTP DELETE请求?
如何将ZipInputStream转换为InputStream?
java.util.logging Java 8中的变量
PowerMockito.doReturn返回null
Java中的RESTful调用
Swing / Java:如何正确使用getText和setText字符串
特殊字符和重音字符
Android Studio中的ndk.dir错误
错误“找不到主类”