def get_named_set(lang_codes, feature_set):
    if feature_set == 'id':
        return get_id_set(lang_codes)

    if feature_set not in FEATURE_SETS:
        print("ERROR: Invalid feature set " + feature_set, file=sys.stderr)

    filename, source, prefix = FEATURE_SETS[feature_set]
    feature_database = np.load(filename)
    lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
    lang_indices = [ get_language_index(l, feature_database) for l in lang_codes ]
    feature_names = get_feature_names(prefix, feature_database)
    feature_indices = [ get_feature_index(f, feature_database) for f in feature_names ]
    source_index = get_source_index(source, feature_database)
    feature_values = feature_database["data"][lang_indices,:,:][:,feature_indices,source_index]
    feature_values = feature_values.squeeze(axis=2)
    return feature_names, feature_values
def directory_has_smart_contract(location):
    # returns bool if there is a tsol contract in said directory
    # probably makes more sense to put this inside of the tsol package
    code_path = glob.glob(os.path.join(location, '*.tsol'))
    example = glob.glob(os.path.join(location, '*.json'))

    assert len(code_path) > 0 and len(example) > 0, 'Could not find *.tsol and *.json files in provided directory.'

    # pop off the first file name and turn the code into a file object
    code = open(code_path[0])

    # turn the example into a dict
    with open(example[0]) as e:
        example = json.load(e)

        tsol.compile(code, example)
    except Exception as e:
        return False
    return True
def load_prevIoUs(self, path=None):
        """Load prevIoUs copy of config from disk.

        In normal usage you don't need to call this method directly - it
        is called automatically at object initialization.

        :param path:

            File path from which to load the prevIoUs config. If `None`,
            config is loaded from the default location. If `path` is
            specified,subsequent `save()` calls will write to the same

        self.path = path or self.path
        with open(self.path) as f:
            self._prev_dict = json.load(f)
        for k, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
def info_to_db(db, name):
    cur = db.cursor()
    with open(name) as data_file:
        data = json.load(data_file)
        subject = data['subject']
        code = data['code']
        title = data['title']
        credit = data['credit']
        desc = data['desc']
        geneds = data['geneds']
        if geneds is None:
            geneds = ''
            geneds = ','.join(geneds) 
        url = data['url']
        cmd = 'INSERT INTO `CourseExplorer` (`Id`,`Subject`,`Code`,`Title`,`Credit`,`Description`,`GenEd`,' \
              '`Url`) VALUES(NULL,%s,%s) '
        val = (subject, code, title, credit, desc, geneds, url)
        cur.execute(cmd, val)
        # if subject == 'PHYS':
        #     print cmd % val
def _set_asset_paths(self, app):
        Read in the manifest json file which acts as a manifest for assets.
        This allows us to get the asset path as well as hashed names.

        :param app: aiohttp application
        :return: None
        webpack_stats = app.settings.WEBPACK_MANIFEST_PATH

            with open(webpack_stats, 'r') as stats_json:
                stats = json.load(stats_json)
                if app.settings.WEBPACK_ASSETS_URL:
                    self.assets_url = app.settings.WEBPACK_ASSETS_URL
                    self.assets_url = stats['publicPath']

                self.assets = stats['assets']
        except IOError:
            raise RuntimeError(
                "'WEBPACK_MANIFEST_PATH' is required to be set and "
                "it must point to a valid json file.")
def _get_external_data(url):
    result = {}
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
def save(self, pypi_version, current_time):
        # Check to make sure that we own the directory
        if not check_path_owner(os.path.dirname(self.statefile_path)):

        # Now that we've ensured the directory is owned by this user,we'll go
        # ahead and make sure that all our directories are created.

        # Attempt to write out our version check file
        with lockfile.LockFile(self.statefile_path):
            if os.path.exists(self.statefile_path):
                with open(self.statefile_path) as statefile:
                    state = json.load(statefile)
                state = {}

            state[sys.prefix] = {
                "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
                "pypi_version": pypi_version,

            with open(self.statefile_path, "w") as statefile:
                json.dump(state, statefile, sort_keys=True,
                          separators=(",", ":"))
def load(self):
        # XXX JSON is not a great database
        for path in load_config_paths('wheel'):
            conf = os.path.join(native(path), self.CONfig_NAME)
            if os.path.exists(conf):
                with open(conf, 'r') as infile:
                    self.data = json.load(infile)
                    for x in ('signers', 'verifiers'):
                        if not x in self.data:
                            self.data[x] = []
                    if 'schema' not in self.data:
                        self.data['schema'] = self.SCHEMA
                    elif self.data['schema'] != self.SCHEMA:
                        raise ValueError(
                            "Bad wheel.json version {0},expected {1}".format(
                                self.data['schema'], self.SCHEMA))
        return self
def __load_layout(self, config):
        var = config.get_value('engine/replace-with-kanji-python', 'layout')
        if var is None or var.get_type_string() != 's':
            path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts')
            path = os.path.join(path, 'roomazi.json')
            if var:
                config.unset('engine/replace-with-kanji-python', 'layout')
            path = var.get_string()
        logger.info("layout: %s", path)
        layout = roomazi.layout     # Use 'roomazi' as default
            with open(path) as f:
                layout = json.load(f)
        except ValueError as error:
            logger.error("JSON error: %s", error)
        except OSError as error:
            logger.error("Error: %s", error)
            logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1])
        self.__to_kana = self.__handle_roomazi_layout
        if 'Type' in layout:
            if layout['Type'] == 'Kana':
                self.__to_kana = self.__handle_kana_layout
        return layout
def _get_cache(ttl, cache_path):
    If url contains valid cache,returns it,else returns empty list.
    # Check if we have a valid cached version.
        cached_time = os.path.getmtime(cache_path)
    except OSError:
        return []
    if current_time() - cached_time < ttl:
        log.debug('%s is less than ttl', cache_path)
            with open(cache_path) as json_file:
                loaded_json = json.load(json_file)
                return loaded_json
        except IOError:
            return []
        except ValueError:
            log.error('%s was not json formatted', cache_path)
            return []
        log.debug('%s was older than ttl', cache_path)
        return []
def test_update_text(self):
        with open(os.path.join(self.FIXTURES_DIR, 'eyes_in_the_skies.json')) as f:
            final_data = json.load(f)

        original_text = final_data['RTR']['cards'][0]['originalText']
        final_text = final_data['RTR']['cards'][0]['text']

        # copy the data and munge it into its original state.
        original_data = copy.deepcopy(final_data)
        original_data['RTR']['cards'][0]['text'] = original_text

        # Import the original data.
        parse_data(original_data, ['RTR'])
        eyes_in_the_skies = Card.objects.first()
        self.assertEqual(eyes_in_the_skies.text, original_text)

        # Import the final,updated data.
        parse_data(final_data, ['RTR'])
        self.assertEqual(eyes_in_the_skies.text, final_text)
def test_update_types(self):
        with open(os.path.join(self.FIXTURES_DIR, 'jackal_pup.json')) as f:
            final_data = json.load(f)

        # copy the data and munge the types.
        original_data = copy.deepcopy(final_data)
        original_subtype = 'Hound'
        original_data['TMP']['cards'][0]['subtypes'] = [original_subtype]

        # Import the original data.
        parse_data(original_data, ['TMP'])
        jackal_pup = Card.objects.first()
        self.assertEqual(jackal_pup.subtypes.count(), 1)
        self.assertEqual(jackal_pup.subtypes.first().name, original_subtype)

        # Import the final, ['TMP'])
        self.assertEqual(jackal_pup.subtypes.count(), 'Jackal')
        # The Hound subtype has been deleted.
def test_update_loyalty(self):
        Simulates the upgrade process from version 0.2 to version 0.4.
        with open(os.path.join(self.FIXTURES_DIR, 'vraska_the_unseen.json')) as f:
            final_data = json.load(f)

        # copy the data and munge it to remove the loyalty.
        original_data = copy.deepcopy(final_data)
        del original_data['RTR']['cards'][0]['loyalty']

        # Import the original data.
        parse_data(original_data, ['RTR'])
        vraska = Card.objects.first()

        # Import the final, ['RTR'])
        self.assertEqual(vraska.loyalty, 5)
def main():
    if len(sys.argv) < 2:
        sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0])
    path = sys.argv[1]

    with open(os.path.join(path, "Metadata.json")) as f:
        Metadata = json.load(f)
        start = date(Metadata["start"][:-1])
        end = date(Metadata["start"][:-1])
        print('open measurement "%s" from "%s" to "%s"', Metadata["name"], start, end)
        for service in Metadata["services"]:
            print('open service "%s"' % service["name"])
            with open(os.path.join(path, service["filename"])) as csvfile:
                r = csv.DictReader(csvfile, dialect=csv.excel_tab)
                for row in r:
项目:consul-pg    作者:adamcstephens    | 项目源码 | 文件源码
        # load config values
            with open(self.configfile) as configfile_contents:
                self.config = json.load(configfile_contents)
            self.config = {}

            self.agent_services = self.api_session.get(self.api_endpoint + '/agent/services?stale').json()
        self.managed_service = self.agent_services[self.service]

        if self.managed_service['Tags'] == None:
            self.managed_service['Tags'] = []

        if self.role_source == "facter":
            print("!! unsupported PG role source !!")
def prompt_pick_backup(message):
    """Prompts the user to pick an existing database,and returns the
       selected choice database ID and its Metadata"""

    # First load all the saved databases (splitting extension and path)
    saved_db = [path.splitext(path.split(f)[1])[0] for f in glob('backups/*.tlo')]

    # Then prompt the user
    print('Available backups databases:')
    for i, db_id in enumerate(saved_db):
        Metadata = get_Metadata(db_id)
        print('{}. {},ID: {}'.format(i + 1,
                                      Metadata.get('peer_name', '???'),

    db_id = saved_db[get_integer(message, 1, len(saved_db)) - 1]
    return db_id, get_Metadata(db_id)
def _get_external_data(url):
    result = {}
        # urlopen might fail if it runs into redirections, e)
    return result
def save(self, ":"))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def load(self):
        # XXX JSON is not a great database
        for path in load_config_paths('wheel'):
            conf = os.path.join(native(path), 'verifiers'):
                        if x not in self.data:
                            self.data[x] = []
                    if 'schema' not in self.data:
                        self.data['schema'] = self.SCHEMA
                    elif self.data['schema'] != self.SCHEMA:
                        raise ValueError(
                            "Bad wheel.json version {0}, self.SCHEMA))
        return self
def read_store(self):
        Get the store from file.
        with open(self._path,'r',encoding='ascii') as fr:
            outer_data = json.load(fr)

        except ValidationError:
            raise SSError("Invalid outer schema")

        enc_bytes = hex_str_to_bytes(outer_data["enc_blob"])
            inner_data = self._Box.decrypt(enc_bytes)
        except nacl.exceptions.CryptoError:
            raise SSCryptoError("Wrong password")

        return json.loads(inner_data.decode('utf-8'))
def __init__(self, path='config.json'):
        CONfig_PATH = os.path.join(sys.path[0], path)
        self.path = path

            print "Creando fichero de configuracion"
            self['hs_threshold1'] = 100
            self['hs_threshold2'] = 200
            self['hs_apertura'] = 1
            self['hs_blur'] = 3
            self['hs_minLineLength'] = 100
            self['hs_maxLineGap'] = 10
            self['hs_kernel_dilate'] = 3
            self['hs_kernel_erode'] = 3
            self['hs_param1'] = 3
            self['hs_param2'] = 3
            self['hs_param3'] = 3
            self['hs_dilate_iteracciones'] = 1

def test_match_fetching(self):
        """Check if the server handle correctly a normal request."""
        # Forward sample data fetched from the Riot API directly as result
        this_dir = os.path.dirname(os.path.abspath(__file__))
        with open(os.sep.join([this_dir, "samples", self.MATCH_DATA])) as f:
            self.match_data = json.load(f)

        # Setup mocks
        self.service.cache_manager.find_match.return_value = None
        self.service.riot_api_handler.get_match.return_value = self.match_data

        response = self.stub.Match(service_pb2.MatchRequest(id=4242,

        # Check the conversion of the sample is matching the response
        converter = JSONConverter(None)
        expected = converter.json_match_to_match_pb(self.match_data)
        self.assertEqual(response, expected)
def test_match_from_cache(self):
        """Check if the server returns a match from the cache."""
        # Forward sample data fetched from the Riot API directly as result
        this_dir = os.path.dirname(os.path.abspath(__file__))
        with open(os.sep.join([this_dir, self.MATCH_DATA])) as f:
            self.match_data = json.load(f)

        # Setup mocks
        self.service.cache_manager.find_match.return_value = self.match_data

        response = self.stub.Match(service_pb2.MatchRequest(id=4242,

        # Check the conversion of the sample is matching the response
def load(self):
        # Prepare + load directory.

        # Load the files and parse JSON.
        parsed_settings = dict()

            for file_name in self.files:
                file_path = os.path.join(self.directory, file_name)
                with open(file_path, 'r') as file_handle:
        except json.JSONDecodeError as e:
            raise ImproperlyConfigured(
                'Your settings file(s) contain invalid JSON Syntax! Please fix and restart!,{}'.format(str(e))

        # Loop and set in local settings (+ uppercase keys).
        for key, value in parsed_settings.items():
            self.settings[key.upper()] = value
def load_prevIoUs(self, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
def load_endpoints(self):

        for name in listdir(str(BASE_PATH / 'endpoints')):
            if name.endswith('.json'):
                item = QListWidgetItem(name.split('.json')[0], self.choose_endpoint.endpoints)
                item.setFlags(item.flags() & ~Qt.ItemIsEnabled)

                pb_msg_to_endpoints = defaultdict(list)
                with open(str(BASE_PATH / 'endpoints' / name)) as fd:
                    for endpoint in load(fd, object_pairs_hook=OrderedDict):

                for pb_msg, endpoints in pb_msg_to_endpoints.items():
                    item = QListWidgetItem(' ' * 4 + pb_msg, self.choose_endpoint.endpoints)
                    item.setFlags(item.flags() & ~Qt.ItemIsEnabled)

                    for endpoint in endpoints:
                        path_and_qs = '/' + endpoint['request']['url'].split('/', 3).pop()
                        item = QListWidgetItem(' ' * 8 + path_and_qs, self.choose_endpoint.endpoints)
                        item.setData(Qt.UserRole, endpoint)

def load_barcode_dist(filename, barcode_whitelist, gem_group, proportions=True):
    """ Load barcode count distribution from a json file """
    # Input barcode whitelist must be an ordered type;
    # safeguard against it going out of sync with the distribution file
    assert barcode_whitelist is None or isinstance(barcode_whitelist, list)

    if not os.path.isfile(filename):
        return None

    with open(filename, 'r') as f:
        values = json.load(f)

    start = (gem_group-1)*len(barcode_whitelist)
    end = gem_group*len(barcode_whitelist)
    barcode_counts = {bc: value for bc, value in zip(barcode_whitelist, values[start:end])}
    if proportions:
        total_barcode_counts = sum(barcode_counts.values())
        barcode_dist = {bc: tk_stats.robust_divide(float(value), float(total_barcode_counts))
                        for bc, value in barcode_counts.iteritems()}
        return barcode_dist
        return barcode_counts
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def split(args):
    assert len(args.read1s) == len(args.read2s)

    chunks = []

    # Determine the number of buckets required to achieve
    # the given chunk size.
    chunks_per_gem_group = {}
    with open(args.reads_summary) as f:
        reads_summary = json.load(f)
        for gg in args.gem_groups:
            readpairs = reads_summary['%d_total_reads_per_gem_group' % gg]
            chunks_per_gem_group[str(gg)] = max(2,
                                                int(math.ceil(float(readpairs) / \

    for fastq1, fastq2 in itertools.izip(args.read1s, args.read2s):
            'read1s_chunk': fastq1,
            'read2s_chunk': fastq2,
            'chunks_per_gem_group': chunks_per_gem_group,
    return {'chunks': chunks}
def main(args, outs):

    # Write read_chunk for consumption by Rust
    with open("chunk_args.json", "w") as f:
        json.dump(args.read_chunk, f)

    output_path = martian.make_path("")
    prefix = "fastq_chunk"
    chunk_reads_args = ['chunk_reads',  '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"]
    print "running chunk reads: [%s]" % str(chunk_reads_args)

    with open(os.path.join(output_path, "read_chunks.json")) as f:
        chunk_results = json.load(f)

    outs.out_chunks = []

    # Write out a new chunk entry for each resulting chunk
    for chunk in chunk_results:
        print args.read_chunk
        chunk_copy = args.read_chunk.copy()
        print chunk_copy
        chunk_copy['read_chunks'] = chunk
def load_model(file_path):
        with open(file_path, "r") as f:
            decision_trees, sparse_features, total_feature_count = load(f)

        dts = {}
        for key in decision_trees.keys():
            dts[key] = []
            for dt in decision_trees[key]:
                d = DecisionTree([])
                for k in dt['model'].keys():
                    setattr(d, k, dt['model'][k])
                dt['model'] = d

        return ClassificationEngine(
def requests_with_cache(dir):
    def decorator(func):
        def wrapper(**kwargs):
            cache_key = str(kwargs.get("param", "default.json"))
            cache_url = dir + "/" + cache_key.replace("/", "-").replace("_", "-")
            if os.path.isfile(cache_url):
                with open(cache_url, 'r') as f:
                    return json.load(f)
            with open(cache_url, 'w+') as f:
                ret = func(**kwargs)
                json.dump(ret, f)
                return ret

        return wrapper

    return decorator
def main():
    if len(sys.argv) == 1:
        infile = sys.stdin
        outfile = sys.stdout
    elif len(sys.argv) == 2:
        infile = open(sys.argv[1], 'rb')
        outfile = sys.stdout
    elif len(sys.argv) == 3:
        infile = open(sys.argv[1], 'rb')
        outfile = open(sys.argv[2], 'wb')
        raise SystemExit(sys.argv[0] + " [infile [outfile]]")
        obj = json.load(infile)
    except ValueError, e:
        raise SystemExit(e)
    json.dump(obj, outfile, indent=4)
def import_policy(cb, parser, args):
    p = cb.create(Policy)

    p.policy = json.load(open(args.policyfile, "r"))
    p.description = args.description
    p.name = args.name
    p.priorityLevel = args.prioritylevel
    p.version = 2

    except ServerError as se:
        print("Could not add policy: {0}".format(str(se)))
    except Exception as e:
        print("Could not add policy: {0}".format(str(e)))
        print("Added policy. New policy ID is {0}".format(p.id))
def create_graph():
    logfile = 'result/log'
    xs = []
    ys = []
    ls = []
    f = open(logfile, 'r')
    data = json.load(f)


    for d in data:

    plt.hlines(1, 0, np.max(xs), colors='r', linestyles="dashed")  # y=-1,1??????
    plt.plot(xs, ys, label="accuracy")
    plt.plot(xs, ls, label="loss")
def associate_lambda(group_config, lambda_config):
        Associate the Lambda described in the `lambda_config` with the
        Greengrass Group described by the `group_config`

        :param group_config: `gg_group_setup.GroupConfigFile` to store the group
        :param lambda_config: the configuration describing the Lambda to
            associate with the Greengrass Group

        with open(lambda_config, "r") as f:
            cfg = json.load(f)

        config = GroupConfigFile(config_file=group_config)

        lambdas = config['lambda_functions']
        lambdas[cfg['func_name']] = {
            'arn': cfg['lambda_arn'],
            'arn_qualifier': cfg['lambda_alias']

        config['lambda_functions'] = lambdas
def load_cache(self, file):
        import json

            with open(file, 'r') as f:
                self.cache = json.load(f)
            # Fail silently
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def editPipeline(args, config):
        pipelinedbutils = Pipelinedbutils(config)

        request = json.loads(pipelinedbutils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)

        _, tmp = mkstemp()
        with open(tmp, 'w') as f:
            f.write("{data}".format(data=json.dumps(request, indent=4)))

        if "EDITOR" in os.environ.keys():
            editor = os.environ["EDITOR"]
            editor = "/usr/bin/nano"

        if subprocess.call([editor, tmp]) == 0:
            with open(tmp, 'r') as f:
                request = json.load(f)

            pipelinedbutils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
            print "ERROR: there was a problem editing the request"
def load_messages():
        filename = 'guideline_content.json'
        file_path = ('%s/guideline/%s' % (
        with open(file_path) as json_data:
            data = json.load(json_data)

项目:openhealthalgorithms    作者:openhealthalgorithms    | 项目源码 | 文件源码
def load_guidelines(guideline_key):
        filename = 'guideline_%s.json' % guideline_key
        file_path = ('%s/guideline/%s' % (
        with open(file_path) as json_data:
            data = json.load(json_data)

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _load_ready_file(self):
        if self._ready is not None:
        if os.path.exists(self._ready_file):
            with open(self._ready_file) as fp:
                self._ready = set(json.load(fp))
            self._ready = set()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'):
    mm_map = {}
    if os.path.isfile(mm_file):
        with open(mm_file, 'r') as f:
            mm_map = json.load(f)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _git_yaml_load(projects_yaml):
    Load the specified yaml into a dictionary.
    if not projects_yaml:
        return None

项目:lang-reps    作者:chaitanyamalaviya    | 项目源码 | 文件源码
def get_id_set(lang_codes):
    feature_database = np.load("family_features.npz")
    lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
    all_languages = list(feature_database["langs"])
    feature_names = [ "ID_" + l.upper() for l in all_languages ]
    values = np.zeros((len(lang_codes), len(feature_names)))
    for i, lang_code in enumerate(lang_codes):
        feature_index = get_language_index(lang_code, feature_database)
        values[i, feature_index] = 1.0
    return feature_names, values
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def generate(location):
    # cli wizard for creating a new contract from a template
    if directory_has_smart_contract(location):
        example_payload = json.load(open(glob.glob(os.path.join(location, '*.json'))[0]))
        for k, v in example_payload.items():
            value = input(k + ':')
            if value != '':
                example_payload[k] = value

        code_path = glob.glob(os.path.join(location, '*.tsol'))
        tsol.compile(open(code_path[0]), example_payload)
        print('Code compiles with new payload.')
        selection = ''
        while True:
            selection = input('(G)enerate solidity contract or (E)xport implementation:')
            if selection.lower() == 'g':
                output_name = input('Name your contract file without an extension:')
                code = tsol.generate_code(open(code_path[0]).read(), example_payload)
                open(os.path.join(location, '{}.sol'.format(output_name)), 'w').write(code)

            if selection.lower() == 'e':
                output_name = input('Name your implementation file without an extension:')
                json.dump(example_payload, open(os.path.join(location, '{}.json'.format(output_name)), 'w'))
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def __init__(self, path=None, filename=None):
        self.kafka_offset_spec_file = os.path.join(
            (path or "/tmp/"), (filename or 'kafka_offset_specs.json'))

        self._kafka_offsets = {}
        if os.path.exists(self.kafka_offset_spec_file):
                f = open(self.kafka_offset_spec_file)
                kafka_offset_dict = json.load(f)
                for key, value in kafka_offset_dict.items():
                    log.info("Found offset %s: %s", key, value)
                    self._kafka_offsets[key] = OffsetSpec(
            except Exception:
                log.info('Invalid or corrupts offsets file found at %s,'
                         ' starting over' % self.kafka_offset_spec_file)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def load_offset_file_as_json(self, file_path):
        with open(file_path, 'r') as f:
            json_file = json.load(f)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def register(self, name, serializer):
        """Register ``serializer`` object under ``name``.

        Raises :class:`AttributeError` if ``serializer`` in invalid.

        .. note::

            ``name`` will be used as the file extension of the saved files.

        :param name: Name to register ``serializer`` under
        :type name: ``unicode`` or ``str``
        :param serializer: object with ``load()`` and ``dump()``

        # Basic validation
        getattr(serializer, 'load')
        getattr(serializer, 'dump')

项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def load(cls, file_obj):
        """Load serialized object from open JSON file.

        .. versionadded:: 1.8

        :param file_obj: file handle
        :type file_obj: ``file`` object
        :returns: object loaded from JSON file
        return json.load(file_obj)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def load(cls, file_obj):
        """Load serialized object from open pickle file.

        .. versionadded:: 1.8

        :param file_obj: file handle
        :type file_obj: ``file`` object
        :returns: object loaded from pickle file
        :rtype: object

项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def load(cls, file_obj):
        """Load serialized object from open pickle file.

        .. versionadded:: 1.8

        :param file_obj: file handle
        :type file_obj: ``file`` object
        :returns: object loaded from pickle file
        return pickle.load(file_obj)

