diff --git a/core/translate/aggregation.rs b/core/translate/aggregation.rs
index d23caf3ec..7eec4531f 100644
--- a/core/translate/aggregation.rs
+++ b/core/translate/aggregation.rs
@@ -41,7 +41,7 @@ pub fn emit_ungrouped_aggregation<'a>(
 
     // This always emits a ResultRow because currently it can only be used for a single row result
     // Limit is None because we early exit on limit 0 and the max rows here is 1
-    emit_select_result(program, t_ctx, plan, None)?;
+    emit_select_result(program, t_ctx, plan, None, None)?;
 
     Ok(())
 }
diff --git a/core/translate/delete.rs b/core/translate/delete.rs
index 373a74024..6bbe6270b 100644
--- a/core/translate/delete.rs
+++ b/core/translate/delete.rs
@@ -44,8 +44,8 @@ pub fn prepare_delete_plan(
     // Parse the WHERE clause
     let resolved_where_clauses = parse_where(where_clause, &referenced_tables)?;
 
-    // Parse the LIMIT clause
-    let resolved_limit = limit.and_then(|l| parse_limit(*l));
+    // Parse the LIMIT/OFFSET clause
+    let (resolved_limit, resolved_offset) = limit.map_or(Ok((None, None)), |l| parse_limit(*l))?;
 
     let plan = DeletePlan {
         source: SourceOperator::Scan {
@@ -58,6 +58,7 @@ pub fn prepare_delete_plan(
         where_clause: resolved_where_clauses,
         order_by: None,
         limit: resolved_limit,
+        offset: resolved_offset,
         referenced_tables,
         available_indexes: vec![],
         contains_constant_false_condition: false,
diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs
index 939a287f0..bf585dddc 100644
--- a/core/translate/emitter.rs
+++ b/core/translate/emitter.rs
@@ -68,6 +68,10 @@ pub struct TranslateCtx<'a> {
     pub reg_result_cols_start: Option<usize>,
     // The register holding the limit value, if any.
     pub reg_limit: Option<usize>,
+    // The register holding the offset value, if any.
+    pub reg_offset: Option<usize>,
+    // The register holding the limit+offset value, if any.
+    pub reg_limit_offset_sum: Option<usize>,
     // metadata for the group by operator
     pub meta_group_by: Option<GroupByMetadata>,
     // metadata for the order by operator
@@ -111,6 +115,8 @@ fn prologue<'a>(
         label_main_loop_end: None,
         reg_agg_start: None,
         reg_limit: None,
+        reg_offset: None,
+        reg_limit_offset_sum: None,
         reg_result_cols_start: None,
         meta_group_by: None,
         meta_left_joins: HashMap::new(),
@@ -200,6 +206,14 @@ pub fn emit_query<'a>(
         t_ctx.reg_limit = plan.limit.map(|_| program.alloc_register());
     }
 
+    if t_ctx.reg_offset.is_none() {
+        t_ctx.reg_offset = plan.offset.map(|_| program.alloc_register());
+    }
+
+    if t_ctx.reg_limit_offset_sum.is_none() {
+        t_ctx.reg_limit_offset_sum = plan.offset.map(|_| program.alloc_register());
+    }
+
     // No rows will be read from source table loops if there is a constant false condition eg. WHERE 0
     // however an aggregation might still happen,
     // e.g. SELECT COUNT(*) WHERE 0 returns a row with 0, not an empty result set
@@ -302,7 +316,7 @@ fn emit_delete_insns(
     program: &mut ProgramBuilder,
     t_ctx: &mut TranslateCtx,
     source: &SourceOperator,
-    limit: &Option<usize>,
+    limit: &Option<isize>,
 ) -> Result<()> {
     let cursor_id = match source {
         SourceOperator::Scan {
diff --git a/core/translate/group_by.rs b/core/translate/group_by.rs
index dfb92f20a..1e52a98b5 100644
--- a/core/translate/group_by.rs
+++ b/core/translate/group_by.rs
@@ -394,7 +394,13 @@ pub fn emit_group_by<'a>(
 
     match &plan.order_by {
         None => {
-            emit_select_result(program, t_ctx, plan, Some(label_group_by_end))?;
+            emit_select_result(
+                program,
+                t_ctx,
+                plan,
+                Some(label_group_by_end),
+                Some(group_by_end_without_emitting_row_label),
+            )?;
         }
         Some(_) => {
             order_by_sorter_insert(program, t_ctx, plan)?;
diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs
index 9ce449b9e..4396842e5 100644
--- a/core/translate/main_loop.rs
+++ b/core/translate/main_loop.rs
@@ -705,7 +705,17 @@ fn emit_loop_source(
                 plan.aggregates.is_empty(),
                 "We should not get here with aggregates"
             );
-            emit_select_result(program, t_ctx, plan, t_ctx.label_main_loop_end)?;
+            let loop_labels = *t_ctx
+                .labels_main_loop
+                .get(&plan.source.id())
+                .expect("source has no loop labels");
+            emit_select_result(
+                program,
+                t_ctx,
+                plan,
+                t_ctx.label_main_loop_end,
+                Some(loop_labels.next),
+            )?;
 
             Ok(())
         }
diff --git a/core/translate/order_by.rs b/core/translate/order_by.rs
index 1d02639d2..db6ada538 100644
--- a/core/translate/order_by.rs
+++ b/core/translate/order_by.rs
@@ -17,7 +17,7 @@ use super::{
     emitter::TranslateCtx,
     expr::translate_expr,
     plan::{Direction, ResultSetColumn, SelectPlan},
-    result_row::emit_result_row_and_limit,
+    result_row::{emit_offset, emit_result_row_and_limit},
 };
 
 // Metadata for handling ORDER BY operations
@@ -63,6 +63,7 @@ pub fn emit_order_by(
     let order_by = plan.order_by.as_ref().unwrap();
     let result_columns = &plan.result_columns;
     let sort_loop_start_label = program.allocate_label();
+    let sort_loop_next_label = program.allocate_label();
     let sort_loop_end_label = program.allocate_label();
     let mut pseudo_columns = vec![];
     for (i, _) in order_by.iter().enumerate() {
@@ -117,6 +118,8 @@ pub fn emit_order_by(
     });
 
     program.resolve_label(sort_loop_start_label, program.offset());
+    emit_offset(program, t_ctx, plan, sort_loop_next_label)?;
+
     program.emit_insn(Insn::SorterData {
         cursor_id: sort_cursor,
         dest_reg: reg_sorter_data,
@@ -138,6 +141,7 @@ pub fn emit_order_by(
 
     emit_result_row_and_limit(program, t_ctx, plan, start_reg, Some(sort_loop_end_label))?;
 
+    program.resolve_label(sort_loop_next_label, program.offset());
     program.emit_insn(Insn::SorterNext {
         cursor_id: sort_cursor,
         pc_if_next: sort_loop_start_label,
diff --git a/core/translate/plan.rs b/core/translate/plan.rs
index 6164428fb..3c1fe54f0 100644
--- a/core/translate/plan.rs
+++ b/core/translate/plan.rs
@@ -64,7 +64,9 @@ pub struct SelectPlan {
     /// all the aggregates collected from the result columns, order by, and (TODO) having clauses
     pub aggregates: Vec<Aggregate>,
     /// limit clause
-    pub limit: Option<usize>,
+    pub limit: Option<isize>,
+    /// offset clause
+    pub offset: Option<isize>,
     /// all the tables referenced in the query
     pub referenced_tables: Vec<TableReference>,
     /// all the indexes available
@@ -87,7 +89,9 @@ pub struct DeletePlan {
     /// order by clause
     pub order_by: Option<Vec<(ast::Expr, Direction)>>,
     /// limit clause
-    pub limit: Option<usize>,
+    pub limit: Option<isize>,
+    /// offset clause
+    pub offset: Option<isize>,
     /// all the tables referenced in the query
     pub referenced_tables: Vec<TableReference>,
     /// all the indexes available
diff --git a/core/translate/planner.rs b/core/translate/planner.rs
index 86132fb60..f66c96ce3 100644
--- a/core/translate/planner.rs
+++ b/core/translate/planner.rs
@@ -10,7 +10,7 @@ use crate::{
     vdbe::BranchOffset,
     Result,
 };
-use sqlite3_parser::ast::{self, Expr, FromClause, JoinType, Limit};
+use sqlite3_parser::ast::{self, Expr, FromClause, JoinType, Limit, UnaryOperator};
 
 pub const ROWID: &str = "rowid";
 
@@ -566,19 +566,39 @@ fn parse_join(
     })
 }
 
-pub fn parse_limit(limit: Limit) -> Option<usize> {
+pub fn parse_limit(limit: Limit) -> Result<(Option<isize>, Option<isize>)> {
+    let offset_val = match limit.offset {
+        Some(offset_expr) => match offset_expr {
+            Expr::Literal(ast::Literal::Numeric(n)) => n.parse().ok(),
+            // If OFFSET is negative, the result is as if OFFSET is zero
+            Expr::Unary(UnaryOperator::Negative, expr) => match *expr {
+                Expr::Literal(ast::Literal::Numeric(n)) => n.parse::<isize>().ok().map(|num| -num),
+                _ => crate::bail_parse_error!("Invalid OFFSET clause"),
+            },
+            _ => crate::bail_parse_error!("Invalid OFFSET clause"),
+        },
+        None => Some(0),
+    };
+
     if let Expr::Literal(ast::Literal::Numeric(n)) = limit.expr {
-        n.parse().ok()
+        Ok((n.parse().ok(), offset_val))
+    } else if let Expr::Unary(UnaryOperator::Negative, expr) = limit.expr {
+        if let Expr::Literal(ast::Literal::Numeric(n)) = *expr {
+            let limit_val = n.parse::<isize>().ok().map(|num| -num);
+            Ok((limit_val, offset_val))
+        } else {
+            crate::bail_parse_error!("Invalid LIMIT clause");
+        }
     } else if let Expr::Id(id) = limit.expr {
         if id.0.eq_ignore_ascii_case("true") {
-            Some(1)
+            Ok((Some(1), offset_val))
         } else if id.0.eq_ignore_ascii_case("false") {
-            Some(0)
+            Ok((Some(0), offset_val))
         } else {
-            None
+            crate::bail_parse_error!("Invalid LIMIT clause");
         }
     } else {
-        None
+        crate::bail_parse_error!("Invalid LIMIT clause");
     }
 }
 
diff --git a/core/translate/result_row.rs b/core/translate/result_row.rs
index 5c76f3008..0b4b343d3 100644
--- a/core/translate/result_row.rs
+++ b/core/translate/result_row.rs
@@ -18,7 +18,12 @@ pub fn emit_select_result(
     t_ctx: &mut TranslateCtx,
     plan: &SelectPlan,
     label_on_limit_reached: Option<BranchOffset>,
+    offset_jump_to: Option<BranchOffset>,
 ) -> Result<()> {
+    if let (Some(jump_to), Some(_)) = (offset_jump_to, label_on_limit_reached) {
+        emit_offset(program, t_ctx, plan, jump_to)?;
+    }
+
     let start_reg = t_ctx.reg_result_cols_start.unwrap();
     for (i, rc) in plan.result_columns.iter().enumerate() {
         let reg = start_reg + i;
@@ -71,6 +76,22 @@ pub fn emit_result_row_and_limit(
             dest: t_ctx.reg_limit.unwrap(),
         });
         program.mark_last_insn_constant();
+
+        if let Some(offset) = plan.offset {
+            program.emit_insn(Insn::Integer {
+                value: offset as i64,
+                dest: t_ctx.reg_offset.unwrap(),
+            });
+            program.mark_last_insn_constant();
+
+            program.emit_insn(Insn::OffsetLimit {
+                limit_reg: t_ctx.reg_limit.unwrap(),
+                combined_reg: t_ctx.reg_limit_offset_sum.unwrap(),
+                offset_reg: t_ctx.reg_offset.unwrap(),
+            });
+            program.mark_last_insn_constant();
+        }
+
         program.emit_insn(Insn::DecrJumpZero {
             reg: t_ctx.reg_limit.unwrap(),
             target_pc: label_on_limit_reached.unwrap(),
@@ -78,3 +99,23 @@ pub fn emit_result_row_and_limit(
     }
     Ok(())
 }
+
+pub fn emit_offset(
+    program: &mut ProgramBuilder,
+    t_ctx: &mut TranslateCtx,
+    plan: &SelectPlan,
+    jump_to: BranchOffset,
+) -> Result<()> {
+    match plan.offset {
+        Some(offset) if offset > 0 => {
+            program.add_comment(program.offset(), "OFFSET");
+            program.emit_insn(Insn::IfPos {
+                reg: t_ctx.reg_offset.unwrap(),
+                target_pc: jump_to,
+                decrement_by: 1,
+            });
+        }
+        _ => {}
+    }
+    Ok(())
+}
diff --git a/core/translate/select.rs b/core/translate/select.rs
index 196a1d284..46226abbd 100644
--- a/core/translate/select.rs
+++ b/core/translate/select.rs
@@ -57,6 +57,7 @@ pub fn prepare_select_plan(
                 order_by: None,
                 aggregates: vec![],
                 limit: None,
+                offset: None,
                 referenced_tables,
                 available_indexes: schema.indexes.clone().into_values().flatten().collect(),
                 contains_constant_false_condition: false,
@@ -326,8 +327,9 @@ pub fn prepare_select_plan(
                 plan.order_by = Some(key);
             }
 
-            // Parse the LIMIT clause
-            plan.limit = select.limit.and_then(|l| parse_limit(*l));
+            // Parse the LIMIT/OFFSET clause
+            (plan.limit, plan.offset) =
+                select.limit.map_or(Ok((None, None)), |l| parse_limit(*l))?;
 
             // Return the unoptimized query plan
             Ok(Plan::Select(plan))
diff --git a/core/translate/subquery.rs b/core/translate/subquery.rs
index fd0e6667b..c805f3ca5 100644
--- a/core/translate/subquery.rs
+++ b/core/translate/subquery.rs
@@ -97,6 +97,8 @@ pub fn emit_subquery<'a>(
         result_column_indexes_in_orderby_sorter: HashMap::new(),
         result_columns_to_skip_in_orderby_sorter: None,
         reg_limit: plan.limit.map(|_| program.alloc_register()),
+        reg_offset: plan.offset.map(|_| program.alloc_register()),
+        reg_limit_offset_sum: plan.offset.map(|_| program.alloc_register()),
         resolver: Resolver::new(t_ctx.resolver.symbol_table),
     };
     let subquery_body_end_label = program.allocate_label();
diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs
index 89967608a..490a677b7 100644
--- a/core/vdbe/explain.rs
+++ b/core/vdbe/explain.rs
@@ -957,6 +957,22 @@ pub fn insn_to_str(
                 0,
                 "".to_string(),
             ),
+            Insn::OffsetLimit {
+                limit_reg,
+                combined_reg,
+                offset_reg,
+            } => (
+                "OffsetLimit",
+                *limit_reg as i32,
+                *combined_reg as i32,
+                *offset_reg as i32,
+                OwnedValue::build_text(Rc::new("".to_string())),
+                0,
+                format!(
+                    "if r[{}]>0 then r[{}]=r[{}]+max(0,r[{}]) else r[{}]=(-1)",
+                    limit_reg, combined_reg, limit_reg, offset_reg, combined_reg
+                ),
+            ),
             Insn::OpenWriteAsync {
                 cursor_id,
                 root_page,
diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs
index f17a1a354..354acbf10 100644
--- a/core/vdbe/insn.rs
+++ b/core/vdbe/insn.rs
@@ -473,6 +473,12 @@ pub enum Insn {
         target_pc: BranchOffset,
     },
 
+    OffsetLimit {
+        limit_reg: usize,
+        combined_reg: usize,
+        offset_reg: usize,
+    },
+
     OpenWriteAsync {
         cursor_id: CursorID,
         root_page: PageIdx,
diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs
index 8e2db9189..e7482db21 100644
--- a/core/vdbe/mod.rs
+++ b/core/vdbe/mod.rs
@@ -2290,6 +2290,37 @@ impl Program {
                         state.pc = target_pc.to_offset_int();
                     }
                 }
+                Insn::OffsetLimit {
+                    limit_reg,
+                    combined_reg,
+                    offset_reg,
+                } => {
+                    let limit_val = match state.registers[*limit_reg] {
+                        OwnedValue::Integer(val) => val,
+                        _ => {
+                            return Err(LimboError::InternalError(
+                                "OffsetLimit: the value in limit_reg is not an integer".into(),
+                            ));
+                        }
+                    };
+                    let offset_val = match state.registers[*offset_reg] {
+                        OwnedValue::Integer(val) if val < 0 => 0,
+                        OwnedValue::Integer(val) if val >= 0 => val,
+                        _ => {
+                            return Err(LimboError::InternalError(
+                                "OffsetLimit: the value in offset_reg is not an integer".into(),
+                            ));
+                        }
+                    };
+
+                    let offset_limit_sum = limit_val.overflowing_add(offset_val);
+                    if limit_val <= 0 || offset_limit_sum.1 {
+                        state.registers[*combined_reg] = OwnedValue::Integer(-1);
+                    } else {
+                        state.registers[*combined_reg] = OwnedValue::Integer(offset_limit_sum.0);
+                    }
+                    state.pc += 1;
+                }
                 // this cursor may be reused for next insert
                 // Update: tablemoveto is used to travers on not exists, on insert depending on flags if nonseek it traverses again.
                 // If not there might be some optimizations obviously.
diff --git a/testing/all.test b/testing/all.test
index d70fb3f56..05cd2d196 100755
--- a/testing/all.test
+++ b/testing/all.test
@@ -22,3 +22,4 @@ source $testdir/where.test
 source $testdir/compare.test
 source $testdir/changes.test
 source $testdir/total-changes.test
+source $testdir/offset.test
diff --git a/testing/offset.test b/testing/offset.test
new file mode 100644
index 000000000..2f5be091b
--- /dev/null
+++ b/testing/offset.test
@@ -0,0 +1,47 @@
+#!/usr/bin/env tclsh
+
+set testdir [file dirname $argv0]
+source $testdir/tester.tcl
+
+do_execsql_test select-offset-0 {
+  SELECT id FROM users ORDER BY id LIMIT 1 OFFSET 0;
+} {1}
+
+do_execsql_test select-offset-1 {
+  SELECT id FROM users ORDER BY id LIMIT 1 OFFSET 1;
+} {2}
+
+do_execsql_test select-offset-negative {
+  SELECT id FROM users ORDER BY id LIMIT 1 OFFSET -1;
+} {1}
+
+do_execsql_test select-offset-0-groupby {
+  SELECT COUNT(*) FROM users GROUP BY STATE ORDER BY STATE LIMIT 5 OFFSET 0;
+} {168
+166
+162
+153
+166}
+
+do_execsql_test select-offset-1-groupby {
+  SELECT COUNT(*) FROM users GROUP BY STATE ORDER BY STATE LIMIT 5 OFFSET 1;
+} {166
+162
+153
+166
+170}
+
+do_execsql_test select-offset-subquery {
+    SELECT id, first_name, age 
+    FROM (
+        SELECT id, first_name, age 
+        FROM users 
+        ORDER BY id ASC 
+        LIMIT 5 OFFSET 2
+    )
+    ORDER BY id DESC;
+} {7|Aimee|24
+6|Nicholas|89
+5|Edward|15
+4|Jennifer|33
+3|Tommy|18}
\ No newline at end of file