IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kettel8.2升级elasticsearch为7.x -> 正文阅读

[大数据]kettel8.2升级elasticsearch为7.x

成品下载

下载kettel源码

pentaho-kettle-8.2.0.0-R.tar.gz

导入eclipse

导入工程

修改相关pom

1.修改工程根的pom.xml(红色部分)文件(解决编译时部分包找不到的问题)

<repository>
			<id>pentaho-public1</id>
			<name>/</name>
			<url>http://oss.sonatype.org/content/groups/public/</url>
		</repository>
		<repository>
			<id>pentaho-public2</id>
			<name>/</name>
			<url>https://nexus.pentaho.org/repository/proxy-public-release/</url>
		</repository>
		<repository>
			<id>pentaho-public3</id>
			<name>/</name>
			<url>https://nexus.pentaho.org/repository/proxy-public-snapshot/</url>
		</repository>

2.plugins目录,修改pom.xml文件内容

  <modules>
    <module>elasticsearch-bulk-insert</module>
  </modules>

3.plugins\elasticsearch-bulk-insert\core目录,修改pom.xml文件内容

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>

  <parent>
    <groupId>org.pentaho.di.plugins</groupId>
    <artifactId>elasticsearch-bulk-insert</artifactId>
    <version>8.2.0.0-342</version>
  </parent>

  <artifactId>elasticsearch-bulk-insert-core</artifactId>
  <version>8.2.0.0-342</version>

  <name>PDI Elasticsearch Bulk Insert Plugin Core</name>
  <description>Elasticsearch Bulk Insert Plugin</description>

  <properties>
    <pdi.version>8.2.0.0-342</pdi.version>
    <build.revision>${project.version}</build.revision>
    <timestamp>${maven.build.timestamp}</timestamp>
    <build.description>${project.description}</build.description>
    <maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
    <elasticsearch.version>7.3.0</elasticsearch.version>
  </properties>

  <dependencies>
    <dependency>
	    <groupId>org.elasticsearch</groupId>
	    <artifactId>elasticsearch</artifactId>
	    <version>${elasticsearch.version}</version>
	    <scope>compile</scope>
    </dependency>
    
    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>7.3.0</version>
      </dependency>

     <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>transport</artifactId>
        <version>${elasticsearch.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>pentaho-kettle</groupId>
      <artifactId>kettle-engine</artifactId>
      <version>${pdi.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>pentaho-kettle</groupId>
      <artifactId>kettle-core</artifactId>
      <version>${pdi.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>pentaho-kettle</groupId>
      <artifactId>kettle-ui-swt</artifactId>
      <version>${pdi.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.eclipse.swt</groupId>
      <artifactId>org.eclipse.swt.gtk.linux.x86_64</artifactId>
      <version>4.6</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.eclipse</groupId>
      <artifactId>jface</artifactId>
      <version>3.3.0-I20070606-0010</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-all</artifactId>
      <version>1.9.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>pentaho-kettle</groupId>
      <artifactId>kettle-engine</artifactId>
      <classifier>tests</classifier>
      <version>${pdi.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>pentaho-kettle</groupId>
      <artifactId>kettle-core</artifactId>
      <version>${pdi.version}</version>
      <classifier>tests</classifier>
      <scope>test</scope>
    </dependency>  
  </dependencies>

  <build>
    <resources>
      <resource>
        <filtering>true</filtering>
        <directory>src/main/resources</directory>
      </resource>
    </resources>
  </build>
</project>

修改源码

1.修改ElasticSearchBulk.java

package org.pentaho.di.trans.steps.elasticsearchbulk;

import java.io.IOException;

import java.net.UnknownHostException;

import java.util.ArrayList;

import java.util.Date;

import java.util.List;

import java.util.Map;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;

import org.apache.http.HttpHost;

import org.elasticsearch.action.DocWriteRequest.OpType;

import org.elasticsearch.action.bulk.BulkItemResponse;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.client.transport.NoNodeAvailableException;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.xcontent.XContentBuilder;

import org.elasticsearch.common.xcontent.XContentFactory;

import org.elasticsearch.common.xcontent.XContentType;

import org.pentaho.di.core.exception.KettleException;

import org.pentaho.di.core.exception.KettleStepException;

import org.pentaho.di.core.row.RowDataUtil;

import org.pentaho.di.core.row.RowMetaInterface;

import org.pentaho.di.core.row.ValueMetaInterface;

import org.pentaho.di.i18n.BaseMessages;

import org.pentaho.di.trans.Trans;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStep;

import org.pentaho.di.trans.step.StepDataInterface;

import org.pentaho.di.trans.step.StepInterface;

import org.pentaho.di.trans.step.StepMeta;

import org.pentaho.di.trans.step.StepMetaInterface;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta.Server;

/**

 * Does bulk insert of data into ElasticSearch

 *

 * @author webdetails

 * @since 16-02-2011

 */

public class ElasticSearchBulk extends BaseStep implements StepInterface {
  private static final String INSERT_ERROR_CODE = null;

  private static Class<?> PKG = ElasticSearchBulkMeta.class; // for i18n

  private ElasticSearchBulkMeta meta;

  private ElasticSearchBulkData data;

//  private Client client;

  private RestHighLevelClient client;

  private String index;

  private String type;

//  BulkRequestBuilder currentRequest;

  BulkRequest currentRequest = new BulkRequest();

  private int batchSize = 2;

  private boolean isJsonInsert = false;

  private int jsonFieldIdx = 0;

  private String idOutFieldName = null;

  private Integer idFieldIndex = null;

  private Long timeout = null;

  private TimeUnit timeoutUnit = TimeUnit.MILLISECONDS;

  private int numberOfErrors = 0;

//  private List<IndexRequestBuilder> requestsBuffer;

  private List<IndexRequest> requestsBuffer;

  private boolean stopOnError = true;

  private boolean useOutput = true;

  private Map<String, String> columnsToJson;

  private boolean hasFields;

  private IndexRequest.OpType opType = org.elasticsearch.action.DocWriteRequest.OpType.CREATE;

  public ElasticSearchBulk( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,

                            Trans trans ) {
    super( stepMeta, stepDataInterface, copyNr, transMeta, trans );

  }

  public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
    Object[] rowData = getRow();

    if ( rowData == null ) {
      if ( currentRequest != null && currentRequest.numberOfActions() > 0 ) {
        processBatch( false );

      }

      setOutputDone();

      return false;

    }

    if ( first ) {
      first = false;

      setupData();

//      currentRequest = client.prepareBulk();

//      requestsBuffer = new ArrayList<IndexRequestBuilder>( this.batchSize );

    //   try {
    //            client.bulk(currentRequest, RequestOptions.DEFAULT);

               // } catch (IOException e1) {
               //     rejectAllRows( e1.getLocalizedMessage() );

               //       String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

               //       logError( msg );

               //       throw new KettleStepException( msg, e1 );

               // }

      requestsBuffer = new ArrayList<IndexRequest>( this.batchSize );

      initFieldIndexes();

    }

    try {
      data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;

      return indexRow( data.inputRowMeta, rowData ) || !stopOnError;

    } catch ( KettleStepException e ) {
      throw e;

    } catch ( Exception e ) {
      rejectAllRows( e.getLocalizedMessage() );

      String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage() );

      logError( msg );

      throw new KettleStepException( msg, e );

    }

  }

  /**

   * Initialize <code>this.data</code>

   *

   * @throws KettleStepException

   */

  private void setupData() throws KettleStepException {
    data.nextBufferRowIdx = 0;

    data.inputRowMeta = getInputRowMeta().clone(); // only available after first getRow();

    data.inputRowBuffer = new Object[batchSize][];

    data.outputRowMeta = data.inputRowMeta.clone();

    meta.getFields( data.outputRowMeta, getStepname(), null, null, this, repository, metaStore );

  }

  private void initFieldIndexes() throws KettleStepException {
    if ( isJsonInsert ) {
      Integer idx = getFieldIdx( data.inputRowMeta, environmentSubstitute( meta.getJsonField() ) );

      if ( idx != null ) {
        jsonFieldIdx = idx.intValue();

      } else {
        throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.NoJsonField" ) );

      }

    }

    idOutFieldName = environmentSubstitute( meta.getIdOutField() );

    if ( StringUtils.isNotBlank( meta.getIdInField() ) ) {
      idFieldIndex = getFieldIdx( data.inputRowMeta, environmentSubstitute( meta.getIdInField() ) );

      if ( idFieldIndex == null ) {
        throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.InvalidIdField" ) );

      }

    } else {
      idFieldIndex = null;

    }

  }

  private static Integer getFieldIdx( RowMetaInterface rowMeta, String fieldName ) {
    if ( fieldName == null ) {
      return null;

    }

    for ( int i = 0; i < rowMeta.size(); i++ ) {
      String name = rowMeta.getValueMeta( i ).getName();

      if ( fieldName.equals( name ) ) {
        return i;

      }

    }

    return null;

  }

  /**

   * @param rowMeta The metadata for the row to be indexed

   * @param row     The data for the row to be indexed

   */

  private boolean indexRow( RowMetaInterface rowMeta, Object[] row ) throws KettleStepException {
    try {
//      IndexRequestBuilder requestBuilder = client.prepareIndex( index, type );

//      requestBuilder.setOpType( this.opType );

      IndexRequest indexRequest = new IndexRequest(index);

      indexRequest.type(type);

      indexRequest.opType(this.opType);

      if ( idFieldIndex != null ) {
//        requestBuilder.setId( "" + row[idFieldIndex] ); // "" just in case field isn't string

        indexRequest.id("" + row[idFieldIndex]);

      }

      if ( isJsonInsert ) {
//        addSourceFromJsonString( row, requestBuilder );

        addSourceFromJsonString( row, indexRequest );

      } else {
//        addSourceFromRowFields( requestBuilder, rowMeta, row );

        addSourceFromRowFields( indexRequest, rowMeta, row );

      }

// currentRequest = new BulkRequest();

//      currentRequest.add( requestBuilder );

//      requestsBuffer.add( requestBuilder );

      currentRequest.add( indexRequest );

      requestsBuffer.add( indexRequest );

      if ( currentRequest.numberOfActions() >= batchSize ) {
        return processBatch( true );

      } else {
        return true;

      }

    } catch ( KettleStepException e ) {
      throw e;

    } catch ( NoNodeAvailableException e ) {
      throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoNodesFound" ) );

    } catch ( Exception e ) {
      throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e

              .getLocalizedMessage() ), e );

    }

  }

//  /**

//   * @param row

//   * @param requestBuilder

//   */

//  private void addSourceFromJsonString( Object[] row, IndexRequestBuilder requestBuilder ) throws KettleStepException {
//    Object jsonString = row[jsonFieldIdx];

//    if ( jsonString instanceof byte[] ) {
//      requestBuilder.setSource( (byte[]) jsonString, XContentType.JSON );

//    } else if ( jsonString instanceof String ) {
//      requestBuilder.setSource( (String) jsonString, XContentType.JSON );

//    } else {
//      throw new KettleStepException( BaseMessages.getString( "ElasticSearchBulk.Error.NoJsonFieldFormat" ) );

//    }

//  }

  /**

   * @param row

   * @param IndexRequest

   */

  private void addSourceFromJsonString( Object[] row, IndexRequest indexRequest ) throws KettleStepException {
       Object jsonString = row[jsonFieldIdx];

       if ( jsonString instanceof byte[] ) {
                 indexRequest.source( (byte[]) jsonString, XContentType.JSON );

       } else if ( jsonString instanceof String ) {
                 indexRequest.source( (String) jsonString, XContentType.JSON );

       } else {
                 throw new KettleStepException( BaseMessages.getString( "ElasticSearchBulk.Error.NoJsonFieldFormat" ) );

       }

  }

//  /**

//   * @param requestBuilder

//   * @param rowMeta

//   * @param row

//   * @throws IOException

//   */

//  private void addSourceFromRowFields( IndexRequestBuilder requestBuilder, RowMetaInterface rowMeta, Object[] row )

//          throws IOException {
//    XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();

//

//    for ( int i = 0; i < rowMeta.size(); i++ ) {
//      if ( idFieldIndex != null && i == idFieldIndex ) { // skip id

//        continue;

//      }

//

//      ValueMetaInterface valueMeta = rowMeta.getValueMeta( i );

//      String name = hasFields ? columnsToJson.get( valueMeta.getName() ) : valueMeta.getName();

//      Object value = row[i];

//      if ( value instanceof Date && value.getClass() != Date.class ) {
//        Date subDate = (Date) value;

//        // create a genuine Date object, or jsonBuilder will not recognize it

//        value = new Date( subDate.getTime() );

//      }

//      if ( StringUtils.isNotBlank( name ) ) {
//        jsonBuilder.field( name, value );

//      }

//    }

//

//    jsonBuilder.endObject();

//    requestBuilder.setSource( jsonBuilder );

//  }

  /**

   * @param requestBuilder

   * @param rowMeta

   * @param row

   * @throws IOException

   */

  private void addSourceFromRowFields( IndexRequest indexRequest, RowMetaInterface rowMeta, Object[] row )

                 throws IOException {
       XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();

       for ( int i = 0; i < rowMeta.size(); i++ ) {
                 if ( idFieldIndex != null && i == idFieldIndex ) { // skip id

                          continue;

                 }          

                 ValueMetaInterface valueMeta = rowMeta.getValueMeta( i );

                 String name = hasFields ? columnsToJson.get( valueMeta.getName() ) : valueMeta.getName();

                 Object value = row[i];

                 if ( value instanceof Date && value.getClass() != Date.class ) {
                          Date subDate = (Date) value;

                          // create a genuine Date object, or jsonBuilder will not recognize it

                          value = new Date( subDate.getTime() );

                 }

                 if ( StringUtils.isNotBlank( name ) ) {
                          jsonBuilder.field( name, value );

                 }

       }

       jsonBuilder.endObject();

       indexRequest.source( jsonBuilder );

  }

  public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
    meta = (ElasticSearchBulkMeta) smi;

    data = (ElasticSearchBulkData) sdi;

    if ( super.init( smi, sdi ) ) {
      try {
        numberOfErrors = 0;

        initFromMeta();

        initClient();

        return true;

      } catch ( Exception e ) {
        logError( BaseMessages.getString( PKG, "ElasticSearchBulk.Log.ErrorOccurredDuringStepInitialize" )

                + e.getMessage() );

      }

      return true;

    }

    return false;

  }

  private void initFromMeta() {
    index = environmentSubstitute( meta.getIndex() );

    type = environmentSubstitute( meta.getType() );

    batchSize = meta.getBatchSizeInt( this );

    try {
      timeout = Long.parseLong( environmentSubstitute( meta.getTimeOut() ) );

    } catch ( NumberFormatException e ) {
      timeout = null;

    }

    timeoutUnit = meta.getTimeoutUnit();

    isJsonInsert = meta.isJsonInsert();

    useOutput = meta.isUseOutput();

    stopOnError = meta.isStopOnError();

    columnsToJson = meta.getFieldsMap();

    this.hasFields = columnsToJson.size() > 0;

    this.opType =

            StringUtils.isNotBlank( meta.getIdInField() ) && meta.isOverWriteIfSameId() ? OpType.INDEX : OpType.CREATE;

  }

  private boolean processBatch( boolean makeNew ) throws KettleStepException {
       BulkResponse response = null;

//    ActionFuture<BulkResponse> actionFuture = currentRequest.execute();

    try{
            response = client.bulk(currentRequest, RequestOptions.DEFAULT);

       } catch (IOException e1) {
                        rejectAllRows( e1.getLocalizedMessage() );

                     String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

                     logError( msg );

                     throw new KettleStepException( msg, e1 );

               }

    boolean responseOk = false;

//    try {
//      if ( timeout != null && timeoutUnit != null ) {
//        response = actionFuture.actionGet( timeout, timeoutUnit );

//      } else {
//        response = actionFuture.actionGet();

//      }

//    } catch ( ElasticsearchException e ) {
//      String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Error.BatchExecuteFail", e.getLocalizedMessage() );

//      if ( e instanceof ElasticsearchTimeoutException ) {
//        msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Error.Timeout" );

//      }

//      logError( msg );

//      rejectAllRows( msg );

//    }

    if ( response != null ) {
      responseOk = handleResponse( response );

      requestsBuffer.clear();

    } else { // have to assume all failed

      numberOfErrors += currentRequest.numberOfActions();

      setErrors( numberOfErrors );

    }

    if ( makeNew ) {
//      currentRequest = client.prepareBulk();

      try{
                  client.bulk(currentRequest, RequestOptions.DEFAULT);

         } catch (IOException e1) {
                        rejectAllRows( e1.getLocalizedMessage() );

                     String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

                     logError( msg );

                     throw new KettleStepException( msg, e1 );

               }

      data.nextBufferRowIdx = 0;

      data.inputRowBuffer = new Object[batchSize][];

    } else {
      currentRequest = null;

      data.inputRowBuffer = null;

    }

    return responseOk;

  }

  /**

   * @param response

   * @return <code>true</code> if no errors

   */

  private boolean handleResponse( BulkResponse response ) {
    boolean hasErrors = response.hasFailures();

    if ( hasErrors ) {
      logError( response.buildFailureMessage() );

    }

    int errorsInBatch = 0;

    if ( hasErrors || useOutput ) {
      for ( BulkItemResponse item : response ) {
        if ( item.isFailed() ) {
          // log

          logDetailed( item.getFailureMessage() );

          errorsInBatch++;

          if ( getStepMeta().isDoingErrorHandling() ) {
            rejectRow( item.getItemId(), item.getFailureMessage() );

          }

        } else if ( useOutput ) {
          if ( idOutFieldName != null ) {
            addIdToRow( item.getId(), item.getItemId() );

          }

          echoRow( item.getItemId() );

        }

      }

    }

    numberOfErrors += errorsInBatch;

    setErrors( numberOfErrors );

    int linesOK = currentRequest.numberOfActions() - errorsInBatch;

    if ( useOutput ) {
      setLinesOutput( getLinesOutput() + linesOK );

    } else {
      setLinesWritten( getLinesWritten() + linesOK );

    }

    return !hasErrors;

  }

  private void addIdToRow( String id, int rowIndex ) {
    data.inputRowBuffer[rowIndex] =

            RowDataUtil.resizeArray( data.inputRowBuffer[rowIndex], getInputRowMeta().size() + 1 );

    data.inputRowBuffer[rowIndex][getInputRowMeta().size()] = id;

  }

  /**

   * Send input row to output

   *

   * @param rowIndex

   */

  private void echoRow( int rowIndex ) {
    try {
      putRow( data.outputRowMeta, data.inputRowBuffer[rowIndex] );

    } catch ( KettleStepException e ) {
      logError( e.getLocalizedMessage() );

    } catch ( ArrayIndexOutOfBoundsException e ) {
      logError( e.getLocalizedMessage() );

    }

  }

  /**

   * Send input row to error.

   *

   * @param index

   * @param errorMsg

   */

  private void rejectRow( int index, String errorMsg ) {
    try {
      putError( getInputRowMeta(), data.inputRowBuffer[index], 1, errorMsg, null, INSERT_ERROR_CODE );

    } catch ( KettleStepException e ) {
      logError( e.getLocalizedMessage() );

    } catch ( ArrayIndexOutOfBoundsException e ) {
      logError( e.getLocalizedMessage() );

    }

  }

  private void rejectAllRows( String errorMsg ) {
    for ( int i = 0; i < data.nextBufferRowIdx; i++ ) {
      rejectRow( i, errorMsg );

    }

  }

  private void initClient() throws UnknownHostException {
    Settings.Builder settingsBuilder = Settings.builder();

    settingsBuilder.put( Settings.Builder.EMPTY_SETTINGS );

    meta.getSettingsMap().entrySet().stream().forEach( ( s ) -> settingsBuilder.put( s.getKey(),

            environmentSubstitute( s.getValue() ) ) );

//    PreBuiltTransportClient tClient = new PreBuiltTransportClient( settingsBuilder.build() );

//

//    for ( Server server : meta.getServers() ) {
//      tClient.addTransportAddress( new TransportAddress(

//              InetAddress.getByName( environmentSubstitute( server.getAddress() ) ),

//              server.getPort() ) );

//    }

//

//    client = tClient;

    RestHighLevelClient rclient=null;

    for ( Server server : meta.getServers() ) {
          rclient = new RestHighLevelClient(RestClient.builder(new HttpHost(server.getAddress(), Integer.valueOf(server.getPort()), "http")));

    }

    client = rclient;

    /** With the upgrade to elasticsearch 6.3.0, removed the NodeBuilder,

     *  which was removed from the elasticsearch 5.0 API, see:

     *  https://www.elastic.co/guide/en/elasticsearch/reference/5.0/breaking_50_java_api_changes

     *  .html#_nodebuilder_removed

     */

  }

  private void disposeClient() throws IOException{
    if ( client != null ) {
      client.close();

    }

  }

  public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
    meta = (ElasticSearchBulkMeta) smi;

    data = (ElasticSearchBulkData) sdi;

    try {
      disposeClient();

    } catch ( Exception e ) {
      logError( e.getLocalizedMessage(), e );

    }

    super.dispose( smi, sdi );

  }

}
 

2.修改ElasticSearchBulkDialog.java

package org.pentaho.di.ui.trans.steps.elasticsearchbulk;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import org.apache.http.HttpHost;

import org.eclipse.swt.SWT;

import org.eclipse.swt.custom.CTabFolder;

import org.eclipse.swt.custom.CTabItem;

import org.eclipse.swt.events.FocusListener;

import org.eclipse.swt.events.ModifyEvent;

import org.eclipse.swt.events.ModifyListener;

import org.eclipse.swt.events.SelectionAdapter;

import org.eclipse.swt.events.SelectionEvent;

import org.eclipse.swt.events.SelectionListener;

import org.eclipse.swt.events.ShellAdapter;

import org.eclipse.swt.events.ShellEvent;

import org.eclipse.swt.layout.FormAttachment;

import org.eclipse.swt.layout.FormData;

import org.eclipse.swt.layout.FormLayout;

import org.eclipse.swt.widgets.Button;

import org.eclipse.swt.widgets.Composite;

import org.eclipse.swt.widgets.Control;

import org.eclipse.swt.widgets.Display;

import org.eclipse.swt.widgets.Event;

import org.eclipse.swt.widgets.Group;

import org.eclipse.swt.widgets.Label;

import org.eclipse.swt.widgets.Listener;

import org.eclipse.swt.widgets.MessageBox;

import org.eclipse.swt.widgets.Shell;

import org.eclipse.swt.widgets.Text;

import org.elasticsearch.action.admin.indices.get.GetIndexRequest;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.client.core.MainResponse;

import org.elasticsearch.client.transport.NoNodeAvailableException;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.discovery.MasterNotDiscoveredException;

import org.pentaho.di.core.Const;

import org.pentaho.di.core.Props;

import org.pentaho.di.core.exception.KettleException;

import org.pentaho.di.core.row.RowMetaInterface;

import org.pentaho.di.i18n.BaseMessages;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStepMeta;

import org.pentaho.di.trans.step.StepDialogInterface;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta.Server;

import org.pentaho.di.ui.core.dialog.ErrorDialog;

import org.pentaho.di.ui.core.widget.ColumnInfo;

import org.pentaho.di.ui.core.widget.LabelComboVar;

import org.pentaho.di.ui.core.widget.LabelTextVar;

import org.pentaho.di.ui.core.widget.TableView;

import org.pentaho.di.ui.core.widget.TextVar;

import org.pentaho.di.ui.trans.step.BaseStepDialog;

public class ElasticSearchBulkDialog extends BaseStepDialog implements StepDialogInterface {
  private ElasticSearchBulkMeta model;

  private static Class<?> PKG = ElasticSearchBulkMeta.class;

  private CTabFolder wTabFolder;

  private FormData fdTabFolder;

  private CTabItem wGeneralTab;

  private Composite wGeneralComp;

  private FormData fdGeneralComp;

  private Label wlBatchSize;

  private TextVar wBatchSize;

  private LabelTextVar wIdOutField;

  private Group wIndexGroup;

  private FormData fdIndexGroup;

  private Group wSettingsGroup;

  private FormData fdSettingsGroup;

  private String[] fieldNames;

  private CTabItem wFieldsTab;

  private LabelTextVar wIndex;

  private LabelTextVar wType;

  private ModifyListener lsMod;

  private Button wIsJson;

  private Label wlIsJson;

  private Label wlUseOutput;

  private Button wUseOutput;

  private LabelComboVar wJsonField;

  private TableView wFields;

  private CTabItem wServersTab;

  private TableView wServers;

  private CTabItem wSettingsTab;

  private TableView wSettings;

  private LabelTimeComposite wTimeOut;

  private Label wlStopOnError;

  private Button wStopOnError;

  private Button wTest;

  private Button wTestCl;

  private LabelComboVar wIdInField;

  private Button wIsOverwrite;

  private Label wlIsOverwrite;

  public ElasticSearchBulkDialog( Shell parent, Object in, TransMeta transMeta, String sname ) {
    super( parent, (BaseStepMeta) in, transMeta, sname );

    model = (ElasticSearchBulkMeta) in;

  }

  public String open() {
    Shell parent = getParent();

    Display display = parent.getDisplay();

    shell = new Shell( parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN );

    props.setLook( shell );

    setShellImage( shell, model );

    lsMod = new ModifyListener() {
      public void modifyText( ModifyEvent e ) {
        model.setChanged();

      }

    };

    changed = model.hasChanged();

    FormLayout formLayout = new FormLayout();

    formLayout.marginWidth = Const.FORM_MARGIN;

    formLayout.marginHeight = Const.FORM_MARGIN;

    shell.setLayout( formLayout );

    shell.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.DialogTitle" ) );

    int middle = props.getMiddlePct();

    int margin = Const.MARGIN;

    // Stepname line

    wlStepname = new Label( shell, SWT.RIGHT );

    wlStepname.setText( BaseMessages.getString( PKG, "System.Label.StepName" ) );

    props.setLook( wlStepname );

    fdlStepname = new FormData();

    fdlStepname.left = new FormAttachment( 0, 0 );

    fdlStepname.top = new FormAttachment( 0, margin );

    fdlStepname.right = new FormAttachment( middle, -margin );

    wlStepname.setLayoutData( fdlStepname );

    wStepname = new Text( shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER );

    wStepname.setText( stepname );

    props.setLook( wStepname );

    wStepname.addModifyListener( lsMod );

    fdStepname = new FormData();

    fdStepname.left = new FormAttachment( middle, 0 );

    fdStepname.top = new FormAttachment( 0, margin );

    fdStepname.right = new FormAttachment( 100, 0 );

    wStepname.setLayoutData( fdStepname );

    wTabFolder = new CTabFolder( shell, SWT.BORDER );

    props.setLook( wTabFolder, Props.WIDGET_STYLE_TAB );

    // GENERAL TAB

    addGeneralTab();

    // Servers TAB

    addServersTab();

    // Fields TAB

    addFieldsTab();

    // Settings TAB

    addSettingsTab();

    //

    // BUTTONS //

    // //

    wOK = new Button( shell, SWT.PUSH );

    wOK.setText( BaseMessages.getString( PKG, "System.Button.OK" ) );

    wCancel = new Button( shell, SWT.PUSH );

    wCancel.setText( BaseMessages.getString( PKG, "System.Button.Cancel" ) );

    setButtonPositions( new Button[]{wOK, wCancel}, margin, null );

    fdTabFolder = new FormData();

    fdTabFolder.left = new FormAttachment( 0, 0 );

    fdTabFolder.top = new FormAttachment( wStepname, margin );

    fdTabFolder.right = new FormAttachment( 100, 0 );

    fdTabFolder.bottom = new FormAttachment( wOK, -margin );

    wTabFolder.setLayoutData( fdTabFolder );

    // //

    // Std Listeners //

    //

    addStandardListeners();

    wTabFolder.setSelection( 0 );

    // Set the shell size, based upon previous time...

    setSize();

    getData( model );

    model.setChanged( changed );

    shell.open();

    while ( !shell.isDisposed() ) {
      if ( !display.readAndDispatch() ) {
        display.sleep();

      }

    }

    return stepname;

  }

  private void addStandardListeners() {
    // Add listeners

    lsOK = new Listener() {
      public void handleEvent( Event e ) {
        ok();

      }

    };

    lsCancel = new Listener() {
      public void handleEvent( Event e ) {
        cancel();

      }

    };

    lsMod = new ModifyListener() {
      public void modifyText( ModifyEvent event ) {
        model.setChanged();

      }

    };

    wOK.addListener( SWT.Selection, lsOK );

    wCancel.addListener( SWT.Selection, lsCancel );

    lsDef = new SelectionAdapter() {
      public void widgetDefaultSelected( SelectionEvent e ) {
        ok();

      }

    };

    wStepname.addSelectionListener( lsDef );

    // window close

    shell.addShellListener( new ShellAdapter() {
      public void shellClosed( ShellEvent e ) {
        cancel();

      }

    } );

  }

  /**

   */

  private void addGeneralTab() {
    wGeneralTab = new CTabItem( wTabFolder, SWT.NONE );

    wGeneralTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.General.Tab" ) );

    wGeneralComp = new Composite( wTabFolder, SWT.NONE );

    props.setLook( wGeneralComp );

    FormLayout generalLayout = new FormLayout();

    generalLayout.marginWidth = 3;

    generalLayout.marginHeight = 3;

    wGeneralComp.setLayout( generalLayout );

    // Index GROUP

    fillIndexGroup( wGeneralComp );

    // Options GROUP

    fillOptionsGroup( wGeneralComp );

    fdGeneralComp = new FormData();

    fdGeneralComp.left = new FormAttachment( 0, 0 );

    fdGeneralComp.top = new FormAttachment( wStepname, Const.MARGIN );

    fdGeneralComp.right = new FormAttachment( 100, 0 );

    fdGeneralComp.bottom = new FormAttachment( 100, 0 );

    wGeneralComp.setLayoutData( fdGeneralComp );

    wGeneralComp.layout();

    wGeneralTab.setControl( wGeneralComp );

  }

  private void fillIndexGroup( Composite parentTab ) {
    wIndexGroup = new Group( parentTab, SWT.SHADOW_NONE );

    props.setLook( wIndexGroup );

    wIndexGroup.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IndexGroup.Label" ) );

    FormLayout indexGroupLayout = new FormLayout();

    indexGroupLayout.marginWidth = 10;

    indexGroupLayout.marginHeight = 10;

    wIndexGroup.setLayout( indexGroupLayout );

    // Index

    wIndex = new LabelTextVar( transMeta, wIndexGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Index"

            + ".Label" ), BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Index.Tooltip" ) );

    wIndex.addModifyListener( lsMod );

    // Type

    wType =

            new LabelTextVar( transMeta, wIndexGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Type"

                    + ".Label" ),

                    BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Type.Tooltip" ) );

    wType.addModifyListener( lsMod );

    // Test button

    wTest = new Button( wIndexGroup, SWT.PUSH );

    wTest.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.Label" ) );

    wTest.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.Tooltip" ) );

    wTest.addListener( SWT.Selection, new Listener() {
      public void handleEvent( Event arg0 ) {
        test( TestType.INDEX );

      }

    } );

    Control[] connectionControls = new Control[]{wIndex, wType};

    placeControls( wIndexGroup, connectionControls );

    BaseStepDialog.positionBottomButtons( wIndexGroup, new Button[]{wTest}, Const.MARGIN, wType );

    fdIndexGroup = new FormData();

    fdIndexGroup.left = new FormAttachment( 0, Const.MARGIN );

    fdIndexGroup.top = new FormAttachment( wStepname, Const.MARGIN );

    fdIndexGroup.right = new FormAttachment( 100, -Const.MARGIN );

    wIndexGroup.setLayoutData( fdIndexGroup );

  }

  private void addServersTab() {
    wServersTab = new CTabItem( wTabFolder, SWT.NONE );

    wServersTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.TabTitle" ) );

    FormLayout serversLayout = new FormLayout();

    serversLayout.marginWidth = Const.FORM_MARGIN;

    serversLayout.marginHeight = Const.FORM_MARGIN;

    Composite wServersComp = new Composite( wTabFolder, SWT.NONE );

    wServersComp.setLayout( serversLayout );

    props.setLook( wServersComp );

    // Test button

    wTestCl = new Button( wServersComp, SWT.PUSH );

    wTestCl.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.Label" ) );

    wTestCl.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.Tooltip" ) );

    wTestCl.addListener( SWT.Selection, new Listener() {
      public void handleEvent( Event arg0 ) {
        test( TestType.CLUSTER );

      }

    } );

    setButtonPositions( new Button[]{wTestCl}, Const.MARGIN, null );

    ColumnInfo[] columnsMeta = new ColumnInfo[2];

    columnsMeta[0] =

            new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.Address.Column" ),

                    ColumnInfo.COLUMN_TYPE_TEXT, false );

    columnsMeta[0].setUsingVariables( true );

    columnsMeta[1] =

            new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.Port.Column" ),

                    ColumnInfo.COLUMN_TYPE_TEXT, true );

    wServers =

            new TableView( transMeta, wServersComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta, 1, lsMod,

                    props );

    FormData fdServers = new FormData();

    fdServers.left = new FormAttachment( 0, Const.MARGIN );

    fdServers.top = new FormAttachment( 0, Const.MARGIN );

    fdServers.right = new FormAttachment( 100, -Const.MARGIN );

    fdServers.bottom = new FormAttachment( wTestCl, -Const.MARGIN );

    wServers.setLayoutData( fdServers );

    FormData fdServersComp = new FormData();

    fdServersComp.left = new FormAttachment( 0, 0 );

    fdServersComp.top = new FormAttachment( 0, 0 );

    fdServersComp.right = new FormAttachment( 100, 0 );

    fdServersComp.bottom = new FormAttachment( 100, 0 );

    wServersComp.setLayoutData( fdServersComp );

    wServersComp.layout();

    wServersTab.setControl( wServersComp );

  }

  private void addSettingsTab() {
    wSettingsTab = new CTabItem( wTabFolder, SWT.NONE );

    wSettingsTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.TabTitle" ) );

    FormLayout serversLayout = new FormLayout();

    serversLayout.marginWidth = Const.FORM_MARGIN;

    serversLayout.marginHeight = Const.FORM_MARGIN;

    Composite wSettingsComp = new Composite( wTabFolder, SWT.NONE );

    wSettingsComp.setLayout( serversLayout );

    props.setLook( wSettingsComp );

    ColumnInfo[] columnsMeta = new ColumnInfo[2];

    columnsMeta[0] =

            new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.Property.Column" ),

                    ColumnInfo.COLUMN_TYPE_TEXT, false );

    columnsMeta[1] =

            new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.Value.Column" ),

                    ColumnInfo.COLUMN_TYPE_TEXT, false );

    columnsMeta[1].setUsingVariables( true );

    wSettings =

            new TableView( transMeta, wSettingsComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta, 1, lsMod,

                    props );

    FormData fdServers = new FormData();

    fdServers.left = new FormAttachment( 0, Const.MARGIN );

    fdServers.top = new FormAttachment( 0, Const.MARGIN );

    fdServers.right = new FormAttachment( 100, -Const.MARGIN );

    fdServers.bottom = new FormAttachment( 100, -Const.MARGIN );

    wSettings.setLayoutData( fdServers );

    FormData fdServersComp = new FormData();

    fdServersComp.left = new FormAttachment( 0, 0 );

    fdServersComp.top = new FormAttachment( 0, 0 );

    fdServersComp.right = new FormAttachment( 100, 0 );

    fdServersComp.bottom = new FormAttachment( 100, 0 );

    wSettingsComp.setLayoutData( fdServersComp );

    wSettingsComp.layout();

    wSettingsTab.setControl( wSettingsComp );

  }

  private void addFieldsTab() {
    wFieldsTab = new CTabItem( wTabFolder, SWT.NONE );

    wFieldsTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FieldsTab.TabTitle" ) );

    FormLayout fieldsLayout = new FormLayout();

    fieldsLayout.marginWidth = Const.FORM_MARGIN;

    fieldsLayout.marginHeight = Const.FORM_MARGIN;

    Composite wFieldsComp = new Composite( wTabFolder, SWT.NONE );

    wFieldsComp.setLayout( fieldsLayout );

    props.setLook( wFieldsComp );

    wGet = new Button( wFieldsComp, SWT.PUSH );

    wGet.setText( BaseMessages.getString( PKG, "System.Button.GetFields" ) );

    wGet.setToolTipText( BaseMessages.getString( PKG, "System.Tooltip.GetFields" ) );

    lsGet = new Listener() {
      public void handleEvent( Event e ) {
        getPreviousFields( wFields );

      }

    };

    wGet.addListener( SWT.Selection, lsGet );

    setButtonPositions( new Button[]{wGet}, Const.MARGIN, null );

    final int fieldsRowCount = model.getFields().size();

    String[] names = this.fieldNames != null ? this.fieldNames : new String[]{""};

    ColumnInfo[] columnsMeta = new ColumnInfo[2];

    columnsMeta[0] =

            new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.NameColumn.Column" ),

                    ColumnInfo.COLUMN_TYPE_CCOMBO, names, false );

    columnsMeta[1] =

            new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TargetNameColumn.Column" ),

                    ColumnInfo.COLUMN_TYPE_TEXT, false );

    wFields =

            new TableView( transMeta, wFieldsComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta,

                    fieldsRowCount,

                    lsMod, props );

    FormData fdFields = new FormData();

    fdFields.left = new FormAttachment( 0, Const.MARGIN );

    fdFields.top = new FormAttachment( 0, Const.MARGIN );

    fdFields.right = new FormAttachment( 100, -Const.MARGIN );

    fdFields.bottom = new FormAttachment( wGet, -Const.MARGIN );

    wFields.setLayoutData( fdFields );

    FormData fdFieldsComp = new FormData();

    fdFieldsComp.left = new FormAttachment( 0, 0 );

    fdFieldsComp.top = new FormAttachment( 0, 0 );

    fdFieldsComp.right = new FormAttachment( 100, 0 );

    fdFieldsComp.bottom = new FormAttachment( 100, 0 );

    wFieldsComp.setLayoutData( fdFieldsComp );

    wFieldsComp.layout();

    wFieldsTab.setControl( wFieldsComp );

  }

  private void fillOptionsGroup( Composite parentTab ) {
    int margin = Const.MARGIN;

    wSettingsGroup = new Group( parentTab, SWT.SHADOW_NONE );

    props.setLook( wSettingsGroup );

    wSettingsGroup.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsGroup.Label" ) );

    FormLayout settingGroupLayout = new FormLayout();

    settingGroupLayout.marginWidth = 10;

    settingGroupLayout.marginHeight = 10;

    wSettingsGroup.setLayout( settingGroupLayout );

    // Timeout

    wTimeOut =

            new LabelTimeComposite( wSettingsGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TimeOut"

                    + ".Label" ),

                    BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TimeOut.Tooltip" ) );

    props.setLook( wTimeOut );

    wTimeOut.addModifyListener( lsMod );

    // BatchSize

    wlBatchSize = new Label( wSettingsGroup, SWT.RIGHT );

    wlBatchSize.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.BatchSize.Label" ) );

    props.setLook( wlBatchSize );

    wBatchSize = new TextVar( transMeta, wSettingsGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );

    props.setLook( wBatchSize );

    wBatchSize.addModifyListener( lsMod );

    // Stop on error

    wlStopOnError = new Label( wSettingsGroup, SWT.RIGHT );

    wlStopOnError.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.StopOnError.Label" ) );

    wStopOnError = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

    wStopOnError.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.StopOnError.Tooltip" ) );

    wStopOnError.addSelectionListener( new SelectionListener() {
      public void widgetDefaultSelected( SelectionEvent arg0 ) {
        widgetSelected( arg0 );

      }

      public void widgetSelected( SelectionEvent arg0 ) {
        model.setChanged();

      }

    } );

    // ID input

    wIdInField =

            new LabelComboVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

                    "ElasticSearchBulkDialog.IdField.Label" ), BaseMessages.getString( PKG,

                    "ElasticSearchBulkDialog.IdField.Tooltip" ) );

    props.setLook( wIdInField );

    wIdInField.getComboWidget().setEditable( true );

    wIdInField.addModifyListener( lsMod );

    wIdInField.addFocusListener( new FocusListener() {
      public void focusLost( org.eclipse.swt.events.FocusEvent e ) {
      }

      public void focusGained( org.eclipse.swt.events.FocusEvent e ) {
        getPreviousFields( wIdInField );

      }

    } );

    getPreviousFields( wIdInField );

    wlIsOverwrite = new Label( wSettingsGroup, SWT.RIGHT );

    wlIsOverwrite.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Overwrite.Label" ) );

    wIsOverwrite = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

    wIsOverwrite.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Overwrite.Tooltip" ) );

    wIsOverwrite.addSelectionListener( new SelectionListener() {
      public void widgetDefaultSelected( SelectionEvent arg0 ) {
        widgetSelected( arg0 );

      }

          

      public void widgetSelected( SelectionEvent arg0 ) {
        model.setChanged();

      }

    } );

    // Output rows

    wlUseOutput = new Label( wSettingsGroup, SWT.RIGHT );

    wlUseOutput.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.UseOutput.Label" ) );

    wUseOutput = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

    wUseOutput.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.UseOutput.Tooltip" ) );

    wUseOutput.addSelectionListener( new SelectionListener() {
      public void widgetDefaultSelected( SelectionEvent arg0 ) {
        widgetSelected( arg0 );

      }

      public void widgetSelected( SelectionEvent arg0 ) {
        wIdOutField.setEnabled( wUseOutput.getSelection() );

        model.setChanged();

      }

    } );

    // ID out field

    wIdOutField =

            new LabelTextVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

                    "ElasticSearchBulkDialog.IdOutField.Label" ), BaseMessages.getString( PKG,

                    "ElasticSearchBulkDialog.IdOutField.Tooltip" ) );

    props.setLook( wIdOutField );

    wIdOutField.setEnabled( wUseOutput.getSelection() );

    wIdOutField.addModifyListener( lsMod );

    // use json

    wlIsJson = new Label( wSettingsGroup, SWT.RIGHT );

    wlIsJson.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IsJson.Label" ) );

    wIsJson = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

    wIsJson.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IsJson.Tooltip" ) );

    wIsJson.addSelectionListener( new SelectionListener() {
                 

      public void widgetDefaultSelected( SelectionEvent arg0 ) {
        widgetSelected( arg0 );

      }

      public void widgetSelected( SelectionEvent arg0 ) {
        wJsonField.setEnabled( wIsJson.getSelection() );

        wFields.setEnabled( !wIsJson.getSelection() );

        wFields.setVisible( !wIsJson.getSelection() );

        wGet.setEnabled( !wIsJson.getSelection() );

        model.setChanged();

      }

    } );

    // Json field

    wJsonField =

            new LabelComboVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

                    "ElasticSearchBulkDialog.JsonField.Label" ), BaseMessages.getString( PKG,

                    "ElasticSearchBulkDialog.JsonField.Tooltip" ) );

    wJsonField.getComboWidget().setEditable( true );

    props.setLook( wJsonField );

    wJsonField.addModifyListener( lsMod );

    wJsonField.addFocusListener( new FocusListener() {
      public void focusLost( org.eclipse.swt.events.FocusEvent e ) {
      }

      public void focusGained( org.eclipse.swt.events.FocusEvent e ) {
        getPreviousFields( wJsonField );

      }

    } );

    getPreviousFields( wJsonField );

    wJsonField.setEnabled( wIsJson.getSelection() );

    Control[] settingsControls = new Control[]{wlBatchSize, wBatchSize, wlStopOnError, wStopOnError, wTimeOut,

      wIdInField, wlIsOverwrite, wIsOverwrite, wlUseOutput, wUseOutput, wIdOutField, wlIsJson, wIsJson,

      wJsonField};

    placeControls( wSettingsGroup, settingsControls );

    fdSettingsGroup = new FormData();

    fdSettingsGroup.left = new FormAttachment( 0, margin );

    fdSettingsGroup.top = new FormAttachment( wIndexGroup, margin );

    fdSettingsGroup.right = new FormAttachment( 100, -margin );

    wSettingsGroup.setLayoutData( fdSettingsGroup );

  }

  private void getPreviousFields( LabelComboVar combo ) {
    String value = combo.getText();

    combo.removeAll();

    combo.setItems( getInputFieldNames() );

    if ( value != null ) {
      combo.setText( value );

    }

  }

  private String[] getInputFieldNames() {
    if ( this.fieldNames == null ) {
      try {
        RowMetaInterface r = transMeta.getPrevStepFields( stepname );

        if ( r != null ) {
          fieldNames = r.getFieldNames();

        }

      } catch ( KettleException ke ) {
        new ErrorDialog( shell, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FailedToGetFields.DialogTitle" ),

                BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FailedToGetFields.DialogMessage" ), ke );

        return new String[0];

      }

    }

    return fieldNames;

  }

  private void getPreviousFields( TableView table ) {
    try {
      RowMetaInterface r = transMeta.getPrevStepFields( stepname );

      if ( r != null ) {
        BaseStepDialog.getFieldsFromPrevious( r, table, 1, new int[]{1, 2}, null, 0, 0, null );

      }

    } catch ( KettleException ke ) {
      new ErrorDialog( shell, BaseMessages.getString( PKG, "System.Dialog.GetFieldsFailed.Title" ), BaseMessages

              .getString( PKG, "System.Dialog.GetFieldsFailed.Message" ), ke );

    }

  }

  private void placeControls( Group group, Control[] controls ) {
    Control previousAbove = group;

    Control previousLeft = group;

    for ( Control control : controls ) {
      if ( control instanceof Label ) {
        addLabelAfter( control, previousAbove );

        previousLeft = control;

      } else {
        addWidgetAfter( control, previousAbove, previousLeft );

        previousAbove = control;

        previousLeft = group;

      }

    }

  }

  private void addWidgetAfter( Control widget, Control widgetAbove, Control widgetLeft ) {
    props.setLook( widget );

    FormData fData = new FormData();

    fData.left = new FormAttachment( widgetLeft, Const.MARGIN );

    fData.top = new FormAttachment( widgetAbove, Const.MARGIN );

    fData.right = new FormAttachment( 100, -Const.MARGIN );

    widget.setLayoutData( fData );

  }

  private void addLabelAfter( Control widget, Control widgetAbove ) {
    props.setLook( widget );

    FormData fData = new FormData();

    fData.top = new FormAttachment( widgetAbove, Const.MARGIN );

    fData.right = new FormAttachment( Const.MIDDLE_PCT, -Const.MARGIN );

    widget.setLayoutData( fData );

  }

  /**

   * Read the data from the ElasticSearchBulkMeta object and show it in this dialog.

   *

   * @param in The ElasticSearchBulkMeta object to obtain the data from.

   */

  public void getData( ElasticSearchBulkMeta in ) {
    wIndex.setText( Const.NVL( in.getIndex(), "" ) );

    wType.setText( Const.NVL( in.getType(), "" ) );

    wBatchSize.setText( Const.NVL( in.getBatchSize(), "" + ElasticSearchBulkMeta.DEFAULT_BATCH_SIZE ) );

    wStopOnError.setSelection( in.isStopOnError() );

    wTimeOut.setText( Const.NVL( in.getTimeOut(), "" ) );

    wTimeOut.setTimeUnit( in.getTimeoutUnit() );

    wIdInField.setText( Const.NVL( in.getIdInField(), "" ) );

    wIsOverwrite.setSelection( in.isOverWriteIfSameId() );

    wIsJson.setSelection( in.isJsonInsert() );

    wJsonField.setText( Const.NVL( in.getJsonField(), "" ) );

    wJsonField.setEnabled( wIsJson.getSelection() ); // listener not working here

    wUseOutput.setSelection( in.isUseOutput() );

    wIdOutField.setText( Const.NVL( in.getIdOutField(), "" ) );

    wIdOutField.setEnabled( wUseOutput.getSelection() ); // listener not working here

    // Fields

    mapToTableView( model.getFieldsMap(), wFields );

    // Servers

    for ( ElasticSearchBulkMeta.Server server : model.getServers() ) {
      wServers.add( server.address, "" + server.port );

    }

    wServers.removeEmptyRows();

    wServers.setRowNums();

    // Settings

    mapToTableView( model.getSettingsMap(), wSettings );

    wStepname.selectAll();

    wStepname.setFocus();

  }

  private void mapToTableView( Map<String, String> map, TableView table ) {
    for ( String key : map.keySet() ) {
      table.add( key, map.get( key ) );

    }

    table.removeEmptyRows();

    table.setRowNums();

  }

  private void cancel() {
    stepname = null;

    model.setChanged( changed );

    dispose();

  }

  private void ok() {
    try {
      toModel( model );

    } catch ( KettleException e ) {
      new ErrorDialog( shell, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ErrorValidateData.DialogTitle" ),

              BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ErrorValidateData.DialogMessage" ), e );

    }

    dispose();

  }

  private void toModel( ElasticSearchBulkMeta in ) throws KettleException { // copy info to ElasticSearchBulkMeta

    stepname = wStepname.getText();

    in.setType( wType.getText() );

    in.setIndex( wIndex.getText() );

    in.setBatchSize( wBatchSize.getText() );

    in.setTimeOut( Const.NVL( wTimeOut.getText(), null ) );

    in.setTimeoutUnit( wTimeOut.getTimeUnit() );

    in.setIdInField( wIdInField.getText() );

    in.setOverWriteIfSameId( StringUtils.isNotBlank( wIdInField.getText() ) && wIsOverwrite.getSelection() );

    in.setStopOnError( wStopOnError.getSelection() );

    in.setJsonInsert( wIsJson.getSelection() );

    in.setJsonField( wIsJson.getSelection() ? wJsonField.getText() : null );

    in.setIdOutField( wIdOutField.getText() );

    in.setUseOutput( wUseOutput.getSelection() );

    in.clearFields();

    if ( !wIsJson.getSelection() ) {
      for ( int i = 0; i < wFields.getItemCount(); i++ ) {
        String[] row = wFields.getItem( i );

        if ( StringUtils.isNotBlank( row[0] ) ) {
          in.addField( row[0], row[1] );

        }

      }

    }

    in.clearServers();

    for ( int i = 0; i < wServers.getItemCount(); i++ ) {
      String[] row = wServers.getItem( i );

      if ( StringUtils.isNotBlank( row[0] ) ) {
        try {
          in.addServer( row[0], Integer.parseInt( row[1] ) );

        } catch ( NumberFormatException nfe ) {
          in.addServer( row[0], ElasticSearchBulkMeta.DEFAULT_PORT );

        }

      }

    }

    in.clearSettings();

    for ( int i = 0; i < wSettings.getItemCount(); i++ ) {
      String[] row = wSettings.getItem( i );

      in.addSetting( row[0], row[1] );

    }

  }

  private enum TestType {
    INDEX, CLUSTER,

  }

  private void test( TestType testType ) {
    try {
      ElasticSearchBulkMeta tempMeta = new ElasticSearchBulkMeta();

      toModel( tempMeta );

       if ( !tempMeta.getServers().isEmpty() ) {
      Settings.Builder settingsBuilder = Settings.builder();

      settingsBuilder.put( Settings.Builder.EMPTY_SETTINGS );

      tempMeta.getSettingsMap().entrySet().stream().forEach( ( s ) -> settingsBuilder.put( s.getKey(), transMeta

              .environmentSubstitute( s.getValue() ) ) );

      RestHighLevelClient rclient=null;

//        try ( PreBuiltTransportClient client = new PreBuiltTransportClient( settingsBuilder.build() ) ) {
//

//        for ( Server server : tempMeta.getServers() ) {
//

//          client.addTransportAddress( new TransportAddress(

//                  InetAddress.getByName( transMeta.environmentSubstitute( server.getAddress() ) ),

//                  server.getPort() ) );

//

//        }

        for ( Server server : tempMeta.getServers() ) {
             rclient = new RestHighLevelClient(RestClient.builder(new HttpHost(server.getAddress(), Integer.valueOf(server.getPort()), "http")));

        }

//        AdminClient admin = rclient.admin();

        String[] index = tempMeta.getIndex().split(",");

        GetIndexRequest request = new GetIndexRequest();

        request.indices(index);

        request.local(false);

        request.humanReadable(true);

        boolean exists  = rclient.indices().exists(request, RequestOptions.DEFAULT);

        switch ( testType ) {
          case INDEX:

            if ( StringUtils.isBlank( tempMeta.getIndex() ) ) {
              showError( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.NoIndex" ) );

              break;

            }

            // First check to see if the index exists

//            IndicesExistsRequestBuilder indicesExistBld = admin.indices().prepareExists( tempMeta.getIndex() );

//            IndicesExistsResponse indicesExistResponse = indicesExistBld.execute().get();

            if ( !exists ) {
              showError( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoIndex" ) );

              return;

            }

//            RecoveryRequestBuilder indicesBld = rclient.indices().prepareRecoveries( tempMeta.getIndex() );

//            ActionFuture<RecoveryResponse> lafInd = indicesBld.execute();

//            String shards = "" + lafInd.get().getSuccessfulShards() + "/" + lafInd.get().getTotalShards();

            showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.TestOK", "true" ) );

            break;

          case CLUSTER:

//            ClusterStateRequestBuilder clusterBld = admin.cluster().prepareState();

//            ActionFuture<ClusterStateResponse> lafClu = clusterBld.execute();

//            ClusterStateResponse cluResp = lafClu.actionGet();

//            String name = cluResp.getClusterName().value();

//            ClusterState cluState = cluResp.getState();

//            int numNodes = cluState.getNodes().getSize();

//            showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.TestOK", name, numNodes ) );

               MainResponse response = rclient.info(RequestOptions.DEFAULT);

            showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.TestOK", response.getClusterName(), response.getVersion() ) );

            break;

          default:

            break;

        }

      }else{
           showError( "Servers is null" );

      }

    } catch ( NoNodeAvailableException | MasterNotDiscoveredException e ) {
      showError( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoNodesFound" ) );

    } catch ( Exception e ) {
      showError( e.getLocalizedMessage() );

    }

  }

  private void showError( String message ) {
    MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_ERROR );

    mb.setMessage( message );

    mb.setText( BaseMessages.getString( PKG, "System.Dialog.Error.Title" ) );

    mb.open();

  }

  private void showMessage( String message ) {
    MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_INFORMATION );

    mb.setMessage( message );

    mb.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Test.TestOKTitle" ) );

    mb.open();

  }

  @Override

  public String toString() {
    return this.getClass().getName();

  }

}

打包

elasticsearch-bulk-insert\assemblies\plugin\target 生成ZIP文件

运行

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-15 23:47:47  更:2021-07-15 23:48:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/7 19:32:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码