1 通道

Nextflow基于数据流编程模型,其中流程通过通道进行通信。

通道具有两个主要属性:

发送消息是一个异步操作,无需等待接收过程即可立即完成。

接收数据是一项阻塞操作,它将停止接收过程,直到消息到达为止。

2 通道类型

Nextflow区分两种不同的通道:队列通道和值通道。

2.1 队列通道(queue channel)

队列信道是一个无阻塞的单向FIFO队列,它连接两个进程或操作符。‘

通常使用工厂方法(如fromfromPath等)创建队列通道,或使用mapflatMap等通道操作符将其链接起来。

队列通道也是通过使用into子句的进程输出声明创建的。

在需要将进程输出通道连接到多个进程或操作员operator)的过程中,需要使用into运算符创建同一个通道的两个(或多个)副本,并使用每个副本连接单独的进程。

2.2 值通道(value channel)

根据定义,值通道(又称单例通道)绑定到一个值,并且可以无限制地读取该值,而无需消耗其内容。

因此,一个值通道可以被多个进程用作输入

使用值工厂方法或由操作员返回单个值来创建一个值通道,例如firstlastcollectcountminmaxreducesum

例如:

process foo {
  input:
  val x from 1
  output:
  file 'x.txt' into result

  """ echo $x > x.txt """
}

上面代码段中的进程声明一个输入,它隐含地是一个值通道。因此,result输出是一个可由多个进程读取的值通道。

3 通道工厂(channel factory)

通道可以通过进程输出声明隐式创建,也可以使用以下通道工厂方法显式创建

3.1 创建

创建一个新的通道通过使用create方法,如下所示:

channelObj = Channel.create()

3.2 of

of方法允许您创建一个通道,该通道指定方法参数序列,例如:

ch = Channel.of( 1, 3, 5, 7 )
ch.view { "value: $it" }

本示例中的第一行创建一个变量ch,该变量保存一个通道对象。该通道发出在of方法中指定为参数的值。因此,第二行显示以下内容:

value: 1
value: 3
value: 5
value: 7

3.3 from

该from方法允许您创建一个通道,该通道发出指定为方法参数的任何值序列,例如:

ch = Channel.from( 1, 3, 5, 7 )
ch.subscribe { println "value: $it" }

本示例中的第一行创建一个变量ch,该变量保存一个通道对象。该通道发出在from方法中指定为参数的值。因此,第二行将打印以下内容:

value: 1
value: 3
value: 5
value: 7

以下示例显示了如何根据一系列数字或字符串创建通道:

zeroToNine = Channel.from( 0..9 )
strings = Channel.from( 'A'..'Z' )

请注意,当from参数是实现(Java)Collection接口的对象时 ,结果通道将收集条目作为单独的排放发出。

因此,以下两种声明产生相同的结果,即使在第一种情况下将项指定为多个参数,而在第二种情况下将其指定为单个列表对象参数时,甚至很难。

Channel.from( 1, 3, 5, 7, 9 )
Channel.from( [1, 3, 5, 7, 9] )

但是,当提供了多个论点时,它们总是作为单一排放进行管理。因此,以下示例创建一个通道,该通道发出三个条目,每个条目都是一个包含两个元素的列表:

Channel.from( [1, 2], [5,6], [7,9] )

3.4 value

该值工厂方法用于创建一个值的通道。null可以指定一个可选的not 参数,以将通道绑定到特定值。例如:

expl1 = Channel.value()
expl2 = Channel.value( 'Hello there' )
expl3 = Channel.value( [1,2,3,4,5] )

示例中的第一行创建一个“空”变量。第二行创建一个通道并将一个字符串绑定到该通道。最后,最后一个创建一个通道,并将一个列表对象绑定到该通道,该列表对象将作为唯一的通道发出。

3.5 fromList

fromList方法允许您创建一个通道,该通道发出作为元素列表提供的值,例如:

Channel
    .fromList( ['a', 'b', 'c', 'd'] )
    .view { "value: $it" }

打印:

a 
b 
c 
d

3.6 fromPath

您可以通过使用fromPath方法并指定路径字符串作为参数来创建发出一个或多个文件路径的通道。例如:

myFileChannel = Channel.fromPath( '/data/some/bigfile.txt' )

上一行创建一个通道,并将 引用指定文件的Path项绑定到该通道。

注意:它不检查文件是否存在。

只要fromPath变量包含*?通配符,它​​将被解释为全局路径匹配器。例如:

myFileChannel = Channel.fromPath( '/data/big/*.txt' )

本示例创建一个通道,并发出Path与文件夹中带有txt扩展名的/data/big文件一样多的项目。

例如:

files = Channel.fromPath( 'data/**.fa' )
moreFiles = Channel.fromPath( 'data/**/*.fa' )
pairFiles = Channel.fromPath( 'data/file_{1,2}.fq' )

第一行返回一个信道发射以后缀结尾的文件.fadata的文件夹和在其所有子文件夹递归。而第二个只发射出具有在相同后缀的文件的任何在子文件夹data路径。最后,最后一个示例发出两个文件:data/file_1.fqdata/file_2.fq

为了包括隐藏文件,您需要以句点字符开头或指定选项。例如:hidden: true

expl1 = Channel.fromPath( '/path/.*' )
expl2 = Channel.fromPath( '/path/.*.fa' )
expl3 = Channel.fromPath( '/path/*', hidden: true )

第一个示例返回指定路径中的所有隐藏文件。第二个返回所有以.fa后缀结尾的隐藏文件。最后,最后一个示例返回该路径中的所有文件(已隐藏和未隐藏)。

默认情况下,全局模式仅查找符合指定条件的常规文件路径,即它不会返回目录路径

您可以使用type指定值的参数filedirany为了定义所需的路径类型。例如:

myFileChannel = Channel.fromPath( '/path/*b', type: 'dir' )
myFileChannel = Channel.fromPath( '/path/a*', type: 'any' )

第一个示例将返回所有以后缀结尾的目录路径b,而第二个示例将返回以a前缀开头的任何文件和目录。

名称 描述
glob 值为true时,解释字符*?[]{}作为水珠通配符,否则处理它们的正常字符(默认值:true
type 键入的返回路径,无论是filedirany(默认值:file
hidden 值为true时,包括在所得到的路径隐藏文件(默认值:false
maxDepth 要访问的最大目录级别数(默认:无限制)
followLinks 值为true时,它遵循在目录树的遍历符号链接,否则会被管理的文件(默认:true
relative 值为true时,返回路径是相对于最顶层的公共目录(默认值:false
checkIfExists 值为true时,抛出指定路径的例外在文件系统中不存在(默认值:false

3.7 fromFilePairs

fromFilePairs方法创建一个通道,该通道发出与用户提供的全局模式匹配的文件对。匹配文件以元组的形式发出,其中第一个元素是匹配对的分组键,第二个元素是文件列表(按字典顺序排序)。例如:

Channel
    .fromFilePairs('/my/data/SRR*_{1,2}.fastq')
    .println()

它将产生类似于以下内容的产出:

[SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
[SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]
[SRR493368, [/my/data/SRR493368_1.fastq, /my/data/SRR493368_2.fastq]]
[SRR493369, [/my/data/SRR493369_1.fastq, /my/data/SRR493369_2.fastq]]
[SRR493370, [/my/data/SRR493370_1.fastq, /my/data/SRR493370_2.fastq]]
[SRR493371, [/my/data/SRR493371_1.fastq, /my/data/SRR493371_2.fastq]]

或者,可以实现自定义文件对分组策略,以提供一个闭包,以当前文件为参数,返回分组键。例如:

Channel
    .fromFilePairs('/some/data/*', size: -1) { file -> file.extension }
    .println { ext, files -> "Files with the extension $ext are $files" }

可用的可选参数表:

名称 描述
type 键入的返回路径,无论是filedirany(默认值:file
hidden 值为true时,包括在所得到的路径隐藏文件(默认值:false
maxDepth 要访问的最大目录级别数(默认:无限制)
followLinks 值为true时,它遵循在目录树的遍历符号链接,否则会被管理的文件(默认:true
size 定义每个发出的项目应保留的文件数(默认值:2)。设置-1为任意。
flat 值为true时,发出的元组中将匹配文件作为唯一元素生成时(默认值:false)。
checkIfExists 值为true时,抛出指定路径的例外在文件系统中不存在(默认值:false

3.8 fromSRA

fromSRA方法查询NCBI SRA数据库并返回一个通道,该通道发出与指定标准(即项目或登录号)匹配的FASTQ文件。例如:

Channel
    .fromSRA('SRP043510')
    .println()

它返回:

[SRR1448794, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
[SRR1448795, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
[SRR1448792, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/002/SRR1448792/SRR1448792.fastq.gz]
[SRR1448793, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/003/SRR1448793/SRR1448793.fastq.gz]
[SRR1910483, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/003/SRR1910483/SRR1910483.fastq.gz]
[SRR1910482, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/002/SRR1910482/SRR1910482.fastq.gz]
(remaining omitted)

可以使用列表对象指定多个登录ID:

ids = ['ERR908507', 'ERR908506', 'ERR908505']
Channel
    .fromSRA(ids)
    .println()
[ERR908507, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_2.fastq.gz]]
[ERR908506, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_2.fastq.gz]]
[ERR908505, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_2.fastq.gz]]

可用的可选参数表:

名称 描述
apiKey NCBI用户API密钥。
cache 启用/禁用缓存API请求(默认值:true)。
max 可以重试的最大条目数(默认值:无限制)。

3.9 watchPath

watchPath方法监视文件夹中是否存在与指定模式匹配的一个或多个文件。一旦存在满足指定条件的文件,该文件就会通过该watchPath 方法返回的通道发出。可以通过使用*或?通配符(即通过指定全局路径匹配条件)来指定要监视的文件上的条件。

例如:

Channel
   .watchPath( '/path/*.fa' )
   .subscribe { println "Fasta file: $it" }

默认情况下,它仅监视在指定文件夹中创建的新文件。可选地,可以提供第二个参数来指定要观看的事件。支持的事件是:

名称 描述
create 创建一个新文件(默认)
delete 文件被删除
modify 文件被修改

您可以使用逗号分隔的字符串来指定多个这些事件之一,如下所示:

Channel
   .watchPath( '/path/*.fa', 'create,modify' )
   .subscribe { println "File created or modified: $it" }

参考资料:https://www.nextflow.io/docs/latest/channel.html