quinta-feira, 21 de julho de 2016

AWS - Big Data - Parte 2

Agora que o nosso ambiente está configurado. Vamos enviar algum log para o Kinesis. Em um ambiente de produção você pode usar algum plugin como o Appender para o log4j, pode usar a lib KCL entre outras maneiras.

No nosso exemplo vamos usar o appender, para enviar um arquivo de log para o Kinesis.

Vamos fazer o download do appender:

wget http://emr-kinesis.s3.amazonaws.com/publisher/ kinesis-log4j-appender-1.0.0.jar

E do log também:

wget http://elasticmapreduce.s3.amazonaws.com/samples/ pig-apache/input/access_log_1

Na mesma pasta que o appender, vamos criar um arquivo chamado awsCredentials.properties com as credenciais IAM, que tenham permissão para escrita no Stream do Kinesis que criamos. O conteúdo ficará assim:

accessKey=SUA_ACCESS_KEY
secretKey=SUA_SECRET_KEY

Agora vamos rodar o comando que enviará o log para o Kinesis.

java -cp .:kinesis-log4j-appender-1.0.0.jar com.amazonaws.services.kinesis.log4j.FilePublisher access_log_1 &

Em poucos instantes, aparecerão linhas de log contendo:

INFO [main] (FilePublisher.java:58) - Started reading: access_log_1
DEBUG [main] (FilePublisher.java:64) - 100 records written
DEBUG [main] (FilePublisher.java:64) - 200 records written
DEBUG [main] (FilePublisher.java:64) - 300 records written
DEBUG [main] (FilePublisher.java:64) - 400 records written
DEBUG [main] (FilePublisher.java:64) - 500 records written
DEBUG [main] (FilePublisher.java:64) - 600 records written

Continue aguardando até o final, pois ele irá enviar aproximadamente 39000 linhas de log para o Kinesis. Em um ambiente real, teríamos um pool de servidores apache enviando seus logs para o Kinesis.

Quando aparecer
 INFO [main] (FilePublisher.java:68) - Finished publishing 39344 log events from access_log_1, took 6 minutes, 46 seconds and 206 milliseconds to publish
 INFO [main] (FilePublisher.java:70) - DO NOT kill this process immediately, publisher threads will keep on sending buffered logs to Kinesis
DEBUG [pool-1-thread-8] (AsyncPutCallStatsReporter.java:52) - Appender (KINESIS) made 39000 successful put requests out of total 39000 in 6 minutes, 52 seconds and 559 milliseconds since start

Pode aguardar alguns instantes e encerrar.

Agora temos os nossos logs no Kinesis e precisamos que o EMR, pegue essa informação e coloque no S3. Para isso vamos nos logar em nosso EMR. Para pegar o nome do seu nó siga o seguinte caminho: EMR -> Cluster List -> Demo e copie o Master public DNS. Rode o seguinte comando:

ssh -o TCPKeepAlive=yes -o ServerAliveInterval=30 -i SUA_CHAVE_SSH hadoop@MASTER_PUBLIC_DNS
 Após o login, vamos fazer download do cliente do Kinesis:

wget http://repo1.maven.org/maven2/com/amazonaws/amazon-kinesis-client/1.6.0/amazon-kinesis-client-1.6.0.jar
Diminuir o log do console:

sudo sed -i 's/INFO/ERROR/g' /usr/lib/spark/conf/spark-defaults.confsudo sed -i 's/INFO/ERROR/g' /usr/lib/spark/conf/log4j.properties
Rodar o Shell do Spark:

spark-shell --jars /usr/lib/spark/extras/lib/spark-streaming-kinesis-asl.jar,amazon-kinesis-client-1.6.0.jar --driver-java-options "-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties"

A sua tela deve estar semelhante a esta:


Copie e cole o seguinte código, dentro da máquina do EMR, verifique a endpointUrl e o outputDir para ficar compatível com a sua configuração.

/* import required libraries */
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisUtils
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker._
import java.util.Date
import org.apache.hadoop.io.compress._

/* Set up the variables as needed */
val streamName = "AccessLogStream"
val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
val outputDir = "s3://escolha-o-nome-do-seu-bucket/access-log-raw"
val outputInterval = Seconds(60)

/* Reconfigure the spark-shell */
val sparkConf = sc.getConf
sparkConf.setAppName("S3Writer")
sparkConf.remove("spark.driver.extraClassPath")
sparkConf.remove("spark.executor.extraClassPath")
sc.stop
val sc = new SparkContext(sparkConf)

/* Setup the KinesisClient */
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpointUrl)

/* Determine the number of shards from the stream */
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size()

/* Create one worker per Kinesis shard */
val ssc = new StreamingContext(sc, outputInterval)
val kinesisStreams = (0 until numShards).map { i =>
  KinesisUtils.createStream(ssc, streamName, endpointUrl,outputInterval,InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY)
}
/* Merge the worker Dstreams and translate the byteArray to string */
val unionStreams = ssc.union(kinesisStreams)
val accessLogs = unionStreams.map(byteArray => new String(byteArray))

/* Write each RDD to Amazon S3 */
accessLogs.foreachRDD( (rdd,time) => {
  if (rdd.count > 0) {
    val outPartitionFolder = new java.text.SimpleDateFormat("'year='yyyy/'month='MM/'day='dd/'hour='hh/'min='mm").format(new Date(time.milliseconds))
    rdd.saveAsTextFile("%s/%s".format(outputDir,outPartitionFolder),classOf[GzipCodec])
}})
ssc.start()
ssc.awaitTermination()

Por fim rode o seguinte comando na sua estação:
aws s3 ls s3://escolha-o-nome-do-seu-bucket/access-log-raw/ --recursive
Caso tenha um arquivo chamado _SUCCESS pode fazer um CTRL+C no no terminal do EMR. Caso ele não tenha encerrado automaticamente.
Agora vamos para a terceira parte onde nós faremos o processamento desses dados.



Nenhum comentário: