@@ -161,19 +161,19 @@ public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(final String jarName)
161161 }
162162
163163 public void checkPipePluginExistence (
164- final Map <String , String > extractorAttributes ,
164+ final Map <String , String > sourceAttributes ,
165165 final Map <String , String > processorAttributes ,
166- final Map <String , String > connectorAttributes ) {
167- final PipeParameters extractorParameters = new PipeParameters (extractorAttributes );
168- final String extractorPluginName =
169- extractorParameters .getStringOrDefault (
166+ final Map <String , String > sinkAttributes ) {
167+ final PipeParameters sourceParameters = new PipeParameters (sourceAttributes );
168+ final String sourcePluginName =
169+ sourceParameters .getStringOrDefault (
170170 Arrays .asList (PipeSourceConstant .EXTRACTOR_KEY , PipeSourceConstant .SOURCE_KEY ),
171171 IOTDB_EXTRACTOR .getPipePluginName ());
172- if (!pipePluginMetaKeeper .containsPipePlugin (extractorPluginName )) {
172+ if (!pipePluginMetaKeeper .containsPipePlugin (sourcePluginName )) {
173173 final String exceptionMessage =
174174 String .format (
175175 "Failed to create or alter pipe, the pipe extractor plugin %s does not exist" ,
176- extractorPluginName );
176+ sourcePluginName );
177177 LOGGER .warn (exceptionMessage );
178178 throw new PipeException (exceptionMessage );
179179 }
@@ -191,16 +191,16 @@ public void checkPipePluginExistence(
191191 throw new PipeException (exceptionMessage );
192192 }
193193
194- final PipeParameters connectorParameters = new PipeParameters (connectorAttributes );
195- final String connectorPluginName =
196- connectorParameters .getStringOrDefault (
194+ final PipeParameters sinkParameters = new PipeParameters (sinkAttributes );
195+ final String sinkPluginName =
196+ sinkParameters .getStringOrDefault (
197197 Arrays .asList (PipeSinkConstant .CONNECTOR_KEY , PipeSinkConstant .SINK_KEY ),
198198 IOTDB_THRIFT_CONNECTOR .getPipePluginName ());
199- if (!pipePluginMetaKeeper .containsPipePlugin (connectorPluginName )) {
199+ if (!pipePluginMetaKeeper .containsPipePlugin (sinkPluginName )) {
200200 final String exceptionMessage =
201201 String .format (
202202 "Failed to create or alter pipe, the pipe connector plugin %s does not exist" ,
203- connectorPluginName );
203+ sinkPluginName );
204204 LOGGER .warn (exceptionMessage );
205205 throw new PipeException (exceptionMessage );
206206 }
@@ -212,34 +212,29 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
212212 try {
213213 final PipePluginMeta pipePluginMeta = createPipePluginPlan .getPipePluginMeta ();
214214 final String pluginName = pipePluginMeta .getPluginName ();
215+ final String className = pipePluginMeta .getClassName ();
216+ final String jarName = pipePluginMeta .getJarName ();
215217
216218 // try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
217219 dropPipePlugin (new DropPipePluginPlan (pluginName ));
218220
219221 pipePluginMetaKeeper .addPipePluginMeta (pluginName , pipePluginMeta );
220- pipePluginMetaKeeper .addJarNameAndMd5 (
221- pipePluginMeta .getJarName (), pipePluginMeta .getJarMD5 ());
222+ pipePluginMetaKeeper .addJarNameAndMd5 (jarName , pipePluginMeta .getJarMD5 ());
222223
223224 if (createPipePluginPlan .getJarFile () != null ) {
224225 pipePluginExecutableManager .savePluginToInstallDir (
225- ByteBuffer .wrap (createPipePluginPlan .getJarFile ().getValues ()),
226- pluginName ,
227- pipePluginMeta .getJarName ());
228- final String pluginDirPath = pipePluginExecutableManager .getPluginsDirPath (pluginName );
229- final PipePluginClassLoader pipePluginClassLoader =
230- classLoaderManager .createPipePluginClassLoader (pluginDirPath );
231- try {
232- final Class <?> pluginClass =
233- Class .forName (pipePluginMeta .getClassName (), true , pipePluginClassLoader );
234- pipePluginMetaKeeper .addPipePluginVisibility (
235- pluginName , VisibilityUtils .calculateFromPluginClass (pluginClass ));
236- classLoaderManager .addPluginAndClassLoader (pluginName , pipePluginClassLoader );
237- } catch (final Exception e ) {
238- try {
239- pipePluginClassLoader .close ();
240- } catch (final Exception ignored ) {
241- }
242- throw e ;
226+ ByteBuffer .wrap (createPipePluginPlan .getJarFile ().getValues ()), pluginName , jarName );
227+ computeFromPluginClass (pluginName , className );
228+ } else {
229+ final String existed = pipePluginMetaKeeper .getPluginNameByJarName (jarName );
230+ if (Objects .nonNull (existed )) {
231+ pipePluginExecutableManager .linkExistedPlugin (existed , pluginName , jarName );
232+ computeFromPluginClass (pluginName , className );
233+ } else {
234+ throw new PipeException (
235+ String .format (
236+ "The %s's creation has not passed in jarName, which does not exist in other pipePlugins. Please check" ,
237+ pluginName ));
243238 }
244239 }
245240
@@ -255,6 +250,25 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
255250 }
256251 }
257252
253+ private void computeFromPluginClass (final String pluginName , final String className )
254+ throws Exception {
255+ final String pluginDirPath = pipePluginExecutableManager .getPluginsDirPath (pluginName );
256+ final PipePluginClassLoader pipePluginClassLoader =
257+ classLoaderManager .createPipePluginClassLoader (pluginDirPath );
258+ try {
259+ final Class <?> pluginClass = Class .forName (className , true , pipePluginClassLoader );
260+ pipePluginMetaKeeper .addPipePluginVisibility (
261+ pluginName , VisibilityUtils .calculateFromPluginClass (pluginClass ));
262+ classLoaderManager .addPluginAndClassLoader (pluginName , pipePluginClassLoader );
263+ } catch (final Exception e ) {
264+ try {
265+ pipePluginClassLoader .close ();
266+ } catch (final Exception ignored ) {
267+ }
268+ throw e ;
269+ }
270+ }
271+
258272 public TSStatus dropPipePlugin (final DropPipePluginPlan dropPipePluginPlan ) {
259273 final String pluginName = dropPipePluginPlan .getPluginName ();
260274
0 commit comments