Avro 教程 | Avro 的基本使用
Apache Avro 是一个数据序列化系统。
Avro 提供:
- 丰富的数据结构。
- 一种紧凑、快速的二进制数据格式。
- 一个容器文件,用于存储持久数据。
- 远程过程调用 (RPC)。
- 与动态语言的简单集成。代码生成不需要读取或写入数据文件,也不需要使用或实现 RPC 协议。代码生成作为一种可选的优化,只值得为静态类型语言实现。
Avro 依赖于Schema
。在读取或者写入avro数据时都需要携带Schema信息。这允许在没有每个值开销的情况下写入每个数据,从而使序列化既快速又小。这也便于使用动态脚本语言,因为数据及其模式是完全自描述的。
当 Avro 数据存储在文件中时,它的schema也随之存储,以便以后任何程序都可以处理文件。
Schema
Avro 的 schema 可以用 Json 形式表示, 形如:
1
2
3
4
5
6
7
8
9
10
11
{
"namespace": "com.ehaier.bigdata.avro.schema",
"type": "record",
"name": "User",
"doc": "demo for test",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": ["int", "null"]},
{"name": "favoriteColor", "type": ["string", "null"]}
]
}
数据支持简单类型与复杂类型.
简单类型
- null
- boolean
- int
- long
- float
- double
- bytes
- string
Records
相当于 object 类型, 内部支持一下属性。
name
表示这个记录的名字(相当于类名)namespace
名称空间(相当于包名)doc
record 说明文档aliases
别名fields
Record 内部所包含的字段,每个字段可以包含以下部分内容:name
字段名称doc
字段说明type
字段类型default
默认值
1
2
3
4
5
6
7
8
9
{
"type" : "record",
"name" : "testRecord",
"namespace" : "test.schema"
"aliases" : ["test_record"],
"fields" : [
{"name": "value", "type": "long"}
]
}
Enums
枚举类型
name
枚举名称namespace
名称空间aliases
枚举别名doc
说明文档symbols
枚举值(数组表示)default
枚举默认值
1
2
3
4
5
{
"type": "enum",
"name": "suit",
"symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}
Arrays
数组类型
items
单个类型的数据类型
1
2
3
4
5
{
"type" : "array",
"items" : "string",
"default" : []
}
Maps
映射类型
values
value 的数据类型
1
2
3
4
{
"type" : "map",
"values" : "string"
}
Union
联合数据类型,它使用数组表示, 例如 ["string", "null"]
他表示可以是null,也可以是字符串。
Avro 的常见操作
引入 avro 的依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
schema 的构建
方法一:通过Schema描述文件构建
1
2
3
4
5
6
7
8
9
10
11
{
"namespace": "com.ehaier.bigdata.avro.schema",
"type": "record",
"name": "User",
"doc": "demo for test",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": ["int", "null"]},
{"name": "favoriteColor", "type": ["string", "null"]}
]
}
通过schema描述文件的构建方式需要maven编译支持, 添加maven依赖.
在avro-maven-plugin
中引入需要导入的 schema 描述文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<imports>
<import>${basedir}/src/main/avro/user.avsc</import>
</imports>
</configuration>
</execution>
</executions>
</plugin>
mvn 会在编译的时候将目标的描述文件编译成class文件,namespace
为对应的包名。name
为对应的类名。
方法二: 直接加载schema描述性文件
1
2
3
4
private Schema getSchemaByFile() throws IOException {
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("user.avsc");
return new Schema.Parser().parse(inputStream);
}
方法三:代码构建Schema
使用 SchemaBuilder.builder()
来构建 Avro Schema.
builder.recod()
方法开启一个 record. fields() 开启填充所需要的字段
开启填充所有字段。endRecord()
结束填充字段.
注意在填充字段时,模式是非空的, 如果是需要空的字段,需要明示 nullable()
1
2
3
4
5
6
7
8
9
10
11
private Schema getSchemaByCode(){
SchemaBuilder.TypeBuilder<Schema> builder = SchemaBuilder.builder(); // Schema 构造器
return builder.record("User")
.namespace("com.ehaier.bigdata.avro.schema")
.fields()
.name("name").type(builder.stringType()).noDefault()
.name("favoriteNumber").type(builder.nullable().intType()).noDefault()
.name("favoriteColor").type(builder.nullable().stringType()).noDefault()
.endRecord();
}
基于Schema填充数据
Avro 在程序中使用 GenericRecord
来维护数据实体
对于Schema的数据填充生成
1
2
3
4
GenericRecord record = new GenericData.Record(schema);
record.put("name", "aiden");
record.put("favoriteNumber", 1000);
record.put("favoriteColor", "red");
基于描述文件直接生成的Schema对象
对于直接从描述文件直接生成的目标对象, 则可以直接调用。Record
name 字段及为目标类名.
1
2
3
4
5
User user = User.newBuilder()
.setName("aiden")
.setFavoriteNumber(1000)
.setFavoriteColor("red")
.build();
这里需要说明的是, 生成的目标对象(User
) 为 GenericRecord
的子类。
其中 setter
分别对应Record的字段名
GenericRecord 数据序列化到磁盘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void serializeRecord() throws IOException {
Schema schema = getSchemaByCode();
GenericRecord record = new GenericData.Record(schema);
record.put("name", "aiden");
record.put("favoriteNumber", 1000);
record.put("favoriteColor", "red");
File file = new File("user1.avro");
DatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record);
dataFileWriter.flush();
dataFileWriter.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void serializeRecord2() throws IOException {
User user = User.newBuilder()
.setName("aiden")
.setFavoriteNumber(1000)
.setFavoriteColor("red")
.build();
File file = new File("user2.avro");
DatumWriter datumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter fileWriter = new DataFileWriter<>(datumWriter);
fileWriter.create(user.getSchema(), file);
fileWriter.append(user);
fileWriter.flush();
fileWriter.close();
}
GenericRecord 数据反序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void deserializeRecord1() throws IOException {
Schema schema = getSchemaByCode();
File file = new File("user2.avro");
DatumReader specificDatumReader = new SpecificDatumReader<>(schema);
DataFileReader dataFileReader = new DataFileReader(file, specificDatumReader);
while(dataFileReader.hasNext()){
GenericRecord data = (GenericRecord) dataFileReader.next(schema);
System.out.println(data);
}
dataFileReader.close();
}
1
2
3
4
5
6
7
8
9
10
11
public void deserializeRecord2() throws IOException {
File file = new File("user1.avro");
DatumReader datumReader = new SpecificDatumReader(User.class);
DataFileReader dataFileReader = new DataFileReader(file, datumReader);
while (dataFileReader.hasNext()){
User user = (User)dataFileReader.next();
System.out.println(user);
}
}
参考文档