如何解决使用python编译的protobuf pb2作为键和值序列化器
我正在尝试从已经使用Google的protobuf序列化的kafka topiv中读取数据。
现在,我正在尝试使用“浮士德”和创建流处理器,但是我找不到正确的方法将pb2文件用作key_serializer
和value_serializer
。
这是我尝试过的:
import faust
from proto.topic_pb2 import topic
app = faust.App(
'faust-consumer',broker='kafka://',store="memory://",cache="memory://",)
schema = faust.Schema(
## key_type=topic.PK,## value_type=topic,key_serializer=topic.PK,value_serializer=topic,)
topic = app.topic(
'topic',schema=schema
)
@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)
if __name__ == "__main__":
app.main()
有人知道如何在串行器中使用pb2吗?
解决方法
伙计,过去一周我一直在尝试这样做。经过努力,我终于找到了可行的方法-不是最好的方法-但效果很好。
因此,最初我使用以下python编译器:https://github.com/danielgtaylor/python-betterproto来生成具有数据类/类型提示的*.py
文件。
然后,我能够使用助手来动态创建Faust.Record
类:
import abc
import inspect
from typing import Type
import betterproto
import faust
GENERATED_SUFFIX = "__FaustRecord_Auto"
def _import_relative_class(module: str,klass_name: str):
resolved_import = __import__(module,fromlist=[klass_name])
klass = getattr(resolved_import,klass_name)
return klass
def _is_record(attype: Type):
return (
inspect.isclass(attype)
and isinstance(attype,betterproto.Message)
or isinstance(attype,abc.ABCMeta)
)
def _build_record_annotations(klass: Type):
annotations = {}
for atname,attype in klass.__annotations__.items():
if _is_record(attype):
annotations[atname] = make_faust_record(attype)
elif isinstance(attype,str):
subklass = _import_relative_class(klass.__module__,attype)
annotations[atname] = make_faust_record(subklass)
else:
annotations[atname] = attype
return annotations
def make_faust_record(klass: Type):
type_name = f"{klass.__name__}{GENERATED_SUFFIX}"
record_type = type(type_name,(faust.Record,klass),{})
record_type.__annotations__ = _build_record_annotations(klass)
record_type._init_subclass()
return record_type
现在您可以像这样使用它:
import faust
from proto.your_models import YourModel # Import your generated proto here
from faust_converter import make_faust_record
app = faust.App(
'faust-consumer',broker='kafka://',store="memory://",cache="memory://",)
model_record = make_faust_record(YourModel)
topic = app.topic(
'topic',value_type=model_record
)
@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)
if __name__ == "__main__":
app.main()
,
我能够通过这样创建一个Serializer类来做到这一点:
import faust
from abc import ABCMeta,abstractmethod
from google.protobuf.json_format import MessageToDict
from faust.serializers.codecs import Codec
from importlib import import_module
def get_proto(topic_name,only_pk=False):
if not hasattr(get_proto,"topics"):
setattr(get_proto,"topics",dict())
get_proto.topics[topic_name] = import_module(
"protodef.{}_pb2".format(topic_name)
).__getattribute__(topic_name.split(".")[-1])
if only_pk:
return getattr(get_proto,"topics").get(topic_name).PK
else:
return getattr(get_proto,"topics").get(topic_name)
class ProtoSerializer(Codec,metaclass=ABCMeta):
@abstractmethod
def only_key(self):
...
def as_proto(self,topic_name):
self._proto = get_proto(topic_name,self.only_key())
return self
def _loads(self,b):
data = MessageToDict(
self._proto.FromString(b),preserving_proto_field_name=True,including_default_value_fields=True,)
# remove the key object from the unserialized message
data.pop("key",None)
return data
def _dumps(self,o):
# for deletes
if not o:
return None
obj = self._proto()
# add the key object to them message before serializing
if hasattr(obj,"PK"):
for k in obj.PK.DESCRIPTOR.fields_by_name.keys():
if k not in o:
raise Exception(
"Invalid object `{}` for proto `{}`".format(o,self._proto)
)
setattr(obj.key,k,o[k])
for k,v in o.items():
if hasattr(obj,k):
setattr(obj,v)
else:
ghost.debug(
"Invalid value-attribute `%s` for proto `%s`",self._proto
)
return obj.SerializeToString()
class ProtoValue(ProtoSerializer):
def only_key(self):
return False
class ProtoKey(ProtoSerializer):
def only_key(self):
return True
,然后按如下所示使用它:
import faust
from utils.serializer import ProtoKey,ProtoValue
app = faust.App(
'faust-consumer',)
topic = app.topic(
'topic',key_serializer=ProtoKey().as_proto('topic'),value_serializer=ProtoValue().as_proto('topic')
)
@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)
if __name__ == "__main__":
app.main()
,
我也在尝试将Protobuf与Faust一起使用。
下面提到的是使用Faust Serialiser编解码器的解决方案。 faust-protobuf https://github.com/hemantkashniyal/faust-protobuf
proto_serializer.py
from faust.serializers import codecs
from typing import Any
from google.protobuf import json_format
from google.protobuf.json_format import MessageToJson
from google.protobuf.json_format import MessageToDict
from google.protobuf import text_format
from google.protobuf.text_format import MessageToString
from google.protobuf.text_format import MessageToBytes
class ProtobufSerializer(codecs.Codec):
def __init__(self,pb_type: Any):
self.pb_type = pb_type
super(self.__class__,self).__init__()
def _dumps(self,pb: Any) -> bytes:
return pb.SerializeToString()
def _loads(self,s: bytes) -> Any:
pb = self.pb_type()
pb.ParseFromString(s)
return pb
app.py
import faust
from google.protobuf.json_format import MessageToJson
from .proto.greetings_pb2 import Greeting
from .proto_serializer import ProtobufSerializer
app = faust.App(
'faust-consumer',# TODO: update kafka endpoint
store="memory://",)
greetings_schema = faust.Schema(
key_serializer=ProtobufSerializer(pb_type=Greeting),value_serializer=ProtobufSerializer(pb_type=Greeting),)
topic = app.topic(
'greetings',schema=greetings_schema
)
@app.agent(topic)
async def consume(topic):
async for event in topic:
print(MessageToJson(event))
@app.timer(5)
async def produce():
for i in range(10):
data = Greeting(hello="world",message=i)
await consume.send(value=data)
if __name__ == "__main__":
app.main()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。