如何解决不存在相同名称的列时出现歧义列错误
尝试在模型中调用sqlalchemy
混合列表达式时,出现Document.document_status
列名错误的情况:
class Document(db.Model):
"""A specific Customer Transaction Document.
Each Transaction may contain one or more document.
A document is associated with one or more Services
representing the services performed on the physical
document.
"""
transaction_id = db.Column(db.String,db.ForeignKey('transaction.id'),primary_key=True,nullable=False)
transaction = db.relationship('Transaction',backref=db.backref('documents',lazy=True))
document_id = db.Column(db.Integer,nullable=False,autoincrement=False,default=1)
type_id = db.Column(db.Integer,db.ForeignKey('document_type.id'),nullable=False)
type = db.relationship('DocumentType')
locale_id = db.Column(db.String,db.ForeignKey('locale.id'),nullable=True)
locale = db.relationship('Locale')
country_of_use = db.Column(db.String,nullable=False)
rush_service = db.Column(db.Boolean,default=False)
deadline = db.Column(db.DateTime,nullable=True)
notification_thread = db.Column(db.String,nullable=True)
services = db.relationship("Service",secondary=service_association_table)
@hybrid_property
def tracking_code(self):
return f"{self.transaction_id}.{self.document_id}"
@tracking_code.expression
def tracking_code(cls):
return Columnoperators.concat(Columnoperators.concat(cls.transaction_id,"."),cast(cls.document_id,db.String))
@hybrid_property
def document_status(self):
if hasattr(self,'scans') and len(self.scans) > 0:
return self.scans[0].step.status
else:
return "Pending..."
@document_status.expression
def document_status(cls):
# code = select([Step.status]).where(
# Step.id == select([Scan.step_id])
# .where(and_(cls.document_id == Scan.document_id,# cls.transaction_id == Scan.transaction_id))
# .order_by(Scan.timestamp)
# .limit(1)).label("status")
# print(code.status)
# return code
# return Step.status
q = (select([Scan.step_id]).
where(and_(Scan.transaction_id == cls.transaction_id,Scan.document_id == cls.document_id)).order_by(Scan.timestamp).
limit(1).
label("id"))
return select([Step.status]).where(Step.id == q)
# return select([Step.status]).where(
# Step.id == q).limit(1)
def __repr__(self):
return f'<Document {self.transaction_id}.{self.document_id}>'
def to_dict(self):
data = {
'id': self.document_id,'type': self.type.type,'locale': self.locale.locale,'country_of_use': self.country_of_use,'rush_service': self.rush_service,'deadline': self.deadline,'_links': {
'self': url_for('api.get_document',document_id=self.document_id,transaction_id=self.transaction_id),'transaction': url_for('api.get_transaction',id=self.transaction_id),'type': url_for('api.get_document_type',id=self.type_id),'locale': url_for('api.get_locale',id=self.locale_id)
}
}
return data
我的其他联接表均未包含任何status
属性。如果信息有用,我可以提供其他模型。
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) ambiguous column name: status
[sql: SELECT anon_1.document_transaction_id AS anon_1_document_transaction_id,anon_1.document_document_id AS anon_1_document_document_id,anon_1.document_type_id AS anon_1_document_type_id,anon_1.document_locale_id AS anon_1_document_locale_id,anon_1.document_country_of_use AS anon_1_document_country_of_use,anon_1.document_rush_service AS anon_1_document_rush_service,anon_1.document_deadline AS anon_1_document_deadline,anon_1.document_notification_thread AS anon_1_document_notification_thread,anon_1.status AS anon_1_status,anon_1.step_status AS anon_1_step_status,anon_1.anon_2 AS anon_1_anon_2,anon_1.step_id AS anon_1_step_id,anon_1.step_notification_id AS anon_1_step_notification_id,anon_1.step_service_id AS anon_1_step_service_id,service_1.id AS service_1_id,service_1.name AS service_1_name,service_1.price AS service_1_price,service_1.min_turnaround AS service_1_min_turnaround,service_1.max_turnaround AS service_1_max_turnaround
FROM (SELECT document.transaction_id AS document_transaction_id,document.document_id AS document_document_id,document.type_id AS document_type_id,document.locale_id AS document_locale_id,document.country_of_use AS document_country_of_use,document.rush_service AS document_rush_service,document.deadline AS document_deadline,document.notification_thread AS document_notification_thread,status AS status,step.status AS step_status,step.id = (SELECT scan.step_id
FROM scan
WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
LIMIT ? OFFSET ?) AS anon_2,step.id AS step_id,step.notification_id AS step_notification_id,step.service_id AS step_service_id
FROM document,(SELECT step.status AS status
FROM step
WHERE step.id = (SELECT scan.step_id
FROM scan,document
WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
LIMIT ? OFFSET ?)),step ORDER BY (SELECT step.status
FROM step
WHERE step.id = (SELECT scan.step_id
FROM scan,document
WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
LIMIT ? OFFSET ?)) DESC
LIMIT ? OFFSET ?) AS anon_1 LEFT OUTER JOIN (service_association AS service_association_1 JOIN service AS service_1 ON service_1.id = service_association_1.service_id) ON anon_1.document_document_id = service_association_1.document_id AND anon_1.document_transaction_id = service_association_1.transaction_id ORDER BY (SELECT document.transaction_id AS document_transaction_id,document
WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
LIMIT ? OFFSET ?)) DESC
LIMIT ? OFFSET ?) DESC]
[parameters: (1,1,20,0)]
(Background on this error at: http://sqlalche.me/e/e3q8)
我注意到sqlAlchemy似乎在anon_1上创建了一个status属性。看来这可能是问题的根源,但我不明白为什么。
解决方法
看起来嵌套查询正在创建问题。我最终用第二个select
子句替换了内部where
:
@status.expression
def status(cls):
return = (select([Step.status]).
where(Scan.step_id == Step.id).
where(and_(Scan.transaction_id == cls.transaction_id,Scan.document_id == cls.document_id)).
order_by(Scan.timestamp).
limit(1).
label("abcdefg"))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。