quinta-feira, 21 de julho de 2016

AWS - Big Data - Parte 3

Nós já temos os logs no S3. Dessa maneira, não precisamos do EMR rodando a todo momento. Podemos carregar os logs fazer consultas, processamentos, etc e depois desligar o cluster.

Vamos fazer isso agora, vamos carregar as informações e fazer algumas consultas:

Da mesma maneira que fizemos no artigo anterior, vamos nos logar no EMR e após o login rodar o seguinte comando:

spark-sql --driver-java-options "-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties"

Estamos no shell SQL do Spark, copie e cole o seguinte script:

CREATE EXTERNAL TABLE access_log_raw(
  host STRING, identity STRING,
  user STRING, request_time STRING,
  request STRING, status STRING,
  size STRING, referrer STRING,
  agent STRING
)
PARTITIONED BY (year INT, month INT, day INT, hour INT, min INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?"
)
LOCATION 's3://escolha-o-nome-do-seu-bucket/access-log-raw';

msck repair table access_log_raw;

Agora estamos com os dados carregados no Spark e podemos rodar algumas consultas como:

--Primeira linha de log:
SELECT * FROM access_log_raw LIMIT 1;

--Contagem de todos os itens:
SELECT COUNT(1) FROM access_log_raw;

--Top 10 Hosts:
SELECT host, COUNT(1) FROM access_log_raw GROUP BY host ORDER BY 2 DESC LIMIT 10;

Porém a medida que o nosso volume de logs cresce, o nosso cluster (nesse tamanho) não vai dar conta de carregar. Para isso devemos usar um Data Warehouse para armazenar nossos dados e permitir que façamos as consultas. Assim vamos rodar um novo script, que vai deixar os dados prontos para serem utilizados pelo Redshift:

CREATE EXTERNAL TABLE access_log_processed (
  request_time STRING,
  host STRING,
  request STRING,
  status INT,
  referrer STRING,
  agent STRING
)
PARTITIONED BY (hour STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION 's3://escolha-o-nome-do-seu-bucket/access-log-processed';

Estamos usando o particionamento por hora, para o Redshift carregar paralelamente os logs e com isso ganharmos tempo.
Esse script criou a nossa tabela, vamos rodar o script que vai escrever para o S3:

-- setup Hive's "dynamic partitioning"
-- this will split output files when writing to Amazon S3
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;


-- compress output files on Amazon S3 using Gzip
SET mapred.output.compress=true;
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec= org.apache.hadoop.io.compress.GzipCodec;
SET io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;
-- convert the Apache log timestamp to a UNIX timestamp
-- split files in Amazon S3 by the hour in the log lines
INSERT OVERWRITE TABLE access_log_processed PARTITION (hour) 
  SELECT 
    from_unixtime(unix_timestamp(request_time, 
      '[dd/MMM/yyyy:HH:mm:ss Z]')),
    host,
    request,
    status,
    referrer,
    agent,
    hour(from_unixtime(unix_timestamp(request_time, 
      '[dd/MMM/yyyy:HH:mm:ss Z]'))) as hour
  FROM access_log_raw;

Feito! Temos os nossos dados já estruturados no S3.
Vamos nos conectar no RedShift e rodar mais algumas consultas.
Você pode usar a linha de comando no PostgreSQL ou ferramentas como o Agnity e o SQL WorkBench/J. Aqui vamos usar a linha de comando:

Na tela do Redhisft -> clusters -> demo copie o endereço do EndPoint, no nosso exemplo é: demo.cwfaorbxzfzn.us-east-1.redshift.amazonaws.com



e rode o seguinte comando:

psql -h SEU_ENDPOINT_REDSHIFT -p 8192 -U master demo

Rode o script de criação de tabela
CREATE TABLE accesslogs (
  request_time timestamp,
  host varchar(50),
  request varchar(1024),
  status int,
  referrer varchar(1024),
  agent varchar(1024)
)
DISTKEY(host)
SORTKEY(request_time);

e copie os dados do S3:

COPY accesslogs 
FROM 's3://YOUR-S3-BUCKET/access-log-processed' 
CREDENTIALS 
 'aws_access_key_id=SUA_ACCESS_KEY;aws_secret_access_key=SUA_SECRET_KEY'
DELIMITER '\t' IGNOREHEADER 0 
MAXERROR 0 
GZIP;

Dados carregados! Podemos rodar algumas consultas:

-- Distribuição dos status pelo dia
SELECT TRUNC(request_time),status,COUNT(1) FROM accesslogs GROUP BY 1,2 ORDER BY 1,3 DESC;

-- Status 404
SELECT COUNT(1) FROM accessLogs WHERE status = 404;

-- Todas as consultas que retornaram Page Not Found
SELECT TOP 1 request,COUNT(1) FROM accesslogs WHERE status = 404 GROUP BY 1 ORDER BY 2 DESC;

É isso, espero que tenham gostado. Em outro post, vamos colocar um código hospedado no S3, chamando uma função lambda, que poderá consultar o redshift e retornar alguns gráficos.


Nenhum comentário: