GraphQL subscriptions (and more) on serverless infrastructure
GraphQL subscriptions for AWS Lambda and API Gateway WebSockets.
Have all the functionality of GraphQL subscriptions on a stateful server without the cost.
Note: This project uses the graphql-ws protocol under the hood.
Seriously, read this first before you even think about using this.
import { createInstance } from 'subscriptionless';const instance = createInstance({schema,});
export const gatewayHandler = instance.gatewayHandler;
Set up API Gateway to route WebSocket events to the exported handler.
yaml
functions:
websocket:
name: my-subscription-lambda
handler: ./handler.gatewayHandler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $defaulttf
resource "aws_apigatewayv2_api" "ws" {
name = "websocket-api"
protocol_type = "WEBSOCKET"
route_selection_expression = "$request.body.action"
}
resource "aws_apigatewayv2_route" "default_route" {
api_id = aws_apigatewayv2_api.ws.id
route_key = "$default"
target = "integrations/${aws_apigatewayv2_integration.default_integration.id}"
}
resource "aws_apigatewayv2_route" "connect_route" {
api_id = aws_apigatewayv2_api.ws.id
route_key = "$connect"
target = "integrations/${aws_apigatewayv2_integration.default_integration.id}"
}
resource "aws_apigatewayv2_route" "disconnect_route" {
api_id = aws_apigatewayv2_api.ws.id
route_key = "$disconnect"
target = "integrations/${aws_apigatewayv2_integration.default_integration.id}"
}
resource "aws_apigatewayv2_integration" "default_integration" {
api_id = aws_apigatewayv2_api.ws.id
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.gateway_handler.invoke_arn
}
resource "aws_lambda_permission" "apigateway_invoke_lambda" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.gateway_handler.function_name
principal = "apigateway.amazonaws.com"
}
resource "aws_apigatewayv2_deployment" "ws" {
api_id = aws_apigatewayv2_api.ws.id
triggers = {
redeployment = sha1(join(",", tolist([
jsonencode(aws_apigatewayv2_integration.default_integration),
jsonencode(aws_apigatewayv2_route.default_route),
jsonencode(aws_apigatewayv2_route.connect_route),
jsonencode(aws_apigatewayv2_route.disconnect_route),
])))
}
depends_on = [
aws_apigatewayv2_route.default_route,
aws_apigatewayv2_route.connect_route,
aws_apigatewayv2_route.disconnect_route
]
}
resource "aws_apigatewayv2_stage" "ws" {
api_id = aws_apigatewayv2_api.ws.id
name = "example"
deployment_id = aws_apigatewayv2_deployment.ws.id
}In-flight connections and subscriptions need to be persisted.
tableNames argument to override the default table names.ts
const instance = createInstance({
/* ... */
tableNames: {
connections: 'my_connections',
subscriptions: 'my_subscriptions',
},
});yaml
resources:
Resources:
# Table for tracking connections
connectionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.environment.CONNECTIONS_TABLE}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
# Table for tracking subscriptions
subscriptionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.environment.SUBSCRIPTIONS_TABLE}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
- AttributeName: topic
AttributeType: S
- AttributeName: connectionId
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
- AttributeName: topic
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: ConnectionIndex
KeySchema:
- AttributeName: connectionId
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
- IndexName: TopicIndex
KeySchema:
- AttributeName: topic
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1tf
resource "aws_dynamodb_table" "connections-table" {
name = "subscriptionless_connections"
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
attribute {
name = "id"
type = "S"
}
ttl {
attribute_name = "ttl"
enabled = true
}
}
resource "aws_dynamodb_table" "subscriptions-table" {
name = "subscriptionless_subscriptions"
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
range_key = "topic"
attribute {
name = "id"
type = "S"
}
attribute {
name = "topic"
type = "S"
}
attribute {
name = "connectionId"
type = "S"
}
global_secondary_index {
name = "ConnectionIndex"
hash_key = "connectionId"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
global_secondary_index {
name = "TopicIndex"
hash_key = "topic"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
ttl {
attribute_name = "ttl"
enabled = true
}
}Set up server->client pinging for socket idleness detection.
Note: While not a hard requirement, this is strongly recommended.
ping argument to configure delays and what state machine to invoke.ts
const instance = createInstance({
/* ... */
ping: {
interval: 60, // Rate in seconds to send ping message
timeout: 30, // Threshold for pong response before closing socket
machineArn: process.env.MACHINE_ARN, // State machine to invoke
},
});ts
export const stateMachineHandler = instance.stateMachineHandler;yaml
functions:
machine:
handler: src/handler.stateMachineHandleryaml
stepFunctions:
stateMachines:
ping:
role: !GetAtt IamRoleLambdaExecution.Arn
definition:
StartAt: Wait
States:
Eval:
Type: Task
Resource: !GetAtt machine.Arn
Next: Choose
Wait:
Type: Wait
SecondsPath: '$.seconds'
Next: Eval
Choose:
Type: Choice
Choices:
- Not:
Variable: '$.state'
StringEquals: 'ABORT'
Next: Wait
Default: End
End:
Type: Pass
End: trueyaml
functions:
subscription:
handler: src/handler.gatewayHandler
environment:
PING_STATE_MACHINE_ARN: ${self:resources.Outputs.PingStateMachine.Value}
# ...
resources:
Outputs:
PingStateMachine:
Value:
Ref: PingStepFunctionsStateMachineconnection_init, the state machine will be invoked. Ensure that the websocket handler has the following permissions.yaml
- Effect: Allow
Resource: !GetAtt PingStepFunctionsStateMachine.Arn
Action:
- states:StartExecutionyaml
- Effect: Allow
Resource: !GetAtt connectionsTable.Arn
Action:
- dynamodb:GetItem
- dynamodb:UpdateItem
- Effect: Allow
Resource: '*'
Action:
- execute-api:*tf
resource "aws_lambda_function" "machine" {
function_name = "machine"
runtime = "nodejs14.x"
filename = data.archive_file.handler.output_path
source_code_hash = data.archive_file.handler.output_base64sha256
handler = "example.stateMachineHandler"
role = aws_iam_role.state_machine_function.arn
environment {
variables = {
CONNECTIONS_TABLE = aws_dynamodb_table.connections.id
SUBSCRIPTIONS_TABLE = aws_dynamodb_table.subscriptions.id
}
}
}tf
resource "aws_sfn_state_machine" "ping_state_machine" {
name = "ping-state-machine"
role_arn = aws_iam_role.state_machine.arn
definition = jsonencode({
StartAt = "Wait"
States = {
Wait = {
Type = "Wait"
SecondsPath = "$.seconds"
Next = "Eval"
}
Eval = {
Type = "Task"
Resource = aws_lambda_function.machine.arn
Next = "Choose"
}
Choose = {
Type = "Choice"
Choices = [{
Not = {
Variable = "$.state"
StringEquals = "ABORT"
}
Next = "Wait"
}]
Default = "End"
}
End = {
Type = "Pass"
End = true
}
}
})
}tf
resource "aws_lambda_function" "gateway_handler" {
# ...
environment {
variables = {
# ...
PING_STATE_MACHINE_ARN = aws_sfn_state_machine.ping_state_machine.arn
}
}
}subscriptionless uses it’s own PubSub implementation which loosely implements the Apollo PubSub Interface.
Note: Unlike the Apollo
PubSublibrary, this implementation is (mostly) stateless
subscribe function to associate incoming subscriptions with a topic.ts
import { subscribe } from 'subscriptionless/subscribe';
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {/* ... */}
subscribe: subscribe('MY_TOPIC'),
}
}
}subscribe function call in a withFilter to provide filter conditions.ts
import { withFilter, subscribe } from 'subscriptionless/subscribe';
// Subscription agnostic filter
withFilter(subscribe('MY_TOPIC'), {
attr1: '`attr1` must have this value',
attr2: {
attr3: 'Nested attributes work fine',
},
});
// Subscription specific filter
withFilter(subscribe('MY_TOPIC'), (root, args, context, info) => ({
userId: args.userId,
}));concat.tsx
import { concat, subscribe } from 'subscriptionless/subscribe';
concat(subscribe('TOPIC_1'), subscribe('TOPIC_2'));publish on your subscriptionless instance to publish events to active subscriptions.tsx
instance.publish({
type: 'MY_TOPIC',
payload: 'HELLO',
});tsx
// SNS Event
export const snsHandler = (event) =>
Promise.all(
event.Records.map((r) =>
instance.publish({
topic: r.Sns.TopicArn.substring(r.Sns.TopicArn.lastIndexOf(':') + 1), // Get topic name (e.g. "MY_TOPIC")
payload: JSON.parse(r.Sns.Message),
})
)
);
// Manual Invocation
export const invocationHandler = (payload) =>
instance.publish({ topic: 'MY_TOPIC', payload });Context values are accessible in all resolver level functions (resolve, subscribe, onSubscribe and onComplete).
context argument is provided, the default value is an object containing a connectionParams attribute.connection_init.ts
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {
console.log(context.connectionParams); // payload from connection_init
},
},
},
};context attribute when calling createInstance.ts
const instance = createInstance({
/* ... */
context: {
myAttr: 'hello',
},
});context attribute when calling createInstance.ts
const instance = createInstance({
/* ... */
context: ({ connectionParams }) => ({
myAttr: 'hello',
user: connectionParams.user,
}),
});Side effect handlers can be declared on subscription fields to handle onSubscribe (start) and onComplete (stop) events.
onSubscribe and onComplete side effects to work, resolvers must first be passed to prepareResolvers prior to schema construction.ts
import { prepareResolvers } from 'subscriptionless/subscribe';
const schema = makeExecutableSchema({
typedefs,
resolvers: prepareResolvers(resolvers),
});ts
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {
/* ... */
},
subscribe: subscribe('MY_TOPIC'),
onSubscribe: (root, args) => {
/* Do something on subscription start */
},
onComplete: (root, args) => {
/* Do something on subscription stop */
},
},
},
};Global events can be provided when calling createInstance to track the execution cycle of the lambda.
$connect event.ts
const instance = createInstance({
/* ... */
onConnect: ({ event }) => {
/* */
},
});$disconnect event.ts
const instance = createInstance({
/* ... */
onDisconnect: ({ event }) => {
/* */
},
});connection_init message.onConnectionInit can be used to verify the connection_init payload prior to persistence.ts
const instance = createInstance({
/* ... */
onConnectionInit: ({ message }) => {
const token = message.payload.token;
if (!myValidation(token)) {
throw Error('Token validation failed');
}
// Prevent sensitive data from being written to DB
return {
...message.payload,
token: undefined,
};
},
});subscribe message.ts
const instance = createInstance({
/* ... */
onSubscribe: ({ event, message }) => {
/* */
},
});complete message.ts
const instance = createInstance({
/* ... */
onComplete: ({ event, message }) => {
/* */
},
});ping message.ts
const instance = createInstance({
/* ... */
onPing: ({ event, message }) => {
/* */
},
});pong message.ts
const instance = createInstance({
/* ... */
onPong: ({ event, message }) => {
/* */
},
});ts
const instance = createInstance({
/* ... */
onError: (error, context) => {
/* */
},
});