quinta-feira, 21 de julho de 2016

AWS - Big Data - Parte 3

Nós já temos os logs no S3. Dessa maneira, não precisamos do EMR rodando a todo momento. Podemos carregar os logs fazer consultas, processamentos, etc e depois desligar o cluster.

Vamos fazer isso agora, vamos carregar as informações e fazer algumas consultas:

Da mesma maneira que fizemos no artigo anterior, vamos nos logar no EMR e após o login rodar o seguinte comando:

spark-sql --driver-java-options "-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties"

Estamos no shell SQL do Spark, copie e cole o seguinte script:

CREATE EXTERNAL TABLE access_log_raw(
  host STRING, identity STRING,
  user STRING, request_time STRING,
  request STRING, status STRING,
  size STRING, referrer STRING,
  agent STRING
)
PARTITIONED BY (year INT, month INT, day INT, hour INT, min INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?"
)
LOCATION 's3://escolha-o-nome-do-seu-bucket/access-log-raw';

msck repair table access_log_raw;

Agora estamos com os dados carregados no Spark e podemos rodar algumas consultas como:

--Primeira linha de log:
SELECT * FROM access_log_raw LIMIT 1;

--Contagem de todos os itens:
SELECT COUNT(1) FROM access_log_raw;

--Top 10 Hosts:
SELECT host, COUNT(1) FROM access_log_raw GROUP BY host ORDER BY 2 DESC LIMIT 10;

Porém a medida que o nosso volume de logs cresce, o nosso cluster (nesse tamanho) não vai dar conta de carregar. Para isso devemos usar um Data Warehouse para armazenar nossos dados e permitir que façamos as consultas. Assim vamos rodar um novo script, que vai deixar os dados prontos para serem utilizados pelo Redshift:

CREATE EXTERNAL TABLE access_log_processed (
  request_time STRING,
  host STRING,
  request STRING,
  status INT,
  referrer STRING,
  agent STRING
)
PARTITIONED BY (hour STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION 's3://escolha-o-nome-do-seu-bucket/access-log-processed';

Estamos usando o particionamento por hora, para o Redshift carregar paralelamente os logs e com isso ganharmos tempo.
Esse script criou a nossa tabela, vamos rodar o script que vai escrever para o S3:

-- setup Hive's "dynamic partitioning"
-- this will split output files when writing to Amazon S3
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;


-- compress output files on Amazon S3 using Gzip
SET mapred.output.compress=true;
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec= org.apache.hadoop.io.compress.GzipCodec;
SET io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;
-- convert the Apache log timestamp to a UNIX timestamp
-- split files in Amazon S3 by the hour in the log lines
INSERT OVERWRITE TABLE access_log_processed PARTITION (hour) 
  SELECT 
    from_unixtime(unix_timestamp(request_time, 
      '[dd/MMM/yyyy:HH:mm:ss Z]')),
    host,
    request,
    status,
    referrer,
    agent,
    hour(from_unixtime(unix_timestamp(request_time, 
      '[dd/MMM/yyyy:HH:mm:ss Z]'))) as hour
  FROM access_log_raw;

Feito! Temos os nossos dados já estruturados no S3.
Vamos nos conectar no RedShift e rodar mais algumas consultas.
Você pode usar a linha de comando no PostgreSQL ou ferramentas como o Agnity e o SQL WorkBench/J. Aqui vamos usar a linha de comando:

Na tela do Redhisft -> clusters -> demo copie o endereço do EndPoint, no nosso exemplo é: demo.cwfaorbxzfzn.us-east-1.redshift.amazonaws.com



e rode o seguinte comando:

psql -h SEU_ENDPOINT_REDSHIFT -p 8192 -U master demo

Rode o script de criação de tabela
CREATE TABLE accesslogs (
  request_time timestamp,
  host varchar(50),
  request varchar(1024),
  status int,
  referrer varchar(1024),
  agent varchar(1024)
)
DISTKEY(host)
SORTKEY(request_time);

e copie os dados do S3:

COPY accesslogs 
FROM 's3://YOUR-S3-BUCKET/access-log-processed' 
CREDENTIALS 
 'aws_access_key_id=SUA_ACCESS_KEY;aws_secret_access_key=SUA_SECRET_KEY'
DELIMITER '\t' IGNOREHEADER 0 
MAXERROR 0 
GZIP;

Dados carregados! Podemos rodar algumas consultas:

-- Distribuição dos status pelo dia
SELECT TRUNC(request_time),status,COUNT(1) FROM accesslogs GROUP BY 1,2 ORDER BY 1,3 DESC;

-- Status 404
SELECT COUNT(1) FROM accessLogs WHERE status = 404;

-- Todas as consultas que retornaram Page Not Found
SELECT TOP 1 request,COUNT(1) FROM accesslogs WHERE status = 404 GROUP BY 1 ORDER BY 2 DESC;

É isso, espero que tenham gostado. Em outro post, vamos colocar um código hospedado no S3, chamando uma função lambda, que poderá consultar o redshift e retornar alguns gráficos.


AWS - Big Data - Parte 2

Agora que o nosso ambiente está configurado. Vamos enviar algum log para o Kinesis. Em um ambiente de produção você pode usar algum plugin como o Appender para o log4j, pode usar a lib KCL entre outras maneiras.

No nosso exemplo vamos usar o appender, para enviar um arquivo de log para o Kinesis.

Vamos fazer o download do appender:

wget http://emr-kinesis.s3.amazonaws.com/publisher/ kinesis-log4j-appender-1.0.0.jar

E do log também:

wget http://elasticmapreduce.s3.amazonaws.com/samples/ pig-apache/input/access_log_1

Na mesma pasta que o appender, vamos criar um arquivo chamado awsCredentials.properties com as credenciais IAM, que tenham permissão para escrita no Stream do Kinesis que criamos. O conteúdo ficará assim:

accessKey=SUA_ACCESS_KEY
secretKey=SUA_SECRET_KEY

Agora vamos rodar o comando que enviará o log para o Kinesis.

java -cp .:kinesis-log4j-appender-1.0.0.jar com.amazonaws.services.kinesis.log4j.FilePublisher access_log_1 &

Em poucos instantes, aparecerão linhas de log contendo:

INFO [main] (FilePublisher.java:58) - Started reading: access_log_1
DEBUG [main] (FilePublisher.java:64) - 100 records written
DEBUG [main] (FilePublisher.java:64) - 200 records written
DEBUG [main] (FilePublisher.java:64) - 300 records written
DEBUG [main] (FilePublisher.java:64) - 400 records written
DEBUG [main] (FilePublisher.java:64) - 500 records written
DEBUG [main] (FilePublisher.java:64) - 600 records written

Continue aguardando até o final, pois ele irá enviar aproximadamente 39000 linhas de log para o Kinesis. Em um ambiente real, teríamos um pool de servidores apache enviando seus logs para o Kinesis.

Quando aparecer
 INFO [main] (FilePublisher.java:68) - Finished publishing 39344 log events from access_log_1, took 6 minutes, 46 seconds and 206 milliseconds to publish
 INFO [main] (FilePublisher.java:70) - DO NOT kill this process immediately, publisher threads will keep on sending buffered logs to Kinesis
DEBUG [pool-1-thread-8] (AsyncPutCallStatsReporter.java:52) - Appender (KINESIS) made 39000 successful put requests out of total 39000 in 6 minutes, 52 seconds and 559 milliseconds since start

Pode aguardar alguns instantes e encerrar.

Agora temos os nossos logs no Kinesis e precisamos que o EMR, pegue essa informação e coloque no S3. Para isso vamos nos logar em nosso EMR. Para pegar o nome do seu nó siga o seguinte caminho: EMR -> Cluster List -> Demo e copie o Master public DNS. Rode o seguinte comando:

ssh -o TCPKeepAlive=yes -o ServerAliveInterval=30 -i SUA_CHAVE_SSH hadoop@MASTER_PUBLIC_DNS
 Após o login, vamos fazer download do cliente do Kinesis:

wget http://repo1.maven.org/maven2/com/amazonaws/amazon-kinesis-client/1.6.0/amazon-kinesis-client-1.6.0.jar
Diminuir o log do console:

sudo sed -i 's/INFO/ERROR/g' /usr/lib/spark/conf/spark-defaults.confsudo sed -i 's/INFO/ERROR/g' /usr/lib/spark/conf/log4j.properties
Rodar o Shell do Spark:

spark-shell --jars /usr/lib/spark/extras/lib/spark-streaming-kinesis-asl.jar,amazon-kinesis-client-1.6.0.jar --driver-java-options "-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties"

A sua tela deve estar semelhante a esta:


Copie e cole o seguinte código, dentro da máquina do EMR, verifique a endpointUrl e o outputDir para ficar compatível com a sua configuração.

/* import required libraries */
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisUtils
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker._
import java.util.Date
import org.apache.hadoop.io.compress._

/* Set up the variables as needed */
val streamName = "AccessLogStream"
val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
val outputDir = "s3://escolha-o-nome-do-seu-bucket/access-log-raw"
val outputInterval = Seconds(60)

/* Reconfigure the spark-shell */
val sparkConf = sc.getConf
sparkConf.setAppName("S3Writer")
sparkConf.remove("spark.driver.extraClassPath")
sparkConf.remove("spark.executor.extraClassPath")
sc.stop
val sc = new SparkContext(sparkConf)

/* Setup the KinesisClient */
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpointUrl)

/* Determine the number of shards from the stream */
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size()

/* Create one worker per Kinesis shard */
val ssc = new StreamingContext(sc, outputInterval)
val kinesisStreams = (0 until numShards).map { i =>
  KinesisUtils.createStream(ssc, streamName, endpointUrl,outputInterval,InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY)
}
/* Merge the worker Dstreams and translate the byteArray to string */
val unionStreams = ssc.union(kinesisStreams)
val accessLogs = unionStreams.map(byteArray => new String(byteArray))

/* Write each RDD to Amazon S3 */
accessLogs.foreachRDD( (rdd,time) => {
  if (rdd.count > 0) {
    val outPartitionFolder = new java.text.SimpleDateFormat("'year='yyyy/'month='MM/'day='dd/'hour='hh/'min='mm").format(new Date(time.milliseconds))
    rdd.saveAsTextFile("%s/%s".format(outputDir,outPartitionFolder),classOf[GzipCodec])
}})
ssc.start()
ssc.awaitTermination()

Por fim rode o seguinte comando na sua estação:
aws s3 ls s3://escolha-o-nome-do-seu-bucket/access-log-raw/ --recursive
Caso tenha um arquivo chamado _SUCCESS pode fazer um CTRL+C no no terminal do EMR. Caso ele não tenha encerrado automaticamente.
Agora vamos para a terceira parte onde nós faremos o processamento desses dados.



AWS - Big Data - Parte 1


A melhor definição de Big Data que eu ouvi, dizia que Big Data é aquele tipo/volume de dado que não é possível/viável de ser tratado na sua estação de trabalho. E vejo diversas pessoas com dúvidas e se perguntando, como que isso pode ser utilizado no seu dia-a-dia.


Então para trazer um caso de uso e exemplo, vamos falar de logs. Eu que trabalho na área de infra, administro diversos servidores que geram muitas linhas de log. Se pensarmos em ambiente AWS, onde uma máquina pode durar menos de 1 hora, fica difícil você tratar e identificar padrões nesses logs.

Pensando nisso, vamos criar uma solução de Big Data, que irá coletar logs de servidores, convertê-los em um formato SQLizado e carregar em uma solução de Data Warehouse quando necessário. Tudo isso por um custo baixo. Eu diria que isso pode ficar mais barato que um café, mas como isso não vale na nossa realidade tupiniquim, deixo essa afirmação para os nossos amigos gringos.

Esse post está baseado em uma apresentação que ocorreu no re:invent da AWS em 2015 e pode ser assistido aqui. Já existe uma versão mais atualizada, onde é usado o Firehose. Isso será tema de uma nova postagem. Mas caso você queira ver a apresentação atualizada, segue o link.

O post será dividido em 3 partes:
  1. Setup ambiente
  2. Coleta de dados
  3. Processamento e Análise de dados

Serviços:

Primeiro vamos apresentar os serviços da Amazon que vão compor a nossa solução, mais informações cliquem sobre o link:

Processamento de dados:
Armazenamento e coleta de dados:
A solução seguirá o seguinte fluxo:


Kinesis:
Vamos lá, primeiro vamos criar o shard do Kinesis, você pode criar usando linha de comando:

aws kinesis create-stream \
  --stream-name AccessLogStream \
  --shard-count 1
ou pelo console em Kinesis -> Go to Kinesis Stream -> Create Stream:


O kinesis receberá o log dos nossos servidores, os logs ficarão até 24 horas a disposição da aplicação que vai tratá-los. Caso aumente o número de servidores, devemos aumentar o número de shards. 
OBS: Para este exemplo o nome tem que ser AccessLogStream

S3:
Vamos criar um bucket para armazenar os nossos logs:

CLI:
aws s3 mb s3://escolha_o_nome_do_seu_bucket

Console:

S3 -> Create Bucket:


EMR:
O cluster EMR será responsável por pegar os logs, tratá-los e devolvê-los ao Bucket S3. A principal função dele no nosso caso é de estruturar o dado que está sendo recebido.

CLI:
aws emr create-cluster \
  --name "demo" \
  --instance-type m3.xlarge \
  --instance-count 2 \
  --release-label emr-4.6.0 \
  --ec2-attributes KeyName=SUA_CHAVE_SSH \
  --use-default-roles \
  --applications Name=Hive Name=Spark

Console:

EMR -> Create Cluster -> Go to advanced options

Passo 1:
 Passo 2:

Passo 3:

Passo 4:



Redshift:
Um Data Warehouse gerenciado e escalável. Ele receberá os nossos logs já estrutudados, para que os dados sejam analisados.

CLI:
aws redshift create-cluster \  --cluster-identifier demo \  --db-name demo \  --node-type dc1.large \  --cluster-type single-node \  --master-username master \  --master-user-password DEFINA_A_SUA_SENHA \  --publicly-accessible \  --port 8192

Console:
Redshift -> Launch Cluster

Passo 1:
 Passo 2:
 Passo 3:
 Passo 4:

Agora que o nosso ambiente está configurado vamos para a parte 2.


AWS - CloudFormation - Uma outra abordagem

Mais uma tentativa de tirar poeira desse blog. Inclusive pretendo tirá-lo do Blogger e migrar para o S3 com Jekyll, mas isso será motivo de uma nova postagem.

A motivação para essa postagem é a pouca cobertura que o tema (Cloudformation) tem em português. Em inglês temos diversas postagens, e temos os modelos (update: também disponíveis em português) que a Amazon nos oferece, mas eles em geral possuem arquivos gigantes que desanimam quem está tentando aprender. Então lá vai:

Definição oficial de CloudFormation:

"O AWS CloudFormation oferece aos desenvolvedores e administradores de sistemas uma maneira fácil de criar e gerenciar um grupo de recursos relacionados à AWS, e fornecê-los e atualizá-los de uma forma organizada e previsível"

Definição não-oficial:

Infrastructure-as-a-code

Por que usar:
  • Versionamento de Infra
  • Agilidade no deploy
  • Controle de dependências
  • Sem erro nas repetições
  • Outputs

Como Funciona:

O CloudFormation é um serviço da AWS que permite que você crie (através de arquivos no formato JSON ou do AWS Cloudformation Designer) um template com todos os recursos AWS que você precisa.  Ele trata as dependências, relações, ordens e caso algum recurso falhe para ser criado ou provisionado, ele cancela toda a criação.

Você não precisa saber a ordem de provisionamento dos serviços da AWS ou os detalhes para as dependências funcionarem. O CloudFormation cuida disso para você. Após a implantação dos recursos da AWS, você pode modificá-los e atualizá-los de uma forma controlada e previsível, efetivamente aplicando o controle de versão à sua infraestrutura na AWS da mesma forma que faz com o seu software.

Também traz uma maneira segura de undeploy, caso você precise remover ele se encarregará de "desprovisionar" todos os recursos relacionados à aquele CloudFormation.

Não há custo para o uso, somente o custo dos recursos que ele provisionar serão faturados.


Sugestão de abordagem:

Uma das dificuldades no aprendizado do cloudformation é que todas as documentações possuem enormes arquivos JSON com uma pilha completamente configurada. Para alguns casos essa abordagem é interessante, mas para aprendizado sugerimos uma abordagem dividida.

Ao invés de um arquivo contendo todos os recursos, vamos utilizar mais de um arquivo.

Vantagens:
  • Simplicidade
  • Reuso de código
  • Diversas equipes subindo itens em um ambiente já provisionado
Desvantagens:
  • Falta de dependências (falaremos mais sobre isso)
  • Tendência para perda de controle do ambiente
Mão na massa:

Como falado anteriormente, o cloudformation utiliza o formato JSON e no nosso exemplo criaremos um VPC em um arquivo. E depois, em um arquivo adicional, criaremos uma instância EC2.

O primeiro arquivo pode ser baixado aqui e o segundo aqui.

Faça download do primeiro arquivo e no console da amazon acesse:

Cloudformation -> Create Stack -> Upload a template do Amazon S3 -> selecione o arquivo -> next

Nessa tela preencha o nome da sua Stack. Algumas já estão preenchidas, caso queira pode mudar, caso contrário pode clicar em next -> next -> Create

Em alguns instantes você terá um VPC novo e configurado com o seguinte:

  • Internet Gateway
  • Duas subnets públicas com route table para o Internet Gateway
  • Duas subnets privadas
  • Security Group liberando somente as subnet publicas para acesso
  • Security Group liberando porta 22, 3389 e ICMP do IP da sua empresa

Vamos para o segundo arquivo:

Escolha chave SSH, Subnet (alguma das criadas anteriormente) e Security group (algum dos criados anteriormente) que serão utilizados.


Pronto! Temos uma instância EC2 rodando dentro do VPC na Subnet desejada.
Selecione a sua stack e vá até a aba outputs, você deve ter algo semelhante a isso:


Um pouco de teoria:
Os nossos arquivos CloudFormation são compostos por 4 sessões (clique nos nomes, para acessar a documentação):

  1. Parameters: São as informações que precisamos que sejam informados pelo usuário, no nosso caso os endereços de rede de VPC e a chave SSH são alguns exemplos.
  2. Mappings: São as constantes condicionais que estarão presentes. No nosso caso, temos para a instância EC2 o mapeamento da AMI que queremos, para cada região que a instância pode ser lançada.
  3. Resources: Recursos que serão criados pelo template, no nosso caso temos desde a instância em si, até tabela de roteamento e security group
  4. Outputs: Retorno que queremos para acesso, documentação ou automações (Cloudformation também funciona em CLI ;) ).
Casos de uso:

Nesse exemplo podemos ter uma equipe de infra que criou o VPC e dentro dele, outras equipes podem lançar seus EC2, sem impactar no que foi feito pela equipe de infra e seguindo um padrão, previamente definido.

Porém este cenário é didático, para o dia-a-dia um arquivo monolítico pode ser mais seguro. Imagina nesse caso que alguém resolver remover o cloudformation do VPC. Ele vai tentar apagar tudo, mas não conseguirá pela existência de instâncias rodando. Mas pela característica do CloudFormation ele vai conseguir apagar o Internet Gateway antes de perceber isso. Logo suas instâncias ficarão sem conectividade.

Assim é preferível para o cotidiano um arquivo só e as alterações podem ser feitas no Cloudformation, consulte a documentação para ver se haverá algum tipo de indisponibilidade na sua alteração.

Comentando a sessão resources do EC2 (estamos desconsiderando as chaves):
  1. "Ec2Instance" - Nome do recurso que pode ser referenciado em outros recursos e nos Outputs.
  2. "Type" : "AWS::EC2::Instance - Tipo do recurso 
  3. "Properties" : - Propriedades
  4. "ImageId" : { "Fn::FindInMap" : [ "RegionMap", { "Ref" : "AWS::Region" }, "AMI" ]} - Imagem que vai ser usada, ele verifica a região que você está usando (no console ou na CLI), para verificar qual AMI deve ser utilizada.
  5. "KeyName": { "Ref": "KeyName"  - Chave escolhida pelo usuário, na sessão parameters
  6. "InstanceType" : "m1.small" - Tamanho da instância, neste caso padronizamos m1.small
  7. "NetworkInterfaces" : - Sub sessão networkInterfaces
  8. "GroupSet" : [{ "Ref" : "SecurityGroupIds" }] - Security Group escolhido pelo usuário, na sessão parameters
  9. "DeviceIndex" : "0", - Ordem que a interface de rede será anexada. Neste caso será a primeira placa de rede
  10. "DeleteOnTermination" : "true" - Será removida quando a máquina for terminada
  11. "SubnetId" : { "Ref" : "SubnetId" } - Subnet escolhida pelo usuário, na sessão parameters
É isso. Espero que esse exemplo sirva de inspiração para que você comece a utilizar Cloudformation na sua conta AWS. Fique a vontade para copiar e para tirar dúvidas aqui nos comentários.