View Javadoc
1   
2   package org.apache.avro.avsc;
3   
4   
5   import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
6   import java.util.ArrayList;
7   import java.util.IdentityHashMap;
8   import java.util.List;
9   import java.util.function.Function;
10  import org.apache.avro.AvroTypeException;
11  import org.apache.avro.Schema;
12  import org.apache.avro.Schema.Field;
13  import org.apache.avro.SchemaAdapter;
14  
15  /**
16   * this visitor will create a clone of the original Schema and will also resolve all unresolved schemas
17   *
18   * by default. what attributes are copied is customizable.
19   */
20  @SuppressFBWarnings("FCCD_FIND_CLASS_CIRCULAR_DEPENDENCY")
21  public final class ResolvingVisitor implements SchemaVisitor<Schema> {
22  
23    private final IdentityHashMap<Schema, Schema> replace;
24    private final Function<String, Schema> symbolTable;
25    private final boolean  allowUndefinedLogicalTypes;
26    private final Schema root;
27  
28  
29    @SuppressFBWarnings("EI_EXPOSE_REP2")
30    public ResolvingVisitor(final Schema root, final IdentityHashMap<Schema, Schema> replace,
31            final Function<String, Schema> symbolTable, final boolean  allowUndefinedLogicalTypes) {
32      this.replace = replace;
33      this.symbolTable = symbolTable;
34      this.root = root;
35      this.allowUndefinedLogicalTypes = allowUndefinedLogicalTypes;
36    }
37  
38    @Override
39    public SchemaVisitorAction visitTerminal(final Schema terminal) {
40      if (replace.containsKey(terminal)) {
41        return SchemaVisitorAction.CONTINUE;
42      }
43      Schema.Type type = terminal.getType();
44      Schema newSchema;
45      switch (type) {
46        case RECORD: // recursion.
47        case ARRAY:
48        case MAP:
49        case UNION:
50          if (!replace.containsKey(terminal)) {
51            throw new IllegalStateException("Schema " + terminal + " must be already processed");
52          }
53          return SchemaVisitorAction.CONTINUE;
54        case BOOLEAN:
55        case BYTES:
56        case DOUBLE:
57        case FLOAT:
58        case INT:
59        case LONG:
60        case NULL:
61        case STRING:
62          newSchema = Schema.create(type);
63        break;
64        case ENUM:
65          newSchema = Schema.createEnum(terminal.getName(), terminal.getDoc(),
66                  terminal.getNamespace(), terminal.getEnumSymbols(), terminal.getEnumDefault());
67          break;
68        case FIXED:
69          newSchema = Schema.createFixed(terminal.getName(), terminal.getDoc(),
70                  terminal.getNamespace(), terminal.getFixedSize());
71          break;
72        default:
73          throw new IllegalStateException("Unsupported schema " + terminal);
74      }
75      copyAllProperties(terminal, newSchema);
76      materializeLogicalType(newSchema);
77      replace.put(terminal, newSchema);
78      return SchemaVisitorAction.CONTINUE;
79    }
80  
81    private void materializeLogicalType(final Schema schema) {
82      SchemaAdapter.parseLogicalType(schema, allowUndefinedLogicalTypes);
83    }
84  
85    public static void copyAllProperties(final Schema first, final Schema second) {
86      Schemas.copyLogicalTypes(first, second);
87      Schemas.copyAliases(first, second);
88      Schemas.copyProperties(first, second);
89    }
90  
91    public static void copyAllProperties(final Field first, final Field second) {
92      Schemas.copyAliases(first, second);
93      Schemas.copyProperties(first, second);
94    }
95  
96    @Override
97    @SuppressFBWarnings({"LEST_LOST_EXCEPTION_STACK_TRACE", "EXS_EXCEPTION_SOFTENING_NO_CHECKED"})
98    public SchemaVisitorAction visitNonTerminal(final Schema nt) {
99      Schema.Type type = nt.getType();
100     if  (type == Schema.Type.RECORD) {
101         if (replace.containsKey(nt)) {
102           return SchemaVisitorAction.SKIP_SUBTREE;
103         }
104         if (SchemaResolver.isUnresolvedSchema(nt)) {
105           // unresolved schema will get a replacement that we already encountered,
106           // or we will attempt to resolve.
107           final String unresolvedSchemaName = SchemaResolver.getUnresolvedSchemaName(nt);
108           Schema resSchema = symbolTable.apply(unresolvedSchemaName);
109           if (resSchema == null && unresolvedSchemaName.indexOf('.') < 0) { // try parent namespace
110             resSchema = symbolTable.apply(root.getNamespace() + '.' + unresolvedSchemaName);
111           }
112           if (resSchema == null) {
113             throw new AvroTypeException("Unable to resolve " + unresolvedSchemaName);
114           }
115           Schema replacement = replace.get(resSchema);
116           if (replacement == null) {
117             try {
118             replace.put(nt, Schemas.visit(resSchema, new ResolvingVisitor(resSchema,
119                     replace, symbolTable, allowUndefinedLogicalTypes)));
120             } catch (StackOverflowError err) {
121               throw new IllegalStateException("Stack overflow while resolving " + resSchema.getName());
122             }
123           } else {
124             replace.put(nt, replacement);
125           }
126         } else {
127           // create a fieldless clone. Fields will be added in afterVisitNonTerminal.
128           Schema newSchema = Schema.createRecord(nt.getName(), nt.getDoc(), nt.getNamespace(), nt.isError());
129           copyAllProperties(nt, newSchema);
130           replace.put(nt, newSchema);
131         }
132     }
133     return SchemaVisitorAction.CONTINUE;
134   }
135 
136   @Override
137   public SchemaVisitorAction afterVisitNonTerminal(final Schema nt) {
138      Schema.Type type = nt.getType();
139      Schema newSchema;
140      switch (type) {
141        case RECORD:
142          if (!SchemaResolver.isUnresolvedSchema(nt)) {
143             newSchema = replace.get(nt);
144             List<Schema.Field> fields = nt.getFields();
145             List<Schema.Field> newFields = new ArrayList<Schema.Field>(fields.size());
146             for (Schema.Field field : fields) {
147               Schema fieldSchema = field.schema();
148               Schema get = replace.get(fieldSchema);
149               if (get == null) {
150                 throw new RuntimeException("No replacement for " + fieldSchema);
151               }
152              Schema.Field newField = new Schema.Field(field.name(), get,
153                      field.doc(), field.defaultVal(), field.order());
154              copyAllProperties(field, newField);
155              newFields.add(newField);
156             }
157             newSchema.setFields(newFields);
158             materializeLogicalType(newSchema);
159          }
160          return SchemaVisitorAction.CONTINUE;
161        case UNION:
162           if (replace.containsKey(nt)) {
163             return SchemaVisitorAction.CONTINUE;
164           }
165           List<Schema> types = nt.getTypes();
166           List<Schema> newTypes = new ArrayList<Schema>(types.size());
167           for (Schema sch : types) {
168             newTypes.add(replace.get(sch));
169           }
170           newSchema = Schema.createUnion(newTypes);
171           break;
172        case ARRAY:
173         if (replace.containsKey(nt)) {
174             return SchemaVisitorAction.CONTINUE;
175          }
176          newSchema = Schema.createArray(replace.get(nt.getElementType()));
177          break;
178        case MAP:
179          if (replace.containsKey(nt)) {
180             return SchemaVisitorAction.CONTINUE;
181           }
182          newSchema = Schema.createMap(replace.get(nt.getValueType()));
183          break;
184        default:
185          throw new IllegalStateException("Illegal type " + type + ", schema " + nt);
186      }
187      copyAllProperties(nt, newSchema);
188      materializeLogicalType(newSchema);
189      replace.put(nt, newSchema);
190 
191      return SchemaVisitorAction.CONTINUE;
192   }
193 
194   @Override
195   public Schema get() {
196     return replace.get(root);
197   }
198 
199   @Override
200   public String toString() {
201     return "ResolvingVisitor{" + "replace=" + replace + ", symbolTable=" + symbolTable + ", root=" + root + '}';
202   }
203 
204 }