
Python flask 模块-send_file() 实例源码

项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def get_zip(self, project, ty):
        """Get a ZIP file directly from uploaded directory
        or generate one on the fly and upload it if not existing."""
        filename = self.download_name(project, ty)
        if not self.zip_existing(project, ty):
            print "Warning: Generating %s on the fly Now!" % filename
            self._make_zip(project, ty)
        if isinstance(uploader, local.LocalUploader):
            filepath = self._download_path(project)
            res = send_file(filename_or_fp=safe_join(filepath, filename),
            # fail safe mode for more encoded filenames.
            # It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char
            # res.headers['Content-disposition'] = 'attachment; filename*=%s' % filename
            return res
            return redirect(url_for('rackspace', filename=filename,
项目:hack-lastfm    作者:CSEC-NITH    | 项目源码 | 文件源码
def collage():
    if request.method=='GET':
        return render_template('collage.html',input=True)
    elif request.method == 'POST':
        username = request.form['lastfm_username']
        duration = request.form['duration']
        payload = {
            'user': username,
            'api_key': config.LASTFM_API_KEY,
            'method': 'user.gettopalbums',
        r = requests.get(config.url, params = payload)
        filenames = get_album_art(r.text, username)
        return send_file('static/' + username+'.jpg', mimetype='image/jpg')
项目:esdc-shipment    作者:erigones    | 项目源码 | 文件源码
def get_image_file(uuid, image_file_path):
    """GetimageFile (GET /images/:uuid/file)

    Return the image file.
    encoded_md5 = get_image_file_md5(uuid, image_file_path)

    if Nginx_ENABLED:
        res = make_response('')
        res.headers['Content-Type'] = 'application/octet-stream'
        res.headers['x-accel-redirect'] = '/static/images/%s/file?md5=%s' % (uuid, encoded_md5)
        res = make_response(send_file(image_file_path, add_etags=False, cache_timeout=0))

    res.headers['Content-Length'] = get_image_file_size(image_file_path)
    res.headers['Content-MD5'] = encoded_md5

    return res
项目:encore    作者:statgen    | 项目源码 | 文件源码
def get_job_output(job_id, filename, as_attach=False, mimetype=None, tail=None, head=None, job=None):
        output_file = job.relative_path(filename)
        if tail or head:
            if tail and head:
                return "Cannot specify tail AND head", 500
            cmd = "head" if head else "tail"
            count = tail or head
            p = subprocess.Popen([cmd, "-n", count, output_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            tail_data, tail_error = p.communicate()
            resp = make_response(tail_data)
            if as_attach:
                resp.headers["Content-disposition"] = "attachment; filename={}".format(filename)
            if mimetype:
                resp.headers["Content-Type"] = mimetype
            return resp
            return send_file(output_file, as_attachment=as_attach, mimetype=mimetype)
    except Exception as e:
        print e
        return "File Not Found", 404
项目:gepify    作者:nvlbg    | 项目源码 | 文件源码
def send_file(filename, attachment_filename, mimetype, **kwargs):
    response = flask_send_file(filename, mimetype=mimetype)

        attachment_filename = attachment_filename.encode('latin-1')
    except UnicodeEncodeError:
        filenames = {
            'filename': unicodedata
                .normalize('NFKD', attachment_filename)
                .encode('latin-1', 'ignore'),
            'filename*': "UTF-8''{}".format(
        filenames = {'filename': attachment_filename}

        'Content-disposition', 'attachment', **filenames)
    return response
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_send_file_xsendfile(self):
        app = flask.Flask(__name__)
        app.use_x_sendfile = True
        with app.test_request_context():
            rv = flask.send_file('static/index.html')
            self.assert_in('x-sendfile', rv.headers)
                os.path.join(app.root_path, 'static/index.html'))
            self.assert_equal(rv.mimetype, 'text/html')
项目:rc-niceties    作者:mjec    | 项目源码 | 文件源码
def serve_static_files(p, index_on_error=True):
    """Securely serve static files for the given path using send_file."""

    # Determine the canonical path of the file
    full_path = os.path.realpath(os.path.join(app.static_folder, p))

    # We have a problem if either:
    #   - the path is not a sub-path of app.static_folder; or
    #   - the path does not refer to a real file.
    if (os.path.commonprefix([app.static_folder, full_path]) != app.static_folder or
            not os.path.isfile(full_path)):
        file_to_return = app.config.get('STATIC_FILE_ON_404', None)
        if file_to_return is not None:
            full_path = os.path.realpath(os.path.join(app.static_folder, file_to_return))
            return abort(404)
    return send_file(full_path)
项目:mal    作者:chaosking121    | 项目源码 | 文件源码
def shrug():
    import os, os.path
    from tools.slack_posting import postimage

    filename = os.path.join(os.getcwd(), tools.settings.getValue('misc', 'shinoa_shrug'))

    if request.method == 'GET':
        return send_file(filename, mimetype='image/{}'.format(filename.split('.')[-1]))

    elif request.method == 'POST':
        if (not (isValidCommand(request, 'shrug'))):
            return "You dun goof'd."

        postimage(filename, request.form['channel_id'], None, "shrug", "*shrug*")

        return ""
项目:veripress    作者:veripress    | 项目源码 | 文件源码
def pages(page_path):
    if not validate_custom_page_path(page_path):
        raise ApiException(error=Error.NOT_ALLOWED, message='The visit of path "{}" is not allowed.'.format(page_path))

    rel_url, exists = storage.fix_relative_url('page', page_path)
    if exists:
        file_path = rel_url
        return send_file(file_path)
    elif rel_url is None:  # pragma: no cover,it seems impossible to make this happen,see code of 'fix_relative_url'
        raise ApiException(error=Error.BAD_PATH, message='The path "{}" cannot be recognized.'.format(page_path))
        page_d = cache.get('api-handler.' + rel_url)
        if page_d is not None:
            return page_d  # pragma: no cover,here just get the cached dict

        page = storage.get_page(rel_url, include_draft=False)
        if page is None:
            raise ApiException(error=Error.RESOURCE_NOT_EXISTS)
        page_d = page.to_dict()
        del page_d['raw_content']
        page_d['content'] = get_parser(page.format).parse_whole(page.raw_content)

        cache.set('api-handler.' + rel_url, page_d, timeout=2 * 60)
        return page_d
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def report_get(task_id, report_format="json"):
    task = Task.query.get(task_id)
    if not task:
        return json_error(404, "Task not found")

    if task.status == Task.DELETED:
        return json_error(404, "Task report has been deleted")

    if task.status != Task.FINISHED:
        return json_error(420, "Task not finished yet")

    report_path = os.path.join(settings.reports_directory,
                               "%d" % task_id, "report.%s" % report_format)
    if not os.path.isfile(report_path):
        return json_error(404, "Report format not found")

    return send_file(report_path)
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def pcap_get(task_id):
    task = Task.query.get(task_id)
    if not task:
        return json_error(404, "Task files has been deleted")

    if task.status != Task.FINISHED:
        return json_error(420, "Task not finished yet")

    pcap_path = os.path.join(settings.reports_directory,
                             "%s" % task_id, "dump.pcap")
    if not os.path.isfile(pcap_path):
        return json_error(404, "Pcap file not found")

    return send_file(pcap_path)
项目:shakecast    作者:usgs    | 项目源码 | 文件源码
def shakemap_overlay(shakemap_id):
    session = Session()
    shakemap = (session.query(ShakeMap)
                    .filter(ShakeMap.shakemap_id == shakemap_id)
    if shakemap is not None:
        img = os.path.join(app.config['EARTHquakeS'],
                           shakemap_id + '-' + str(shakemap.shakemap_version),

        img = app.send_static_file('sc_logo.png')

    return send_file(img, mimetype='image/gif')
项目:esper    作者:scanner-research    | 项目源码 | 文件源码
def index():
    path = request.args.get('path')
    frame = request.args.get('frame')
    id = request.args.get('id')

    basename = os.path.split(path)[1]
    video_path = '{}/{}'.format(FILE_DIR, basename)

    if not os.path.isfile(video_path):
        blob = bucket.get_blob(path)
        with open(video_path, 'wb') as f:

    sp.check_call(shlex.split("ffmpeg -y -i {} -vf \"select=eq(n\\,{})\" -frames:v 1 {}/%05d.jpg".format(video_path, frame, FILE_DIR)))
    blob = bucket.blob('public/thumbnails/tvnews/frame_{}.jpg'.format(id))
    img_path = '{}/00001.jpg'.format(FILE_DIR)

    return send_file(img_path)
项目:talkback    作者:Axilent    | 项目源码 | 文件源码
def media_file(filename):
    Retrieves a media file.
    outfile = media_storage.get_file(filename)
    return send_file(outfile,mimetype='image/png')
项目:apiTest    作者:wuranxu    | 项目源码 | 文件源码
def showHtml(filename):
    return send_file(os.path.join(app.config['REPORT'], filename))

# ????????
项目:apiTest    作者:wuranxu    | 项目源码 | 文件源码
def download_homework(filename):
    return send_file(os.path.join(app.config['HOMEWORK'], filename))
项目:bock    作者:afreeorange    | 项目源码 | 文件源码
def scripts():
    return send_file('ui/cached_dist/Bock.js')
项目:bock    作者:afreeorange    | 项目源码 | 文件源码
def styles():
    return send_file('ui/cached_dist/Bock.css')
项目:bock    作者:afreeorange    | 项目源码 | 文件源码
def fonts(route):
    return send_file('ui/cached_dist/fonts/{}'.format(route))
项目:bock    作者:afreeorange    | 项目源码 | 文件源码
def route(route):
    return send_file('ui/cached_dist/index.html')
项目:bock    作者:afreeorange    | 项目源码 | 文件源码
def index():
    return send_file('ui/cached_dist/index.html')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_send_file_regular(self):
        app = flask.Flask(__name__)
        with app.test_request_context():
            rv = flask.send_file('static/index.html')
            self.assert_equal(rv.mimetype, 'text/html')
            with app.open_resource('static/index.html') as f:
                rv.direct_passthrough = False
                self.assert_equal(rv.data, f.read())
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_attachment(self):
        app = flask.Flask(__name__)
        with catch_warnings() as captured:
            with app.test_request_context():
                f = open(os.path.join(app.root_path, 'static/index.html'))
                rv = flask.send_file(f, as_attachment=True)
                value, options = parse_options_header(rv.headers['Content-disposition'])
                self.assert_equal(value, 'attachment')
            # mimetypes + etag
            self.assert_equal(len(captured), 2)

        with app.test_request_context():
            self.assert_equal(options['filename'], 'index.html')
            rv = flask.send_file('static/index.html', as_attachment=True)
            value, options = parse_options_header(rv.headers['Content-disposition'])
            self.assert_equal(value, 'attachment')
            self.assert_equal(options['filename'], 'index.html')

        with app.test_request_context():
            rv = flask.send_file(StringIO('Test'), as_attachment=True,
            self.assert_equal(rv.mimetype, 'text/plain')
            value, 'index.txt')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_static_file(self):
        app = flask.Flask(__name__)
        # default cache timeout is 12 hours
        with app.test_request_context():
            # Test with static file handler.
            rv = app.send_static_file('index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 12 * 60 * 60)
            # Test again with direct use of send_file utility.
            rv = flask.send_file('static/index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 12 * 60 * 60)
        app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 3600
        with app.test_request_context():
            # Test with static file handler.
            rv = app.send_static_file('index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 3600)
            # Test again with direct use of send_file utility.
            rv = flask.send_file('static/index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 3600)
        class StaticFileApp(flask.Flask):
            def get_send_file_max_age(self, filename):
                return 10
        app = StaticFileApp(__name__)
        with app.test_request_context():
            # Test with static file handler.
            rv = app.send_static_file('index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 10)
            # Test again with direct use of send_file utility.
            rv = flask.send_file('static/index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 10)
项目:supervising-ui    作者:USCDataScience    | 项目源码 | 文件源码
def document():
    url = request.args.get('url')
    if not url or not os.path.exists(url):
        return abort(400, "File %s not found " % url)
    return send_file(url)
项目:boomerang    作者:EmersonElectricCo    | 项目源码 | 文件源码
def api_request_basic():
    """Establishes the endpoint that is used for the basic fetch mode

    POST body should be json formated.
    required field(s):
      - url : actual url that is requested to be fetched by the minion

    Optional field(s):
      - user-agent : user agent to be used during the request of the url
                     if this argument is not provided,then the default
                     user agent in the config file will be used.

        data = request.get_json()
        if data is None:
            raise BadRequest

        file_path = run_job(data, "basic")
        return send_file(file_path)

    except BadRequest as e:
        return prepare_400("api_request_basic", str(e))
    except ValueError as e:
        return prepare_400("api_request_basic", str(e))
    except Exception as e:
        print type(e)
        return prepare_500("api_request_basic", str(e))
项目:boomerang    作者:EmersonElectricCo    | 项目源码 | 文件源码
def api_request_website():
    """Establishes the endpoint that is used for the website fetch mode

    POST body should be json formated.
    required field(s):
      - url : actual url that is requested to be fetched by the minion

    Optional field(s):
      - user-agent : user agent to be used during the request of the url
                     if this argument is not provided,then the default
                     user agent in the config file will be used.

        data = request.get_json()
        if data is None:
            raise BadRequest

        file_path = run_job(data, "website")
        return send_file(file_path)

    except BadRequest as e:
        return prepare_400("api_request_website", str(e))
    except ValueError as e:
        return prepare_400("api_request_website", str(e))
    except Exception as e:
        print type(e)
        return prepare_500("api_request_website", str(e))
项目:boomerang    作者:EmersonElectricCo    | 项目源码 | 文件源码
def api_basic_results(job_id):
    """Establishes endpoint for retrieval of historical boomerang results

    :param job_id: id of the specific job that you wish to look up

    if minion.config['boomerang_STORE_RESULTS']:
            file_path = get_job_results(job_id)
            return send_file(file_path)
        except BadRequest as e:
            return prepare_400("api_basic_results", str(e))
        except ValueError:
            return prepare_404('api_basic_results')
        except Exception as e:
            return prepare_500("api_basic_results", str(e))
        return prepare_400("api_basic_results", error="Config set to not store results")
项目:qmk_compiler_api    作者:qmk    | 项目源码 | 文件源码
def GET_v1_compile_job_id_hex(job_id):
    """Download a compiled firmware
    job = get_job_Metadata(job_id)
    if not job:
        return error("Compile job not found", 404)

    if job['result']['firmware']:
        return send_file(job['result']['firmware'], mimetype='application/octet-stream', attachment_filename=job['result']['firmware_filename'])

    return error("Compile job not finished or other error.", 422)
项目:qmk_compiler_api    作者:qmk    | 项目源码 | 文件源码
def GET_v1_compile_job_id_src(job_id):
    """Download a completed compile job.
    job = get_job_Metadata(job_id)
    if not job:
        return error("Compile job not found", 404)

    if job['result']['firmware']:
        source_zip = qmk_storage.get('%(id)s/%(source_archive)s' % job['result'])
        return send_file(source_zip, attachment_filename=job['result']['source_archive'])

    return error("Compile job not finished or other error.", 422)
项目:pyt    作者:SW10IoT    | 项目源码 | 文件源码
def cat_picture():
    image_name = request.args.get('image_name')

    image_name = image_name.replace('..', '')

    return send_file(os.path.join(os.getcwd(), image_name))
项目:pyt    作者:SW10IoT    | 项目源码 | 文件源码
def cat_picture():
    image_name = request.args.get('image_name')

    if not '..' in image_name:
        return 404
    return send_file(os.path.join(os.getcwd(), image_name))
项目:pyt    作者:SW10IoT    | 项目源码 | 文件源码
def cat_picture():
    image_name = request.args.get('image_name')
    if not image_name:
        return 404
    return send_file(os.path.join(os.getcwd(), image_name))
项目:pheweb    作者:statgen    | 项目源码 | 文件源码
def api_top_hits():
    return send_file(common_filepaths['top-hits-1k'])
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_send_file_regular(self):
        app = flask.Flask(__name__)
        with app.test_request_context():
            rv = flask.send_file('static/index.html')
            self.assert_equal(rv.mimetype, f.read())
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_send_file_xsendfile(self):
        app = flask.Flask(__name__)
        app.use_x_sendfile = True
        with app.test_request_context():
            rv = flask.send_file('static/index.html')
            self.assert_in('x-sendfile', 'text/html')
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_attachment(self):
        app = flask.Flask(__name__)
        with catch_warnings() as captured:
            with app.test_request_context():
                f = open(os.path.join(app.root_path, 'index.txt')
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_static_file(self):
        app = flask.Flask(__name__)
        # default cache timeout is 12 hours
        with app.test_request_context():
            # Test with static file handler.
            rv = app.send_static_file('index.html')
            cc = parse_cache_control_header(rv.headers['Cache-Control'])
            self.assert_equal(cc.max_age, 10)
项目:polichombr    作者:ANSSI-FR    | 项目源码 | 文件源码
def api_family_export_sampleszip(family_id, tlp_level):
    my_family = api.get_elem_by_type("family", family_id)
    zpath = api.familycontrol.generate_samples_zip_file(my_family, tlp_level)
    if zpath is None:
        return ""
    return send_file("../" + zpath,
项目:polichombr    作者:ANSSI-FR    | 项目源码 | 文件源码
def download_family_file(family_id, file_id):
    Family attachment download endpoint.
    attachment = api.get_elem_by_type("family_file", file_id)
    data_file = attachment.filepath
    if not os.path.exists(data_file):
    return send_file('../' + data_file,
项目:polichombr    作者:ANSSI-FR    | 项目源码 | 文件源码
def api_get_sample_file(sid):
        Return the sample binary
    sample = api.get_elem_by_type("sample", sid)
    data_file = sample.storage_file
    return send_file('../' + data_file,
项目:plexivity    作者:mutschler    | 项目源码 | 文件源码
def cache(filename):
    if not config.CACHE_IMAGES:
        return g.plex.get_thumb_data(filename)
    cache_dir = os.path.join(config.data_dir, "cache")
    cache_file = os.path.join(cache_dir, filename)
    if not os.path.exists(cache_file + ".jpg"):
        if helper.cache_file(filename, g.plex):
            return send_from_directory(cache_dir, filename + ".jpg")
            return send_file('static/images/poster.png')
        return send_from_directory(cache_dir, filename + ".jpg")
项目:pi-robot-rc-android    作者:Grdkly    | 项目源码 | 文件源码
def image_jpg():
    return send_file('/home/pi/boot/image.jpg', attachment_filename='image.jpg')
项目:rc-niceties    作者:mjec    | 项目源码 | 文件源码
def home():
    return send_file(os.path.realpath(os.path.join(app.static_folder, 'index.html')))
项目:rc-niceties    作者:mjec    | 项目源码 | 文件源码
def font():
    return send_file(os.path.realpath(os.path.join('SFPixelate-Bold.ttf')))
    #return send_from_directory('/','SFPixelate-Bold.ttf')
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def index():
    return send_file('templates/index.html')
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def get_vxstream_download(sha256, eid, ftype):
    headers = {
        'Accept': 'text/html',
        'User-Agent': 'VxStream SandBox API Client'}
    params = {'type': ftype, 'environmentId': eid}
    vx = vxstream.api.get('result/{}'.format(sha256),
                          params=params, headers=headers)
    if ftype in ['xml', 'html', 'bin', 'pcap']:
        ftype += '.gz'
    return send_file(BytesIO(vx),
                     attachment_filename='{}.{}'.format(sha256, ftype),
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def get_fireeye_download(sha256, ftype):
    raise ApiException({}, 501)
    headers = {
        'Accept': 'text/html',
        'User-Agent': 'FireEye SandBox API Client'}
    params = {'type': ftype, 'environmentId': eid}
    vx = fireeye.api.get('result/{}'.format(sha256),
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def download_file(file_id):
    """Download file

    **Example request**:

    .. sourcecode:: http

       GET /api/1.0/files/1 HTTP/1.1
       Host: do.cert.europa.eu
       Accept: application/json

    **Example response**:

    .. sourcecode:: http

       HTTP/1.0 200 OK
       Content-Type: application/json
       Content-disposition: attachment; filename=CIMBL-244-EU.zip
       Content-Length: 55277
       Content-Type: application/zip

    :param file_id: file's unique ID

    :reqheader Accept: Content type(s) accepted by the client
    :resheader Content-Type: this depends on `Accept` header or request

    :status 200: File found
    :status 404: Resource not found
    dfile = DeliverableFile.query.filter_by(id=file_id).first_or_404()
    cfg = current_app.config
    return send_file(os.path.join(cfg['APP_UPLOADS'], dfile.name),
                     attachment_filename=dfile.name, as_attachment=True)
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def get_cp_vxstream_download(sha256, ftype):
    Sample.query.filter_by(sha256=sha256, user_id=g.user.id).first_or_404()
    headers = {
        'Accept': 'text/html',

