Pipeline Code
This documents Hich’s technical implementation and is aimed at developers rather than users.
Common Patterns
One -> One
Use case: processes that take individual samples as input and return new or updated attributes.
Filter for the required samples, extract the sample attributes needed (usually including id), call the process, call the process, label the outputs, and pack the outputs as new attributes of the same by using id to link the new outputs to the correct sample.
Here is an example from parse.nf.
samples
| filter{!skip("parse") && it.datatype in ["fastq", "sambam"]}
| map{tuple(it.id, it.sambam, it.chromsizes, it.assembly, it.pairtoolsParse2Params, it.reshapeParams, it.subMap("minMapq"))}
| PairtoolsParse2
| map{[id:it[0], pairs:it[1], latest:it[1], latestPairs:it[1]]}
| set{result}
pack(samples, result) | set{samples}
Many -> One
groupRowsToColumnFormat groups the samples by common values of biorep, condition and aggregationPlanName to yield a channel with one column-format item per group. Then coalesce replaces any constant vectors with a constant value (i.e. [val: [1, 1, 1]] becomes [val: 1]).
Example:
// Filter samples
levelSamples
| branch {
yesMerge: it.includeInMerge && it.mergeTechrepsToBioreps
noMerge: true
}
| set{samples}
// Merge the pairs files.
groupRowsToColumnFormat(samples.yesMerge, ["biorep", "condition", "aggregationPlanName"], ["dropNull": true])
| map{coalesce(it)}
| map{tuple(makeID(it, columns = true), it.latestPairs)}
| mergeTechrepsToBioreps
| map{[id:it[0], pairs:it[1], latest:it[1], latestPairs:it[1]]}
| set{mergedTechrepAttributes}
// Group the merged result by the mergeGroupIdentifiers, then coalesce common values
// to a single value, dropping any null or heterogeneous values. The other
// common values are kept as inherited merge attributes. Then add an ID
// for the merge.
groupRowsToColumnFormat(samples.yesMerge, ["biorep", "condition", "aggregationPlanName"], ["dropNull": true])
| map{coalesce(it, '_drop')}
| map{it += [id:makeID(it)]}
| set{inheritedMergeAttributes}
util/rowsCols.nf
columns format
A channel in columns format has one item, which is a single hashmap with keys corresponding to column names and values being lists of row values. Example:
columnsChannel = channel.of(["odd": [1, 3, 5], "even": [2, 4, 6]])
In rows format, we instead have one channel item per row, with keys being column names and values the row entry for that row.
rowsChannel = channel.of([["odd": 1, "even": 2], ["odd": 3 "even": 4], ["odd": 5, "even": 6]])
columns
Converts a channel from rows format to columns format.
def columns (mapList, options = [:]) {
// Get set of all keys from all maps
def allKeys = mapList.collectMany{
it.keySet()
}.toSet()
// Extract parameters to list
def transposed = [:]
// Iterate through all maps in the mapList
mapList.each {
map ->
// Iterate through all keys from all maps
allKeys.each {
key ->
/* For each key, get the current map's value or a default if supplied.
Verify the value is non-null/missing or is OK to be null.
Add it to the previous value list and update the transposed map.
*/
def value = map.get(key, options.defaults?[(key)])
if (value != null || !options.dropNull) {
def previous = transposed.get(key, [])
assert value != null || options.nullOK?.contains(key), "In call to 'columns' without dropNull, '${key}' is not in nullOK, and is missing/null for:\n${map}"
def valueList = previous + [value]
def updatedItem = [(key): valueList]
transposed += updatedItem
}
}
}
// Return the transposed map
transposed
}
rows
Converts a channel from columns format to rows format.
def rows (columnsMap) {
/*
Arguments
columnsMap: map[str, ArrayList | Any] -- A channel containing key names
associated with lists of per-sample values to be converted to a row format
*/
// Get the keys and the transposed values
def keys = columnsMap.keySet().toList()
def values = columnsMap.values().toList()
// Ensure all values are lists
def formattedValues = values.collect({it instanceof ArrayList ? it : [it]})
// This converts the column format of the values to row format
// https://docs.groovy-lang.org/latest/html/groovy-jdk/java/util/List.html#transpose()
def transposed = formattedValues.transpose()
// Create N hashmaps, each containing the values at index i for the corresponding keys
def result = transposed.collect {
row ->
// Keys and row are equal-length lists which are transposed into a list of individual
// key value pairs like [[key1: value1], [key2: value2]]. collectEntries converts
// this to a single map like [key1: value1, key2: value2]
[keys, row].transpose().collectEntries()
}
result
}
rowHashmapToColumnChannel
Convert a Groovy hashmap to a column channel so that its keys are in the [keyCol] channel and values in the [valueCol] channel so the index of each key and its corresponding value are at the same index.
def rowHashmapToColumnChannel (hashMap, keyCol, valueCol) {
// Treat key-value pairs of hashMap as row entries in a two-column table and emit in column format
(
channel.of(params.aggregationPlans)
| map {
[
(keyCol): hashMap.keySet().toList(),
(valueCol): hashMap.values().toList()
]
}
)
}
rowHashmapToRowChannel
Convert a Groovy hashmap to a row channel structured as [keyCol]: [key], [valueCol]: [value], where [key] is a particular key from the original hashmap and [value] is the corresponding value.
def rowHashmapToRowChannel (hashMap, keyCol, valueCol) {
// Treat key-value pairs of hashMap as row entries in a two-column table and emit in row format
rowHashmapToColumnChannel(hashMap, keyCol, valueCol) | map{rows(it)} | flatten
}
util/group.nf
groupHashMap
Essentially a groupby operation on a channel of hashmaps.
Example input:
ch = [[param1: 1, param2: 1], [param1: 1, param2: 2], [param1: 2, param2: 3], [param1: 2, param2: 4]]
by = ["param1"]
sortBy = ["param1"]
Output:
[[[param1: 1, param2: 1], [param1: 1, param2: 2]], [[param1: 2, param2: 3], [param1: 2, param2: 4]]]
Arguments:
ch – the sample hashmap channel.
by – the list of attributes to group by.
sortBy – sort output by these keys for reproducible workflows.
def groupRowsToColumnFormat(ch, by, sortBy = ["id"], columnsOptions = [:]) {
/* Essentially a groupby operation on a channel of hashmaps
Example:
[[param1: 1, param2: 1], [param1: 1, param2: 2], [param1: 2, param2: 3], [param1: 2, param2: 4]]
This is grouped to:
[param1: [1, 1], param2: [1, 2]]
[param1: [2, 2], param2: [3, 4]]
Arguments:
ch -- the sample hashmap channel
by -- the list of attributes to group by
sortBy -- typically just use id. This function is often used to convert samples to columnar format where a single
hashmap containing lists of sample attributes under each attribute name is provided to the process. Changes in order
typically should not affect process behavior, but will trigger a Nextflow rerun unnecessarily. We therefore use sortBy
to sort the samples to prevent these unnecessary reruns.
*/
ch
// Group maps to the format [[bySubmap], [list of samples]] and extract [list of samples]
// Note that because the order in which samples are returned is unpredictable, this will result in a different
// order of samples on different runs, which is why samples are sorted later.
| map{tuple(it.subMap(by), it)}
| groupTuple
| map{it[1]}
// Sort the hashmaps if requested
| map{
mapList ->
// Sort the hashmaps to ensure same output for same set of samples on repeated runs
if (sortBy) {
mapList.sort {
map1, map2 ->
// sortBy is a list of keys to sort by, in descending order of priority
// We collect all the comparisons for each sortBy key, then take the first nonzero as the sort order
sortBy.collect {
key ->
// -1, 0, 1 depending on comparison outcome
map1[key] <=> map2[key]
}.findResult {it != 0 ? it : null} ?: 0
}
} else {
mapList
}
}
| map{columns(it, columnsOptions)}
}
coalesce
Combine lists of values that are identical into a single non-list value
Example input:
[param1: [1, 1, 1], params: [2, 3, 4]]
Output:
[param1: 1, params2: [2, 3, 4]]
Arguments:
transposedSample – The sample in columnar format
defaultWhenDifferent: – Determines how values that are different are treated
_shrink -> take unique values in original order of first apperance
_drop -> drop the key:value pair
_nullify -> keep the key, set value to null
_error -> throw an exception
anything else -> replace the value
empty values are retained unchanged
whenDifferent: – Key-specific behavior when different. The values can be any of the appropriate values for defaultWhenDifferent and are applied to specific keys in transposedSample. For a particular key, whenDifferent’s value will be used if the key is present in whenDifferent, and defaultWhenDifferent otherwise.
// Called from within map function on transposedSamples
def coalesce (transposedSample, defaultWhenDifferent = "_unchanged", whenDifferent = [:]) {
def result = [:]
// Iterate through values in the transposed sample
transposedSample.each {
key, value ->
// LinkedHashSet preserves the original order of the ArrayList
def valueSet = value instanceof ArrayList ? value as LinkedHashSet : [value] as LinkedHashSet
if (valueSet.size() == 1) {
// If there's only one value, just use that.
result += [(key):valueSet.first()]
} else if (valueSet.size() > 1) {
// Determine strategy for this key when value is non-homogeneous
def todo = whenDifferent.get(key, defaultWhenDifferent)
// Error if specified
assert todo != 'error', "In coalesce, ${key} is set to error when values are non unique. Values are ${value} for sample:\n${sample}"
// Otherwise, use the 'todo' strategy to decide how to alter the value
switch (todo) {
case "_unchanged":
result += [(key): value] // Leave unaltered
break
case "_shrink":
result += [(key): valueSet] // Keep first instance of each distinct item in value in original order of first appearance
break
case "_drop": // Drop it from the result
break
default:
result += [(key): todo] // Replace with value of 'todo'
}
} else if (valueSet.size() == 0) {
result += [(key): value] // If there is nothing in value, keep the empty list as the value
}
}
return result
}
sqljoin
workflow sqljoin {
take:
left
right
condition
main:
/*
MOTIVATION: HashMap-based traceability
A traceable workflow tracks every file and attribute produced for each
sample in a single per-sample HashMap under meaningful keys. For example,
the initial fastq files are under sample.fastq1 and sample.fastq2. When
alignment producs a bam file, the file handle is stored at sample.sambam.
CONTEXT:
Nextflow poses a few challenges for traceable workflows.
If the entire sample HashMap is passed in as a process input,
Nextflow will yield a different task cache for the process any time
irrelevant Hashmap attributes change, breaking the -resume feature.
In order to be staged, Nextflow requires paths to be extracted from
the HashMap and passed in the format path(pathVariable) in the input
and output sections.
Hich solves this by assigning each sample or resource file a unique ID.
This, along with the necessary process inputs, are extracted from the
HashMap, then the outputs (inluding the ID) are built into a new HashMap,
and the ID can be used as a join key to associate the process outputs
with the original per-sample HashMap.
The problem is that Nextflow's standard 'join' operator does not really
implement a true SQL inner join (contrary to the documentation's claims)
and does not work directly on HashMap keys.
PURPOSE:
Implement a true SQL-like inner, full, left, and right join.
IMPLEMENTATION:
ARGUMENTS:
left
right
condition
by
suffix
how
*/
/* Similar to a SQL inner join
Tables are represented as channels emitting LinkedHashMap 'rows'
Arguments:
left -- left channel
right -- right channel
by -- Key or list of keys to join on
suffix -- suffix to add to right if necessary to avoid non-key clashes
Notes:
1. If there is a conflict (i.e. the left table has a non-key 'data'
and the right has a non-key 'data' and 'data_right' with the suffix
'_right', then the right keys with higher index have more suffix copies
appended to them.
2. sqljoin takes key type into account, so even if key1 == key2,
they will not be joined (and will not be treated as conflicting and
thus distinguished with a suffix) unless their class is the same. One
way this can be an issue is if their types are GString and String.
*/
/*
Motivation and implementation logic:
When Nextflow channel items are LinkedHashMaps ("hashmaps"), they are
similar to un-normalized tables. Items are rows, keys represent the
column name for that row, and values represent cell entries.
We can therefore do an SQL-like join on them. A left join, for example,
keeps all 'left' rows, adding 'right' rows where there is a matching]
key and null or default values where there is no matching 'right' key.
If two or more 'right' rows match one 'left' row, then multiple
rows are produced containing one copy of the 'left' items and one
copy of one of the matching 'right' rows' items.
Nextflow's 'join' is similar but importantly different from an SQL
inner join. Other join types are not available. None operate on named
keys from hashmaps. We implement a closer approximation to the SQL
full, left, right and inner joins on Nextflow channels containing
a single LinkedHashMap item here.
'keyset': the values in the by keys of a particular channel item (i.e. table row)
The implementation idea is as follows:
1. Depending on join type, determine missing keysets that would
inappropriately lead to rows being dropped and add in empty placeholders.
For example, for a left join all left rows should be kept. So any keysets
in the left table not present in the right table must be added as
placeholders to the right table.
2. Group the left channel by keyset, giving a [keyset: [list of items]]
channel. This is necessary because the Nextflow join operator returns
only the first item from the left channel for each specific keyset.
3. Use Nextflow cross operator to get pairwise combinations of left
channel items and right channel items with matching keysets. For example,
if there are two left items and two right items with a particular
keyset we obtain:
[[keyset, [left_1, left_2]], [keyset, right_1]]
[[keyset, [left_1, left_2]], [keyset, right_2]]
4. Reformat to produce hashmap-formatted elements:
[left_1 + right_1]
[left_1 + right_1]
[left_1 + right_1]
[left_1 + right_1]
Note: the matching keyset will be represented once in the
resulting hashmaps
Where there are non-keyset matching columsn in the left and right
tables, we add a suffix to avoid a clash or (if a blank suffix
is provided) we overwrite.
*/
/*
SETUP: Extract and format by, suffix, and how
*/
// Get the BY-values to join on and convert to an ArrayList if it's not already
by = condition.get("by")
by = by instanceof List ? by : [by]
// Get the suffix for non-BY values that are found in the left and right channels.
suffix = condition.get("suffix", "_right")
// Determine what type of join to do, defaulting to a left join.
how = condition.get("how", "left")
/*
1. Depending on join type, determine missing keysets that would
inappropriately lead to rows being dropped and add in empty placeholders.
For example, for a left join all left rows should be kept. So any keysets
in the left table not present in the right table must be added as
placeholders to the right table.
*/
if (how == "full" || how == "left") {
// Add any keys in left that are missing in right to right.
missing = keydiff(left, right, by, "right")
right = right | concat(missing)
}
if (how == "full" || how == "right") {
// Add any keys in left that are missing in right to right.
missing = keydiff(left, right, by, "left")
left = left | concat(missing)
}
/*
2.
PURPOSE:
Extract BY values in preparation for cross, grouping the LEFT hashmaps
that have the same BY value since this is a requirement of
Nextflow's cross operator.
IMPLEMENTATION:
This leaves the channels formatted as follows:
LEFT:
channel([BY1, [HASHMAP1, HASHMAP2, ...]], [BY2, [HASHMAP3, HASHMAP4, ...]])
RIGHT:
channel([BY1, HASHMAP1], [BY2, HASHMAP2])
*/
left = left | map{[it.subMap(by), it]} | groupTuple
right = right | map{[it.subMap(by), it]}
dominant = condition.dominant ?: "left"
assert dominant in ["left", "right"], "In sqljoin, condition.dominant must be 'left' (default) or 'right' but was ${dominant}"
/*
CONTEXT:
At this point, LEFT items with a matching BY value are grouped as a
single LEFT channel item (see above). We need to associate all samples
with matching BY values and reshape them into single, joined HashMap
items.
IMPLEMENTATION:
Nextflow's cross operator will associate each LEFT item with any RIGHT
items having the same BY value. For example, if there are two LEFT and
two RIGHT samples having the BY value BY1, cross yields:
channel(
[[BY1, [LEFT_MAP1, LEFT_MAP2]], [BY1, RIGHT_MAP1]],
[[BY1, [LEFT_MAP1, LEFT_MAP2]], [BY1, RIGHT_MAP2]]
)
For each item, we loosely want to emit items in the format:
[LEFT_MAP1 + RIGHT_MAP1]
[LEFT_MAP2 + RIGHT_MAP1]
[LEFT_MAP1 + RIGHT_MAP2]
[LEFT_MAP2 + RIGHT_MAP2]
The caveat is that we have to deal with a situation where a LEFT_MAP
and its RIGHT_MAP share non-BY keys. To handle this, we declare one of
the channels "dominant" (left by default) and a suffix to add one or
more times to the non-dominant channel. If the suffix is blank, then
the non-dominant channels' non-BY items are replaced by the dominant
channel's non-BY items at the same key. Otherwise, the suffix is added
to the non-dominant non-BY keys until they don't match any keys in the
dominant-channel HashMap.
*/
left
| cross(right)
| map{
joinedMapList = []
leftMapList = it[0][1]
rightMap = it[1][1]
// Iterate through each leftrow and combine with rightMap
leftMapList.each() {
leftMap ->
joinedMap = dominant == "left" ? leftMap : rightMap
// Iterate through each key in rightMap,
// appending suffix if needed to avoid clashes with leftrow
// and adding it to the joinedMap.
(dominant == "left" ? rightMap : leftMap).each() {
key, value ->
// Add non-keyset keys from right to left.
if (!by.contains(key)) {
// Add as many suffixes as necessary to avoid clash
while (suffix != "" && joinedMap.containsKey(key)) {
key = key + suffix
}
/* Add the right key value if either criterion holds:
- There is no name clash, or
- The left table is not declared dominant
*/
if (!joinedMap.containsKey(key)) {
joinedMap += [(key):value]
}
}
}
// Add the new hashmap to the list of hashmaps
joinedMapList += [joinedMap]
}
// List of per-keyset joins
joinedMapList
}
| collect // Currently, channel items are lists of per-keyset joins.
| flatMap // Collects to a single list of lists, then flattens
| set{joined} // Yielding the desired result of single-hashmap join items.
emit:
joined
}
pack
def pack(addTo, addMe, by = "id") {
/*
A big feature of Hich is the ability to store sample attributes in hashmaps.
This requires extracting just the features specifically relevant to a particular
process and passing them in. Otherwise, irrelevant changes to the hashmap would trigger
reruns of the process. However, this then requires rejoining the outputs from the process
to the original hashmap. We therefore pass in a unique id for each sample. The sqljoin
function does most of the heavy lifting but is more general than required, so "pack"
provides a simpler gloss over this needed functionality.
Update the "addTo" channel of hashmaps with the "addMe" channel of hashmaps
dominant is "right" because this makes addMe overwrite corresponding keys
in addTo rather than the reverse. Suffix is blank so we overwrite rather than
adding new keys when addTo and addMe share keys.
*/
sqljoin(addTo, addMe, [by:by, suffix: "", dominant:"right"])
}
keydiff
workflow keydiff {
take:
left
right
by
how
main:
/*
CONTEXT:
Facilitate sql-like joins on Nextflow channels containing LinkedHashMaps
PURPOSE:
During a left or right join, identify missing keys that should be replaced
by default values.
IMPLEMENTATION:
Take two channels LEFT and RIGHT joined on a subset of keys BY
Channel items should be HashMaps
Return:
LEFT KEYS not in RIGHT
RIGHT KEYS not in LEFT
*/
/*
PURPOSE:
Extract unique BY values from LEFT and RIGHT channels
IMPLEMENTATION:
Add a 'true' value as this will help keep track of whether a BY value
was left-only, right-only, or in both.
*/
left_keys = left
| map{it.subMap(by)}
| unique
| map{[it, true]}
right_keys = right
| map{it.subMap(by)}
| unique
| map{[it, true]}
/*
PURPOSE:
Split which BY-values are missing in the left (i.e. right-only), missing
in the right (i.e. left-only), or missing in neither (i.e. found in both)
IMPLEMENTATION:
unique operator above means left_keys and right_keys are non-redundant
remainder: true emits [left, null] or [null, right] when a BY value is
left-only or right-only.
join will result in a 3-element tuple [BY-value, inLeft, inRight]
where inLeft and inRight are either true or null (null is the default
Nextflow substitutes for joins when 'remainder' is true).
*/
missing =
left_keys
| join(right_keys, remainder: true)
| branch {
neither: it[1] && it[2] //key present in both
right: it[1] && !it[2] //key in left only
left: !it[1] && it[2] //key in right only
}
/*
CONTEXT:
For left joins, return any right-only values
For right joins, return any left-only values
PURPOSE:
Return the missing BY-values that are needed depending on the join type.
IMPLEMENTATION:
*/
if (how == "right") {
missing.right | map{it[0]} | set{result}
} else if (how == "left") {
missing.left | map{it[0]} | set{result}
}
emit:
result
}
util/samples.nf
def label(map, lbl) {
// Return whether map.lbl contains a non-empty string, used below to determine
// if the techrep, biorep and condition keys are present and specified
return map?[(lbl)]?.toString()?.length() > 0
}
/*
Determine if the techrep, biorep, condition fields are uniquely specified
For isSingleCell, the user has to specify a cellBarcodeField (which tag in a sam/bam
file holds the cell barcode in order to extract it to the pairs cellID field)
or has to specify isSingleCell (which can be used for pairs files where the cellID field
is already extracted).
*/
def isTechrep(map) {return label(map, "techrep") && label(map, "biorep") && label(map, "condition")}
def isBiorep(map) {return (!label(map, "techrep")) && label(map, "biorep") && label(map, "condition")}
def isCondition(map) {return (!label(map, "techrep")) && (!label(map, "biorep")) && label(map, "condition")}
def isSingleCell(map) {return map.cellBarcodeField || map.isSingleCell}
/*
Hich depends on each sample having a unique sample.id attribute for joining process results to the
appropriate sample hashmap. A legible name is also convenient for troubleshooting. If the user
wants to let Hich build unique ids automatically, they should specify unique conditions, bioreps and techreps
and not use the _ character in order to ensure that all ids will be unique. The aggregateProfileName is also
included because new copies of the input samples are produced for each aggregateProfile.
*/
def constructIdentifier(map) {
return map.subMap("condition", "biorep", "techrep", "aggregateProfileName").values().join("_")
}
util/analysisPlans.nf
def createCompositeStrategy(strategyKeys, strategyMap, combineHow = [:]) {
/* A composite strategy is a hashmap in which keys are sample attributes and values are lists of permitted sample attribute values. It is created by combining one or more individual strategies specified in params.sampleSelectionStrategies.
strategyKeys -- keys in strategyMap for the sub-strategies to combine (i.e. analysisPlan)
strategyMap -- [strategyKey: selectionStrategy] map-of-maps, typically params.sampleSelectionStrategies
combineHow -- not currently used, but permits defining how to combine strategies when
there are conflicts by passing a [key: closure] map where
closure(oldVals, newVals) outputs the updated value for the key. By
default, the later-specified strategy has precedence.
Returns empty map if no keys or strategies are supplied.
*/
def compositeStrategy = [:]
// Extract the values associated with individual selected strategies to form the composite strategy
def subStrategies = strategyKeys ? strategyMap.subMap(strategyKeys).values() : []
subStrategies.each {
subStrategy ->
subStrategy.each {
key, val ->
// Converts val from a single element or ArrayList into a HashSet of elements
def newVals = asHashSet(val)
// Handle situations where the same key is defined in more than one composite strategy
// combineHow may contain a per-key method to define how to do the replacement.
// The default behavior is to replace values from earlier-specified keys with
// newly-specified keys. For example, if the keys are ["strategy1", "strategy2"]
// and both strategies have a value "v1", then v1 will take the value for strategy2 by default.
def oldVals = compositeStrategy.get(key, new HashSet())
def updated = combineHow.get(key, {a, b -> b})(oldVals, newVals)
// Add the value to the composite strategy
compositeStrategy += [(key): updated]
}
}
return compositeStrategy
}
def filterSamplesByStrategy(samples, strategy) {
/* After a composite strategy is built, filter for samples for which all sample attributes are present and are in the list of permitted values specified by the composite strategy.
samples - a channel of sample hashmaps
strategy - a hashmap as [attributeName: [permittedValues]]
*/
// Return all samples if no strategy is specified
if (!strategy) return samples
// Remove reserved keywords from the set of sample-specific strategies
def reservedKeywords = ["same", "different"]
def sampleAttributeFilter = strategy.findAll {key, value -> !(key in reservedKeywords)}
def filtered = samples | filter {
sample ->
def passesFilter = sampleAttributeFilter.every {key, select ->
// Check that the sample attribute value is in the whitelisted values specified in the composite strategy
sample.get(key) in select
}
passesFilter
}
// For each sample, collect into a single list and ensure that at least one sample was selected
filtered
| collect
| {
assert it.size() > 0, "Error: In filterSamplesByStrategy with sample selection strategy ${strategy}, no samples matched this filter."
}
return filtered
}
def groupSamplesByStrategy(samples, strategy) {
/* Get all samples having matching values of strategy.same
*/
return samples
| map{tuple(it.subMap(strategy.get("same", [])), it)}
| groupTuple
| map{it[1]}
}
def pairSamplesByStrategy(samples, strategy) {
/*
The diffloops workflow has to define a way to pair up samples, especially
by enforcing that certain attributes are the same or different.
samples -- channel of sample hashmaps
strategy -- a composite sample selection strategy [attribute: permittedValues]
two possible attributes are
same: a list of attributes which must be the same to pair two samples
different: a list of attributes which must differ to pair two samples
other attributes are used for filtering individual samples
*/
// Ensure there's no conflict between "same" and "different"
def same = strategy.get("same", [])
def different = strategy.get("different", [])
def sameAndDifferent = same.intersect(different)
if (!sameAndDifferent.isEmpty()) {
System.err.println("Warning: In filterSamplesByStrategy, comparisons on ${sameAndDifferent} are required to be same and different, so no result is obtained")
return channel.empty()
}
// Filter individual samples before forming pairs of samples
def filtered = filterSamplesByStrategy(samples, strategy)
// Obtain pairs of samples matching the "same" and "different" criteria
def combined = filtered
| combine(filtered)
| filter{it[0].id <= it[1].id}
| unique
| filter {
s1, s2 ->
def sameOK = same.every {key -> s1.get(key) == s2.get(key)}
def differentOK = different.every {key -> s1.get(key) != s2.get(key)}
sameOK && differentOK
}
combined
| collect
| map{
assert it.size() > 0, "In pairSamplesByStrategy with sample selection strategy ${strategy}, no samples were paired, likely due to 'same' or 'different' filter."
}
return combined
}
util/workflowControl.nf
def emptyOnLastStep(step, samples) {
def isExplicitLastStep = (params.containsKey("lastStep") && params.get("lastStep") == step)
def isLastStep = (step == "End") || isExplicitLastStep
def hasViewLastStep = params.containsKey("viewLastStep") && params.get("viewLastStep")
if (isLastStep && hasViewLastStep) {
samples
| map {
sample ->
params.viewLastStep instanceof Boolean ? sample : sample.subMap(params.viewLastStep.split())}
| view
}
return isExplicitLastStep ? channel.empty() : samples
}
def skip(step) {
/*
Users may want to skip some steps, such as QC or forming a particular kind of contact matrix,
or run only certain steps. This uses both params to define a list of steps to be skipped
(the intersection of skip and runOnly's complement).
*/
def excluded = params.containsKey("runOnly") && !params.runOnly.split().contains(step)
def skipped = params.containsKey("skip") && params.skip.split().contains(step)
return excluded || skipped
}
util/cli.nf
def parsePattern(String str, String parsePattern) {
/*
Used to extract sample attributes from filenames, such as condition, biorep, and techrep,
via a syntax similar to that offered by Python's parse library. In parse, users can
extract substrings into a map with patterns like: "{condition}_{biorep}_{techrep}.fastq",
which would take a string like "cond1_br1_tr1.fastq" and return ["condition": "cond1", "biorep": "br1", "techrep": "tr1"].
This is easier to specify at the command line than a regex but AFAIK has no Groovy equivalent.
This function implements this parsing functionality, returning the extracted map.
*/
def patternPlaceholders = []
// This regex searches the parsePattern string (i.e. "{condition}_{biorep}_{techrep}.fastq")
// for the placeholders between braces (i.e. condition, biorep, techrep) and adds them
// to the list of patternPlaceholders to become keys in the output map.
// It also yields in "pattern" the list of matchers to look for in the input string "str"
def pattern = parsePattern.replaceAll(/\{([^{}]*)\}/) { match ->
if (match[1].trim()) {
patternPlaceholders << match[1] // Track the placeholder name
"(?<${match[1]}>.+?)"
} else {
"" // Ignore empty placeholders
}
}
// This extracts the patterns from str
def matcher = str =~ pattern
// Combine the patternPlaceholders with the corresponding matches from "str" into an output map "result"
def result = [:]
if (matcher) {
patternPlaceholders.eachWithIndex { placeholder, index ->
result[placeholder] = matcher.group(index + 1) // Retrieve group by index
}
}
return result ?: null
}
def formatArg(pattern, object, sep) {
/*
Some processes receive either a list of values or a single non-list element
as parameter values, but need to call a CLI command passing a delimiter-separated
list of the received values. Other times they get nothing and should
not pass an argument for that parameter at all. This facilitates this interconversion
and returns an empty string if the object passed was falsey.
NOTE this is a potential issue if the goal is to pass a boolean false
to the CLI command, but I don't think Hich currently does this...
pattern -- the string pattern to format the results into, like "--numbers {commaSeparatedNums}"
object -- the element or list of elements to join (where necessary) into a delimiter-separated list
sep -- the delimiter, like ","
*/
// Put non-lists into a list so when join is called, it has something to (silently) operate on
def listed = (object instanceof List || object == null) ? object : [object]
def joined = listed ? listed.join(sep) : listed
// Format the string with the result or return
return joined ? String.format(pattern, joined) : ""
}
util/hashmaps.nf
def asHashSet(val) {
// Convert a non-HashSet val into a HashSet
if (val instanceof ArrayList) return new HashSet(val)
if (val instanceof HashSet) return val
return new HashSet([val])
}
util/logs.nf
def prepForJson(obj) {
def updated = obj
if (obj instanceof Map) {
updated = [:]
obj.each {
k, v ->
kStr = k.toString()
updated[(kStr)] = prepForJson(v)
}
} else if (obj.getClass() in [List, ArrayList]) {
updated = []
obj.eachWithIndex {it, idx -> updated[idx] = prepForJson(it) }
} else {
updated = obj.toString()
}
return updated
}
def withLog(cmdArg, mapArg) {
def map = mapArg + [command: cmdArg.toString()]
// Convert to a map to avoid cyclic entries
def preppedMap = prepForJson(map)
def json = JsonOutput.toJson(preppedMap)
return "${cmdArg} && cat <<'EOF' > hich_output.json\n${json}\nEOF"
}
def stubLog(stubArg, cmdArg, mapArg) {
def map = mapArg + [command: cmdArg.toString(), stub: stubArg.toString()]
// Convert to a map to avoid cyclic entries
def preppedMap = prepForJson(map)
def json = JsonOutput.toJson(preppedMap)
return "${stubArg} && cat <<'EOF' > hich_output.json\n${json}\nEOF"
}
util/files.nf
def isExistingFile(it) {
// Type-agnostic way to check if file exists for any file class having an exists() method.
return it && it.metaClass.respondsTo(it, 'exists') && it.exists()
}
def datatypeFromExtension(path) {
/*
Look for various known extensions to extract the datatype implicitly from the
input file so that Hich can ingest intermediate file formats appropriately
without explicit specification by the user. This is especially helpful in
permitting the user to use globs at the command line to feed files into Hich.
*/
def extensions = [".fastq": "fastq",
".fq": "fastq",
".sam": "sambam",
".bam": "sambam",
".pairs": "pairs",
".mcool": "mcool",
".hic": "hic"]
def pathString = path.toString()
def foundExtension = extensions.keySet().find {
ext ->
pathString.endsWith(ext) || pathString.contains("${ext}.")
}
return foundExtension ? extensions[foundExtension] : null
}