# pylint: disable=E1101, dangerous-default-value
"""
Classes to handle alignments in the SAM format.
Reader -> Sam -> Writer
"""
import sys
try:
from collections import OrderedDict
except ImportError: #python 2.6 or 3.6+
if sys.version_info >= (3,6):
OrderedDict = dict
else:
from ordereddict import OrderedDict
import os
from itertools import groupby
from subprocess import Popen, PIPE
from io import TextIOWrapper
import re
from six import PY3, string_types
from pkg_resources import get_distribution
try:
from multiprocessing.dummy.connection import Connection
except ImportError: #python2
from _multiprocessing import Connection
__version__ = get_distribution("simplesam").version
[docs]class DefaultOrderedDict(OrderedDict):
[docs] def __init__(self, default, items=[]):
super(DefaultOrderedDict, self).__init__(items)
self._default = default
def __missing__(self, key):
self[key] = value = self._default()
return value
class GenomicOrder(object):
def __gt__(self, other):
if self.rname != other.rname:
return self.rname > other.rname
return self.pos > other.pos
def __lt__(self, other):
if self.rname != other.rname:
return self.rname < other.rname
return self.pos < other.pos
def __eq__(self, other):
return self.rname == other.rname and self.pos == other.pos
[docs]class Reader(object):
""" Read SAM/BAM format file as an iterable. """
[docs] def __init__(self, f, regions=False, kind=None, samtools_path="samtools"):
ext = None
self.samtools_path = samtools_path
self.spool = None # use this to catch alignment during reader scraping
self.type = 'sam'
try:
self._f_name = f.name
_, ext = os.path.splitext(f.name)
if f.name == '<stdin>': # stdin stream
self._sam_init(f)
elif (ext is not None and ext.lower()) == '.bam' or (kind is not None and kind.lower() == 'bam'):
self._bam_init(f, regions)
self.type = 'bam'
elif (ext is not None and ext.lower()) == '.sam' or (kind is not None and kind.lower() == 'sam'):
self._sam_init(f)
else:
self._sam_init(f)
if (regions and (ext is not None and ext.lower() != '.bam') and kind is None) or (regions and kind is not None and kind.lower() != 'bam'):
self.__exit__()
raise ValueError("Region support requires bam file.")
except AttributeError:
self._f_name = None
if isinstance(f, Connection):
self._pipe_init(f)
else:
self._sam_init(f)
def _pipe_init(self, f):
header = []
for line in iter(f.recv, ''):
if line[0] == '@':
header.append(line.rstrip('\n\r'))
else:
self.spool = line
break
self.header_as_dict(header)
self.f = iter(f.recv, '')
self._conn = 'pipe'
def _sam_init(self, f):
header = []
self.f = f
for line in self.f:
if line[0] == '@':
header.append(line.rstrip('\n\r'))
else:
self.spool = line
break
self.header_as_dict(header)
self._conn = 'file'
def _bam_init(self, f, regions):
pline = [self.samtools_path, 'view', '-H', f.name]
try:
p = Popen(pline, bufsize=-1, stdout=PIPE,
stderr=PIPE)
except OSError:
raise OSError('Samtools must be installed for BAM file support!\n')
self.header_as_dict([line.decode('utf-8').rstrip('\n\r') for line in p.stdout])
p.wait()
if regions:
try:
open(''.join([f.name, '.bai']))
except EnvironmentError:
sys.stderr.write("BAM index not found. Attempting to index file.\n")
index_p = Popen([self.samtools_path, 'index', f.name], stdout=PIPE, stderr=PIPE)
_, err = index_p.communicate()
if index_p.returncode > 0 or re.search("fail", str(err)):
raise OSError("Indexing failed. Is the BAM file sorted?\n")
else:
sys.stderr.write("Index created successfully.\n")
pline = [self.samtools_path, 'view', f.name, regions]
else:
pline = [self.samtools_path, 'view', f.name]
self.p = Popen(pline, bufsize=-1, stdout=PIPE,
stderr=PIPE)
if PY3:
self.f = TextIOWrapper(self.p.stdout)
else:
self.f = self.p.stdout
self._conn = 'proc'
[docs] def next(self):
""" Returns the next :class:`.Sam` object """
try:
if self.spool: # this will be the first alignment in a SAM file or stream
line = self.spool.rstrip('\n\r')
self.spool = None
else:
line = next(self.f).rstrip('\n\r')
if line == '':
raise StopIteration
fields = line.split('\t')
required = fields[:11]
tags = fields[11:]
return Sam(*required, tags=tags)
except StopIteration:
raise StopIteration
def __next__(self):
return self.next()
def __iter__(self):
return self
[docs] def __len__(self):
""" Returns the number of reads in an indexed BAM file.
Not implemented for SAM files. """
if self.type != 'bam':
raise NotImplementedError("len(Reader) is only implemented for BAM files.")
elif self.type == 'bam':
return sum(bam_read_count(self._f_name, self.samtools_path))
[docs] def subsample(self, n):
""" Returns an interator that draws every nth read from
the input file. Returns :class:`.Sam`. """
for i, line in enumerate(self.f):
if i % n == 0:
fields = line.split('\t')
required = fields[:11]
tags = fields[11:]
yield Sam(*required, tags=tags)
@property
def seqs(self):
""" Return just the sequence names from the @SQ library as a generator. """
for key in self.header['@SQ'].keys():
yield key.split(':')[1]
[docs] def tile_genome(self, width):
""" Return a generator of UCSC-style regions tiling ``width``. """
assert isinstance(width, int)
for k, v in self.header['@SQ'].items():
rname = k.split(':')[1]
seqlength = v[0].split(':')[1]
for region in tile_region(rname, 1, int(seqlength), width):
yield region
def close(self):
self.__exit__()
def __enter__(self):
return self
def __exit__(self, *args):
if self._conn == 'file':
self.f.close()
if self._conn == 'proc':
self.f.close()
self.p.terminate()
[docs]class Writer(object):
""" Write SAM/BAM format file from :class:`.Sam` objects. """
[docs] def __init__(self, f, header=None):
try:
_, ext = os.path.splitext(f.name)
if ext == '.bam':
# Why not just pipe to samtools?
raise NotImplementedError("Bam writing support is not implemented.\n")
except AttributeError: # pipe?
pass
self.file = f
if self.file.mode == 'a' and self.file.tell() == 0:
# We're appending to an empty file. Assume we need a header.
self._merge_header(header)
self._header_dict_write()
elif self.file.mode == 'a' and self.file.tell() > 0:
if header:
raise NotImplementedError("Updating headers on existing SAM files is not supported.\n")
else:
self._merge_header(header)
self._header_dict_write()
def _merge_header(self, header):
self.header = DefaultOrderedDict(OrderedDict)
if not header:
self.header['@HD']['VN:1.0'] = ['SO:unknown']
else:
for key, values in header.items():
for k, v in values.items():
self.header[key][k] = v
def _header_dict_write(self):
for key, value in self.header.items():
for k, v in value.items():
tags = '\t'.join(v)
self.file.write('{key}\t{k}\t{tags}\n'.format(**locals()))
[docs] def write(self, sam):
""" Write the string representation of the ``sam`` :class:`.Sam` object. """
self.file.write(str(sam))
def close(self):
self.__exit__()
def __enter__(self):
return self
def __exit__(self, *args):
self.file.close()
[docs]class Sam(GenomicOrder):
""" Object representation of a SAM entry. """
# https://github.com/samtools/hts-specs/blob/da805be01e2ceaaa69fdde9f33c5377bf9ee6369/SAMv1.tex#L383
# operations that consume the reference
_cigar_ref = set(('M', 'D', 'N', '=', 'X', 'EQ'))
# operations that consume the query
_cigar_query = set(('M', 'I', 'S', '=', 'X', 'EQ'))
# operations that do not represent an alignment
_cigar_no_align = set(('H', 'P'))
_valid_cigar = _cigar_ref | _cigar_query | _cigar_no_align
# operations that can be represented as aligned to the reference
_cigar_align = _cigar_ref & _cigar_query
# operations that only consume the reference
_cigar_ref_only = _cigar_ref - _cigar_align
# operations that only consume the query
_cigar_query_only = _cigar_query - _cigar_align
[docs] def __init__(self, qname='', flag=4, rname='*', pos=0, mapq=255, cigar='*', rnext='*', pnext=0, tlen=0, seq='*', qual='*', tags=[]):
self.qname = qname
self.flag = int(flag)
self.rname = rname
self.pos = int(pos)
self.mapq = int(mapq)
self.cigar = cigar
self.rnext = rnext
self.pnext = int(pnext)
self.tlen = int(tlen)
self.seq = seq
self.qual = qual
self._tags = tags
self._cache = dict()
[docs] def __str__(self):
""" Returns the string representation of a SAM entry. Correspondes to one line
in the on-disk format of a SAM file. """
if self.tags:
tag_fields = '\t'.join([encode_tag(tag, self.tags[tag]) for tag in sorted(self.tags.keys())])
else:
tag_fields = '\t'.join(self._tags)
return '{0}\t{1}\t{2}\t{3}\t{4}\t{5}\t{6}\t{7}\t{8}\t{9}\t{10}\t{11}\n'.format(self.qname,
str(self.flag),
self.rname,
str(self.pos),
str(self.mapq),
self.cigar,
self.rnext,
str(self.pnext),
str(self.tlen),
self.seq,
self.qual,
tag_fields)
[docs] def __repr__(self):
return "Sam({0}:{1}:{2})".format(self.rname, self.pos, self.qname)
[docs] def __len__(self):
""" Returns the length of the portion of ``self.seq`` aligned to the reference. Unaligned reads will
have len() == 0. Insertions (I) and soft-clipped portions (S) will not contribute to the aligned length.
>>> x = Sam(cigar='8M2I4M1D3M4S')
>>> len(x)
16
"""
return sum(c[0] for c in self.cigars if c[1] in self._cigar_ref)
[docs] def __getitem__(self, tag):
""" Retreives the SAM tag named "tag" as a tuple: (tag_name, data). The
data type of the tag is interpreted as the proper Python object type.
>>> x = Sam(tags=['NM:i:0', 'ZZ:Z:xyz'])
>>> x['NM']
0
>>> x['ZZ']
'xyz'
"""
return self.tags[tag]
[docs] def __setitem__(self, tag, data):
""" Stores the SAM tag named "tag" with the value "data". The
data type of the tag is interpreted from the Python object type.
>>> x = Sam(tags=[])
>>> x['NM'] = 0
>>> x['NM']
0
"""
self.tags[tag] = data
[docs] def index_of(self, pos):
""" Return the relative index within the alignment from a genomic position 'pos' """
i = pos - self.pos
if i >= 0:
return i
else:
raise IndexError("Position {0:n} not in {1}.".format(pos, self.qname))
def get(self, key, default_value):
try:
return self[key]
except KeyError:
return default_value
def cigar_split(self):
# https://github.com/brentp/bwa-meth
if self.cigar == "*":
yield (0, None)
return
cig_iter = groupby(self.cigar, lambda c: c.isdigit())
for _, n in cig_iter:
op = int("".join(n)), "".join(next(cig_iter)[1])
if op[1] in self._valid_cigar:
yield op
else:
raise ValueError("CIGAR operation %s in record %s is invalid." % (op[1], self.qname))
[docs] def gapped(self, attr, gap_char='-'):
""" Return a :class:`.Sam` sequence attribute or tag with all
deletions in the reference sequence represented as 'gap_char' and all
insertions in the reference sequence removed. A sequence could
be :class:``Sam.seq``, ``Sam.qual``, or any :class:`.Sam` tag that
represents an aligned sequence, such as a methylation tag for bisulfite
sequencing libraries.
>>> x = Sam(*'r001\t99\tref\t7\t30\t8M2I4M1D3M\t=\t37\t39\tTTAGATAAAGGATACTG\t*'.split())
>>> x.gapped('seq')
'TTAGATAAGATA-CTG'
>>> x = Sam(*'r001\t99\tref\t7\t30\t8M2I4M1D3M\t=\t37\t39\tTTAGATAAAGGATACTG\t*'.split(), tags=['ZM:Z:.........M....M.M'])
>>> x.gapped('ZM')
'............-M.M'
"""
try:
ungapped = getattr(self, attr)
except AttributeError:
ungapped = self[attr] # get dictionary key (tag) if attribute is missing
if len(ungapped) != len(self.seq):
raise ValueError("The length of the '%s' attribute is not equal to the length of Sam.seq!" % attr)
gapped = []
i = 0
for n, t in self.cigars:
if t in self._cigar_align:
gapped.extend(ungapped[i:i + n])
i += n
elif t in self._cigar_ref_only:
gapped.extend([gap_char] * n)
elif t in self._cigar_query_only:
i += n
elif t in self._cigar_no_align:
pass
return ''.join(gapped)
[docs] def parse_md(self):
""" Return the ungapped reference sequence from the MD tag, if present.
"""
try:
return self._cache['parse_md']
except KeyError:
pass
try:
md = self['MD']
except KeyError:
raise KeyError('MD tag not found in SAM record.')
ref_seq = list(self.gapped('seq'))
md_match = re.findall(r"([0-9]+)\^?([A-Z]+)?", md)
ref_seq_i = 0
for i, b in md_match:
ref_seq_i += int(i)
for mismatch in b:
try:
ref_seq[ref_seq_i] = mismatch
except IndexError:
raise IndexError(locals())
ref_seq_i += 1
self._cache['parse_md'] = ref_seq
return ref_seq
@property
def cigars(self):
""" Returns the CIGAR string as a tuple.
>>> x = Sam(cigar='8M2I4M1D3M')
>>> x.cigars
((8, 'M'), (2, 'I'), (4, 'M'), (1, 'D'), (3, 'M'))
"""
try:
return self._cache['cigars']
except KeyError:
self._cache['cigars'] = tuple(self.cigar_split())
return self._cache['cigars']
@property
def tags(self):
""" Parses the tags string to a dictionary if necessary.
>>> x = Sam(tags=['XU:Z:cgttttaa', 'XB:Z:cttacgttaagagttaac', 'MD:Z:75', 'NM:i:0', 'NH:i:1', 'RG:Z:1'])
>>> sorted(x.tags.items(), key=lambda x: x[0])
[('MD', '75'), ('NH', 1), ('NM', 0), ('RG', '1'), ('XB', 'cttacgttaagagttaac'), ('XU', 'cgttttaa')]
"""
try:
return self._cache['tags']
except KeyError:
self._cache['tags'] = parse_sam_tags(self._tags)
return self._cache['tags']
@property
def paired(self):
""" Returns True if the read is paired and
each segment properly aligned according to the aligner. """
return bool(self.flag & 0x2)
@property
def mapped(self):
""" Returns True of the read is mapped. """
return not (self.flag & 0x4)
@property
def secondary(self):
""" Returns True if the read alignment is secondary. """
return bool(self.flag & 0x100)
@property
def reverse(self):
""" Returns True if ``Sam.seq`` is being reverse complemented. """
return bool(self.flag & 0x10)
@property
def passing(self):
""" Returns True if the read is passing filters, such as platform/vendor quality controls. """
return not bool(self.flag & 0x200)
@property
def duplicate(self):
""" Returns True if the read is a PCR or optical duplicate. """
return bool(self.flag & 0x400)
@property
def coords(self):
""" Returns a range of genomic coordinates for the query sequence positions in the gapped alignment. """
return range(self.pos, self.pos + len(self))
@property
def safename(self):
"""Return ``Sam.qname`` without paired-end identifier if it exists"""
if self.qname[-2] == '/':
return self.qname[:-2]
else:
return self.qname
[docs]def encode_tag(tag, data):
""" Write a SAM tag in the format ``TAG:TYPE:data``. Infers the data type
from the Python object type.
>>> encode_tag('YM', '#""9O"1@!J')
'YM:Z:#""9O"1@!J'
"""
if isinstance(data, string_types):
data_type = 'Z'
elif isinstance(data, int):
data_type = 'i'
elif isinstance(data, float):
data_type = 'f'
else:
raise NotImplementedError("Data {0} cannot be encoded as string, integer, or float tag.".format(data))
value = ':'.join((tag, data_type, str(data)))
return value
[docs]def decode_tag(tag_string):
""" Parse a SAM format tag to a (tag, type, data) tuple. Python object
types for data are set using the type code. Supported type codes are: A, i, f, Z, H, B
>>> decode_tag('YM:Z:#""9O"1@!J')
('YM', 'Z', '#""9O"1@!J')
>>> decode_tag('XS:i:5')
('XS', 'i', 5)
>>> decode_tag('XF:f:100.5')
('XF', 'f', 100.5)
"""
try:
tag, data_type, data = tag_string.split(':')
except ValueError:
match = re.match(r'([A-Z]{2}):([iZfHB]):(\S+)', tag_string)
tag = match.group(1)
data_type = match.group(2)
data = match.group(3)
if data_type == 'i':
return (tag, data_type, int(data))
elif data_type == 'Z':
return (tag, data_type, data)
elif data_type == 'f':
return (tag, data_type, float(data))
elif data_type == 'A': # this is just a special case of a character
return (tag, data_type, data)
elif data_type == 'H':
raise NotImplementedError("Hex array SAM tags are currently not parsed.")
elif data_type == 'B':
raise NotImplementedError("Byte array SAM tags are currently not parsed.")
else:
raise NotImplementedError("Tag {0} cannot be parsed.".format(tag_string))
[docs]def tile_region(rname, start, end, step):
""" Make non-overlapping tiled windows from the specified region in
the UCSC-style string format.
>>> list(tile_region('chr1', 1, 250, 100))
['chr1:1-100', 'chr1:101-200', 'chr1:201-250']
>>> list(tile_region('chr1', 1, 200, 100))
['chr1:1-100', 'chr1:101-200']
"""
while start + step <= end:
yield '%s:%d-%d' % (rname, start, start + step - 1)
start += step
if start < end:
yield '%s:%d-%d' % (rname, start, end)
[docs]def bam_read_count(bamfile, samtools_path="samtools"):
""" Return a tuple of the number of mapped and unmapped reads in a BAM file """
p = Popen([samtools_path, 'idxstats', bamfile], stdout=PIPE)
mapped = 0
unmapped = 0
for line in p.stdout:
rname, rlen, nm, nu = line.rstrip().split()
mapped += int(nm)
unmapped += int(nu)
return (mapped, unmapped)
if __name__ == "__main__":
import doctest
doctest.testmod()