/celerytask/myapp.py
app = Celery('celerytask') app.config_from_object('celerytask.config')
/celerytask/config.py
broker_url = 'amqp://guest:guest@localhost:5672//' result_backend = 'redis://localhost:6379/0' accept_cOntent= ['json', 'application/text'] task_queues = { 'celery': { 'exchange': 'celery', 'routing_key': 'celery', }, } task_routes = { 'celerytask.task.hello': { 'queue': 'celery', 'routing_key': 'celery' }, } imports = ['celerytask.task',]
/celerytask/task.py
@app.task def hello(msg='hello'): time.sleep(5) return msg.upper()
class CallHandler(RequestHandler): def get(self): msg = self.get_argument('msg', 'default') hello.apply_async(args=(msg,)) self.write('ok') class PushHandler(RequestHandler): def get(self): msg = self.get_argument('msg', 'default') self.application.publisher.publish( exchange='celery', routing_key='celery', body={ 'id': str(uuid.uuid1()), 'args': [msg, ], 'task': 'celerytask.task.hello' } ) self.write('ok')
直接调用(CallHandler)一切正常, 如果是把消息推到队列(PushHandler), celery 就报:
[ WARNING/MainProcess]Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"id": "6c50942e-1045-15ea-96c3-5251a82fa45f", "args": ["fsafifds"], "task": "celerytask.task.hello"}' (96b) {content_type:None content_encoding:None delivery_info:{'consumer_tag': 'None3', 'delivery_tag': 1, 'redelivered': False, 'exchange': 'celery', 'routing_key': 'celery'} headers={}}
这里 task.hello()也并没有执行, 关于这个报错, 网上查了能查到的几个解决方法, 包括 celery 版本降到 3.1, 卸载 librabbitmq 改用 pyamqp, 改 task_protocol=1, 都试了一个遍, 貌似都没用, 不知道该怎么解决?
另外有一点想知道的是直接调用任务函数和推送到消息队列有什么区别吗?
1 aoscici2000 OP 已解决, content_type 没匹配上..... 折腾了两天偌大的提示都没留意到 |