介绍
Transporter是一个开源工具,用于跨不同数据存储区移动数据。 开发人员经常编写一次性脚本来执行任务,例如跨数据库移动数据,将数据从文件移动到数据库,反之亦然,但使用像Transporter这样的工具有几个优点。
在Transporter中,您可以构建管道 ,它定义从源 (读取数据的位置)到接收器 (数据写入位置)的数据流。 源和汇可以是SQL或NoSQL数据库,平面文件或其他资源。 Transporter使用可插拔扩展的适配器与这些资源进行通信,并且该项目默认包含用于常用数据库的多个适配器 。
除了移动数据之外,Transporter还允许您在使用变压器穿过管道时更改数据。 像适配器一样,默认情况下包含几个变压器 。 您也可以编写自己的变压器来自定义数据的修改。
在本教程中,我们将介绍使用Transporter的内置适配器和用JavaScript编写的自定义转换器将数据从MongoDB数据库移动和处理到Elasticsearch的示例。
先决条件
要学习本教程,您需要:
- 遵循此Ubuntu 16.04初始服务器设置教程设置的一台Ubuntu 16.04服务器,其中包括sudo非root用户和防火墙。
- 在Ubuntu 16.04教程或现有的MongoDB安装中遵循此 MongoDB安装了MongoDB。
- 通过在Ubuntu 16.04教程中使用此Elasticsearch或现有Elasticsearch安装来安装Elasticsearch。
运输管道是用JavaScript编写的。 本教程不需要任何JavaScript知识或经验,但您可以在这些JavaScript教程中了解更多信息。
第1步 - 安装运输车
Transporter为大多数常见操作系统提供二进制文件。 Ubuntu的安装过程包括两个步骤:下载Linux二进制文件并使其可执行。
首先,从GitHub的Transporter最新版本页面获取最新版本的链接。 复制以-linux-amd64
结尾的链接。 本教程使用v0.5.2,这是写作时最新的。
将二进制文件下载到您的主目录。
cd
wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64
将其移至/usr/local/bin
或您的首选安装目录。
mv transporter-*-linux-amd64 /usr/local/bin/transporter
然后使其可执行,以便可以运行它。
chmod +x /usr/local/bin/transporter
您可以通过运行二进制文件来测试Transporter是否正确设置。
transporter
您将看到使用帮助输出和版本号:
OutputUSAGE
transporter <command> [flags]
COMMANDS
run run pipeline loaded from a file
. . .
VERSION
0.5.2
为了使用Transporter将数据从MongoDB移动到Elasticsearch,我们需要两件事:我们要移动的MongoDB中的数据以及告诉Transporter如何移动它的管道。 下一步创建一些示例数据,但如果您已经有了一个想要移动的MongoDB数据库,则可以跳过下一步并直接进入第3步。
第2步 - 将示例数据添加到MongoDB(可选)
在这一步中,我们将在MongoDB中创建一个包含单个集合的示例数据库,并为该集合添加一些文档。 然后,在本教程的其余部分中,我们将使用Transporter管道迁移和转换此示例数据。
首先,连接到你的MongoDB数据库。
mongo
这会将您的提示改为mongo>
,表示您正在使用MongoDB shell。
从这里,选择一个数据库来处理。 我们将调用我们的my_application
。
use my_application
在MongoDB
,您不需要显式创建数据库或集合。 一旦开始将数据添加到按名称选择的数据库中,该数据库将自动创建。
因此,要创建my_application
数据库,请将两个文档保存到其users
集合中:其中一个代表Sammy Shark,一个代表Gilly Glowfish。 这将是我们的测试数据。
db.users.save({"firstName": "Sammy", "lastName": "Shark"});
db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});
添加完文档后,您可以查询users
集合以查看您的记录。
db.users.find().pretty();
输出看起来类似于下面的输出,但_id
列将会不同。 MongoDB自动添加对象ID来唯一标识集合中的文档。
output{
"_id" : ObjectId("59299ac7f80b31254a916456"),
"firstName" : "Sammy",
"lastName" : "Shark"
}
{
"_id" : ObjectId("59299ac7f80b31254a916457"),
"firstName" : "Gilly",
"lastName" : "Glowfish"
}
按CTRL+C
退出MongoDB shell。
接下来,我们创建一个Transporter管道将这些数据从MongoDB移动到Elasticsearch。
第3步 - 创建基本管道
Transporter中的pipeline.js
默认由一个名为pipeline.js
的JavaScript文件定义。 给定源和接收器,内置init
命令会在正确的目录中创建基本配置文件 。
使用MongoDB作为源和Elasticsearch作为接收器初始化一个入门级pipeline.js
。
transporter init mongodb elasticsearch
您将看到以下输出:
OutputWriting pipeline.js...
你不需要为这一步修改pipeline.js
,但让我们看看它是如何工作的。
该文件看起来像这样,但您也可以使用命令cat pipeline.js
, less pipeline.js
(按q
退出)或使用您最喜欢的文本编辑器打开文件来查看文件的内容。
var source = mongodb({
"uri": "${MONGODB_URI}"
// "timeout": "30s",
// "tail": false,
// "ssl": false,
// "cacerts": ["/path/to/cert.pem"],
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}",
// "read_preference": "Primary"
})
var sink = elasticsearch({
"uri": "${ELASTICSEARCH_URI}"
// "timeout": "10s", // defaults to 30s
// "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
// "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
// "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})
t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
以var source
和var sink
开头的行分别为MongoDB和Elasticsearch适配器定义JavaScript变量 。 我们将定义这些适配器稍后在此步骤中需要的MONGODB_URI
和ELASTICSEARCH_URI
环境变量。
以//
开头的行是注释。 它们突出显示了一些可以为流水线设置的常见配置选项,但我们并未将它们用于我们在此创建的基本流水线。
最后一行连接源和接收器。 可变transporter
或t
让我们访问我们的管道。 我们使用.Source()
和.Save()
使用之前在文件中定义的source
变量和sink
变量来添加源和汇。
Source()
和Save()
函数的第三个参数是namespace.
将/.*/
作为最后一个参数传递意味着我们要从MongoDB传输所有数据并将其保存在Elasticsearch中的相同名称空间下。
在我们运行这个管道之前,我们需要为MongoDB URI和Elasticsearch URI设置环境变量 。 在我们使用的示例中,两者都使用默认设置本地托管,但如果您使用现有的MongoDB或Elasticsearch实例,请确保您自定义这些选项。
export MONGODB_URI='mongodb://localhost/my_application'
export ELASTICSEARCH_URI='http://localhost:9200/my_application'
现在我们准备好运行管道了。
transporter run pipeline.js
你会看到像这样结束的输出:
Output. . .
INFO[0001] metrics source records: 2 path=source ts=1522942118483391242
INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960
INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878
在第二行和倒数第三行中,此输出表明源中有2条记录,而2条记录已移至接收器。
要确认这两个记录都已处理完毕,可以查询Elasticsearch以查找my_application
数据库的内容,该数据库现在应该存在。
curl $ELASTICSEARCH_URI/_search?pretty=true
?pretty=true
参数使输出更易于阅读:
Output{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e9c6687d9f638ced4fe",
"_score" : 1.0,
"_source" : {
"firstName" : "Gilly",
"lastName" : "Glowfish"
}
},
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e986687d9f638ced4fd",
"_score" : 1.0,
"_source" : {
"firstName" : "Sammy",
"lastName" : "Shark"
}
}
]
}
}
MongoDB中的数据库和集合类似于Elasticsearch中的索引和类型。 考虑到这一点,你应该看到:
-
_index
字段设置为my_application,
即原始MongoDB数据库的名称)。 - 为
users,
设置的_type
字段是MongoDB集合的名称。 -
firstName
和lastName
字段分别填写“Sammy”,“Shark”和“Gilly”,“Glowfish”。
这证实来自MongoDB的记录都通过Transporter成功处理并加载到Elasticsearch。 为了建立在这个基本流水线上,我们将添加一个可以转换输入数据的中间处理步骤。
第4步 - 创建一个变压器
顾名思义, 变换器在将源数据加载到接收器之前修改源数据。 例如,它们允许您添加新字段,删除字段或更改字段的数据。 运输车带有一些预定义的变压器以及对定制变压器的支持。
通常,自定义转换器被编写为JavaScript函数并保存在单独的文件中。 要使用它们,可以在pipeline.js
添加对变压器文件的引用。 Transporter包括Otto和Goja JavaScript引擎。 因为Goja更新更快,我们将在这里使用它。 唯一的功能区别是语法。
创建一个名为transform.js
的文件,我们将使用它来编写我们的转换函数。
nano transform.js
以下是我们将使用的函数,它将创建一个名为fullName
的新字段,其值将由一个空格(如Sammy Shark
)分隔的firstName
和lastName
字段连接在一起。
function transform(msg) {
msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
return msg
}
我们来看看这个文件的内容:
-
function transform(msg),
的第一行是 。 -
msg
是一个包含源文档详细信息的JavaScript对象 。 我们使用这个对象来访问通过管道的数据 。 - 该函数的第一行连接两个现有字段并将该值分配给新的
fullName
字段。 - 该函数的最后一行返回要使用的其余管道的新修改的
msg
对象。
保存并关闭文件。
接下来,我们需要修改管道来使用这个变压器。 打开pipeline.js
文件进行编辑。
nano pipeline.js
在最后一行中,我们需要添加对Transform()
函数的调用,以将Transform()
添加到调用Source()
和Save()
之间的管道中,如下所示:
. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")
传递给Transform()
的参数是Transform()
类型,在这种情况下是Goja。 使用goja
函数,我们使用相对路径指定变换器的文件名。
保存并关闭文件。 在重新运行流水线来测试变压器之前,让我们先从前面的测试中清除Elasticsearch中的现有数据。
curl -XDELETE $ELASTICSEARCH_URI
你会看到这个输出确认命令的成功。
Output{"acknowledged":true}
现在重新运行管道。
transporter run pipeline.js
输出看起来与之前的测试非常相似,并且您可以在最后几行看到管道是否像以前一样成功完成。 可以肯定的是,我们可以再次检查Elasticsearch以查看数据是否以我们预期的格式存在。
curl $ELASTICSEARCH_URI/_search?pretty=true
您可以在新输出中看到fullName
字段:
Output{
"took" : 9,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e9c6687d9f638ced4fe",
"_score" : 1.0,
"_source" : {
"firstName" : "Gilly",
"fullName" : "Gilly Glowfish",
"lastName" : "Glowfish"
}
},
{
"_index" : "my_application",
"_type" : "users",
"_id" : "5ac63e986687d9f638ced4fd",
"_score" : 1.0,
"_source" : {
"firstName" : "Sammy",
"fullName" : "Sammy Shark",
"lastName" : "Shark"
}
}
]
}
}
请注意fullName
字段已经在两个文档中添加了正确设置的值。 有了这个,现在我们知道如何将自定义转换添加到传输器管道。
结论
您已经构建了一个带有转换器的基本Transporter管道,用于将数据从MongoDB复制并修改为Elasticsearch。 您可以以相同的方式应用更复杂的转换,在同一管道中链接多个转换,等等。 MongoDB和Elasticsearch只是Transporter支持的两个适配器。 它还支持平面文件,SQL数据库(如Postgres)以及许多其他数据源。
您可以查看GitHub上的Transporter项目,以便随时了解API中的最新变化,并访问Transporter维基以获取有关如何使用适配器,变压器和Transformer其他功能的更多详细信息。