#!/usr/bin/python ''' extract bundles from the log of stringtie verbose mode stringtie version: v1.2.2 ''' import sys import re class Bundle(object): total_bundle_num = 0 def __init__(self): self._begin_line = None self.end_line = None self.name = None self.__chr = None self.__start = None self.__end = None self.__reads_num = None self.__guides_num = None self.__read_list_count = None self.__potential_trans_num = None self.__id = None def count(self): Bundle.total_bundle_num += 1 self.__id = Bundle.total_bundle_num def update(self, bundle): self.end_line = bundle.end_line def get_all_bundle_info(self): p = r'^(chr.+):(\d+)-(\d+)' self.__chr, self.__start, self.__end = ExtractInfoHelper.base(self.name, p) self.__reads_num = ExtractInfoHelper.extract_half_quote_info(self._begin_line[3]) # chr1:16246839-16972979(30883) p1 = r'(.*)' self.__guides_num = ExtractInfoHelper.extract_single_value(self._begin_line[9],p1) # (111 self.__read_list_count = ExtractInfoHelper.extract_half_quote_info(self._begin_line[5]) # chr1:16246839-16972979(19440) self.__potential_trans_num = ExtractInfoHelper.extract_half_quote_info(self.end_line[4]) # (76 def __str__(self): return '\t'.join([str(self.__id), self.__chr, self.__start, self.__end, self.__reads_num, self.__guides_num, self.__potential_trans_num, self.__read_list_count]) class BundleBegin(Bundle): def __init__(self, l): super(BundleBegin, self).__init__() self._begin_line = l info = self._begin_line[2] self.name = ExtractInfoHelper.extract_bundle_region(info) class BundleEnd(Bundle): def __init__(self, l): super(BundleEnd, self).__init__() self.end_line = l info = self.end_line[2] self.name = ExtractInfoHelper.extract_bundle_region(info) class ExtractInfoHelper(): @staticmethod def base(info, p): r = re.match(p, info) if r: return r.groups() else: raise ValueError('Invalid format: %s' % info) @staticmethod def extract_single_value(info, p): result = ExtractInfoHelper.base(info, p) return result[0] @staticmethod def extract_quote_info(info): # extract info for this kind: chr1:16246839-16972979(30883) p = r'.*\((.+)\).*' return ExtractInfoHelper.extract_single_value(info, p) @staticmethod def extract_bundle_region(info): # extract region for this kind: chr1:16246839-16972979(30883) # region is: chr1:16246839-16972979 p = r'^(chr.+:\d+-\d+)$' return ExtractInfoHelper.extract_single_value(info, p) @staticmethod def extract_half_quote_info(info): p = r'^[\(\[](.+)$' return ExtractInfoHelper.extract_single_value(info, p) def usage(): print 'Usage:\tpython %s stringtie_log output_file' % me def write_header(w): w.write('No.\tchr\tstart\tend\treads_num\tguides_num\tpotential_trans_num\tread_list_num\n') def line_has_at_least_two_words(l): if len(l) > 1: return True else: return False def is_bundle_begin_corrected(l): # A line for bundle begin need to be corrected: # >bundle chrY:90836782-90844283(38) (2 guides) loaded, begins processing... if l[0] == '>bundle': l.insert(0, '') return True else: return False def is_bundle_begin(l): # A line for bundle begin is like this: # [03/31 03:04:39]>bundle chr1:16246839-16972979(30883) (111 guides)loaded, begins processing... if l[1].endswith('>bundle'): # 03:04:39]>bundle return True elif is_bundle_begin_corrected(l): return True else: return False def is_bundle_end(l): # A line for bundle begin is like this: # ^bundle chr1:16246839-16972979(19440) done (76 processed potential transcripts). if l[1].endswith('^bundle'): return True else: return False def check_bundles_not_ended(d_incomplete_bundles): left_bundle_num = len(d_incomplete_bundles) if left_bundle_num > 0: raise Exception('%s bundles are not ended: %s\n' % (left_bundle_num, str(d_incomplete_bundles))) def main(input_file, output_file): d_incomplete_bundles = {} with open(input_file) as reader, open(output_file, 'w') as writer: write_header(writer) for line in reader: l_line = line.split(' ') if line_has_at_least_two_words(l_line): if is_bundle_begin(l_line): bundle_begin = BundleBegin(l_line) d_incomplete_bundles[bundle_begin.name] = bundle_begin elif is_bundle_end(l_line): bundle_end = BundleEnd(l_line) if bundle_end.name in d_incomplete_bundles: bundle = d_incomplete_bundles[bundle_end.name] bundle.update(bundle_end) bundle.count() bundle.get_all_bundle_info() writer.write(str(bundle) + '\n') del (d_incomplete_bundles[bundle.name]) else: raise KeyError('The bundle %s does not begin.' % bundle_end.name) check_bundles_not_ended(d_incomplete_bundles) if __name__ == '__main__': me = sys.argv[0] input_file = '' output_file = '' try: input_file = sys.argv[1] output_file = sys.argv[2] except IndexError: usage() sys.exit(1) main(input_file, output_file)