|
- from celery import Celery, current_task
- import requests
- import json
- import time
- import os
- import signal
- import multiprocessing
- import redis
-
- app = Celery(broker="redis://:pcl1305@192.168.202.90:6379/0", backend="redis://:pcl1305@192.168.202.90:6379/1")
- app.conf.result_backend_transport_options = {'visibility_timeout': 18000}
- app.conf.broker_transport_options = {'visibility_timeout': 18000}
- STATUS_FINISHED = 0
- STATUS_IN_PROGRESS = 1
- STATUS_FAILED = 2
- STATUS_NOT_START = 3
- STATUS_STOPPING = 4
- STATUS_STOPPED = 5
-
- STATUS_ITEMS = (
- (STATUS_FINISHED, '处理完毕'),
- (STATUS_IN_PROGRESS, '正在处理'),
- (STATUS_FAILED, '异常'),
- (STATUS_NOT_START, '未处理'),
- (STATUS_STOPPING, '正在停止'),
- (STATUS_STOPPED, '已停止')
- )
-
-
- def fake_algorithm(input_params, result_callback: str, progress_callback=None, extra_params=None):
- # 模拟算法需要处理1分钟
- for i in range(60):
- print(i)
- time.sleep(1)
- result = {
- "video_url": "http://192.168.202.90:9000/truck-video/3.mp4",
- "counts_url": "http://192.168.202.90:9000/trafficalg-bucket/video_19_10mins_counts.json"
- }
-
- # 如果有中间处理结果,将中间结果POST到progress_callback中
- # progress_dict = {
- # # 字典内容需要和WEB应用协商确定
- # "task": int(result_callback.split('/')[-2]),
- # "result": json.dumps(result, ensure_ascii=False)
- # }
- # ret = requests.post(url=progress_callback,
- # data=json.dumps(progress_dict, ensure_ascii=False),
- # headers={'content-type': 'application/json'})
-
- result_dict = {
- "status": STATUS_FINISHED,
- "result": json.dumps(result, ensure_ascii=False)
- }
- # 如果处理成功,将处理结果PATCH到result_callback中
- ret = requests.patch(url=result_callback,
- data=json.dumps(result_dict, ensure_ascii=False),
- headers={'content-type': 'application/json'})
- # 如果处理失败
- # result_dict['status'] = STATUS_FAILED
- # ret = requests.patch(url=result_callback,
- # data=json.dumps(result_dict, ensure_ascii=False),
- # headers={'content-type': 'application/json'})
-
-
- def stop_task(main_task_id):
- subprocess_info_path = '{0}-subprocess.json'.format(main_task_id)
-
- if os.path.exists(subprocess_info_path):
- with open(subprocess_info_path, 'r') as f:
- process_list = json.load(f)
- for process in process_list:
- os.kill(process, signal.SIGKILL)
- print('Terminate process completed.')
- os.remove(subprocess_info_path)
-
-
- def listen_command(main_task_id):
- rc = redis.StrictRedis(host='192.168.202.90',
- port='6379',
- db=3,
- password='pcl1305')
- ps = rc.pubsub()
- ps.subscribe(main_task_id) # 从task_id订阅消息
- for item in ps.listen(): # 监听状态:有消息发布了就拿过来
- print('收到消息')
- if item['type'] == 'message':
- print(item['channel'])
- print(item['data'])
- if str(item['data'], encoding='utf-8') == "STOP":
- break
- stop_task(main_task_id=main_task_id)
-
-
- @app.task
- def start_task(input_params, result_callback, progress_callback=None, extra_params=None):
- subprocess_info_path = '{0}-subprocess.json'.format(current_task.request.id)
- print('task_id: {current_task_id}'.format(current_task_id=current_task.request.id))
-
- listen_process = multiprocessing.Process(target=listen_command,
- args=(current_task.request.id,))
- listen_process.start()
- alg_process = multiprocessing.Process(target=fake_algorithm,
- args=(input_params, result_callback, progress_callback, extra_params))
- alg_process.start()
-
- with open(subprocess_info_path, 'w') as f:
- json.dump([alg_process.pid], f)
|