Skip to main content

Ingestion from Kinesis Stream to Amorphic Dataset

PAT

info

Create a stream

  • Click on 'Streams' widget on the home screen or click on INGESTION --> Streams on the left side navigation-bar or you may also click on Navigator on top right corner and search for Streams.
  • Click on a ➕ icon at the top right corner.
  • Enter the following details and click on create connection.
Stream Name: fraud-detection-events-stream-<your-userid>
Stream Type: Kinesis
Description: This stream receives finance data from a server.
Keywords: Stream
Data Retention Period (Hrs): 24
Shard Count: 1 (One shard provides a capacity of 1MB/sec data input and 2MB/sec data output.)

Create Stream

  • Check the status on details page. Click on 🔄 to refresh the status. Status will change from creating to active.
  • Once the stream is created, Stream Access Key and Stream Secret Key are available as shown below.

Create Stream

Create a consumer

  • Click on Consumers tab.
  • Click on a ➕ icon at the top right corner.
  • Enter the following details and click on create consumer.
Consumer Name: fraud-stream-to-s3-<your_userid>
Buffer Size: 50
Buffer Interval: 300
Dataset Name: stream_2_s3_fraud_detection_<your_userid>
Domain Name: workshop
Description: This Consumer pulls the data from fraud-detection-events-stream-<your-userid> to S3
Target Location: S3
File Type: csv
Data Classifications: None
AreAIServicesEnabled: No
Keywords: stream

Create Stream Create Stream

  • Once the stream is created, it looks like the following picture in the Consumers tab.

Create Stream

  • Click on the Dataset id link to go to the dataset created.
  • Notice the Dataset S3 location. It has a link to Stream ID as well.

Create Stream

Create a remote server

  • 🛑 If you don't have access to any server, you can skip the following sections and use stream_2_s3_fraud_detection_ankamv as an input dataset in the next 'predict fraud' section.
  • 💡 If you already have a server, jump to configure agent. 💡
  • Let's create an EC2 instance in your AWS account.
  • Login to AWS Account to go to EC2 Service.
  • Click on instances --> Launch instances.
  • Select the Amazon Linux AMI with ID ami-0d5eff06f840b45e9.
  • Select t2.medium or a better instance.
  • Select a security group that allows port number 22 (SSH) from your machine.
  • Select an existing key pair(pem file) or create a new one.
  • Once the server is running, SSH to it using the pem file.

Configure a Kinesis agent

Run the following commands on EC2 instance.

sudo yum install python-pip -y
sudo yum install aws-kinesis-agent -y
sudo python3 -m pip install pandas
sudo mkdir /ingest_data
sudo chown ec2-user:ec2-user /ingest_data
  • Edit agent configuration sudo vi /etc/aws-kinesis/agent.json
  • Copy paste following configuration after replacing awsAccessKeyId, awsSecretAccessKey, kinesisStream. You can find these details from the stream you've created above.
{
"cloudwatch.emitMetrics": true,
"awsAccessKeyId": "Copy and paste your stream's Access Key Id",
"awsSecretAccessKey": "Copy and paste your stream's Secret Access Key",
"kinesis.endpoint": "kinesis.us-east-1.amazonaws.com",
"flows": [
{
"filePattern": "/ingest_data/*.csv",
"kinesisStream": "fraud-detection-events-stream-<your-userid>"
}
]
}
  • Create a python program
  • Use command vi /home/ec2-user/duplicate_files.py. Copy and paste below lines.
#!/usr/bin/env python3
import datetime
import pandas as pd

def copy_csv(filename):
import pandas as pd
df = pd.read_csv('/home/ec2-user/fraud.csv',low_memory=False)
df_out = df.sample(n = 5)
df_out.to_csv('/ingest_data/'+datetime.datetime.now().strftime("%Y%m%d-%H%M%S") +'_streams.csv',mode = 'w', index=False)

copy_csv('file.csv')

  • Create a schedule to run program every minute.
  • Run command crontab -e. Copy and paste below line.
*/1 * * * * /usr/bin/python3 /home/ec2-user/duplicate_files.py

Run agent and monitor status

  • Run the following commands to start the agent.
sudo service aws-kinesis-agent start
sudo chkconfig aws-kinesis-agent on
  • Click here to download a test file.
  • Rename the file as 'FraudTestData01.csv'.
  • Run this command locally to copy the test file to the server location.
scp -i your-pem-file.pem FraudTestData01.csv  ec2-user@your-server-ip-address:/home/ec2-user/fraud.csv

Note: If you can't 'scp' this file, copy and paste the content of the file CSV file on server. You may copy a big file 'fraud_test_with_email_ip.csv' from 'finance_fraud_detection_data' dataset if needed.

  • Kinesis producer agent status can be watched running the following command on the server.
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Check output

  • Go to the Amorphic Dataset: stream_2_s3_fraud_detection_<your_userid>. Use the navigator to quickly go to the Dataset page. Press ctrl two times successively and type the dataset name stream_2_s3_fraud_detection_<your_userid>.
  • Click on the files tab.
  • Click on three dots ⋮ in front of the file to download it.
  • Verify the number of records in each file and frequency of file loading on S3.


Congratulations!!!

You have learned how to create a stream, producer and consumer on Amorphic. Now, proceed to 'Predict Fraud' task.