项目作者: chanshunli

项目描述 :
Lisp like R (Native support) & statistics, machine learning
高级语言: R
项目地址: git://github.com/chanshunli/jim-emacs-fun-r-lisp.git
创建时间: 2017-06-30T02:29:33Z
项目社区:https://github.com/chanshunli/jim-emacs-fun-r-lisp

开源协议:

下载


Visualization Python & R Machine Learning, Deep Learning, Reinforcement Learning

First principle

  • Find the same probability distribution in a large amount of data and make predictions based on the same probability distribution: y = f(x)
  • Just like learning a function relationship, the inverse function or reverse engineering function requires DL. You just know that the data has a certain pattern and then guess what the original function that generated the data is. For example, you train to get a calculator neural network.
  • The idea of high-dimensional space: the code is cut into high-dimensional space, and then a very detailed high-dimensional classification is done to separate it. Then the search is also high-dimensional, just like the code, it is entered into the treesitter to do training to obtain logical learning relationships. Most of NLP is a multi-classification problem in high-dimensional space.
  • Collect the input x and output y around you as training data, and mine their mapping relationship f(x) at any time. You can use GPT to generate certain data for your model training needs or write crawler to get you need data.

init env

  1. conda create -n emacspy python=3.11
  2. conda activate emacspy
  3. poetry install

least squares method

  1. import numpy as np
  2. import matplotlib.pyplot as plt
  3. # Example data points
  4. X = np.array([1, 2.2, 3, 4, 5])
  5. y = np.array([2, 4, 6.3, 8, 11])
  6. # Add a column of ones to X for the intercept term (bias)
  7. X_b = np.c_[np.ones((X.shape[0], 1)), X] # X_b is X with a bias column
  8. # Calculate the best fit line parameters using the Normal Equation
  9. theta_best = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y)
  10. # Print the parameters (intercept and slope)
  11. print(f"Intercept: {theta_best[0]}")
  12. print(f"Slope: {theta_best[1]}")
  13. # Predict values using the model
  14. y_pred = X_b.dot(theta_best)
  15. # Plot the data points and the best fit line
  16. plt.scatter(X, y, color='blue', label='Data points')
  17. plt.plot(X, y_pred, color='red', label='Best fit line')
  18. plt.xlabel('X')
  19. plt.ylabel('y')
  20. plt.legend()
  21. plt.show()

least squares method by neural network

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. import matplotlib.pyplot as plt
  5. # graph show the pytorch torch.optim.Adam and plot it How it works
  6. # Define a simple linear model
  7. class LinearModel(nn.Module):
  8. def __init__(self):
  9. super(LinearModel, self).__init__()
  10. self.linear = nn.Linear(1, 1)
  11. def forward(self, x):
  12. return self.linear(x)
  13. # Initialize the model, loss function, and optimizer
  14. model = LinearModel()
  15. criterion = nn.MSELoss()
  16. optimizer = optim.Adam(model.parameters(), lr=0.01)
  17. # Generate some synthetic data (y = 2x + 1 with some noise)
  18. x_train = torch.linspace(-1, 1, 100).reshape(-1, 1)
  19. y_train = 2 * x_train + 1 + 0.2 * torch.randn(x_train.size())
  20. # List to store the loss values
  21. loss_values = []
  22. # Training loop
  23. for epoch in range(1000):
  24. model.train()
  25. optimizer.zero_grad()
  26. outputs = model(x_train)
  27. loss = criterion(outputs, y_train)
  28. loss.backward()
  29. optimizer.step()
  30. loss_values.append(loss.item())

nonlinear fitting

  1. import torch
  2. import torch.nn as nn
  3. import numpy as np
  4. import matplotlib.pyplot as plt
  5. # Step 1: Generate a 100-length random sequence
  6. n = 100
  7. x = torch.linspace(1, 10, n).unsqueeze(1)
  8. y = torch.sin(x) + torch.rand(n, 1) * 0.5
  9. # Step 2: Define a simple neural network model for nonlinear fitting
  10. class NonlinearModel(nn.Module):
  11. def __init__(self):
  12. super(NonlinearModel, self).__init__()
  13. self.fc1 = nn.Linear(1, 10)
  14. self.fc2 = nn.Linear(10, 10)
  15. self.fc3 = nn.Linear(10, 1)
  16. def forward(self, x):
  17. x = torch.relu(self.fc1(x))
  18. x = torch.relu(self.fc2(x))
  19. x = self.fc3(x)
  20. return x
  21. model = NonlinearModel()
  22. # Step 3: Define loss function and optimizer
  23. criterion = nn.MSELoss()
  24. optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
  25. # Step 4: Train the model
  26. epochs = 1000
  27. for epoch in range(epochs):
  28. model.train()
  29. # Forward pass
  30. outputs = model(x)
  31. loss = criterion(outputs, y)
  32. # Backward pass and optimization
  33. optimizer.zero_grad()
  34. loss.backward()
  35. optimizer.step()
  36. if (epoch+1) % 100 == 0:
  37. print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')
  38. # Step 5: Plot the original data and the fitted curve
  39. model.eval()
  40. with torch.no_grad():
  41. predicted = model(x).numpy()
  42. plt.figure(figsize=(10, 5))
  43. plt.plot(x.numpy(), y.numpy(), 'ro', label='Original data')
  44. plt.plot(x.numpy(), predicted, 'b-', label='Fitted curve')
  45. plt.legend()
  46. plt.show()

polar coordinate classification

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. from torch.utils.data import DataLoader, TensorDataset
  5. import matplotlib.pyplot as plt
  6. from mpl_toolkits.mplot3d import Axes3D
  7. # Helper function to convert Cartesian to Polar coordinates
  8. def cartesian_to_polar(x, y, z):
  9. r = torch.sqrt(x**2 + y**2 + z**2)
  10. theta = torch.atan2(y, x)
  11. phi = torch.acos(z / r)
  12. return r, theta, phi
  13. # Example data generation (replace with your actual data)
  14. n_samples = 5000
  15. x = torch.randn(n_samples)
  16. y = torch.randn(n_samples)
  17. z = torch.randn(n_samples)
  18. labels = torch.randint(0, 4, (n_samples,)) # Four classes (0, 1, 2, 3)
  19. # Convert to polar coordinates
  20. r, theta, phi = cartesian_to_polar(x, y, z)
  21. # Combine into a single tensor
  22. data = torch.stack((r, theta, phi), dim=1)
  23. # Create a Dataset and DataLoader
  24. dataset = TensorDataset(data, labels)
  25. train_loader = DataLoader(dataset, batch_size=32, shuffle=True)
  26. # Define a simple feedforward neural network
  27. class PolarNet(nn.Module):
  28. def __init__(self):
  29. super(PolarNet, self).__init__()
  30. self.fc1 = nn.Linear(3, 64)
  31. self.fc2 = nn.Linear(64, 128)
  32. self.fc3 = nn.Linear(128, 4) # Four output classes
  33. def forward(self, x):
  34. x = torch.relu(self.fc1(x))
  35. x = torch.relu(self.fc2(x))
  36. x = self.fc3(x)
  37. return x
  38. # Initialize the model, loss function, and optimizer
  39. model = PolarNet()
  40. criterion = nn.CrossEntropyLoss()
  41. optimizer = optim.Adam(model.parameters(), lr=0.001)
  42. # Training loop
  43. for epoch in range(20): # Number of epochs
  44. for inputs, targets in train_loader:
  45. # Forward pass
  46. outputs = model(inputs)
  47. loss = criterion(outputs, targets)
  48. # Backward pass and optimization
  49. optimizer.zero_grad()
  50. loss.backward()
  51. optimizer.step()
  52. print(f'Epoch {epoch+1}/20, Loss: {loss.item()}')
  53. # After training, evaluate the model on the entire dataset for visualization
  54. with torch.no_grad():
  55. predicted_labels = model(data).argmax(dim=1)
  56. # Plotting the results in 3D
  57. fig = plt.figure()
  58. ax = fig.add_subplot(111, projection='3d')
  59. # Convert polar back to Cartesian for plotting
  60. x_cartesian = r * torch.sin(phi) * torch.cos(theta)
  61. y_cartesian = r * torch.sin(phi) * torch.sin(theta)
  62. z_cartesian = r * torch.cos(phi)
  63. # Plot the 3D scatter plot
  64. scatter = ax.scatter(x_cartesian, y_cartesian, z_cartesian, c=predicted_labels, cmap='viridis', marker='o')
  65. # Add color bar and labels
  66. plt.colorbar(scatter, ax=ax)
  67. ax.set_xlabel('X')
  68. ax.set_ylabel('Y')
  69. ax.set_zlabel('Z')
  70. plt.title('3D Visualization of PolarNet Classifications')
  71. plt.show()

mnist ocr

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. import torch.nn.functional as F
  5. from torchvision import datasets, transforms
  6. from torch.utils.data import DataLoader
  7. batch_size = 64
  8. learning_rate = 0.01
  9. epochs = 100
  10. transform = transforms.Compose([
  11. transforms.ToTensor(),
  12. transforms.Normalize((0.1307,), (0.3081,))
  13. ])
  14. train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
  15. test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
  16. train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
  17. test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
  18. class Net(nn.Module):
  19. def __init__(self):
  20. super(Net, self).__init__()
  21. self.fc1 = nn.Linear(28 * 28, 128)
  22. self.fc2 = nn.Linear(128, 64)
  23. self.fc3 = nn.Linear(64, 10)
  24. def forward(self, x):
  25. x = x.view(-1, 28 * 28)
  26. x = F.relu(self.fc1(x))
  27. x = F.relu(self.fc2(x))
  28. x = self.fc3(x)
  29. return x
  30. model = Net()
  31. criterion = nn.CrossEntropyLoss()
  32. optimizer = optim.SGD(model.parameters(), lr=learning_rate)
  33. for epoch in range(epochs):
  34. model.train()
  35. for batch_idx, (data, target) in enumerate(train_loader):
  36. optimizer.zero_grad()
  37. output = model(data)
  38. loss = criterion(output, target)
  39. loss.backward()
  40. optimizer.step()
  41. if batch_idx % 100 == 0:
  42. print(f'Epoch: {epoch+1}/{epochs} [Batch: {batch_idx*len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')
  43. model.eval()
  44. test_loss = 0
  45. correct = 0
  46. with torch.no_grad():
  47. for data, target in test_loader:
  48. output = model(data)
  49. test_loss += criterion(output, target).item()
  50. pred = output.argmax(dim=1, keepdim=True)
  51. correct += pred.eq(target.view_as(pred)).sum().item()
  52. test_loss /= len(test_loader.dataset)
  53. accuracy = 100. * correct / len(test_loader.dataset)
  54. print(f'Test set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)')
  55. torch.save(model.state_dict(), "mnist_model.pth")

use mnist

  1. model = Net()
  2. ### 3. Load the Trained Model Weights
  3. model.load_state_dict(torch.load("mnist_model.pth"))
  4. model.eval() # Set the model to evaluation mode
  5. ### 4. Prepare the Handwritten Input Image
  6. #You need to preprocess the handwritten image to match the format of the MNIST dataset (28x28 pixels, grayscale).
  7. def preprocess_image(image_path):
  8. transform = transforms.Compose([
  9. transforms.Grayscale(), # Ensure the image is grayscale
  10. transforms.Resize((28, 28)), # Resize to 28x28 pixels
  11. transforms.ToTensor(), # Convert to tensor
  12. transforms.Normalize((0.1307,), (0.3081,)) # Normalize with the same mean and std as MNIST
  13. ])
  14. image = Image.open(image_path)
  15. image = transform(image).unsqueeze(0) # Add batch dimension
  16. return image
  17. ### 5. Perform Inference
  18. def recognize_digit(image_path):
  19. image = preprocess_image(image_path)
  20. with torch.no_grad():
  21. output = model(image)
  22. prediction = output.argmax(dim=1, keepdim=True)
  23. return prediction.item()
  24. # Example usage
  25. image_path = 'path_to_your_handwritten_digit_image3.png'
  26. predicted_digit = recognize_digit(image_path)
  27. print(f'Predicted Digit: {predicted_digit}')

calculator neural network

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. import random
  5. import numpy as np
  6. # Define the neural network architecture
  7. class CalculatorNN(nn.Module):
  8. def __init__(self):
  9. super(CalculatorNN, self).__init__()
  10. self.fc1 = nn.Linear(3, 128) # Input: 2 numbers + operation
  11. self.fc2 = nn.Linear(128, 64)
  12. self.fc3 = nn.Linear(64, 1) # Output: the result
  13. def forward(self, x):
  14. x = torch.relu(self.fc1(x))
  15. x = torch.relu(self.fc2(x))
  16. x = self.fc3(x)
  17. return x
  18. model = CalculatorNN()
  19. criterion = nn.MSELoss()
  20. optimizer = optim.Adam(model.parameters(), lr=0.001)
  21. # Training loop
  22. num_epochs = 50000 # loss is too large if is 5000.
  23. for epoch in range(num_epochs):
  24. model.train()
  25. # Forward pass
  26. predictions = model(X_train)
  27. loss = criterion(predictions, y_train)
  28. # Backward pass and optimization
  29. optimizer.zero_grad()
  30. loss.backward()
  31. optimizer.step()
  32. if (epoch + 1) % 10 == 0:
  33. print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
  34. # ---- use
  35. model = CalculatorNN()
  36. model.load_state_dict(torch.load('calculator_model.pth'))
  37. model.eval()
  38. # Perform the prediction
  39. with torch.no_grad():
  40. # Prepare the input (32 * 3)
  41. input_data = torch.tensor([[32.0, 3.0, 2]], dtype=torch.float32) # 2 corresponds to multiplication
  42. prediction = model(input_data)
  43. print(f'Prediction for 32 * 3: {prediction.item():.4f}')

Data cleaning

  • log clean utils
    ```python

    split by pattern, a full log for instance

    def split_log_file(input_file, split_pattern, output_pattern):
    with open(input_file, ‘r’) as file:
    1. log_content = file.read()
    pattern = re.compile(split_pattern)
    split_points = [match.start() for match in re.finditer(pattern, log_content)]
    split_points.append(len(log_content))
    for i in range(len(split_points) - 1):
    1. start = split_points[i]
    2. end = split_points[i + 1]
    3. segment = log_content[start:end]
    4. match = pattern.search(segment)
    5. if match:
    6. number = match.group(1)
    7. output_file = output_pattern.format(number=number)
    8. with open(output_file, 'w') as file:
    9. file.write(segment)
    10. print(f"Segment saved as {output_file}")

difference patterns save log

def move_patterns_logs(destination_path, patterns):
current_directory = os.getcwd()
log_files = glob.glob(“*.log”)
for log_file in log_files:
with open(log_file, ‘r’) as file:
if any(re.search(pattern, line) for pattern in patterns for line in file):
shutil.move(os.path.join(current_directory, log_file), destination_path)
break

filter show or data visualization

def filter_log_file(log_file_path, exclude_keywords):
with open(log_file_path, “r”) as file:
lines = file.readlines()
filtered_lines = [line for line in lines if not any(keyword in line for keyword in exclude_keywords)]
for line in filtered_lines:
print(line, end=””)

  1. ## SVM
  2. ![](svm_visualization_3d.gif)
  3. ```python
  4. import numpy as np
  5. import matplotlib.pyplot as plt
  6. from mpl_toolkits.mplot3d import Axes3D
  7. from sklearn.datasets import make_classification
  8. from sklearn.svm import SVC
  9. from sklearn.preprocessing import StandardScaler
  10. from sklearn.model_selection import train_test_split
  11. X, y = make_classification(n_samples=100, n_features=3, n_informative=3, n_redundant=0, n_classes=2, random_state=42)
  12. X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
  13. scaler = StandardScaler()
  14. X_train = scaler.fit_transform(X_train)
  15. X_test = scaler.transform(X_test)
  16. model = SVC(kernel='linear')
  17. model.fit(X_train, y_train)
  18. def plot_svm_decision_boundary_3d(model, X, y):
  19. fig = plt.figure(figsize=(10, 8))
  20. ax = fig.add_subplot(111, projection='3d')
  21. # Plot the training points
  22. scatter = ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=y, s=30, cmap=plt.cm.coolwarm)
  23. # Create grid to evaluate model (this defines the 3D space)
  24. xlim = ax.get_xlim()
  25. ylim = ax.get_ylim()
  26. zlim = ax.get_zlim()
  27. xx = np.linspace(xlim[0], xlim[1], 20)
  28. yy = np.linspace(ylim[0], ylim[1], 20)
  29. zz = np.linspace(zlim[0], zlim[1], 20)
  30. # Create a meshgrid to evaluate the decision function
  31. YY, ZZ = np.meshgrid(yy, zz)
  32. XX = -(model.coef_[0][0] * YY + model.coef_[0][2] * ZZ + model.intercept_) / model.coef_[0][1]
  33. # Plot the decision surface
  34. ax.plot_surface(XX, YY, ZZ, color='gray', alpha=0.3, rstride=100, cstride=100)
  35. # Highlight support vectors
  36. ax.scatter(model.support_vectors_[:, 0], model.support_vectors_[:, 1], model.support_vectors_[:, 2],
  37. s=100, facecolors='none', edgecolors='k', linewidth=1.5, label='Support Vectors')
  38. ax.set_title('SVM Decision Boundary in 3D')
  39. ax.set_xlabel('Feature 1')
  40. ax.set_ylabel('Feature 2')
  41. ax.set_zlabel('Feature 3')
  42. # Add color legend
  43. legend1 = ax.legend(*scatter.legend_elements(), loc="best", title="Classes")
  44. ax.add_artist(legend1)
  45. plt.show()
  46. plot_svm_decision_boundary_3d(model, X_train, y_train)

kmeans

  1. from sklearn.feature_extraction.text import TfidfVectorizer
  2. from sklearn.cluster import KMeans
  3. def cluster_error_messages(error_messages, num_clusters=5):
  4. vectorizer = TfidfVectorizer(stop_words='english')
  5. X = vectorizer.fit_transform(error_messages)
  6. kmeans = KMeans(n_clusters=num_clusters, random_state=0)
  7. kmeans.fit(X)
  8. labels = kmeans.labels_
  9. clustered_errors = {}
  10. for i, label in enumerate(labels):
  11. if label not in clustered_errors:
  12. clustered_errors[label] = []
  13. clustered_errors[label].append(error_messages[i])
  14. return clustered_errors

Decision Tree Classifier

  1. import matplotlib.pyplot as plt
  2. from sklearn.datasets import load_iris
  3. from sklearn.model_selection import train_test_split
  4. from sklearn.tree import DecisionTreeClassifier, plot_tree
  5. from sklearn import metrics
  6. iris = load_iris()
  7. X = iris.data # Features
  8. y = iris.target # Labels
  9. X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
  10. clf = DecisionTreeClassifier()
  11. clf.fit(X_train, y_train)
  12. y_pred = clf.predict(X_test)
  13. accuracy = metrics.accuracy_score(y_test, y_pred)
  14. print(f"Accuracy: {accuracy * 100:.2f}%")
  15. plt.figure(figsize=(12,8))
  16. plot_tree(clf, feature_names=iris.feature_names, class_names=iris.target_names, filled=True)
  17. plt.show()

Reinforcement Learning (DQN)

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. import torch.nn.functional as F
  5. import numpy as np
  6. import random
  7. # Define a simple fully connected neural network
  8. class DQN(nn.Module):
  9. def __init__(self, input_dim, output_dim):
  10. super(DQN, self).__init__()
  11. self.fc1 = nn.Linear(input_dim, 128)
  12. self.fc2 = nn.Linear(128, 128)
  13. self.fc3 = nn.Linear(128, output_dim)
  14. def forward(self, x):
  15. x = F.relu(self.fc1(x))
  16. x = F.relu(self.fc2(x))
  17. return self.fc3(x)
  18. # ### 3. **Initialize the environment and model:**
  19. import gymnasium as gym
  20. import torch
  21. env = gym.make("LunarLander-v2", render_mode="human")
  22. state_dim = env.observation_space.shape[0]
  23. action_dim = env.action_space.n
  24. # Create the DQN model
  25. model = DQN(input_dim=state_dim, output_dim=action_dim)
  26. # ### 4. **Define the training loop:**
  27. # In this section, we'll define how the agent interacts with the environment, how rewards are collected, and how the model is updated.
  28. # Parameters
  29. learning_rate = 0.001
  30. gamma = 0.99 # Discount factor
  31. epsilon = 1.0 # Exploration rate
  32. epsilon_decay = 0.995
  33. epsilon_min = 0.01
  34. episodes = 500
  35. # Optimizer
  36. optimizer = optim.Adam(model.parameters(), lr=learning_rate)
  37. # Function to choose action (using epsilon-greedy policy)
  38. def choose_action(state, epsilon):
  39. if np.random.rand() <= epsilon:
  40. return np.random.choice(action_dim) # Random action
  41. state = torch.FloatTensor(state).unsqueeze(0)
  42. with torch.no_grad():
  43. q_values = model(state)
  44. return torch.argmax(q_values).item()
  45. # Function to train the model
  46. def train_model(memory, batch_size=64):
  47. if len(memory) < batch_size:
  48. return
  49. # Randomly sample a batch from memory
  50. batch = random.sample(memory, batch_size)
  51. # Extract states, actions, rewards, next_states, and dones from the batch
  52. states, actions, rewards, next_states, dones = zip(*batch)
  53. # Convert them to tensors
  54. states = torch.FloatTensor(states)
  55. actions = torch.LongTensor(actions)
  56. rewards = torch.FloatTensor(rewards)
  57. next_states = torch.FloatTensor(next_states)
  58. dones = torch.FloatTensor(dones)
  59. # Compute Q values for the current states
  60. q_values = model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
  61. # Compute the maximum Q values for the next states
  62. next_q_values = model(next_states).max(1)[0]
  63. # Compute the target Q values
  64. q_targets = rewards + (1 - dones) * gamma * next_q_values
  65. # Compute the loss
  66. loss = F.mse_loss(q_values, q_targets)
  67. # Optimize the model
  68. optimizer.zero_grad()
  69. loss.backward()
  70. optimizer.step()
  71. # Main loop
  72. memory = []
  73. for episode in range(episodes):
  74. state = env.reset()[0]
  75. total_reward = 0
  76. for t in range(1000):
  77. action = choose_action(state, epsilon)
  78. next_state, reward, done, truncated, _ = env.step(action)
  79. memory.append((state, action, reward, next_state, done))
  80. train_model(memory)
  81. state = next_state
  82. total_reward += reward
  83. if done or truncated:
  84. break
  85. epsilon = max(epsilon_min, epsilon * epsilon_decay)
  86. print(f"Episode {episode + 1}, Total Reward: {total_reward}")
  87. env.close()

Flappy bird dqn

  1. import gymnasium as gym
  2. import numpy as np
  3. import pygame
  4. from gymnasium import spaces
  5. import torch
  6. import torch.nn as nn
  7. import torch.optim as optim
  8. import random
  9. from collections import deque
  10. import time
  11. import pygame
  12. import numpy as np
  13. from gymnasium import spaces
  14. from flappy_bird_cl3_pass_env_to_nn_3 import FlappyBirdEnv
  15. class DQN(nn.Module):
  16. def __init__(self, input_size, n_actions):
  17. super(DQN, self).__init__()
  18. self.fc = nn.Sequential(
  19. nn.Linear(input_size, 64),
  20. nn.ReLU(),
  21. nn.Linear(64, 64),
  22. nn.ReLU(),
  23. nn.Linear(64, n_actions)
  24. )
  25. def forward(self, x):
  26. return self.fc(x)
  27. class DQNAgent:
  28. def __init__(self, env, learning_rate=1e-3, gamma=0.99, epsilon_start=1.0, epsilon_final=0.01, epsilon_decay=0.995):
  29. self.env = env
  30. self.n_actions = env.action_space.n
  31. self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  32. self.epsilon = epsilon_start
  33. self.epsilon_final = epsilon_final
  34. self.epsilon_decay = epsilon_decay
  35. self.memory = deque(maxlen=10000)
  36. self.batch_size = 64
  37. state_size = len(env.get_state())
  38. self.model = DQN(state_size, self.n_actions).to(self.device)
  39. self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
  40. self.criterion = nn.MSELoss()
  41. self.gamma = gamma
  42. def get_action(self, state):
  43. if random.random() < self.epsilon:
  44. return random.randint(0, self.n_actions - 1)
  45. with torch.no_grad():
  46. state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
  47. q_values = self.model(state)
  48. return torch.argmax(q_values).item()
  49. def update_epsilon(self):
  50. self.epsilon = max(self.epsilon_final, self.epsilon * self.epsilon_decay)
  51. def remember(self, state, action, reward, next_state, done):
  52. self.memory.append((state, action, reward, next_state, done))
  53. def train(self):
  54. if len(self.memory) < self.batch_size:
  55. return
  56. batch = random.sample(self.memory, self.batch_size)
  57. states, actions, rewards, next_states, dones = zip(*batch)
  58. states = torch.FloatTensor(states).to(self.device)
  59. actions = torch.LongTensor(actions).to(self.device)
  60. rewards = torch.FloatTensor(rewards).to(self.device)
  61. next_states = torch.FloatTensor(next_states).to(self.device)
  62. dones = torch.FloatTensor(dones).to(self.device)
  63. current_q_values = self.model(states).gather(1, actions.unsqueeze(1))
  64. with torch.no_grad():
  65. next_q_values = self.model(next_states).max(1)[0]
  66. target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
  67. loss = self.criterion(current_q_values.squeeze(), target_q_values)
  68. self.optimizer.zero_grad()
  69. loss.backward()
  70. self.optimizer.step()
  71. def train_dqn(env, episodes=2000, max_steps=1000, render_interval=10):
  72. agent = DQNAgent(env)
  73. scores = []
  74. for episode in range(episodes):
  75. state = env.reset()
  76. score = 0
  77. for step in range(max_steps):
  78. if episode % render_interval == 0:
  79. env.render()
  80. action = agent.get_action(state)
  81. next_state, reward, done, _, _ = env.step(action)
  82. agent.remember(state, action, reward, next_state, done)
  83. agent.train()
  84. state = next_state
  85. score += reward
  86. if done:
  87. break
  88. if episode % render_interval == 0:
  89. pygame.event.pump()
  90. agent.update_epsilon()
  91. scores.append(score)
  92. if episode % 10 == 0:
  93. print(f"Episode: {episode}, Score: {score}, Epsilon: {agent.epsilon:.2f}")
  94. return agent, scores
  95. if __name__ == "__main__":
  96. env = FlappyBirdEnv()
  97. agent, scores = train_dqn(env, episodes=6000, render_interval=50)
  98. # Test the trained agent
  99. state = env.reset()
  100. done = False
  101. score = 0
  102. while not done:
  103. env.render()
  104. action = agent.get_action(state)
  105. next_state, reward, done, _, _ = env.step(action)
  106. state = next_state
  107. score += reward
  108. for event in pygame.event.get():
  109. if event.type == pygame.QUIT:
  110. done = True
  111. pygame.event.pump()
  112. time.sleep(0.03)
  113. print(f"Final Score: {score}")
  114. env.close()

SGD

  1. import torch
  2. import numpy as np
  3. import matplotlib.pyplot as plt
  4. from mpl_toolkits.mplot3d import Axes3D
  5. from matplotlib.animation import FuncAnimation
  6. # Random 3D surface (loss function)
  7. def loss_function(x, y):
  8. return torch.sin(x) * torch.cos(y) + 0.1 * (x**2 + y**2)
  9. # Generate a meshgrid for plotting the surface
  10. x = torch.linspace(-5, 5, 100)
  11. y = torch.linspace(-5, 5, 100)
  12. X, Y = torch.meshgrid(x, y)
  13. Z = loss_function(X, Y).detach().numpy()
  14. # Initialize figure and 3D axis for animation
  15. fig = plt.figure(figsize=(10, 7))
  16. ax = fig.add_subplot(111, projection='3d')
  17. ax.set_xlabel('X')
  18. ax.set_ylabel('Y')
  19. ax.set_zlabel('Z')
  20. ax.set_title('SGD Optimization Path on 3D Surface')
  21. # Plot the static 3D surface
  22. ax.plot_surface(X.numpy(), Y.numpy(), Z, cmap='viridis', alpha=0.7)
  23. # SGD starting point
  24. start_point = torch.tensor([4.0, 4.0], requires_grad=True)
  25. # Hyperparameters
  26. learning_rate = 0.1
  27. optimizer = torch.optim.SGD([start_point], lr=learning_rate)
  28. # Number of steps and animation frames
  29. steps = 10
  30. path = np.zeros((steps, 3))
  31. # Plotting the initial point on the surface
  32. point_plot, = ax.plot([], [], [], color='r', marker='o', markersize=5)
  33. # Function to update the frame during animation
  34. def update(i):
  35. global start_point
  36. optimizer.zero_grad()
  37. # Calculate the loss (z value)
  38. loss = loss_function(start_point[0], start_point[1])
  39. # Backpropagation to compute gradients
  40. loss.backward()
  41. # Perform optimization step
  42. optimizer.step()
  43. # Store the (x, y, z) values
  44. path[i, 0] = start_point[0].item()
  45. path[i, 1] = start_point[1].item()
  46. path[i, 2] = loss.item()
  47. # Update point on the surface
  48. point_plot.set_data(path[:i+1, 0], path[:i+1, 1])
  49. point_plot.set_3d_properties(path[:i+1, 2])
  50. return point_plot,
  51. # Animate SGD for 10 steps
  52. ani = FuncAnimation(fig, update, frames=steps, interval=500, blit=True)
  53. # Show the animation
  54. plt.show()

CNN with Attention

  1. import torch.nn as nn
  2. import torch.nn.functional as F
  3. class Attention(nn.Module):
  4. def __init__(self, in_channels, out_channels):
  5. super(Attention, self).__init__()
  6. self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
  7. self.softmax = nn.Softmax(dim=-1)
  8. def forward(self, x):
  9. # Global feature extraction
  10. global_features = torch.mean(x, dim=(2, 3), keepdim=True)
  11. attention_map = self.conv(global_features)
  12. attention_map = self.softmax(attention_map)
  13. out = x * attention_map
  14. return out
  15. class CNNWithAttention(nn.Module):
  16. def __init__(self):
  17. super(CNNWithAttention, self).__init__()
  18. # Convolutional layers
  19. self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
  20. self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
  21. self.pool = nn.MaxPool2d(2, 2)
  22. # Attention layer
  23. self.attention = Attention(64, 64)
  24. # Fully connected layers
  25. self.fc1 = nn.Linear(64 * 8 * 8, 512)
  26. self.fc2 = nn.Linear(512, 10)
  27. def forward(self, x):
  28. x = self.pool(F.relu(self.conv1(x)))
  29. x = self.pool(F.relu(self.conv2(x)))
  30. # Attention mechanism
  31. x = self.attention(x)
  32. x = x.view(-1, 64 * 8 * 8)
  33. x = F.relu(self.fc1(x))
  34. x = self.fc2(x)
  35. return x
  36. # --
  37. # Initialize the model, loss function, and optimizer
  38. model = CNNWithAttention()
  39. criterion = nn.CrossEntropyLoss()
  40. optimizer = optim.Adam(model.parameters(), lr=0.001)
  41. # Training loop
  42. for epoch in range(5): # Train for 5 epochs
  43. running_loss = 0.0
  44. for inputs, labels in trainloader:
  45. # Zero the parameter gradients
  46. optimizer.zero_grad()
  47. # Forward pass
  48. outputs = model(inputs)
  49. loss = criterion(outputs, labels)
  50. # Backward pass and optimize
  51. loss.backward()
  52. optimizer.step()
  53. running_loss += loss.item()
  54. print(f"Epoch [{epoch + 1}/5], Loss: {running_loss / len(trainloader)}")

LSTM generator

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. from torch.utils.data import Dataset, DataLoader
  5. class Vocab:
  6. def __init__(self, stoi, itos):
  7. self.stoi = stoi
  8. self.itos = itos
  9. # Provided corpus (AI history)
  10. corpus = """
  11. The history of artificial intelligence (AI) began in antiquity, with myths, stories and rumors of artificial beings endowed with intelligence or consciousness by master craftsmen.
  12. ... ...
  13. """
  14. # Simple tokenization (splitting by spaces)
  15. corpus = corpus.replace("\n", " ") # Remove newlines
  16. # Tokenization can be improved using libraries like nltk or spacy, but we'll use a simple split here
  17. tokens = corpus.split()
  18. # You can build a vocabulary from this corpus as you did before, for instance:
  19. from collections import Counter
  20. # Create a vocabulary from the corpus
  21. token_counts = Counter(tokens)
  22. vocab_stoi = {token: idx for idx, (token, count) in enumerate(token_counts.items())}
  23. vocab_itos = {idx: token for token, idx in vocab_stoi.items()}
  24. # Create the Vocab object
  25. vocab = Vocab(stoi=vocab_stoi, itos=vocab_itos)
  26. class RNNModel(nn.Module):
  27. def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
  28. super(RNNModel, self).__init__()
  29. self.num_layers = num_layers
  30. self.hidden_size = hidden_size
  31. self.embedding = nn.Embedding(vocab_size, embed_size)
  32. self.rnn = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
  33. self.fc = nn.Linear(hidden_size, vocab_size)
  34. def forward(self, x, hidden):
  35. x = self.embedding(x)
  36. out, hidden = self.rnn(x, hidden)
  37. out = self.fc(out)
  38. return out, hidden
  39. def init_hidden(self, batch_size):
  40. # Initialize hidden states (h_0) and cell states (c_0) with correct batch size
  41. weight = next(self.parameters()).data
  42. return (weight.new_zeros(self.num_layers, batch_size, self.hidden_size),
  43. weight.new_zeros(self.num_layers, batch_size, self.hidden_size))
  44. class TextDataset(Dataset):
  45. def __init__(self, text, vocab, sequence_length):
  46. self.vocab = vocab
  47. self.sequence_length = sequence_length
  48. self.data = self.tokenize_and_encode(text)
  49. def tokenize_and_encode(self, text):
  50. tokens = text.split() # Simple tokenization (split by spaces)
  51. return [self.vocab.stoi[token] for token in tokens if token in self.vocab.stoi]
  52. def __len__(self):
  53. return len(self.data) - self.sequence_length
  54. def __getitem__(self, idx):
  55. x = self.data[idx:idx + self.sequence_length]
  56. y = self.data[idx + 1:idx + 1 + self.sequence_length]
  57. return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)
  58. # Define sequence length and batch size
  59. sequence_length = 10 # Can be tuned
  60. batch_size = 100
  61. # Create the dataset and dataloader
  62. dataset = TextDataset(corpus, vocab, sequence_length)
  63. train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
  64. # Now you're ready to train the model using the provided corpus
  65. # Define model, loss function, and optimizer
  66. vocab_size = len(vocab.stoi)
  67. embed_size = 50 # Adjust as needed
  68. hidden_size = 100 # Adjust as needed
  69. num_layers = 2
  70. num_epochs = 100 # Adjust based on performance
  71. learning_rate = 0.001
  72. model = RNNModel(vocab_size, embed_size, hidden_size, num_layers)
  73. criterion = nn.CrossEntropyLoss()
  74. optimizer = optim.Adam(model.parameters(), lr=learning_rate)
  75. # Training loop
  76. for epoch in range(num_epochs):
  77. for batch in train_loader:
  78. inputs, targets = batch
  79. batch_size = inputs.size(0) # Get the actual batch size for this iteration
  80. hidden = model.init_hidden(batch_size) # Initialize hidden state with correct batch size
  81. outputs, hidden = model(inputs, hidden)
  82. loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))
  83. optimizer.zero_grad()
  84. loss.backward()
  85. optimizer.step()
  86. print(f'Epoch {epoch+1}, Loss: {loss.item()}')
  87. torch.save(model.state_dict(), 'rnn_model_ai.pth')
  88. def generate_text(model, start_text, max_length=100):
  89. model.eval()
  90. hidden = model.init_hidden(1) # Start with batch size 1
  91. input = torch.tensor([[vocab.stoi[start_text]]]) # Convert start_text to input tensor
  92. result = [start_text]
  93. for _ in range(max_length):
  94. output, hidden = model(input, hidden)
  95. prob = nn.functional.softmax(output[0, -1], dim=0).data
  96. next_word = torch.multinomial(prob, 1).item()
  97. result.append(vocab.itos[next_word]) # Convert back to word using vocab
  98. input = torch.tensor([[next_word]]) # Feed the next word as input
  99. return ' '.join(result)
  100. start_text = 'AI' # The starting word
  101. generated_text = generate_text(model, start_text, max_length=100)
  102. print(generated_text)

Seq2seq number translator

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. import numpy as np
  5. import random
  6. import matplotlib.pyplot as plt
  7. import random
  8. class NumeralTranslationDataset:
  9. def __init__(self):
  10. # Comprehensive mapping of Arabic numerals to English words
  11. self.num_to_words = {
  12. '0': 'zero', '1': 'one', '2': 'two', '3': 'three', '4': 'four',
  13. '5': 'five', '6': 'six', '7': 'seven', '8': 'eight', '9': 'nine',
  14. '10': 'ten', '11': 'eleven', '12': 'twelve', '13': 'thirteen',
  15. '14': 'fourteen', '15': 'fifteen', '16': 'sixteen',
  16. '17': 'seventeen', '18': 'eighteen', '19': 'nineteen',
  17. '20': 'twenty', '21': 'twenty one', '22': 'twenty two',
  18. '23': 'twenty three', '24': 'twenty four', '25': 'twenty five',
  19. '30': 'thirty', '31': 'thirty one', '32': 'thirty two',
  20. '33': 'thirty three', '34': 'thirty four', '35': 'thirty five',
  21. '40': 'forty', '41': 'forty one', '42': 'forty two',
  22. '43': 'forty three', '44': 'forty four', '45': 'forty five',
  23. '50': 'fifty', '51': 'fifty one', '52': 'fifty two',
  24. '53': 'fifty three', '54': 'fifty four', '55': 'fifty five',
  25. '60': 'sixty', '61': 'sixty one', '62': 'sixty two',
  26. '63': 'sixty three', '64': 'sixty four', '65': 'sixty five',
  27. '70': 'seventy', '71': 'seventy one', '72': 'seventy two',
  28. '73': 'seventy three', '74': 'seventy four', '75': 'seventy five',
  29. '80': 'eighty', '81': 'eighty one', '82': 'eighty two',
  30. '83': 'eighty three', '84': 'eighty four', '85': 'eighty five',
  31. '90': 'ninety', '91': 'ninety one', '92': 'ninety two',
  32. '93': 'ninety three', '94': 'ninety four', '95': 'ninety five'
  33. }
  34. def generate_training_data(self, num_examples=1000):
  35. """Generate random training data for number translation."""
  36. input_sequences = []
  37. target_sequences = []
  38. # Generate random numbers from 0 to 99 for a total of `num_examples` examples
  39. for _ in range(num_examples):
  40. num = random.randint(0, 99) # Randomly pick a number from 0 to 99
  41. num_str = str(num)
  42. # Translate to words
  43. if num in self.num_to_words:
  44. word = self.num_to_words[num_str]
  45. elif num < 20:
  46. # Handle teens
  47. units = str(num % 10)
  48. word = self.num_to_words[units]
  49. else:
  50. # Handle 21-99
  51. tens = str((num // 10) * 10)
  52. units = str(num % 10)
  53. tens_word = self.num_to_words[tens]
  54. units_word = self.num_to_words[units] if units != '0' else ''
  55. word = f"{tens_word} {units_word}".strip()
  56. input_sequences.append(list(num_str))
  57. target_sequences.append(list(word))
  58. return input_sequences, target_sequences
  59. class Encoder(nn.Module):
  60. def __init__(self, input_size, embedding_dim, hidden_dim):
  61. super(Encoder, self).__init__()
  62. self.embedding = nn.Embedding(input_size, embedding_dim)
  63. self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True, num_layers=2, dropout=0.2)
  64. def forward(self, x):
  65. embedded = self.embedding(x)
  66. outputs, hidden = self.gru(embedded)
  67. return outputs, hidden
  68. class Attention(nn.Module):
  69. def __init__(self, hidden_dim):
  70. super(Attention, self).__init__()
  71. self.attn = nn.Linear(hidden_dim * 2, hidden_dim)
  72. self.v = nn.Parameter(torch.rand(hidden_dim))
  73. def forward(self, hidden, encoder_outputs):
  74. # hidden = [batch size, hidden dim]
  75. # encoder_outputs = [batch size, seq len, hidden dim]
  76. batch_size = encoder_outputs.shape[0]
  77. src_len = encoder_outputs.shape[1]
  78. # Repeat hidden state src_len times
  79. hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
  80. # Concatenate hidden state with encoder outputs
  81. energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=-1)))
  82. # Compute attention scores
  83. attention = torch.sum(self.v * energy, dim=-1)
  84. return torch.softmax(attention, dim=1)
  85. class Decoder(nn.Module):
  86. def __init__(self, output_size, embedding_dim, hidden_dim):
  87. super(Decoder, self).__init__()
  88. self.embedding = nn.Embedding(output_size, embedding_dim)
  89. self.attention = Attention(hidden_dim)
  90. self.gru = nn.GRU(embedding_dim + hidden_dim, hidden_dim, batch_first=True, num_layers=2, dropout=0.2)
  91. self.fc_out = nn.Linear(hidden_dim, output_size)
  92. def forward(self, input, hidden, encoder_outputs):
  93. # input = [batch size, 1]
  94. # hidden = [batch size, hidden dim]
  95. # encoder_outputs = [batch size, src len, hidden dim]
  96. # Embedding input
  97. embedded = self.embedding(input)
  98. # Compute attention weights
  99. a = self.attention(hidden[-1], encoder_outputs)
  100. # Apply attention to encoder outputs
  101. attended = torch.bmm(a.unsqueeze(1), encoder_outputs).squeeze(1)
  102. # Concatenate embedded input with attended context
  103. rnn_input = torch.cat((embedded.squeeze(1), attended), dim=1).unsqueeze(1)
  104. # GRU step
  105. output, hidden = self.gru(rnn_input, hidden)
  106. # Prediction
  107. prediction = self.fc_out(output.squeeze(1))
  108. return prediction, hidden, a
  109. class Seq2SeqTranslator(nn.Module):
  110. def __init__(self, input_size, output_size, embedding_dim, hidden_dim):
  111. super(Seq2SeqTranslator, self).__init__()
  112. self.encoder = Encoder(input_size, embedding_dim, hidden_dim)
  113. self.decoder = Decoder(output_size, embedding_dim, hidden_dim)
  114. def forward(self, input_seq, target_seq, teacher_forcing_ratio=0.5):
  115. batch_size = input_seq.size(0)
  116. target_len = target_seq.size(1)
  117. target_vocab_size = self.decoder.fc_out.out_features
  118. # Tensor to store decoder outputs
  119. outputs = torch.zeros(batch_size, target_len, target_vocab_size)
  120. # Encoder
  121. encoder_outputs, hidden = self.encoder(input_seq)
  122. # First decoder input
  123. decoder_input = torch.zeros(batch_size, 1, dtype=torch.long)
  124. # Decode
  125. for t in range(target_len):
  126. decoder_output, hidden, _ = self.decoder(decoder_input, hidden, encoder_outputs)
  127. outputs[:, t:t+1, :] = decoder_output.unsqueeze(1)
  128. # Teacher forcing
  129. teacher_force = random.random() < teacher_forcing_ratio
  130. top1 = decoder_output.argmax(1)
  131. if teacher_force:
  132. decoder_input = target_seq[:, t:t+1]
  133. else:
  134. decoder_input = top1.unsqueeze(1)
  135. return outputs
  136. class NumeralTranslator:
  137. def __init__(self, input_chars, output_chars):
  138. # Create dataset
  139. self.dataset = NumeralTranslationDataset()
  140. # Create character to index mappings
  141. self.input_char_to_idx = {char: i for i, char in enumerate(input_chars)}
  142. self.input_idx_to_char = {i: char for char, i in self.input_char_to_idx.items()}
  143. self.output_char_to_idx = {char: i for i, char in enumerate(output_chars)}
  144. self.output_idx_to_char = {i: char for char, i in self.output_char_to_idx.items()}
  145. # Hyperparameters
  146. self.embedding_dim = 128
  147. self.hidden_dim = 256
  148. # Initialize model
  149. self.model = Seq2SeqTranslator(
  150. input_size=len(input_chars),
  151. output_size=len(output_chars),
  152. embedding_dim=self.embedding_dim,
  153. hidden_dim=self.hidden_dim
  154. )
  155. self.criterion = nn.CrossEntropyLoss()
  156. self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
  157. def prepare_sequence(self, seq, char_to_idx):
  158. """Convert sequence of characters to tensor of indices."""
  159. return torch.tensor([char_to_idx.get(char, 0) for char in seq], dtype=torch.long)
  160. def pad_sequences(self, sequences, pad_token):
  161. """Pad sequences to equal length."""
  162. # Convert sequences to lists if they are tensors
  163. sequences = [seq.tolist() if torch.is_tensor(seq) else seq for seq in sequences]
  164. max_len = max(len(seq) for seq in sequences)
  165. padded = []
  166. for seq in sequences:
  167. padded.append(seq + [pad_token] * (max_len - len(seq)))
  168. return torch.tensor(padded, dtype=torch.long)
  169. def train(self, epochs=300, batch_size=32):
  170. """Train the translation model."""
  171. # Generate training data
  172. input_sequences, target_sequences = self.dataset.generate_training_data()
  173. # Prepare input and target sequences
  174. input_chars = [list(str(seq)) for seq in input_sequences]
  175. target_chars = [list(seq) for seq in target_sequences]
  176. # Get character sets for input and output
  177. input_chars_set = sorted(set(''.join([''.join(seq) for seq in input_chars])))
  178. output_chars_set = sorted(set(''.join([''.join(seq) for seq in target_chars])))
  179. print("Input characters:", input_chars_set)
  180. print("Output characters:", output_chars_set)
  181. # Training loop
  182. epoch_losses = []
  183. for epoch in range(epochs):
  184. total_loss = 0
  185. # Shuffle data
  186. combined = list(zip(input_chars, target_chars))
  187. random.shuffle(combined)
  188. input_chars, target_chars = zip(*combined)
  189. for i in range(0, len(input_chars), batch_size):
  190. batch_input = input_chars[i:i+batch_size]
  191. batch_target = target_chars[i:i+batch_size]
  192. # Prepare input sequences
  193. input_seqs = self.pad_sequences(
  194. [self.prepare_sequence(seq, self.input_char_to_idx) for seq in batch_input],
  195. pad_token=0
  196. )
  197. # Prepare target sequences
  198. target_seqs = self.pad_sequences(
  199. [self.prepare_sequence(seq, self.output_char_to_idx) for seq in batch_target],
  200. pad_token=0
  201. )
  202. # Zero gradients
  203. self.optimizer.zero_grad()
  204. # Forward pass
  205. outputs = self.model(input_seqs, target_seqs)
  206. # Compute loss
  207. loss = self.criterion(
  208. outputs.view(-1, outputs.size(-1)),
  209. target_seqs.view(-1)
  210. )
  211. # Backward pass
  212. loss.backward()
  213. self.optimizer.step()
  214. total_loss += loss.item()
  215. # Record average epoch loss
  216. avg_loss = total_loss / (len(input_chars) // batch_size)
  217. epoch_losses.append(avg_loss)
  218. # Print progress
  219. if epoch % 10 == 0:
  220. print(f'Epoch {epoch}, Loss: {avg_loss:.4f}')
  221. # Visualize training loss
  222. self.plot_training_loss(epoch_losses)
  223. return epoch_losses
  224. def translate(self, input_number):
  225. """Translate a single number to words."""
  226. # Prepare input sequence
  227. input_seq = self.prepare_sequence(list(str(input_number)), self.input_char_to_idx)
  228. input_seq = input_seq.unsqueeze(0) # Add batch dimension
  229. # Create dummy target sequence of zeros
  230. max_output_length = 10 # Maximum expected word length
  231. dummy_target = torch.zeros(1, max_output_length, dtype=torch.long)
  232. # Disable gradient computation
  233. with torch.no_grad():
  234. # Get model outputs
  235. outputs = self.model(input_seq, dummy_target)
  236. # Get the most likely output characters
  237. predicted_indices = outputs.argmax(dim=-1)
  238. # Convert indices back to characters
  239. predicted_chars = []
  240. for i in range(predicted_indices.size(1)):
  241. char_idx = predicted_indices[0, i].item()
  242. char = self.output_idx_to_char[char_idx]
  243. if char != '<pxad>': # Skip padding
  244. predicted_chars.append(char)
  245. # Join characters to form a word
  246. return ''.join(predicted_chars).strip()
  247. def save_model(self, filepath='numeral_translator.pth'):
  248. """Save model state."""
  249. torch.save({
  250. 'model_state_dict': self.model.state_dict(),
  251. 'input_char_to_idx': self.input_char_to_idx,
  252. 'output_char_to_idx': self.output_char_to_idx
  253. }, filepath)
  254. print(f"Model saved to {filepath}")
  255. def load_model(self, filepath='numeral_translator.pth'):
  256. """Load model state."""
  257. checkpoint = torch.load(filepath)
  258. self.model.load_state_dict(checkpoint['model_state_dict'])
  259. self.input_char_to_idx = checkpoint['input_char_to_idx']
  260. self.output_char_to_idx = checkpoint['output_char_to_idx']
  261. print(f"Model loaded from {filepath}")
  262. def plot_training_loss(self, losses):
  263. """Visualize training loss."""
  264. plt.figure(figsize=(10, 5))
  265. plt.plot(losses, label='Training Loss')
  266. plt.title('Training Loss Over Epochs')
  267. plt.xlabel('Epoch')
  268. plt.ylabel('Loss')
  269. plt.legend()
  270. plt.tight_layout()
  271. plt.savefig('training_loss.png')
  272. plt.close()
  273. def main():
  274. # Define input and output character sets
  275. input_chars = list('0123456789')
  276. output_chars = list(' abcdefghijklmnopqrstuvwxyz') + ['<pad>']
  277. # Initialize translator
  278. translator = NumeralTranslator(input_chars, output_chars)
  279. # Train the model
  280. print("Training model...")
  281. losses = translator.train(epochs=300, batch_size=32)
  282. # Save the trained model
  283. translator.save_model()
  284. # Test the model with some examples
  285. test_numbers = ['0', '5', '13', '25', '42', '67', '89', '99']
  286. print("\nTesting translations:")
  287. for number in test_numbers:
  288. translation = translator.translate(number)
  289. print(f"{number} -> {translation}")
  290. # Interactive mode
  291. print("\nEnter a number (0-99) to translate or 'q' to quit:")
  292. while True:
  293. user_input = input("> ")
  294. if user_input.lower() == 'q':
  295. break
  296. try:
  297. number = int(user_input)
  298. if 0 <= number <= 99:
  299. translation = translator.translate(user_input)
  300. print(f"Translation: {translation}")
  301. else:
  302. print("Please enter a number between 0 and 99")
  303. except ValueError:
  304. print("Invalid input. Please enter a valid number or 'q' to quit")
  305. if __name__ == "__main__":
  306. main()

Transformer generator

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. from torch.utils.data import Dataset, DataLoader
  5. import math
  6. class Vocab:
  7. def __init__(self, stoi, itos):
  8. self.stoi = stoi
  9. self.itos = itos
  10. corpus = """
  11. The history of artificial intelligence (AI) began in antiquity, with myths, stories and rumors of artificial beings endowed with intelligence or consciousness by master craftsmen.
  12. ...
  13. """
  14. corpus = corpus.replace("\n", " ")
  15. tokens = corpus.split()
  16. from collections import Counter
  17. token_counts = Counter(tokens)
  18. vocab_stoi = {token: idx for idx, (token, count) in enumerate(token_counts.items())}
  19. vocab_itos = {idx: token for token, idx in vocab_stoi.items()}
  20. vocab = Vocab(stoi=vocab_stoi, itos=vocab_itos)
  21. class PositionalEncoding(nn.Module):
  22. def __init__(self, embed_size, max_len=5000):
  23. super(PositionalEncoding, self).__init__()
  24. self.encoding = torch.zeros(max_len, embed_size)
  25. position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
  26. div_term = torch.exp(torch.arange(0, embed_size, 2).float() * (-math.log(10000.0) / embed_size))
  27. self.encoding[:, 0::2] = torch.sin(position * div_term)
  28. self.encoding[:, 1::2] = torch.cos(position * div_term)
  29. self.encoding = self.encoding.unsqueeze(0)
  30. def forward(self, x):
  31. return x + self.encoding[:, :x.size(1), :].to(x.device)
  32. class TransformerModel(nn.Module):
  33. def __init__(self, vocab_size, embed_size, num_heads, hidden_size, num_layers, dropout=0.1):
  34. super(TransformerModel, self).__init__()
  35. self.embedding = nn.Embedding(vocab_size, embed_size)
  36. self.pos_encoder = PositionalEncoding(embed_size)
  37. encoder_layers = nn.TransformerEncoderLayer(embed_size, num_heads, hidden_size, dropout)
  38. self.transformer = nn.TransformerEncoder(encoder_layers, num_layers)
  39. self.fc = nn.Linear(embed_size, vocab_size)
  40. def forward(self, src, src_mask=None):
  41. src = self.embedding(src) * math.sqrt(src.size(-1)) # scale by sqrt(embed_size)
  42. src = self.pos_encoder(src)
  43. output = self.transformer(src, src_mask)
  44. output = self.fc(output)
  45. return output
  46. class TextDataset(Dataset):
  47. def __init__(self, text, vocab, sequence_length):
  48. self.vocab = vocab
  49. self.sequence_length = sequence_length
  50. self.data = self.tokenize_and_encode(text)
  51. def tokenize_and_encode(self, text):
  52. tokens = text.split()
  53. return [self.vocab.stoi[token] for token in tokens if token in self.vocab.stoi]
  54. def __len__(self):
  55. return len(self.data) - self.sequence_length
  56. def __getitem__(self, idx):
  57. x = self.data[idx:idx + self.sequence_length]
  58. y = self.data[idx + 1:idx + 1 + self.sequence_length]
  59. return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)
  60. sequence_length = 10
  61. batch_size = 100
  62. dataset = TextDataset(corpus, vocab, sequence_length)
  63. train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
  64. vocab_size = len(vocab.stoi)
  65. embed_size = 50 # Can be tuned
  66. num_heads = 2 # Number of attention heads
  67. hidden_size = 100 # Hidden layer size in feedforward network
  68. num_layers = 88 # Number of Transformer layers
  69. dropout = 0.1
  70. num_epochs = 100 # Adjust based on performance
  71. learning_rate = 0.001
  72. model = TransformerModel(vocab_size, embed_size, num_heads, hidden_size, num_layers, dropout)
  73. criterion = nn.CrossEntropyLoss()
  74. optimizer = optim.Adam(model.parameters(), lr=learning_rate)
  75. for epoch in range(num_epochs):
  76. for batch in train_loader:
  77. inputs, targets = batch
  78. inputs = inputs.permute(1, 0) # (batch_size, sequence_length) -> (sequence_length, batch_size)
  79. targets = targets.permute(1, 0)
  80. outputs = model(inputs)
  81. # Instead of view(), use reshape()
  82. loss = criterion(outputs.reshape(-1, vocab_size), targets.reshape(-1))
  83. optimizer.zero_grad()
  84. loss.backward()
  85. optimizer.step()
  86. print(f'Epoch {epoch+1}, Loss: {loss.item()}')
  87. torch.save(model.state_dict(), 'transformer_model_ai.pth')
  88. def generate_text(model, start_text, max_length=100):
  89. model.eval()
  90. input = torch.tensor([[vocab.stoi[start_text]]]).permute(1, 0) # Convert start_text to input tensor
  91. result = [start_text]
  92. for _ in range(max_length):
  93. output = model(input)
  94. prob = nn.functional.softmax(output[-1, 0], dim=0).data
  95. next_word = torch.multinomial(prob, 1).item()
  96. result.append(vocab.itos[next_word])
  97. input = torch.cat([input, torch.tensor([[next_word]])], dim=0)
  98. return ' '.join(result)
  99. start_text = 'AI'
  100. generated_text = generate_text(model, start_text, max_length=100)
  101. print(generated_text)