基于Apache Avro? 1.10.2 版本的Java实现
环境准备
- IDEA 2020.1
- JDK1.8
- Maven 3.3.9
工程目录
src/main/avro 文件夹是用来存放schema定义.avsc 文件和序列化结果文件.avro com.donny.avro 目录下是采用为.avsc 生成Java类的方式实现的代码com.donny.schema 目录中是采用schema的方式,不需要为.avsc 生成Java类的方式实现的代码
pom.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>avrodemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
avro-maven-plugin 插件可以帮助将user.avsc 转换出User Java类文件
com.donny.avro 目录下之中的文件
User.java 文件
建议不拷贝,直接通过插件生成。
package com.donny.avro;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;
@org.apache.avro.specific.AvroGenerated
public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 820790187725012134L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.donny.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<User> ENCODER =
new BinaryMessageEncoder<User>(MODEL$, SCHEMA$);
private static final BinaryMessageDecoder<User> DECODER =
new BinaryMessageDecoder<User>(MODEL$, SCHEMA$);
public static BinaryMessageEncoder<User> getEncoder() {
return ENCODER;
}
public static BinaryMessageDecoder<User> getDecoder() {
return DECODER;
}
public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder<User>(MODEL$, SCHEMA$, resolver);
}
public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
}
public static User fromByteBuffer(
java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
}
private java.lang.CharSequence name;
private java.lang.Integer favorite_number;
private java.lang.CharSequence favorite_color;
public User() {}
public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
this.name = name;
this.favorite_number = favorite_number;
this.favorite_color = favorite_color;
}
public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; }
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return name;
case 1: return favorite_number;
case 2: return favorite_color;
default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
}
}
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: name = (java.lang.CharSequence)value$; break;
case 1: favorite_number = (java.lang.Integer)value$; break;
case 2: favorite_color = (java.lang.CharSequence)value$; break;
default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
}
}
public java.lang.CharSequence getName() {
return name;
}
public void setName(java.lang.CharSequence value) {
this.name = value;
}
public java.lang.Integer getFavoriteNumber() {
return favorite_number;
}
public void setFavoriteNumber(java.lang.Integer value) {
this.favorite_number = value;
}
public java.lang.CharSequence getFavoriteColor() {
return favorite_color;
}
public void setFavoriteColor(java.lang.CharSequence value) {
this.favorite_color = value;
}
public static com.donny.avro.User.Builder newBuilder() {
return new com.donny.avro.User.Builder();
}
public static com.donny.avro.User.Builder newBuilder(com.donny.avro.User.Builder other) {
if (other == null) {
return new com.donny.avro.User.Builder();
} else {
return new com.donny.avro.User.Builder(other);
}
}
public static com.donny.avro.User.Builder newBuilder(com.donny.avro.User other) {
if (other == null) {
return new com.donny.avro.User.Builder();
} else {
return new com.donny.avro.User.Builder(other);
}
}
@org.apache.avro.specific.AvroGenerated
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
implements org.apache.avro.data.RecordBuilder<User> {
private java.lang.CharSequence name;
private java.lang.Integer favorite_number;
private java.lang.CharSequence favorite_color;
private Builder() {
super(SCHEMA$);
}
private Builder(com.donny.avro.User.Builder other) {
super(other);
if (isValidValue(fields()[0], other.name)) {
this.name = data().deepCopy(fields()[0].schema(), other.name);
fieldSetFlags()[0] = other.fieldSetFlags()[0];
}
if (isValidValue(fields()[1], other.favorite_number)) {
this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
fieldSetFlags()[1] = other.fieldSetFlags()[1];
}
if (isValidValue(fields()[2], other.favorite_color)) {
this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
fieldSetFlags()[2] = other.fieldSetFlags()[2];
}
}
private Builder(com.donny.avro.User other) {
super(SCHEMA$);
if (isValidValue(fields()[0], other.name)) {
this.name = data().deepCopy(fields()[0].schema(), other.name);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.favorite_number)) {
this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.favorite_color)) {
this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
fieldSetFlags()[2] = true;
}
}
public java.lang.CharSequence getName() {
return name;
}
public com.donny.avro.User.Builder setName(java.lang.CharSequence value) {
validate(fields()[0], value);
this.name = value;
fieldSetFlags()[0] = true;
return this;
}
public boolean hasName() {
return fieldSetFlags()[0];
}
public com.donny.avro.User.Builder clearName() {
name = null;
fieldSetFlags()[0] = false;
return this;
}
public java.lang.Integer getFavoriteNumber() {
return favorite_number;
}
public com.donny.avro.User.Builder setFavoriteNumber(java.lang.Integer value) {
validate(fields()[1], value);
this.favorite_number = value;
fieldSetFlags()[1] = true;
return this;
}
public boolean hasFavoriteNumber() {
return fieldSetFlags()[1];
}
public com.donny.avro.User.Builder clearFavoriteNumber() {
favorite_number = null;
fieldSetFlags()[1] = false;
return this;
}
public java.lang.CharSequence getFavoriteColor() {
return favorite_color;
}
public com.donny.avro.User.Builder setFavoriteColor(java.lang.CharSequence value) {
validate(fields()[2], value);
this.favorite_color = value;
fieldSetFlags()[2] = true;
return this;
}
public boolean hasFavoriteColor() {
return fieldSetFlags()[2];
}
public com.donny.avro.User.Builder clearFavoriteColor() {
favorite_color = null;
fieldSetFlags()[2] = false;
return this;
}
@Override
@SuppressWarnings("unchecked")
public User build() {
try {
User record = new User();
record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
return record;
} catch (org.apache.avro.AvroMissingFieldException e) {
throw e;
} catch (java.lang.Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumWriter<User>
WRITER$ = (org.apache.avro.io.DatumWriter<User>)MODEL$.createDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumReader<User>
READER$ = (org.apache.avro.io.DatumReader<User>)MODEL$.createDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
@Override protected boolean hasCustomCoders() { return true; }
@Override public void customEncode(org.apache.avro.io.Encoder out)
throws java.io.IOException
{
out.writeString(this.name);
if (this.favorite_number == null) {
out.writeIndex(1);
out.writeNull();
} else {
out.writeIndex(0);
out.writeInt(this.favorite_number);
}
if (this.favorite_color == null) {
out.writeIndex(1);
out.writeNull();
} else {
out.writeIndex(0);
out.writeString(this.favorite_color);
}
}
@Override public void customDecode(org.apache.avro.io.ResolvingDecoder in)
throws java.io.IOException
{
org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
if (fieldOrder == null) {
this.name = in.readString(this.name instanceof Utf8 ? (Utf8)this.name : null);
if (in.readIndex() != 0) {
in.readNull();
this.favorite_number = null;
} else {
this.favorite_number = in.readInt();
}
if (in.readIndex() != 0) {
in.readNull();
this.favorite_color = null;
} else {
this.favorite_color = in.readString(this.favorite_color instanceof Utf8 ? (Utf8)this.favorite_color : null);
}
} else {
for (int i = 0; i < 3; i++) {
switch (fieldOrder[i].pos()) {
case 0:
this.name = in.readString(this.name instanceof Utf8 ? (Utf8)this.name : null);
break;
case 1:
if (in.readIndex() != 0) {
in.readNull();
this.favorite_number = null;
} else {
this.favorite_number = in.readInt();
}
break;
case 2:
if (in.readIndex() != 0) {
in.readNull();
this.favorite_color = null;
} else {
this.favorite_color = in.readString(this.favorite_color instanceof Utf8 ? (Utf8)this.favorite_color : null);
}
break;
default:
throw new java.io.IOException("Corrupt ResolvingDecoder.");
}
}
}
}
}
Serialize .java 文件
package com.donny.avro;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import java.io.File;
import java.io.IOException;
public class Serialize {
public static void serializeUser() throws IOException {
User user1 = new User();
user1.setName("顧棟");
user1.setFavoriteNumber(6);
User user2 = new User("Ben", 7, "red");
User user3 = User.newBuilder()
.setName("Charlie")
.setFavoriteColor("blue")
.setFavoriteNumber(null)
.build();
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("src\\main\\avro\\users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
}
}
Deserialize.java 文件
package com.donny.avro;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import java.io.File;
import java.io.IOException;
public class Deserialize {
public static void Deserialize() throws IOException {
DatumReader<User> userDatumReader = new SpecificDatumReader<>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<>(new File("src\\main\\avro\\users.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
}
Client.java 文件
package com.donny.avro;
import java.io.IOException;
public class Client {
public static void main(String[] args) throws IOException {
Serialize.serializeUser();
Deserialize.Deserialize();
}
}
结果
{"name": "顧棟", "favorite_number": 6, "favorite_color": null}
{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
{"name": "Charlie", "favorite_number": null, "favorite_color": "blue"}
com.donny.schema 目录下之中的文件
SerializeUser.java 文件
package com.donny.schema;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import java.io.File;
import java.io.IOException;
public class SerializeUser {
public static void serializeUser() throws IOException {
Schema schema = new Schema.Parser().parse(new File("src\\main\\avro\\user.avsc"));
GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
File file = new File("src\\main\\avro\\users.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.close();
}
}
DeserializeUser.java 文件
package com.donny.schema;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import java.io.File;
import java.io.IOException;
public class DeserializeUser {
public static void Deserialize() throws IOException {
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(new Schema.Parser().parse(new File("src\\main\\avro\\user.avsc")));
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File("src\\main\\avro\\users.avro"), datumReader);
GenericRecord user = null;
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
}
Client .java 文件
package com.donny.schema;
import java.io.IOException;
public class Client {
public static void main(String[] args) throws IOException {
SerializeUser.serializeUser();
DeserializeUser.Deserialize();
}
}
结果
{"name": "Alyssa", "favorite_number": 256, "favorite_color": null}
{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
|