微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Arrow Flight:多次调用 FlightServer

如何解决Apache Arrow Flight:多次调用 FlightServer

我一直在关注有关如何设置和使用 Apache Arrow Flight 的 this 教程。

从例子中,server.py:

import pyarrow as pa
import pyarrow.flight as fl

def create_table_int():
    data = [
        pa.array([1,2,3]),pa.array([4,5,6])
    ]
    return pa.Table.from_arrays(data,names=['column1','column2'])


def create_table_dict():
    keys = pa.array(["x","y","z"],type=pa.utf8())
    data = [
        pa.chunked_array([
            pa.DictionaryArray.from_arrays([0,1,2],keys),pa.DictionaryArray.from_arrays([0,keys)
        ]),pa.chunked_array([
            pa.DictionaryArray.from_arrays([1,1],pa.DictionaryArray.from_arrays([2,keys)
        ])
    ]
    return pa.Table.from_arrays(data,'column2'])

class FlightServer(fl.FlightServerBase):

    def __init__(self,location="grpc://0.0.0.0:8815",**kwargs):
        super(FlightServer,self).__init__(location,**kwargs)

        self.tables = {
            b'table_int': create_table_int(),b'table_dict': create_table_dict(),}

    def do_get(self,context,ticket):
        table = self.tables[ticket.ticket]
        return fl.RecordBatchStream(table)
        # return fl.GeneratorStream(table.schema,table.to_batches(max_chunksize=1024))

def main():
    FlightServer().serve()

if __name__ == '__main__':
    main()

客户端.py

import argparse
import sys

import pyarrow as pa
import pyarrow.flight as fl

def get_by_ticket(args,client):
    ticket_name = args.name
    response = client.do_get(fl.Ticket(ticket_name)).read_all()
    print_response(response)

def get_by_ticket_pandas(args,client):
    ticket_name = args.name
    response = client.do_get(fl.Ticket(ticket_name)).read_pandas()
    print_response(response)

def print_response(data):
    print("=== Response ===")
    print(data)
    print("================")

def main():
    parser = argparse.ArgumentParser()
    subcommands = parser.add_subparsers()

    cmd_get_by_t = subcommands.add_parser('get_by_ticket')
    cmd_get_by_t.set_defaults(action='get_by_ticket')
    cmd_get_by_t.add_argument('-n','--name',type=str,help="Name of the ticket to fetch.")

    cmd_get_by_tp = subcommands.add_parser('get_by_ticket_pandas')
    cmd_get_by_tp.set_defaults(action='get_by_ticket_pandas')
    cmd_get_by_tp.add_argument('-n',help="Name of the ticket to fetch.")

    args = parser.parse_args()
    if not hasattr(args,'action'):
        parser.print_help()
        sys.exit(1)

    commands = {
        'get_by_ticket': get_by_ticket,'get_by_ticket_pandas': get_by_ticket_pandas,}

    client = fl.connect("grpc://0.0.0.0:8815")

    commands[args.action](args,client)


if __name__ == '__main__':
    main()

我在通过服务访问的 k8s 集群中运行服务器,其他各种 pod 调用服务器。这工作正常,除非在第一次调用返回之前对服务器进行第二次调用在这种情况下,我没有从第一次调用中得到正确的响应,但我似乎也没有收到任何错误。我不确定正确的术语是什么,但是有没有办法让服务器“阻塞”,以便它在开始第二个调用之前完成第一个调用的处理,或者其他一些解决方法

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?