shell脚本导入
dt=`date +"%Y%m%d" -d "-1 days"`
outpath=/home/etl/test/test.txt
brokerlist='kakfa卡集群地址'
echo $dt $outpath $brokerlist
fileSize=`du -b ${outpath} | awk '{print $1}'`
if [ $fileSize -gt 0 ]
then
cat ${outpath} | /usr/local/kafka/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic health_doc_tag_history_test | > out.txt
fi
java代码导入
public class TextProducerTest {
private static final String server= "";
private static final String topic = "demo";
public static void main(String[] args) throws IOException {
Properties prop = new Properties();
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,server);
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
File file = new File("/home/etl/test/test.txt");
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
String line = null;
while ((line =bufferedReader.readLine()) != null ){
ProducerRecord<String,String> producerRecord = new ProducerRecord(topic,line);
producer.send(producerRecord);
}
producer.close();
}
}
|